Skip to content

Commit 71668fe

Browse files
simorenohCopilot
andauthored
[Cosmos] Port read-item to use Driver (#4053)
This PR will contain the work needed in order to make the changes to port over an initial method (`read_item`) to use the underlying driver as the connection. For now, I am sharing a spec of the proposed changes, in the hopes that this same spec will work to migrate all other remaining methods after we verify this one works. The spec file can be found in the PR to facilitate review, but will also be the description below. Actual code implementation to follow. # SDK-to-Driver Cutover: Design Specification ## Overview This document describes the design for routing `azure_data_cosmos` SDK operations through the `azure_data_cosmos_driver` execution engine, replacing the legacy gateway pipeline path. The first operation cut over is `ContainerClient::read_item`, which serves as the **reference pattern** for all subsequent operations. ### Context Prior to this work, the Cosmos SDK had two separate execution paths: - **Gateway pipeline** (`azure_data_cosmos`): The SDK handled auth, routing, retries, and request construction via `CosmosRequest` → `GatewayPipeline` → HTTP. - **Driver** (`azure_data_cosmos_driver`): A newer execution engine with its own transport, routing, and operation model (`CosmosOperation` + `OperationOptions`). Previously used only in driver-level tests. [PR #4005](#4005) bridged the two worlds by having `ContainerClient::new()` call `driver.resolve_container()` for eager metadata resolution. This PR takes the next step: routing the first data operation through the driver. ### Goal Make the SDK client a **thin wrapper** over the driver. The SDK translates public-facing types into driver concepts, delegates execution, and translates the response back. All real work (auth, routing, retries, transport) happens inside `driver.execute_operation()`. ## Architecture ### Data Flow ```text User calls: container_client.read_item(pk, id, options) │ ┌─────────▼────────────┐ │ SDK ContainerClient │ └─────────┬────────────┘ │ ┌───────────────────┼───────────────────┐ │ │ │ PartitionKey ItemOptions ContainerRef (SDK type) (SDK type) (driver type, │ │ stored on client) │ │ │ ▼ ▼ ▼ into_driver_pk() item_options_to_ ItemReference:: │ operation_options() from_name() │ │ │ └───────────────────┼───────────────────┘ │ ┌─────────▼──────────┐ │ CosmosOperation:: │ │ read_item() │ └─────────┬──────────┘ │ ┌─────────▼───────────┐ │ driver.execute_ │ │ operation(op, opts)│ │ │ │ (auth, routing, │ │ retries, HTTP) │ └─────────┬───────────┘ │ ┌─────────▼───────────┐ │ driver_response_ │ │ to_cosmos_response │ └─────────┬───────────┘ │ ┌─────────▼───────────┐ │ CosmosResponse<T> │ │ (SDK public type) │ └─────────────────────┘ ``` ### Key Principle The SDK's public API does not change. `read_item` retains the same signature, return type, and observable behavior. This is a pure internal refactor. ## Design Decision: Driver as Required Infrastructure An alternative approach was explored where the driver is **optional** — stored as `Option<Arc<CosmosDriver>>` on `CosmosClient`, `DatabaseClient`, and `ContainerClient`. In that model, each operation checks at runtime whether a driver is available: if so, it takes the driver path; otherwise, it falls back to the legacy gateway pipeline. Container metadata resolution is also optional and failure is silently ignored. We chose **not** to take that approach, since we want to verify the behavior of the driver being used only and this single method will serve as the test. In this design, the driver is **required**: - `CosmosClient` stores `Arc<CosmosDriver>` (not `Option`). - `ContainerClient::new()` eagerly resolves container metadata via the driver and returns `Result` — if resolution fails, the client cannot be created. - Operations have a **single codepath** through the driver, with no gateway fallback. ### Rationale The purpose of this cutover is to validate that the driver can fully replace the gateway pipeline for each operation. A fallback path undermines that goal: - **Testability:** If the driver path can silently fall back to the gateway, we can't be 100% sure that the driver path is exercised in production or tests. Failures would be hidden rather than surfaced. - **Correctness:** A dual-codepath design requires maintaining behavioral parity between two implementations indefinitely. A single path is easier to reason about, test, and debug. - **Options fidelity:** A fallback path tempts skipping the options translation (e.g., passing empty `OperationOptions` on the driver path), which silently drops user-configured session tokens, etags, and excluded regions. - **Response fidelity:** A minimal fallback implementation may skip reconstructing response headers from the driver's typed response, causing callers to get `None` for `request_charge()`, `session_token()`, and `etag()`. The cutover is intentionally incremental — one operation at a time. Operations that haven't been cut over yet continue using the gateway pipeline naturally (they don't call the driver). This gives us the gradual rollout benefit without the complexity of runtime branching within a single operation. ## Type Translation Decisions ### PartitionKey (SDK → Driver) The SDK and driver define **separate `PartitionKey` types** with identical structure but in different crates. Both represent a JSON array of typed values (string, number, bool, null). **Approach:** Added `into_driver_partition_key()` on the SDK's `PartitionKey` that maps each `InnerPartitionKeyValue` variant to the driver's `PartitionKeyValue`. **Driver change required:** Made `PartitionKeyValue` `pub` (was `pub(crate)`) so the SDK crate can construct `Vec<PartitionKeyValue>` for the conversion. **Future consideration:** Once Ashley's options alignment work unifies these types, this conversion can be eliminated, and we can just use the Driver's definitions the way we did with the ContainerReference. ```rust // SDK partition_key.rs pub(crate) fn into_driver_partition_key(self) -> driver::PartitionKey { let driver_values: Vec<DriverPKV> = self.0.into_iter() .map(|v| match v.0 { InnerPartitionKeyValue::String(s) => DriverPKV::from(s), InnerPartitionKeyValue::Number(n) => DriverPKV::from(n), InnerPartitionKeyValue::Bool(b) => DriverPKV::from(b), InnerPartitionKeyValue::Null => DriverPKV::from(Option::<String>::None), // ... }) .collect(); DriverPK::from(driver_values) } ``` ### ItemOptions → OperationOptions The SDK's `ItemOptions` (item-scoped request options) maps to the driver's `OperationOptions` field-by-field. The types in each field differ between crates, so values are bridged via their string representations. | SDK `ItemOptions` field | Driver `OperationOptions` | Conversion | | --- | --- | --- | | `session_token: Option<SessionToken>` | `.with_session_token()` | `DriverSessionToken::new(token.to_string())` | | `if_match_etag: Option<Etag>` | `.with_etag_condition()` | `Precondition::if_match(ETag::new(etag.to_string()))` | | `custom_headers: HashMap<...>` | `.with_custom_headers()` | Passed through directly (types are the same) | | `excluded_regions: Option<Vec<RegionName>>` | `.with_excluded_regions()` | `Region::new(name.to_string())` for each | | `content_response_on_write_enabled: bool` | *Ignored for reads* | Driver always returns body for point reads | **Driver change required:** Added `custom_headers` support to `OperationOptions` (new field, setter, getter) and wired it into `build_transport_request` in `operation_pipeline.rs`. Custom headers may be removed in the future as we analyze which options are truly needed. ### Response Bridge (Driver → SDK) The driver returns an untyped `CosmosResponse { body: Vec<u8>, headers: CosmosResponseHeaders, status: CosmosStatus }`. The SDK returns a typed `CosmosResponse<T>` wrapping `azure_core::Response<T>`. **Approach:** Reconstruct the SDK response from driver parts: ```rust pub(crate) fn driver_response_to_cosmos_response<T>( driver_response: DriverResponse, ) -> CosmosResponse<T> { let status_code = driver_response.status().status_code(); let headers = cosmos_response_headers_to_headers(driver_response.headers()); let body = driver_response.into_body(); let raw = RawResponse::from_bytes(status_code, headers, Bytes::from(body)); let typed: Response<T> = raw.into(); CosmosResponse::new(typed, None) } ``` The header conversion maps each typed `CosmosResponseHeaders` field back to its raw header name/value pair (reverse of the driver's `from_headers()` parser). **Caveat:** Only headers that the driver explicitly parses are preserved (activity ID, request charge, session token, etag, continuation, item count, substatus). Any other server headers are lost. This covers all standard Cosmos response metadata. We will probably come back to this when we do the work on verifying the headers we want. ### CosmosRequest → Optional The SDK's `CosmosResponse<T>` previously held the original `CosmosRequest` — a gateway pipeline concept with no driver equivalent. The driver uses `CosmosOperation` + `OperationOptions` instead, which are consumed during execution. **Decision:** Made the `request` field `Option<CosmosRequest>`: - Gateway-routed operations (all methods not yet cut over) continue setting `Some(request)`. - Driver-routed operations set `None`. - The field is only accessed behind `#[cfg(feature = "fault_injection")]` and marked `#[allow(dead_code)]`. - A TODO comment marks it for removal once all operations are on the driver. ## Structural Changes ### ContainerClient Added two fields to `ContainerClient` so `read_item` can reach the driver at execution time: ```rust pub struct ContainerClient { // ... existing fields ... driver: Arc<CosmosDriver>, // retained from new() container_ref: ContainerReference, // cloned before passing to ContainerConnection } ``` Previously, the driver was discarded after `new()` and `ContainerReference` was buried inside `ContainerConnection`. ### driver_bridge Module New private module at `src/driver_bridge.rs` containing: - `driver_response_to_cosmos_response<T>()` — response conversion - `item_options_to_operation_options()` — options translation - `driver_response_headers_to_headers()` — converts the driver's typed response headers (e.g., `activity_id: Option<ActivityId>`, `request_charge: Option<RequestCharge>`) into raw `azure_core::Headers` key-value pairs for the SDK response This module is the shared foundation for all future operation cutover. When cutting over `create_item`, `delete_item`, etc., they reuse the same bridge functions. ## Applying This Pattern to Other Operations To cut over another item operation (e.g., `create_item`), follow this template: 1. **Build the operation:** Use the appropriate `CosmosOperation::*` factory method (e.g., `CosmosOperation::create_item(container_ref, pk)`). 2. **Attach the body:** For write operations, serialize the item to bytes and call `.with_body(bytes)` on the operation. 3. **Translate options:** Reuse `item_options_to_operation_options()` from `driver_bridge.rs`. For write-specific options (e.g., `content_response_on_write_enabled`), extend the bridge function. 4. **Execute:** Call `self.driver.execute_operation(operation, driver_options).await?`. 5. **Bridge response:** Reuse `driver_response_to_cosmos_response(driver_response)`. The public method signature should not change. ## Files Changed | File | Change | | --- | --- | | `azure_data_cosmos_driver/src/options/operation_options.rs` | Added `custom_headers` field + setter/getter | | `azure_data_cosmos_driver/src/driver/pipeline/operation_pipeline.rs` | Wired custom headers into request construction | | `azure_data_cosmos_driver/src/models/partition_key.rs` | Made `PartitionKeyValue` `pub` | | `azure_data_cosmos_driver/src/models/mod.rs` | Re-exported `PartitionKeyValue` | | `azure_data_cosmos/src/driver_bridge.rs` | **New** — shared conversion module | | `azure_data_cosmos/src/clients/container_client.rs` | Added `driver`/`container_ref` fields; rewrote `read_item` | | `azure_data_cosmos/src/models/cosmos_response.rs` | Made `request` field optional | | `azure_data_cosmos/src/partition_key.rs` | Added `into_driver_partition_key()` | | `azure_data_cosmos/src/options/mod.rs` | Added `pub(crate)` accessors for bridge | | `azure_data_cosmos/src/pipeline/mod.rs` | Updated `CosmosResponse::new` call site | | `azure_data_cosmos/src/lib.rs` | Registered `mod driver_bridge` | ## Open Items and Future Work - **Options alignment:** Ashley is working on aligning SDK options with the driver's options model. Once complete, the `ItemOptions` → `OperationOptions` translation may simplify or become unnecessary. - **PartitionKey unification:** The dual `PartitionKey` types and `into_driver_partition_key()` conversion should be eliminated once the types are unified. - **`CosmosRequest` removal:** Once all operations are routed through the driver, the `Option<CosmosRequest>` field on `CosmosResponse<T>` can be removed entirely. - **`custom_headers` review:** The `custom_headers` field on `OperationOptions` was added for feature parity. It may be removed as we analyze which options are truly needed at the driver level. - **Remaining operations:** `create_item`, `delete_item`, `replace_item`, `upsert_item`, `patch_item`, and query operations should follow the same pattern established here. ## Fault Injection Wiring When cutting `read_item` over to the driver, the SDK's fault injection tests initially failed because the two execution paths (gateway and driver) have **independent fault injection systems**. This section documents how they were connected. ### Problem The SDK and driver each have their own fault injection module (`azure_data_cosmos::fault_injection` and `azure_data_cosmos_driver::fault_injection`). They define parallel but separate types (`FaultInjectionRule`, `FaultInjectionCondition`, `FaultInjectionResult`, etc.) with identical variants but different Rust types. Prior to this work, only the gateway pipeline received fault injection rules — the driver was built without them. ### Solution: Rule Translation with Shared State The bridge module (`driver_bridge.rs`) includes `sdk_fi_rules_to_driver_fi_rules()`, which translates SDK fault injection rules into driver fault injection rules. The translation covers: - `FaultOperationType` — variant-by-variant match (identical variant names) - `FaultInjectionErrorType` — variant-by-variant match - `FaultInjectionCondition` — `RegionName` → `Region`, operation type and container ID mapped directly - `FaultInjectionResult` — `Duration` → `Option<Duration>`, probability copied - Timing fields — `start_time: Instant` → `Option<Instant>`, `end_time` and `hit_limit` copied ### Shared Mutable State SDK `FaultInjectionRule` has `enabled: Arc<AtomicBool>` and `hit_count: Arc<AtomicU32>` that tests mutate at runtime (`.disable()`, `.enable()`, `.hit_count()`). The driver's `FaultInjectionRuleBuilder` accepts external `Arc`s via `with_shared_state()`, so both the SDK gateway path and the driver path reference the **same atomic state**. This means: - Calling `.disable()` on the SDK rule also disables it in the driver - Hit counts are shared — both paths increment the same counter - Tests that toggle rules or assert hit counts work correctly across both paths ### Wiring in `CosmosClientBuilder` In `CosmosClientBuilder::build()`: 1. Before the `FaultInjectionClientBuilder` is consumed for the gateway transport, `rules()` extracts a reference to the SDK rules 2. `sdk_fi_rules_to_driver_fi_rules()` translates them to driver rules with shared state 3. The translated rules are passed to `CosmosDriverRuntimeBuilder::with_fault_injection_rules()` 4. The SDK's `fault_injection` Cargo feature now forwards to the driver's `fault_injection` feature ### Test Patterns for Future Cutover When cutting over additional operations, **no additional fault injection wiring is needed** — it's handled once at the `CosmosClientBuilder` level. However, tests that assert `request_url()` need to handle `None` for driver-routed operations: ```rust // Gateway-routed operations return Some(url) // Driver-routed operations return None if let Some(url) = response.request_url() { assert_eq!(url.host_str().unwrap(), expected_endpoint); } ``` ### `custom_response` Translation Translation of `CustomResponse` (synthetic HTTP responses) is not yet implemented. None of the current tests use custom responses for `ReadItem` operations. When needed, the bridge function should be extended to translate `CustomResponse` fields (`status_code`, `headers`, `body`). ### Consolidating to Driver Fault Injection After Cutover The current dual-system architecture (SDK fault injection + driver fault injection + translation bridge) exists only because the cutover is incremental — some operations still go through the gateway while others go through the driver. Once **all** operations are routed through the driver: 1. **Drop `azure_data_cosmos::fault_injection`** — the SDK's HTTP-client-level fault interception module becomes unreachable. Delete the entire `src/fault_injection/` directory. 2. **Re-export driver types** — the SDK re-exports the driver's fault injection types directly: ```rust #[cfg(feature = "fault_injection")] pub use azure_data_cosmos_driver::fault_injection; ``` 3. **Remove the translation layer** — `sdk_fi_rules_to_driver_fi_rules()` in `driver_bridge.rs` and the `shared_enabled()`/`shared_hit_count()` accessors on the SDK rule are no longer needed. 4. **Simplify `CosmosClientBuilder`** — `with_fault_injection()` accepts `Vec<Arc<driver::FaultInjectionRule>>` directly and passes them to `CosmosDriverRuntimeBuilder::with_fault_injection_rules()`. No translation, no cloning, no intermediary builder. 5. **Update tests** — tests construct driver `FaultInjectionRule` directly (same builders, same API) instead of SDK rules. At that point the SDK has **no fault injection logic of its own** — it's a pass-through to the driver, matching the overall "SDK as thin wrapper" goal. The driver is the single source of truth for all transport-related concerns including fault injection. --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent e0397f0 commit 71668fe

File tree

26 files changed

+1084
-131
lines changed

26 files changed

+1084
-131
lines changed

sdk/cosmos/.cspell.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
"cpus",
3333
"cstring",
3434
"ctest",
35+
"cutover",
3536
"Daad",
3637
"dedicatedgateway",
3738
"derefs",
@@ -205,4 +206,4 @@
205206
]
206207
}
207208
]
208-
}
209+
}

sdk/cosmos/azure_data_cosmos/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ key_auth = [
4545
] # Enables support for key-based authentication (Primary Keys and Resource Tokens)
4646
hmac_rust = ["azure_core/hmac_rust"]
4747
hmac_openssl = ["azure_core/hmac_openssl"]
48-
fault_injection = [] # Enables support for fault injection testing
48+
fault_injection = ["azure_data_cosmos_driver/fault_injection"] # Enables support for fault injection testing
4949
allow_invalid_certificates = ["reqwest", "reqwest/native-tls", "azure_data_cosmos_driver/reqwest_native_tls"] # Enables accepting invalid TLS certificates (e.g., for emulator testing)
5050

5151
[package.metadata.docs.rs]

sdk/cosmos/azure_data_cosmos/docs/sdk-to-driver-cutover.md

Lines changed: 401 additions & 0 deletions
Large diffs are not rendered by default.

sdk/cosmos/azure_data_cosmos/src/clients/container_client.rs

Lines changed: 40 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use crate::routing::global_partition_endpoint_manager::GlobalPartitionEndpointMa
2424
use crate::routing::partition_key_range_cache::PartitionKeyRangeCache;
2525
use azure_core::http::headers::AsHeaders;
2626
use azure_core::http::Context;
27+
use azure_data_cosmos_driver::models::{ContainerReference, CosmosOperation, ItemReference};
2728
use azure_data_cosmos_driver::CosmosDriver;
2829
use serde::{de::DeserializeOwned, Serialize};
2930

@@ -37,6 +38,8 @@ pub struct ContainerClient {
3738
pipeline: Arc<GatewayPipeline>,
3839
container_connection: Arc<ContainerConnection>,
3940
container_id: String,
41+
driver: Arc<CosmosDriver>,
42+
container_ref: ContainerReference,
4043
}
4144

4245
impl ContainerClient {
@@ -73,7 +76,7 @@ impl ContainerClient {
7376
pipeline.clone(),
7477
partition_key_range_cache,
7578
global_partition_endpoint_manager.clone(),
76-
container_ref,
79+
container_ref.clone(),
7780
));
7881

7982
Ok(Self {
@@ -82,6 +85,8 @@ impl ContainerClient {
8285
pipeline,
8386
container_connection,
8487
container_id: container_id.to_string(),
88+
driver,
89+
container_ref,
8590
})
8691
}
8792

@@ -544,23 +549,43 @@ impl ContainerClient {
544549
item_id: &str,
545550
options: Option<ItemOptions>,
546551
) -> azure_core::Result<ItemResponse<T>> {
547-
let mut options = options.unwrap_or_default();
552+
let options = options.unwrap_or_default();
548553

549-
// Read APIs should always return the item, ignoring whatever the user set.
550-
options = options.with_content_response_on_write_enabled(true);
554+
// Build the driver's item reference from our stored container metadata.
555+
let item_ref = ItemReference::from_name(
556+
&self.container_ref,
557+
partition_key.into().into_driver_partition_key(),
558+
item_id.to_owned(),
559+
);
551560

552-
let link = self.items_link.item(item_id);
553-
let excluded_regions = options.excluded_regions.clone();
554-
let mut cosmos_request = CosmosRequest::builder(OperationType::Read, link)
555-
.partition_key(partition_key.into())
556-
.excluded_regions(excluded_regions)
557-
.build()?;
558-
options.apply_headers(&mut cosmos_request.headers);
561+
// Create the driver operation.
562+
let mut operation = CosmosOperation::read_item(item_ref);
559563

560-
self.container_connection
561-
.send(cosmos_request, Context::default())
562-
.await
563-
.map(|r| ItemResponse::new(r))
564+
// Wire session token and etag from SDK options onto the operation.
565+
if let Some(session_token) = options.session_token() {
566+
operation = operation.with_session_token(session_token.to_string());
567+
}
568+
if let Some(etag) = options.if_match_etag() {
569+
operation = operation.with_precondition(
570+
azure_data_cosmos_driver::models::Precondition::if_match(
571+
azure_data_cosmos_driver::models::ETag::new(etag.to_string()),
572+
),
573+
);
574+
}
575+
576+
// Translate SDK options to driver options.
577+
let driver_options = crate::driver_bridge::item_options_to_operation_options(&options);
578+
579+
// Execute through the driver.
580+
let driver_response = self
581+
.driver
582+
.execute_operation(operation, driver_options)
583+
.await?;
584+
585+
// Bridge the driver response to the SDK response type.
586+
Ok(ItemResponse::new(
587+
crate::driver_bridge::driver_response_to_cosmos_response(driver_response),
588+
))
564589
}
565590

566591
/// Deletes an item from the container.

sdk/cosmos/azure_data_cosmos/src/clients/cosmos_client_builder.rs

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -203,16 +203,24 @@ impl CosmosClientBuilder {
203203
let base_client: Option<Arc<dyn azure_core::http::HttpClient>> = None;
204204

205205
#[cfg(feature = "fault_injection")]
206-
let transport: Option<azure_core::http::Transport> =
207-
if let Some(fault_builder) = self.fault_injection_builder {
208-
let fault_builder = match base_client {
209-
Some(client) => fault_builder.with_inner_client(client),
210-
None => fault_builder,
211-
};
212-
Some(fault_builder.build())
213-
} else {
214-
base_client.map(azure_core::http::Transport::new)
206+
let (transport, driver_fi_rules): (
207+
Option<azure_core::http::Transport>,
208+
Vec<std::sync::Arc<azure_data_cosmos_driver::fault_injection::FaultInjectionRule>>,
209+
) = if let Some(fault_builder) = self.fault_injection_builder {
210+
// Translate rules for the driver before the builder is consumed.
211+
let driver_rules =
212+
crate::driver_bridge::sdk_fi_rules_to_driver_fi_rules(fault_builder.rules());
213+
let fault_builder = match base_client {
214+
Some(client) => fault_builder.with_inner_client(client),
215+
None => fault_builder,
215216
};
217+
(Some(fault_builder.build()), driver_rules)
218+
} else {
219+
(
220+
base_client.map(azure_core::http::Transport::new),
221+
Vec::new(),
222+
)
223+
};
216224
#[cfg(not(feature = "fault_injection"))]
217225
let transport: Option<azure_core::http::Transport> =
218226
base_client.map(azure_core::http::Transport::new);
@@ -322,7 +330,14 @@ impl CosmosClientBuilder {
322330
// should be shared across clients targeting the same account to avoid duplicate
323331
// background tasks and connection pools. See https://github.com/Azure/azure-sdk-for-rust/issues/3908
324332
let driver_account = build_driver_account(endpoint, driver_credential);
325-
let driver_runtime = CosmosDriverRuntimeBuilder::new().build().await?;
333+
#[allow(unused_mut)]
334+
let mut driver_runtime_builder = CosmosDriverRuntimeBuilder::new();
335+
#[cfg(feature = "fault_injection")]
336+
if !driver_fi_rules.is_empty() {
337+
driver_runtime_builder =
338+
driver_runtime_builder.with_fault_injection_rules(driver_fi_rules);
339+
}
340+
let driver_runtime = driver_runtime_builder.build().await?;
326341
let driver = driver_runtime
327342
.get_or_create_driver(driver_account, None)
328343
.await?;

0 commit comments

Comments
 (0)