Propose ReadMany API design spec for driver crate#4008
Propose ReadMany API design spec for driver crate#4008TheovanKraay wants to merge 4 commits intoAzure:mainfrom
Conversation
Proposes a ReadMany API for azure_data_cosmos_driver that groups items by physical partition using EPK hashing, dispatches single-item groups as point reads and multi-item groups as batched parameterized queries, and executes concurrently. Covers driver, SDK, and C FFI layer APIs with query shape selection, error handling, and test plan.
There was a problem hiding this comment.
Pull request overview
This PR adds a design proposal document for a new client-side ReadMany API in azure_data_cosmos_driver, intended to be consumed by the Rust SDK and the C FFI layer. The proposal describes routing items by physical partition via EPK hashing, choosing between point reads and parameterized queries, executing work concurrently, and aggregating results/diagnostics.
Changes:
- Introduces a new ReadMany design spec covering driver, SDK, and native (C FFI) API shapes.
- Specifies the ReadMany algorithm: container metadata + PK range fetch + EPK grouping + query-shape selection + chunking + concurrent execution.
- Outlines error handling behavior and a unit/integration test plan for the eventual implementation.
| Work units execute concurrently via Tokio tasks. Default concurrency: | ||
| `min(num_ranges, 32)`, configurable via `OperationOptions`. On first error (excluding | ||
| point read 404), remaining tasks are cancelled and the error propagates. |
There was a problem hiding this comment.
The proposal states that ReadMany work units execute concurrently via Tokio tasks. In sdk/cosmos, the driver crate is expected to remain async-runtime-agnostic (use the same abstractions as azure_core), and the codebase explicitly avoids coupling the driver to Tokio (see sdk/cosmos/AGENTS.md:76-80 and the rationale in src/options/runtime_options.rs). Consider describing the implementation in terms of the azure_core/shared async runtime abstraction (or a FuturesUnordered-style approach) rather than Tokio-specific task spawning.
| Work units execute concurrently via Tokio tasks. Default concurrency: | |
| `min(num_ranges, 32)`, configurable via `OperationOptions`. On first error (excluding | |
| point read 404), remaining tasks are cancelled and the error propagates. | |
| Work units execute concurrently via the driver's async runtime abstraction (e.g., | |
| a `FuturesUnordered`-style fan-out/fan-in). Default concurrency: `min(num_ranges, 32)`, | |
| configurable via `OperationOptions`. On first error (excluding point read 404), | |
| remaining in-flight operations are cancelled and the error propagates. |
| Work units execute concurrently via Tokio tasks. Default concurrency: | ||
| `min(num_ranges, 32)`, configurable via `OperationOptions`. On first error (excluding | ||
| point read 404), remaining tasks are cancelled and the error propagates. |
There was a problem hiding this comment.
This section says the default concurrency is configurable via OperationOptions, but the current azure_data_cosmos_driver::options::OperationOptions doesn't have any concurrency/parallelism setting. To keep the proposal actionable and consistent with the existing options model, consider naming the new knob explicitly (e.g., a ReadManyOptions struct or a new field under RuntimeOptions/OperationOptions) and clarifying whether it is per-operation vs driver-wide.
| Work units execute concurrently via Tokio tasks. Default concurrency: | |
| `min(num_ranges, 32)`, configurable via `OperationOptions`. On first error (excluding | |
| point read 404), remaining tasks are cancelled and the error propagates. | |
| Work units execute concurrently via Tokio tasks. Default concurrency is | |
| `min(num_ranges, 32)` per `ReadMany` invocation. A future revision may introduce an | |
| explicit per-operation knob (for example, `ReadManyOptions::max_concurrency`) if callers | |
| need finer control. On first error (excluding point read 404), remaining tasks are | |
| cancelled and the error propagates. |
| **Shape 1 — ID-Only IN** (PK path is `/id`): | ||
|
|
||
| ```sql | ||
| SELECT * FROM c WHERE c.id IN (@p0, @p1, @p2) | ||
| ``` | ||
|
|
||
| **Shape 2 — Single-PK + ID IN** (all items share one logical PK): | ||
|
|
||
| ```sql | ||
| SELECT * FROM c WHERE c.myPk = @pk AND c.id IN (@p0, @p1, @p2) | ||
| ``` |
There was a problem hiding this comment.
For containers whose partition key path is /id, the logical partition key is the item id itself. In that case, a separate partition_key input that can differ from id is either redundant or invalid, and the proposed test case query_builder_id_pk_mismatch (and any fallback to Shape 3) would produce nonsensical predicates (effectively c.id = @id AND c.id = @pk). Consider requiring partition_key == id (validate and error) or omitting partition_key from ItemIdentity when PK path is /id, and keep Shape 1 as the only valid query shape for that PK definition.
| The SDK crate already has MurmurHash3 V1/V2 in `hash.rs`. **Recommended**: move EPK | ||
| hashing into the driver crate — it's a pure function of PK values and PK definition | ||
| with no SDK-layer dependencies. This lets the C FFI layer use ReadMany without | ||
| re-implementing hashing. |
There was a problem hiding this comment.
The text says "The SDK crate already has MurmurHash3 V1/V2 in hash.rs". In the current SDK layout, MurmurHash implementations live in murmur_hash.rs, while hash.rs builds effective partition keys using those functions. Consider rewording to avoid implying the MurmurHash implementations are defined in hash.rs, and call out both modules (or the specific EPK helper you plan to move) to make the migration plan precise.
| The SDK crate already has MurmurHash3 V1/V2 in `hash.rs`. **Recommended**: move EPK | |
| hashing into the driver crate — it's a pure function of PK values and PK definition | |
| with no SDK-layer dependencies. This lets the C FFI layer use ReadMany without | |
| re-implementing hashing. | |
| The SDK crate already has MurmurHash3 V1/V2 implementations in `murmur_hash.rs`, and | |
| `hash.rs` uses them to build effective partition keys (EPKs) from partition key values | |
| and the partition key definition. **Recommended**: move the EPK hashing helper | |
| (i.e., the pure function that computes an EPK from PK values and the PK definition) | |
| into the driver crate. This lets the C FFI layer use ReadMany without re-implementing | |
| hashing. |
| | ID | Test | Verifies | | ||
| |----|------|----------| | ||
| | U1 | `query_builder_id_only_in` | PK path `/id` → Shape 1 | | ||
| | U2 | `query_builder_id_pk_mismatch` | PK path `/id`, pk ≠ id → Shape 3 | |
There was a problem hiding this comment.
In the unit test plan, U2: query_builder_id_pk_mismatch assumes a container with PK path /id but a pk ≠ id input. Since /id implies the partition key value is the id field, this mismatch should likely be treated as invalid input (error) rather than selecting a different query shape. Updating this test case (and the earlier Shape 1/2/3 selection rules) to reflect /id semantics will prevent an implementation from accidentally generating invalid SQL predicates.
| | U2 | `query_builder_id_pk_mismatch` | PK path `/id`, pk ≠ id → Shape 3 | | |
| | U2 | `query_builder_id_pk_mismatch` | PK path `/id`, pk ≠ id → error (invalid input) | |
| /// Order is **unspecified**. | ||
| items: Vec<Vec<u8>>, | ||
| request_charge: RequestCharge, | ||
| diagnostics: Vec<OperationDiagnostics>, |
There was a problem hiding this comment.
should be 1:1 cardinality between diagnostics ang response
| pub async fn read_many( | ||
| &self, | ||
| container: &ContainerReference, | ||
| items: Vec<ItemIdentity>, |
There was a problem hiding this comment.
I would like to design this API to scale to item identities as well as list of just (partial) partition keys - this is a requirement we will need to meet very soon for LSEG in Java as well - not super urgent - but the design should include that as a required scenario and ideallly allow this without requiring yet another API
Also there is an ask on Java right now to allow apps to control concurrency - seems reasonable ask - but can be addressed via OperationOptions I guess
| │ ├─ Compute EPK hash per item (MurmurHash3 V1/V2) | ||
| │ ├─ Binary search sorted ranges → owning range | ||
| │ └─ HashMap<RangeId, Vec<ItemIdentity>> | ||
| │ |
There was a problem hiding this comment.
410 / Partition split/merge retry policy handling should be described here
| └─ 8. Aggregate: concatenate items, sum RU, merge diagnostics | ||
| ``` | ||
|
|
||
| ### Query Shapes |
There was a problem hiding this comment.
should we allow projections? That could be really efficient - but not sure how common it is required?
- Unified API: ReadManyInput enum supports (id,pk) and PK-only modes - PK-only mode: Shape 4/4h queries with EPK-aware routing - Single aggregated OperationDiagnostics (1:1 cardinality) - Explicit max_concurrency on ReadManyOptions - Projections support (SELECT field list instead of SELECT *) - Detailed 410 retry: split (1002), merge (1007), stale (1000) - Runtime-agnostic concurrency (no direct Tokio dependency) - EPK file references: murmur_hash.rs + hash.rs - PK path /id with pk != id is now an error, not Shape 3 fallback - C FFI: separate function for PK-only mode - New test cases: U20-U26 (PK-only, projections, concurrency), E11-E15
Fabian's review asked for the partition split/merge retry handling to be described explicitly. Changes: - New '410 Retry Policy' subsection with step-by-step flow diagram for sub-status 1002 (split), 1007 (merge), and 1000 (stale collection) - Algorithm step 7 now references retry policy instead of blanket 'cancel' - Concurrency section clarifies 410 retries are not cancellation errors - Retry limit bounded by existing driver retry policy (default 3 attempts)
Fabian framed projections as an open question, not a committed feature. Move to Open Questions and remove from ReadManyOptions, query shapes, files table, and test cases.
| /// | ||
| /// All work units execute concurrently. Items not found are silently | ||
| /// omitted. | ||
| pub async fn read_many( |
There was a problem hiding this comment.
Here's a thought. What if this is execute_query_operation instead, and we define a new CosmosQueryOperation type, much like CosmosOperation. This type would be used to represent:
- A Query (single or multi-partition).
- A ReadMany operation using item identities.
- Any other future composite operations we need to add in the future.
For most composite operations, there is a similar workflow: Using the user's input, internal logic, and the assistance of the backend, compute a query plan (issue queries [Q1, Q2, Q3, ...] to PK ranges [P1, P2, P3, ...] and merge with algorithm A).
This was the observation that led us to implementing ReadMany in the Rust Query Engine initially. A ReadMany is effectively a client-driven query plan. We don't need the backend's support to generate it, but once we've generated it, we basically have the same kind of query plan we'd get from the backend. If we create our own internal QueryPlan that can be generated from the backend response, a future library (like ServiceInterop), or synthesized directly in the driver, then we can effectively have a single query pipeline. I think the pipeline node model we've used in other SDKs could really be adapted here.
That also opens the door to more advanced composite operations in the future, like ReadMany w/Projection.
There was a problem hiding this comment.
cc @FabianMeiswinkel to get his attention on this comment.
There was a problem hiding this comment.
@analogrelay - Yeah - I would not call it execute_query_operation - but execute_feed_operation - but that is irrelevant. I think it would make sense to have one entry point for all the (possibly) cross partition oepration that require continuations - ReadFeed, ChangeFeed, ReadMany, ReadAll (PK-specific ReadFeed) and Query
Proposes a ReadMany API for azure_data_cosmos_driver that groups items by physical partition using EPK hashing, dispatches single-item groups as point reads and multi-item groups as batched parameterized queries, and executes concurrently.
Covers driver, SDK, and C FFI layer APIs with query shape selection, error handling, and test plan.