Skip to content

Commit 1d414c3

Browse files
tvaron3Copilot
andcommitted
[Cosmos] ThroughputControl API
Add SDK-level throughput control API for server-side priority-based execution and throughput buckets (issue #3904). New SDK types: - ThroughputControlGroup enum with PriorityBased and ThroughputBucket variants, hiding the driver's unimplemented ClientSide variant - Re-exports PriorityLevel, ThroughputControlGroupName, ContainerReference Registration and resolution: - CosmosClientOptions::with_throughput_control_group() for registration - ThroughputControlGroupRegistry threaded through CosmosClient, DatabaseClient, and ContainerClient - apply_throughput_control() on ItemWriteOptions, QueryOptions, and BatchOptions resolves group name to x-ms-cosmos-priority-level and x-ms-cosmos-throughput-bucket headers Driver: - ContainerReference::stub() doc-hidden helper for unit testing Tests cover registration errors, header application for priority and bucket groups, default group resolution, and empty registry behavior. Fixes #3904 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 4d39ccc commit 1d414c3

File tree

8 files changed

+568
-86
lines changed

8 files changed

+568
-86
lines changed

sdk/cosmos/azure_data_cosmos/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
### Features Added
66

77
- Added `CustomResponseBuilder` and `FaultInjectionRule::hit_count()` APIs for fault injection, enabling ergonomic construction of synthetic HTTP responses and test verification of rule activation counts. ([#3888](https://github.com/Azure/azure-sdk-for-rust/pull/3888))
8+
- Added throughput control API: `ThroughputControlGroup` enum with `PriorityBased` and `ThroughputBucket` variants, plus re-exported `PriorityLevel`, `ThroughputControlGroupName`, and `ContainerReference` from the driver. Users can register throughput control groups on `CosmosClientOptions` via `with_throughput_control_group()` to configure priority-based execution and throughput bucket server features. ([#4078](https://github.com/Azure/azure-sdk-for-rust/pull/4078))
89

910
### Breaking Changes
1011

sdk/cosmos/azure_data_cosmos/src/clients/container_client.rs

Lines changed: 44 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,12 @@
22
// Licensed under the MIT License.
33

44
use crate::{
5-
clients::OffersClient,
5+
clients::{ClientContext, OffersClient},
66
models::{
77
BatchResponse, ContainerProperties, CosmosResponse, ItemResponse, ResourceResponse,
88
ThroughputProperties,
99
},
1010
options::{BatchOptions, QueryOptions, ReadContainerOptions},
11-
pipeline::GatewayPipeline,
1211
resource_context::{ResourceLink, ResourceType},
1312
transactional_batch::TransactionalBatch,
1413
DeleteContainerOptions, FeedItemIterator, ItemReadOptions, ItemWriteOptions, PartitionKey,
@@ -19,13 +18,10 @@ use std::sync::Arc;
1918
use crate::cosmos_request::CosmosRequest;
2019
use crate::handler::container_connection::ContainerConnection;
2120
use crate::operation_context::OperationType;
22-
use crate::routing::global_endpoint_manager::GlobalEndpointManager;
23-
use crate::routing::global_partition_endpoint_manager::GlobalPartitionEndpointManager;
2421
use crate::routing::partition_key_range_cache::PartitionKeyRangeCache;
2522
use azure_core::http::headers::AsHeaders;
2623
use azure_core::http::Context;
2724
use azure_data_cosmos_driver::models::{ContainerReference, CosmosOperation, ItemReference};
28-
use azure_data_cosmos_driver::CosmosDriver;
2925
use serde::{de::DeserializeOwned, Serialize};
3026

3127
/// A client for working with a specific container in a Cosmos DB account.
@@ -35,30 +31,27 @@ use serde::{de::DeserializeOwned, Serialize};
3531
pub struct ContainerClient {
3632
link: ResourceLink,
3733
items_link: ResourceLink,
38-
pipeline: Arc<GatewayPipeline>,
3934
container_connection: Arc<ContainerConnection>,
4035
container_id: String,
41-
driver: Arc<CosmosDriver>,
4236
container_ref: ContainerReference,
37+
context: ClientContext,
4338
}
4439

4540
impl ContainerClient {
4641
pub(crate) async fn new(
47-
pipeline: Arc<GatewayPipeline>,
42+
context: ClientContext,
4843
database_link: &ResourceLink,
4944
container_id: &str,
5045
database_id: &str,
51-
driver: Arc<CosmosDriver>,
52-
global_endpoint_manager: Arc<GlobalEndpointManager>,
53-
global_partition_endpoint_manager: Arc<GlobalPartitionEndpointManager>,
5446
) -> azure_core::Result<Self> {
5547
let link = database_link
5648
.feed(ResourceType::Containers)
5749
.item(container_id);
5850
let items_link = link.feed(ResourceType::Documents);
5951

6052
// Eagerly resolve immutable container metadata from the driver.
61-
let container_ref = driver
53+
let container_ref = context
54+
.driver
6255
.resolve_container(database_id, container_id)
6356
.await
6457
.map_err(|e| {
@@ -68,25 +61,24 @@ impl ContainerClient {
6861
})?;
6962

7063
let partition_key_range_cache = Arc::from(PartitionKeyRangeCache::new(
71-
pipeline.clone(),
64+
context.pipeline.clone(),
7265
database_link.clone(),
73-
global_endpoint_manager.clone(),
66+
context.global_endpoint_manager.clone(),
7467
));
7568
let container_connection = Arc::from(ContainerConnection::new(
76-
pipeline.clone(),
69+
context.pipeline.clone(),
7770
partition_key_range_cache,
78-
global_partition_endpoint_manager.clone(),
71+
context.global_partition_endpoint_manager.clone(),
7972
container_ref.clone(),
8073
));
8174

8275
Ok(Self {
8376
link,
8477
items_link,
85-
pipeline,
8678
container_connection,
8779
container_id: container_id.to_string(),
88-
driver,
8980
container_ref,
81+
context,
9082
})
9183
}
9284

@@ -190,7 +182,7 @@ impl ContainerClient {
190182
.resource_id
191183
.expect("service should always return a '_rid' for a container");
192184

193-
let offers_client = OffersClient::new(self.pipeline.clone(), resource_id);
185+
let offers_client = OffersClient::new(self.context.pipeline.clone(), resource_id);
194186
offers_client.read(Context::default()).await
195187
}
196188

@@ -222,7 +214,7 @@ impl ContainerClient {
222214
.resource_id
223215
.expect("service should always return a '_rid' for a container");
224216

225-
let offers_client = OffersClient::new(self.pipeline.clone(), resource_id);
217+
let offers_client = OffersClient::new(self.context.pipeline.clone(), resource_id);
226218
offers_client
227219
.replace(Context::default(), throughput)
228220
.await
@@ -332,6 +324,11 @@ impl ContainerClient {
332324
.excluded_regions(excluded_regions)
333325
.build()?;
334326
options.apply_headers(&mut cosmos_request.headers);
327+
options.apply_throughput_control(
328+
&mut cosmos_request.headers,
329+
&self.context.throughput_control_groups,
330+
&self.container_ref,
331+
);
335332

336333
self.container_connection
337334
.send(cosmos_request, Context::default())
@@ -420,6 +417,11 @@ impl ContainerClient {
420417
.excluded_regions(excluded_regions)
421418
.build()?;
422419
options.apply_headers(&mut cosmos_request.headers);
420+
options.apply_throughput_control(
421+
&mut cosmos_request.headers,
422+
&self.context.throughput_control_groups,
423+
&self.container_ref,
424+
);
423425

424426
self.container_connection
425427
.send(cosmos_request, Context::default())
@@ -510,6 +512,11 @@ impl ContainerClient {
510512
.excluded_regions(excluded_regions)
511513
.build()?;
512514
options.apply_headers(&mut cosmos_request.headers);
515+
options.apply_throughput_control(
516+
&mut cosmos_request.headers,
517+
&self.context.throughput_control_groups,
518+
&self.container_ref,
519+
);
513520

514521
return self
515522
.container_connection
@@ -575,6 +582,7 @@ impl ContainerClient {
575582

576583
// Execute through the driver.
577584
let driver_response = self
585+
.context
578586
.driver
579587
.execute_operation(operation, options.operation)
580588
.await?;
@@ -620,6 +628,11 @@ impl ContainerClient {
620628
.excluded_regions(excluded_regions)
621629
.build()?;
622630
options.apply_headers(&mut cosmos_request.headers);
631+
options.apply_throughput_control(
632+
&mut cosmos_request.headers,
633+
&self.context.throughput_control_groups,
634+
&self.container_ref,
635+
);
623636

624637
self.container_connection
625638
.send(cosmos_request, Context::default())
@@ -704,9 +717,14 @@ impl ContainerClient {
704717
headers.insert(name, value);
705718
}
706719
options.apply_headers(&mut headers);
720+
options.apply_throughput_control(
721+
&mut headers,
722+
&self.context.throughput_control_groups,
723+
&self.container_ref,
724+
);
707725

708726
crate::query::executor::QueryExecutor::new(
709-
self.pipeline.clone(),
727+
self.context.pipeline.clone(),
710728
self.items_link.clone(),
711729
Context::default(),
712730
query,
@@ -771,6 +789,11 @@ impl ContainerClient {
771789
.json(batch.operations())
772790
.build()?;
773791
options.apply_headers(&mut cosmos_request.headers);
792+
options.apply_throughput_control(
793+
&mut cosmos_request.headers,
794+
&self.context.throughput_control_groups,
795+
&self.container_ref,
796+
);
774797

775798
self.container_connection
776799
.send(cosmos_request, Context::default())

sdk/cosmos/azure_data_cosmos/src/clients/cosmos_client.rs

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,15 @@
22
// Licensed under the MIT License.
33

44
use crate::{
5-
clients::DatabaseClient,
5+
clients::{ClientContext, DatabaseClient},
66
cosmos_request::CosmosRequest,
77
models::{DatabaseProperties, ResourceResponse},
88
operation_context::OperationType,
9-
pipeline::GatewayPipeline,
109
resource_context::ResourceLink,
11-
routing::{
12-
global_endpoint_manager::GlobalEndpointManager,
13-
global_partition_endpoint_manager::GlobalPartitionEndpointManager,
14-
},
1510
CreateDatabaseOptions, FeedItemIterator, Query, QueryDatabasesOptions,
1611
};
1712
use azure_core::http::{Context, Url};
18-
use azure_data_cosmos_driver::CosmosDriver;
1913
use serde::Serialize;
20-
use std::sync::Arc;
2114

2215
pub use super::cosmos_client_builder::CosmosClientBuilder;
2316

@@ -70,10 +63,7 @@ pub use super::cosmos_client_builder::CosmosClientBuilder;
7063
#[derive(Debug, Clone)]
7164
pub struct CosmosClient {
7265
pub(crate) databases_link: ResourceLink,
73-
pub(crate) pipeline: Arc<GatewayPipeline>,
74-
pub(crate) driver: Arc<CosmosDriver>,
75-
pub(crate) global_endpoint_manager: Arc<GlobalEndpointManager>,
76-
pub(crate) global_partition_endpoint_manager: Arc<GlobalPartitionEndpointManager>,
66+
pub(crate) context: ClientContext,
7767
}
7868

7969
impl CosmosClient {
@@ -105,18 +95,12 @@ impl CosmosClient {
10595
/// # Arguments
10696
/// * `id` - The ID of the database.
10797
pub fn database_client(&self, id: &str) -> DatabaseClient {
108-
DatabaseClient::new(
109-
self.pipeline.clone(),
110-
id,
111-
self.driver.clone(),
112-
self.global_endpoint_manager.clone(),
113-
self.global_partition_endpoint_manager.clone(),
114-
)
98+
DatabaseClient::new(self.context.clone(), id)
11599
}
116100

117101
/// Gets the endpoint of the database account this client is connected to.
118102
pub fn endpoint(&self) -> &Url {
119-
&self.pipeline.endpoint
103+
&self.context.pipeline.endpoint
120104
}
121105

122106
/// Executes a query against databases in the account.
@@ -149,7 +133,7 @@ impl CosmosClient {
149133
_options: Option<QueryDatabasesOptions>,
150134
) -> azure_core::Result<FeedItemIterator<DatabaseProperties>> {
151135
crate::query::executor::QueryExecutor::new(
152-
self.pipeline.clone(),
136+
self.context.pipeline.clone(),
153137
self.databases_link.clone(),
154138
Context::default(),
155139
query.into(),
@@ -184,7 +168,8 @@ impl CosmosClient {
184168
.json(&RequestBody { id })
185169
.build()?;
186170

187-
self.pipeline
171+
self.context
172+
.pipeline
188173
.send(cosmos_request, Context::default())
189174
.await
190175
.map(ResourceResponse::new)

sdk/cosmos/azure_data_cosmos/src/clients/cosmos_client_builder.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
//! Builder for creating [`CosmosClient`] instances.
55
66
use crate::{
7+
clients::ClientContext,
78
pipeline::{AuthorizationPolicy, CosmosHeadersPolicy, GatewayPipeline},
89
resource_context::{ResourceLink, ResourceType},
910
CosmosAccountReference, CosmosClient, CosmosClientOptions, CosmosCredential, RoutingStrategy,
@@ -318,6 +319,8 @@ impl CosmosClientBuilder {
318319
},
319320
));
320321

322+
let throughput_control_groups = self.options.throughput_control_groups.clone();
323+
321324
let pipeline = Arc::new(GatewayPipeline::new(
322325
endpoint.clone(),
323326
pipeline_core,
@@ -355,10 +358,13 @@ impl CosmosClientBuilder {
355358

356359
Ok(CosmosClient {
357360
databases_link: ResourceLink::root(ResourceType::Databases),
358-
pipeline,
359-
driver,
360-
global_endpoint_manager,
361-
global_partition_endpoint_manager,
361+
context: ClientContext {
362+
pipeline,
363+
driver,
364+
global_endpoint_manager,
365+
global_partition_endpoint_manager,
366+
throughput_control_groups,
367+
},
362368
})
363369
}
364370
}

0 commit comments

Comments
 (0)