Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdk/core/azure_core/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub use pager::{ItemIterator, PageIterator, Pager};
pub use pipeline::*;
pub use poller::Poller;
pub use request::{Body, Request, RequestContent};
pub use response::{AsyncRawResponse, AsyncResponse, RawResponse, Response};
pub use response::{AsyncRawResponse, AsyncResponse, AsyncResponseBody, RawResponse, Response};

pub use typespec_client_core::http::response;
pub use typespec_client_core::http::{
Expand Down
2 changes: 1 addition & 1 deletion sdk/core/typespec_client_core/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub use models::*;
pub use options::*;
pub use pipeline::*;
pub use request::{Body, Request, RequestContent};
pub use response::{AsyncRawResponse, RawResponse, Response};
pub use response::{AsyncRawResponse, AsyncResponseBody, RawResponse, Response};
pub use sanitizer::*;

use std::borrow::Cow::{self, Borrowed, Owned};
Expand Down
5 changes: 5 additions & 0 deletions sdk/storage/azure_storage_blob/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@

### Breaking Changes

- Revised `download()` on `BlobClient` with the following breaking changes:
- Now uses managed (multi-part) download logic for optimal performance on single-shot and parallel range transfers.
- Returns `Result<BlobClientDownloadResult>` instead of `Result<AsyncResponse<BlobClientDownloadResult>>`.
- The previous `BlobClientDownloadResultHeaders` trait was removed.

### Bugs Fixed

### Other Changes
Expand Down
2 changes: 1 addition & 1 deletion sdk/storage/azure_storage_blob/assets.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"AssetsRepo": "Azure/azure-sdk-assets",
"AssetsRepoPrefixPath": "rust",
"Tag": "rust/azure_storage_blob_a66159c81f",
"Tag": "rust/azure_storage_blob_0221cdcd7a",
"TagPrefix": "rust/azure_storage_blob"
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Download the file
println!("\nDownloading blob '{}'...", blob_name);
let response = blob_client.download(None).await?;
let (_, _, body) = response.deconstruct();
let downloaded_content = body.collect().await?;
let downloaded_content = response.body.collect().await?;

// Print the contents to stdout
println!("\n=== File Contents ===");
Expand Down
10 changes: 5 additions & 5 deletions sdk/storage/azure_storage_blob/perf/download_blob_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl DownloadBlobTest {
async fn collect_stream(&self, blob_client: BlobClient) -> azure_core::Result<()> {
let response = blob_client.download(None).await?;

let mut body = response.into_body();
let mut body = response.body;

while let Some(result) = body.next().await {
// We don't actually care about the contents of the blob for this test, we just want to download it.
Expand All @@ -136,7 +136,7 @@ impl DownloadBlobTest {
let response = blob_client.download(None).await?;

let mut buffer = vec![0u8; self.size];
response.into_body().collect_into(&mut buffer).await?;
response.body.collect_into(&mut buffer).await?;
black_box(buffer);
Ok(())
}
Expand All @@ -148,7 +148,7 @@ impl DownloadBlobTest {
async fn collect_blob(&self, blob_client: BlobClient) -> azure_core::Result<Bytes> {
let response = blob_client.download(None).await?;

let body = response.into_body().collect().await?;
let body = response.body.collect().await?;
Ok(black_box(body))
}

Expand All @@ -157,7 +157,7 @@ impl DownloadBlobTest {
async fn collect_blob_simple(&self, blob_client: BlobClient) -> azure_core::Result<Bytes> {
let response = blob_client.download(None).await?;

let mut body = response.into_body();
let mut body = response.body;

let mut final_result: Vec<u8> = Vec::new();
while let Some(res) = body.next().await {
Expand All @@ -171,7 +171,7 @@ impl DownloadBlobTest {
async fn collect_blob_bytes_mut(&self, blob_client: BlobClient) -> azure_core::Result<Bytes> {
let response = blob_client.download(None).await?;

let mut body = response.into_body();
let mut body = response.body;

let mut final_result = BytesMut::new();

Expand Down
148 changes: 80 additions & 68 deletions sdk/storage/azure_storage_blob/src/clients/blob_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ pub use crate::generated::clients::{BlobClient, BlobClientOptions};

use crate::{
generated::clients::BlobClient as GeneratedBlobClient,
generated::models::BlobClientDownloadInternalOptions,
logging::apply_storage_logging_defaults,
models::{
http_ranges::IntoRangeHeader, method_options::BlobClientManagedDownloadOptions,
BlobClientDownloadOptions, BlobClientDownloadResult, BlobClientUploadOptions,
BlobClientUploadResult, StorageErrorCode,
http_ranges::IntoRangeHeader, BlobClientDownloadOptions, BlobClientDownloadResult,
BlobClientUploadOptions, BlobClientUploadResult, StorageErrorCode,
},
partitioned_transfer::{self, PartitionedDownloadBehavior},
pipeline::StorageHeadersPolicy,
Expand All @@ -21,13 +21,12 @@ use azure_core::{
error::ErrorKind,
http::{
policies::{auth::BearerTokenAuthorizationPolicy, Policy},
response::{AsyncResponse, PinnedStream},
AsyncRawResponse, NoFormat, Pipeline, RequestContent, StatusCode, Url, UrlExt,
AsyncRawResponse, ClientMethodOptions, NoFormat, Pipeline, RequestContent, StatusCode, Url,
UrlExt,
},
tracing, Bytes, Result,
};
use std::sync::Arc;
use std::{num::NonZero, ops::Range};
use std::{num::NonZero, ops::Range, sync::Arc};

impl BlobClient {
/// Creates a new BlobClient, using Entra ID authentication.
Expand Down Expand Up @@ -113,67 +112,27 @@ impl BlobClient {
})
}

/// The managed download operation retrieves the content of an existing blob.
/// Downloads a blob directly into a caller-provided buffer.
///
/// # Arguments
///
/// * `options` - Optional parameters for the request.
pub async fn managed_download(
&self,
options: Option<BlobClientManagedDownloadOptions<'_>>,
) -> Result<PinnedStream> {
let options = options.unwrap_or_default();
let parallel = options.parallel.unwrap_or(DEFAULT_PARALLEL);
let partition_size = options.partition_size.unwrap_or(DEFAULT_PARTITION_SIZE);
// construct exhaustively to ensure we catch new options when added
let get_range_options = BlobClientDownloadOptions {
encryption_algorithm: options.encryption_algorithm,
encryption_key: options.encryption_key,
encryption_key_sha256: options.encryption_key_sha256,
if_match: options.if_match,
if_modified_since: options.if_modified_since,
if_none_match: options.if_none_match,
if_tags: options.if_tags,
if_unmodified_since: options.if_unmodified_since,
lease_id: options.lease_id,
// TODO: method_options: options.method_options,
range: None,
range_get_content_crc64: options.range_get_content_crc64,
range_get_content_md5: options.range_get_content_md5,
snapshot: options.snapshot,
structured_body_type: options.structured_body_type,
timeout: options.timeout,
version_id: options.version_id,
..Default::default()
};

let client = GeneratedBlobClient {
endpoint: self.endpoint.clone(),
pipeline: self.pipeline.clone(),
version: self.version.clone(),
tracer: self.tracer.clone(),
};
let client = BlobClientDownloadBehavior::new(client, get_range_options);

partitioned_transfer::download(options.range, parallel, partition_size, Arc::new(client))
.await
}

/// The managed download operation retrieves the content of an existing blob.
/// Unlike [`BlobClient::download`], which allocates and returns the blob data, this method
/// writes the content directly into `buffer`. The blob is fetched in parallel range requests and assembled in-place.
///
/// # Arguments
///
/// * `buffer` - The buffer to write the blob content into. Must be large enough to hold the requested range or the entire blob.
/// * `options` - Optional parameters for the request.
pub async fn managed_download_into(
pub async fn download_into(
&self,
buffer: &mut [u8],
options: Option<BlobClientManagedDownloadOptions<'_>>,
options: Option<BlobClientDownloadOptions<'_>>,
) -> Result<usize> {
let options = options.unwrap_or_default();
let parallel = options.parallel.unwrap_or(DEFAULT_PARALLEL);
let partition_size = options.partition_size.unwrap_or(DEFAULT_PARTITION_SIZE);
// construct exhaustively to ensure we catch new options when added
let get_range_options = BlobClientDownloadOptions {
let parallel = options.parallel.unwrap_or(DEFAULT_DOWNLOAD_PARALLEL);
let partition_size = options
.partition_size
.unwrap_or(DEFAULT_DOWNLOAD_PARTITION_SIZE);
// Construct exhaustively to catch new options.
let get_range_options = BlobClientDownloadInternalOptions {
encryption_algorithm: options.encryption_algorithm,
encryption_key: options.encryption_key,
encryption_key_sha256: options.encryption_key_sha256,
Expand All @@ -183,15 +142,16 @@ impl BlobClient {
if_tags: options.if_tags,
if_unmodified_since: options.if_unmodified_since,
lease_id: options.lease_id,
// TODO: method_options: options.method_options,
method_options: ClientMethodOptions {
context: options.method_options.context.into_owned(),
},
range: None,
range_get_content_crc64: options.range_get_content_crc64,
range_get_content_md5: options.range_get_content_md5,
snapshot: options.snapshot,
structured_body_type: options.structured_body_type,
timeout: options.timeout,
version_id: options.version_id,
..Default::default()
};

let client = GeneratedBlobClient {
Expand Down Expand Up @@ -289,14 +249,65 @@ impl BlobClient {
})
}

/// Downloads a blob from the service, including its metadata and properties.
/// Downloads a blob and its contents from the service.
///
/// This operation performs a managed (multi-part) download, splitting the blob into
/// parallel range requests for better performance on large blobs. The returned
/// [`BlobClientDownloadResult::body`] contains the complete blob data, while
/// [`BlobClientDownloadResult::properties`] and /// [`BlobClientDownloadResult::headers`]
/// reflect only the initial response's metadata and properties.
///
/// # Arguments
///
/// * `options` - Optional configuration for the request.
#[tracing::function("Storage.Blob.Blob.download")]
pub async fn download(
&self,
options: Option<BlobClientDownloadOptions<'_>>,
) -> Result<AsyncResponse<BlobClientDownloadResult>> {
self.download_internal(options).await
) -> Result<BlobClientDownloadResult> {
let options = options.unwrap_or_default();
let parallel = options.parallel.unwrap_or(DEFAULT_DOWNLOAD_PARALLEL);
let partition_size = options
.partition_size
.unwrap_or(DEFAULT_DOWNLOAD_PARTITION_SIZE);
// Construct exhaustively to catch new options.
let get_range_options = BlobClientDownloadInternalOptions {
encryption_algorithm: options.encryption_algorithm,
encryption_key: options.encryption_key,
encryption_key_sha256: options.encryption_key_sha256,
if_match: options.if_match,
if_modified_since: options.if_modified_since,
if_none_match: options.if_none_match,
if_tags: options.if_tags,
if_unmodified_since: options.if_unmodified_since,
lease_id: options.lease_id,
// requires into_owned due to BlobClientDownloadBehavior w/ 'static Behavior
method_options: ClientMethodOptions {
context: options.method_options.context.into_owned(),
},
range: None,
range_get_content_crc64: options.range_get_content_crc64,
range_get_content_md5: options.range_get_content_md5,
snapshot: options.snapshot,
structured_body_type: options.structured_body_type,
timeout: options.timeout,
version_id: options.version_id,
};
let inner_client = GeneratedBlobClient {
endpoint: self.endpoint.clone(),
pipeline: self.pipeline.clone(),
version: self.version.clone(),
tracer: self.tracer.clone(),
};
let behavior = BlobClientDownloadBehavior::new(inner_client, get_range_options);
let response = partitioned_transfer::download(
options.range,
parallel,
partition_size,
Arc::new(behavior),
)
.await?;
BlobClientDownloadResult::from_headers(response)
}

/// Uploads content to a block blob, overwriting any existing blob by default.
Expand Down Expand Up @@ -340,15 +351,16 @@ impl BlobClient {
}

// unwrap evaluated at compile time
const DEFAULT_PARALLEL: NonZero<usize> = NonZero::new(4).unwrap();
const DEFAULT_PARTITION_SIZE: NonZero<usize> = NonZero::new(4 * 1024 * 1024).unwrap();
const DEFAULT_DOWNLOAD_PARALLEL: NonZero<usize> = NonZero::new(4).unwrap();
const DEFAULT_DOWNLOAD_PARTITION_SIZE: NonZero<usize> = NonZero::new(4 * 1024 * 1024).unwrap();

struct BlobClientDownloadBehavior<'a> {
client: GeneratedBlobClient,
options: BlobClientDownloadOptions<'a>,
options: BlobClientDownloadInternalOptions<'a>,
}

impl<'a> BlobClientDownloadBehavior<'a> {
fn new(client: GeneratedBlobClient, options: BlobClientDownloadOptions<'a>) -> Self {
fn new(client: GeneratedBlobClient, options: BlobClientDownloadInternalOptions<'a>) -> Self {
Self { client, options }
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl BlockBlobClient {
let options = options.unwrap_or_default();
let parallel = options.parallel.unwrap_or(DEFAULT_PARALLEL);
let partition_size = options.partition_size.unwrap_or(DEFAULT_PARTITION_SIZE);
// construct exhaustively to ensure we catch new options when added
// Construct exhaustively to catch new options.
let oneshot_options = BlockBlobClientUploadInternalOptions {
blob_cache_control: options.blob_cache_control.clone(),
blob_content_disposition: options.blob_content_disposition.clone(),
Expand Down
Loading