Skip to content

Commit 0f206f6

Browse files
[Storage] Finalized managed_download and managed_download_to drop-in replacements (#4068)
1 parent 5e42fa5 commit 0f206f6

File tree

22 files changed

+572
-329
lines changed

22 files changed

+572
-329
lines changed

sdk/core/azure_core/src/http/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ pub use pager::{ItemIterator, PageIterator, Pager};
1818
pub use pipeline::*;
1919
pub use poller::Poller;
2020
pub use request::{Body, Request, RequestContent};
21-
pub use response::{AsyncRawResponse, AsyncResponse, RawResponse, Response};
21+
pub use response::{AsyncRawResponse, AsyncResponse, AsyncResponseBody, RawResponse, Response};
2222

2323
pub use typespec_client_core::http::response;
2424
pub use typespec_client_core::http::{

sdk/core/typespec_client_core/src/http/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ pub use models::*;
2424
pub use options::*;
2525
pub use pipeline::*;
2626
pub use request::{Body, Request, RequestContent};
27-
pub use response::{AsyncRawResponse, RawResponse, Response};
27+
pub use response::{AsyncRawResponse, AsyncResponseBody, RawResponse, Response};
2828
pub use sanitizer::*;
2929

3030
use std::borrow::Cow::{self, Borrowed, Owned};

sdk/storage/azure_storage_blob/CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,11 @@
88

99
### Breaking Changes
1010

11+
- Revised `download()` on `BlobClient` with the following breaking changes:
12+
- Now uses managed (multi-part) download logic for optimal performance on single-shot and parallel range transfers.
13+
- Returns `Result<BlobClientDownloadResult>` instead of `Result<AsyncResponse<BlobClientDownloadResult>>`.
14+
- The previous `BlobClientDownloadResultHeaders` trait was removed.
15+
1116
### Bugs Fixed
1217

1318
### Other Changes
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"AssetsRepo": "Azure/azure-sdk-assets",
33
"AssetsRepoPrefixPath": "rust",
4-
"Tag": "rust/azure_storage_blob_a66159c81f",
4+
"Tag": "rust/azure_storage_blob_0221cdcd7a",
55
"TagPrefix": "rust/azure_storage_blob"
66
}

sdk/storage/azure_storage_blob/examples/blob_storage_logging.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
147147
// Download the file
148148
println!("\nDownloading blob '{}'...", blob_name);
149149
let response = blob_client.download(None).await?;
150-
let (_, _, body) = response.deconstruct();
151-
let downloaded_content = body.collect().await?;
150+
let downloaded_content = response.body.collect().await?;
152151

153152
// Print the contents to stdout
154153
println!("\n=== File Contents ===");

sdk/storage/azure_storage_blob/perf/download_blob_test.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ impl DownloadBlobTest {
122122
async fn collect_stream(&self, blob_client: BlobClient) -> azure_core::Result<()> {
123123
let response = blob_client.download(None).await?;
124124

125-
let mut body = response.into_body();
125+
let mut body = response.body;
126126

127127
while let Some(result) = body.next().await {
128128
// We don't actually care about the contents of the blob for this test, we just want to download it.
@@ -136,7 +136,7 @@ impl DownloadBlobTest {
136136
let response = blob_client.download(None).await?;
137137

138138
let mut buffer = vec![0u8; self.size];
139-
response.into_body().collect_into(&mut buffer).await?;
139+
response.body.collect_into(&mut buffer).await?;
140140
black_box(buffer);
141141
Ok(())
142142
}
@@ -148,7 +148,7 @@ impl DownloadBlobTest {
148148
async fn collect_blob(&self, blob_client: BlobClient) -> azure_core::Result<Bytes> {
149149
let response = blob_client.download(None).await?;
150150

151-
let body = response.into_body().collect().await?;
151+
let body = response.body.collect().await?;
152152
Ok(black_box(body))
153153
}
154154

@@ -157,7 +157,7 @@ impl DownloadBlobTest {
157157
async fn collect_blob_simple(&self, blob_client: BlobClient) -> azure_core::Result<Bytes> {
158158
let response = blob_client.download(None).await?;
159159

160-
let mut body = response.into_body();
160+
let mut body = response.body;
161161

162162
let mut final_result: Vec<u8> = Vec::new();
163163
while let Some(res) = body.next().await {
@@ -171,7 +171,7 @@ impl DownloadBlobTest {
171171
async fn collect_blob_bytes_mut(&self, blob_client: BlobClient) -> azure_core::Result<Bytes> {
172172
let response = blob_client.download(None).await?;
173173

174-
let mut body = response.into_body();
174+
let mut body = response.body;
175175

176176
let mut final_result = BytesMut::new();
177177

sdk/storage/azure_storage_blob/src/clients/blob_client.rs

Lines changed: 80 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@ pub use crate::generated::clients::{BlobClient, BlobClientOptions};
55

66
use crate::{
77
generated::clients::BlobClient as GeneratedBlobClient,
8+
generated::models::BlobClientDownloadInternalOptions,
89
logging::apply_storage_logging_defaults,
910
models::{
10-
http_ranges::IntoRangeHeader, method_options::BlobClientManagedDownloadOptions,
11-
BlobClientDownloadOptions, BlobClientDownloadResult, BlobClientUploadOptions,
12-
BlobClientUploadResult, StorageErrorCode,
11+
http_ranges::IntoRangeHeader, BlobClientDownloadOptions, BlobClientDownloadResult,
12+
BlobClientUploadOptions, BlobClientUploadResult, StorageErrorCode,
1313
},
1414
partitioned_transfer::{self, PartitionedDownloadBehavior},
1515
pipeline::StorageHeadersPolicy,
@@ -21,13 +21,12 @@ use azure_core::{
2121
error::ErrorKind,
2222
http::{
2323
policies::{auth::BearerTokenAuthorizationPolicy, Policy},
24-
response::{AsyncResponse, PinnedStream},
25-
AsyncRawResponse, NoFormat, Pipeline, RequestContent, StatusCode, Url, UrlExt,
24+
AsyncRawResponse, ClientMethodOptions, NoFormat, Pipeline, RequestContent, StatusCode, Url,
25+
UrlExt,
2626
},
2727
tracing, Bytes, Result,
2828
};
29-
use std::sync::Arc;
30-
use std::{num::NonZero, ops::Range};
29+
use std::{num::NonZero, ops::Range, sync::Arc};
3130

3231
impl BlobClient {
3332
/// Creates a new BlobClient, using Entra ID authentication.
@@ -113,67 +112,27 @@ impl BlobClient {
113112
})
114113
}
115114

116-
/// The managed download operation retrieves the content of an existing blob.
115+
/// Downloads a blob directly into a caller-provided buffer.
117116
///
118-
/// # Arguments
119-
///
120-
/// * `options` - Optional parameters for the request.
121-
pub async fn managed_download(
122-
&self,
123-
options: Option<BlobClientManagedDownloadOptions<'_>>,
124-
) -> Result<PinnedStream> {
125-
let options = options.unwrap_or_default();
126-
let parallel = options.parallel.unwrap_or(DEFAULT_PARALLEL);
127-
let partition_size = options.partition_size.unwrap_or(DEFAULT_PARTITION_SIZE);
128-
// construct exhaustively to ensure we catch new options when added
129-
let get_range_options = BlobClientDownloadOptions {
130-
encryption_algorithm: options.encryption_algorithm,
131-
encryption_key: options.encryption_key,
132-
encryption_key_sha256: options.encryption_key_sha256,
133-
if_match: options.if_match,
134-
if_modified_since: options.if_modified_since,
135-
if_none_match: options.if_none_match,
136-
if_tags: options.if_tags,
137-
if_unmodified_since: options.if_unmodified_since,
138-
lease_id: options.lease_id,
139-
// TODO: method_options: options.method_options,
140-
range: None,
141-
range_get_content_crc64: options.range_get_content_crc64,
142-
range_get_content_md5: options.range_get_content_md5,
143-
snapshot: options.snapshot,
144-
structured_body_type: options.structured_body_type,
145-
timeout: options.timeout,
146-
version_id: options.version_id,
147-
..Default::default()
148-
};
149-
150-
let client = GeneratedBlobClient {
151-
endpoint: self.endpoint.clone(),
152-
pipeline: self.pipeline.clone(),
153-
version: self.version.clone(),
154-
tracer: self.tracer.clone(),
155-
};
156-
let client = BlobClientDownloadBehavior::new(client, get_range_options);
157-
158-
partitioned_transfer::download(options.range, parallel, partition_size, Arc::new(client))
159-
.await
160-
}
161-
162-
/// The managed download operation retrieves the content of an existing blob.
117+
/// Unlike [`BlobClient::download`], which allocates and returns the blob data, this method
118+
/// writes the content directly into `buffer`. The blob is fetched in parallel range requests and assembled in-place.
163119
///
164120
/// # Arguments
165121
///
122+
/// * `buffer` - The buffer to write the blob content into. Must be large enough to hold the requested range or the entire blob.
166123
/// * `options` - Optional parameters for the request.
167-
pub async fn managed_download_into(
124+
pub async fn download_into(
168125
&self,
169126
buffer: &mut [u8],
170-
options: Option<BlobClientManagedDownloadOptions<'_>>,
127+
options: Option<BlobClientDownloadOptions<'_>>,
171128
) -> Result<usize> {
172129
let options = options.unwrap_or_default();
173-
let parallel = options.parallel.unwrap_or(DEFAULT_PARALLEL);
174-
let partition_size = options.partition_size.unwrap_or(DEFAULT_PARTITION_SIZE);
175-
// construct exhaustively to ensure we catch new options when added
176-
let get_range_options = BlobClientDownloadOptions {
130+
let parallel = options.parallel.unwrap_or(DEFAULT_DOWNLOAD_PARALLEL);
131+
let partition_size = options
132+
.partition_size
133+
.unwrap_or(DEFAULT_DOWNLOAD_PARTITION_SIZE);
134+
// Construct exhaustively to catch new options.
135+
let get_range_options = BlobClientDownloadInternalOptions {
177136
encryption_algorithm: options.encryption_algorithm,
178137
encryption_key: options.encryption_key,
179138
encryption_key_sha256: options.encryption_key_sha256,
@@ -183,15 +142,16 @@ impl BlobClient {
183142
if_tags: options.if_tags,
184143
if_unmodified_since: options.if_unmodified_since,
185144
lease_id: options.lease_id,
186-
// TODO: method_options: options.method_options,
145+
method_options: ClientMethodOptions {
146+
context: options.method_options.context.into_owned(),
147+
},
187148
range: None,
188149
range_get_content_crc64: options.range_get_content_crc64,
189150
range_get_content_md5: options.range_get_content_md5,
190151
snapshot: options.snapshot,
191152
structured_body_type: options.structured_body_type,
192153
timeout: options.timeout,
193154
version_id: options.version_id,
194-
..Default::default()
195155
};
196156

197157
let client = GeneratedBlobClient {
@@ -289,14 +249,65 @@ impl BlobClient {
289249
})
290250
}
291251

292-
/// Downloads a blob from the service, including its metadata and properties.
252+
/// Downloads a blob and its contents from the service.
253+
///
254+
/// This operation performs a managed (multi-part) download, splitting the blob into
255+
/// parallel range requests for better performance on large blobs. The returned
256+
/// [`BlobClientDownloadResult::body`] contains the complete blob data, while
257+
/// [`BlobClientDownloadResult::properties`] and /// [`BlobClientDownloadResult::headers`]
258+
/// reflect only the initial response's metadata and properties.
259+
///
260+
/// # Arguments
293261
///
294262
/// * `options` - Optional configuration for the request.
263+
#[tracing::function("Storage.Blob.Blob.download")]
295264
pub async fn download(
296265
&self,
297266
options: Option<BlobClientDownloadOptions<'_>>,
298-
) -> Result<AsyncResponse<BlobClientDownloadResult>> {
299-
self.download_internal(options).await
267+
) -> Result<BlobClientDownloadResult> {
268+
let options = options.unwrap_or_default();
269+
let parallel = options.parallel.unwrap_or(DEFAULT_DOWNLOAD_PARALLEL);
270+
let partition_size = options
271+
.partition_size
272+
.unwrap_or(DEFAULT_DOWNLOAD_PARTITION_SIZE);
273+
// Construct exhaustively to catch new options.
274+
let get_range_options = BlobClientDownloadInternalOptions {
275+
encryption_algorithm: options.encryption_algorithm,
276+
encryption_key: options.encryption_key,
277+
encryption_key_sha256: options.encryption_key_sha256,
278+
if_match: options.if_match,
279+
if_modified_since: options.if_modified_since,
280+
if_none_match: options.if_none_match,
281+
if_tags: options.if_tags,
282+
if_unmodified_since: options.if_unmodified_since,
283+
lease_id: options.lease_id,
284+
// requires into_owned due to BlobClientDownloadBehavior w/ 'static Behavior
285+
method_options: ClientMethodOptions {
286+
context: options.method_options.context.into_owned(),
287+
},
288+
range: None,
289+
range_get_content_crc64: options.range_get_content_crc64,
290+
range_get_content_md5: options.range_get_content_md5,
291+
snapshot: options.snapshot,
292+
structured_body_type: options.structured_body_type,
293+
timeout: options.timeout,
294+
version_id: options.version_id,
295+
};
296+
let inner_client = GeneratedBlobClient {
297+
endpoint: self.endpoint.clone(),
298+
pipeline: self.pipeline.clone(),
299+
version: self.version.clone(),
300+
tracer: self.tracer.clone(),
301+
};
302+
let behavior = BlobClientDownloadBehavior::new(inner_client, get_range_options);
303+
let response = partitioned_transfer::download(
304+
options.range,
305+
parallel,
306+
partition_size,
307+
Arc::new(behavior),
308+
)
309+
.await?;
310+
BlobClientDownloadResult::from_headers(response)
300311
}
301312

302313
/// Uploads content to a block blob, overwriting any existing blob by default.
@@ -340,15 +351,16 @@ impl BlobClient {
340351
}
341352

342353
// unwrap evaluated at compile time
343-
const DEFAULT_PARALLEL: NonZero<usize> = NonZero::new(4).unwrap();
344-
const DEFAULT_PARTITION_SIZE: NonZero<usize> = NonZero::new(4 * 1024 * 1024).unwrap();
354+
const DEFAULT_DOWNLOAD_PARALLEL: NonZero<usize> = NonZero::new(4).unwrap();
355+
const DEFAULT_DOWNLOAD_PARTITION_SIZE: NonZero<usize> = NonZero::new(4 * 1024 * 1024).unwrap();
345356

346357
struct BlobClientDownloadBehavior<'a> {
347358
client: GeneratedBlobClient,
348-
options: BlobClientDownloadOptions<'a>,
359+
options: BlobClientDownloadInternalOptions<'a>,
349360
}
361+
350362
impl<'a> BlobClientDownloadBehavior<'a> {
351-
fn new(client: GeneratedBlobClient, options: BlobClientDownloadOptions<'a>) -> Self {
363+
fn new(client: GeneratedBlobClient, options: BlobClientDownloadInternalOptions<'a>) -> Self {
352364
Self { client, options }
353365
}
354366
}

sdk/storage/azure_storage_blob/src/clients/block_blob_client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ impl BlockBlobClient {
135135
let options = options.unwrap_or_default();
136136
let parallel = options.parallel.unwrap_or(DEFAULT_PARALLEL);
137137
let partition_size = options.partition_size.unwrap_or(DEFAULT_PARTITION_SIZE);
138-
// construct exhaustively to ensure we catch new options when added
138+
// Construct exhaustively to catch new options.
139139
let oneshot_options = BlockBlobClientUploadInternalOptions {
140140
blob_cache_control: options.blob_cache_control.clone(),
141141
blob_content_disposition: options.blob_content_disposition.clone(),

0 commit comments

Comments
 (0)