Skip to content

Commit 014d0df

Browse files
tvaron3Copilot
andcommitted
[Cosmos] ThroughputControl API
Add SDK-level throughput control API for server-side priority-based execution and throughput buckets (issue #3904). New SDK types: - ThroughputControlGroup enum with PriorityBased and ThroughputBucket variants, hiding the driver's unimplemented ClientSide variant - Re-exports PriorityLevel, ThroughputControlGroupName, ContainerReference Registration and resolution: - CosmosClientOptions::with_throughput_control_group() for registration - ThroughputControlGroupRegistry threaded through CosmosClient, DatabaseClient, and ContainerClient - apply_throughput_control() on ItemWriteOptions, QueryOptions, and BatchOptions resolves group name to x-ms-cosmos-priority-level and x-ms-cosmos-throughput-bucket headers Driver: - ContainerReference::stub() doc-hidden helper for unit testing Tests cover registration errors, header application for priority and bucket groups, default group resolution, and empty registry behavior. Fixes #3904 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 4d39ccc commit 014d0df

File tree

13 files changed

+303
-341
lines changed

13 files changed

+303
-341
lines changed

sdk/cosmos/azure_data_cosmos/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
### Features Added
66

77
- Added `CustomResponseBuilder` and `FaultInjectionRule::hit_count()` APIs for fault injection, enabling ergonomic construction of synthetic HTTP responses and test verification of rule activation counts. ([#3888](https://github.com/Azure/azure-sdk-for-rust/pull/3888))
8+
- Added throughput control API: re-exported `ThroughputControlGroupOptions`, `PriorityLevel`, `ThroughputControlGroupRegistry`, and related types from the driver. Users can register throughput control groups on `CosmosClientBuilder` via `with_throughput_control_group()` to configure priority-based execution and throughput bucket server features. ([#4078](https://github.com/Azure/azure-sdk-for-rust/pull/4078))
89

910
### Breaking Changes
1011

sdk/cosmos/azure_data_cosmos/src/clients/container_client.rs

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,12 @@
22
// Licensed under the MIT License.
33

44
use crate::{
5-
clients::OffersClient,
5+
clients::{ClientContext, OffersClient},
66
models::{
77
BatchResponse, ContainerProperties, CosmosResponse, ItemResponse, ResourceResponse,
88
ThroughputProperties,
99
},
1010
options::{BatchOptions, QueryOptions, ReadContainerOptions},
11-
pipeline::GatewayPipeline,
1211
resource_context::{ResourceLink, ResourceType},
1312
transactional_batch::TransactionalBatch,
1413
DeleteContainerOptions, FeedItemIterator, ItemReadOptions, ItemWriteOptions, PartitionKey,
@@ -19,13 +18,10 @@ use std::sync::Arc;
1918
use crate::cosmos_request::CosmosRequest;
2019
use crate::handler::container_connection::ContainerConnection;
2120
use crate::operation_context::OperationType;
22-
use crate::routing::global_endpoint_manager::GlobalEndpointManager;
23-
use crate::routing::global_partition_endpoint_manager::GlobalPartitionEndpointManager;
2421
use crate::routing::partition_key_range_cache::PartitionKeyRangeCache;
2522
use azure_core::http::headers::AsHeaders;
2623
use azure_core::http::Context;
2724
use azure_data_cosmos_driver::models::{ContainerReference, CosmosOperation, ItemReference};
28-
use azure_data_cosmos_driver::CosmosDriver;
2925
use serde::{de::DeserializeOwned, Serialize};
3026

3127
/// A client for working with a specific container in a Cosmos DB account.
@@ -35,30 +31,27 @@ use serde::{de::DeserializeOwned, Serialize};
3531
pub struct ContainerClient {
3632
link: ResourceLink,
3733
items_link: ResourceLink,
38-
pipeline: Arc<GatewayPipeline>,
3934
container_connection: Arc<ContainerConnection>,
4035
container_id: String,
41-
driver: Arc<CosmosDriver>,
4236
container_ref: ContainerReference,
37+
context: ClientContext,
4338
}
4439

4540
impl ContainerClient {
4641
pub(crate) async fn new(
47-
pipeline: Arc<GatewayPipeline>,
42+
context: ClientContext,
4843
database_link: &ResourceLink,
4944
container_id: &str,
5045
database_id: &str,
51-
driver: Arc<CosmosDriver>,
52-
global_endpoint_manager: Arc<GlobalEndpointManager>,
53-
global_partition_endpoint_manager: Arc<GlobalPartitionEndpointManager>,
5446
) -> azure_core::Result<Self> {
5547
let link = database_link
5648
.feed(ResourceType::Containers)
5749
.item(container_id);
5850
let items_link = link.feed(ResourceType::Documents);
5951

6052
// Eagerly resolve immutable container metadata from the driver.
61-
let container_ref = driver
53+
let container_ref = context
54+
.driver
6255
.resolve_container(database_id, container_id)
6356
.await
6457
.map_err(|e| {
@@ -68,25 +61,24 @@ impl ContainerClient {
6861
})?;
6962

7063
let partition_key_range_cache = Arc::from(PartitionKeyRangeCache::new(
71-
pipeline.clone(),
64+
context.pipeline.clone(),
7265
database_link.clone(),
73-
global_endpoint_manager.clone(),
66+
context.global_endpoint_manager.clone(),
7467
));
7568
let container_connection = Arc::from(ContainerConnection::new(
76-
pipeline.clone(),
69+
context.pipeline.clone(),
7770
partition_key_range_cache,
78-
global_partition_endpoint_manager.clone(),
71+
context.global_partition_endpoint_manager.clone(),
7972
container_ref.clone(),
8073
));
8174

8275
Ok(Self {
8376
link,
8477
items_link,
85-
pipeline,
8678
container_connection,
8779
container_id: container_id.to_string(),
88-
driver,
8980
container_ref,
81+
context,
9082
})
9183
}
9284

@@ -190,7 +182,7 @@ impl ContainerClient {
190182
.resource_id
191183
.expect("service should always return a '_rid' for a container");
192184

193-
let offers_client = OffersClient::new(self.pipeline.clone(), resource_id);
185+
let offers_client = OffersClient::new(self.context.pipeline.clone(), resource_id);
194186
offers_client.read(Context::default()).await
195187
}
196188

@@ -222,7 +214,7 @@ impl ContainerClient {
222214
.resource_id
223215
.expect("service should always return a '_rid' for a container");
224216

225-
let offers_client = OffersClient::new(self.pipeline.clone(), resource_id);
217+
let offers_client = OffersClient::new(self.context.pipeline.clone(), resource_id);
226218
offers_client
227219
.replace(Context::default(), throughput)
228220
.await
@@ -575,6 +567,7 @@ impl ContainerClient {
575567

576568
// Execute through the driver.
577569
let driver_response = self
570+
.context
578571
.driver
579572
.execute_operation(operation, options.operation)
580573
.await?;
@@ -706,7 +699,7 @@ impl ContainerClient {
706699
options.apply_headers(&mut headers);
707700

708701
crate::query::executor::QueryExecutor::new(
709-
self.pipeline.clone(),
702+
self.context.pipeline.clone(),
710703
self.items_link.clone(),
711704
Context::default(),
712705
query,

sdk/cosmos/azure_data_cosmos/src/clients/cosmos_client.rs

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,16 @@
22
// Licensed under the MIT License.
33

44
use crate::{
5-
clients::DatabaseClient,
5+
clients::{ClientContext, DatabaseClient},
66
cosmos_request::CosmosRequest,
77
models::{DatabaseProperties, ResourceResponse},
88
operation_context::OperationType,
9-
pipeline::GatewayPipeline,
9+
options::ThroughputControlGroupRegistry,
1010
resource_context::ResourceLink,
11-
routing::{
12-
global_endpoint_manager::GlobalEndpointManager,
13-
global_partition_endpoint_manager::GlobalPartitionEndpointManager,
14-
},
1511
CreateDatabaseOptions, FeedItemIterator, Query, QueryDatabasesOptions,
1612
};
1713
use azure_core::http::{Context, Url};
18-
use azure_data_cosmos_driver::CosmosDriver;
1914
use serde::Serialize;
20-
use std::sync::Arc;
2115

2216
pub use super::cosmos_client_builder::CosmosClientBuilder;
2317

@@ -70,10 +64,7 @@ pub use super::cosmos_client_builder::CosmosClientBuilder;
7064
#[derive(Debug, Clone)]
7165
pub struct CosmosClient {
7266
pub(crate) databases_link: ResourceLink,
73-
pub(crate) pipeline: Arc<GatewayPipeline>,
74-
pub(crate) driver: Arc<CosmosDriver>,
75-
pub(crate) global_endpoint_manager: Arc<GlobalEndpointManager>,
76-
pub(crate) global_partition_endpoint_manager: Arc<GlobalPartitionEndpointManager>,
67+
pub(crate) context: ClientContext,
7768
}
7869

7970
impl CosmosClient {
@@ -105,18 +96,20 @@ impl CosmosClient {
10596
/// # Arguments
10697
/// * `id` - The ID of the database.
10798
pub fn database_client(&self, id: &str) -> DatabaseClient {
108-
DatabaseClient::new(
109-
self.pipeline.clone(),
110-
id,
111-
self.driver.clone(),
112-
self.global_endpoint_manager.clone(),
113-
self.global_partition_endpoint_manager.clone(),
114-
)
99+
DatabaseClient::new(self.context.clone(), id)
115100
}
116101

117102
/// Gets the endpoint of the database account this client is connected to.
118103
pub fn endpoint(&self) -> &Url {
119-
&self.pipeline.endpoint
104+
&self.context.pipeline.endpoint
105+
}
106+
107+
/// Returns the throughput control group registry.
108+
///
109+
/// Groups are registered at client creation time via
110+
/// [`CosmosClientBuilder::with_throughput_control_group()`].
111+
pub fn throughput_control_groups(&self) -> &ThroughputControlGroupRegistry {
112+
self.context.driver.runtime().throughput_control_groups()
120113
}
121114

122115
/// Executes a query against databases in the account.
@@ -149,7 +142,7 @@ impl CosmosClient {
149142
_options: Option<QueryDatabasesOptions>,
150143
) -> azure_core::Result<FeedItemIterator<DatabaseProperties>> {
151144
crate::query::executor::QueryExecutor::new(
152-
self.pipeline.clone(),
145+
self.context.pipeline.clone(),
153146
self.databases_link.clone(),
154147
Context::default(),
155148
query.into(),
@@ -184,7 +177,8 @@ impl CosmosClient {
184177
.json(&RequestBody { id })
185178
.build()?;
186179

187-
self.pipeline
180+
self.context
181+
.pipeline
188182
.send(cosmos_request, Context::default())
189183
.await
190184
.map(ResourceResponse::new)

sdk/cosmos/azure_data_cosmos/src/clients/cosmos_client_builder.rs

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
//! Builder for creating [`CosmosClient`] instances.
55
66
use crate::{
7+
clients::ClientContext,
8+
options::ThroughputControlGroupOptions,
79
pipeline::{AuthorizationPolicy, CosmosHeadersPolicy, GatewayPipeline},
810
resource_context::{ResourceLink, ResourceType},
911
CosmosAccountReference, CosmosClient, CosmosClientOptions, CosmosCredential, RoutingStrategy,
@@ -79,6 +81,8 @@ use azure_core::http::{ClientOptions, LoggingOptions, RetryOptions};
7981
#[derive(Default)]
8082
pub struct CosmosClientBuilder {
8183
options: CosmosClientOptions,
84+
/// Throughput control groups to register on the driver runtime.
85+
throughput_control_groups: Vec<ThroughputControlGroupOptions>,
8286
/// Whether to accept invalid TLS certificates when connecting to the emulator.
8387
#[cfg(feature = "allow_invalid_certificates")]
8488
allow_emulator_invalid_certificates: bool,
@@ -139,6 +143,16 @@ impl CosmosClientBuilder {
139143
self
140144
}
141145

146+
/// Registers a throughput control group on the driver runtime.
147+
///
148+
/// Groups define throughput policies (priority level, throughput bucket) that
149+
/// are applied to requests referencing the group name via
150+
/// [`OperationOptions::throughput_control_group_names`](crate::OperationOptions::throughput_control_group_names).
151+
pub fn with_throughput_control_group(mut self, group: ThroughputControlGroupOptions) -> Self {
152+
self.throughput_control_groups.push(group);
153+
self
154+
}
155+
142156
/// Builds the [`CosmosClient`] with the specified account reference and region selection strategy.
143157
///
144158
/// The account reference bundles an endpoint and credential. You can create one using
@@ -348,17 +362,24 @@ impl CosmosClientBuilder {
348362
driver_runtime_builder =
349363
driver_runtime_builder.with_fault_injection_rules(driver_fi_rules);
350364
}
365+
for group in self.throughput_control_groups {
366+
driver_runtime_builder = driver_runtime_builder
367+
.register_throughput_control_group(group)
368+
.map_err(|e| azure_core::Error::new(azure_core::error::ErrorKind::Other, e))?;
369+
}
351370
let driver_runtime = driver_runtime_builder.build().await?;
352371
let driver = driver_runtime
353372
.get_or_create_driver(driver_account, None)
354373
.await?;
355374

356375
Ok(CosmosClient {
357376
databases_link: ResourceLink::root(ResourceType::Databases),
358-
pipeline,
359-
driver,
360-
global_endpoint_manager,
361-
global_partition_endpoint_manager,
377+
context: ClientContext {
378+
pipeline,
379+
driver,
380+
global_endpoint_manager,
381+
global_partition_endpoint_manager,
382+
},
362383
})
363384
}
364385
}

0 commit comments

Comments
 (0)