Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/storage/azure_storage_blob/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ tokio = { workspace = true, optional = true, features = [
workspace = true

[dev-dependencies]
async-trait.workspace = true
azure_core_opentelemetry.workspace = true
azure_core_test = { workspace = true, features = [
"tracing",
Expand Down
217 changes: 214 additions & 3 deletions sdk/storage/azure_storage_blob/tests/blob_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@

use azure_core::{
error::ErrorKind,
http::{headers::CONTENT_TYPE, ClientOptions, RequestContent, StatusCode, Url},
http::{
headers::CONTENT_TYPE, response::PinnedStream, Body, ClientOptions, RequestContent,
StatusCode, Url,
},
stream::SeekableStream,
time::{parse_rfc3339, to_rfc3339, OffsetDateTime},
Bytes,
};
Expand All @@ -26,16 +30,18 @@ use azure_storage_blob_test::{
StorageAccount, TestPolicy,
};
use bytes::{BufMut, BytesMut};
use futures::TryStreamExt;
use futures::{io::AsyncRead, StreamExt, TryStreamExt};
use std::{
cmp::min,
collections::HashMap,
error::Error,
num::NonZero,
pin::Pin,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
Arc, Mutex,
},
task::{Context, Poll},
time::Duration,
};
use tokio::time;
Expand Down Expand Up @@ -1151,3 +1157,208 @@ async fn test_set_blob_properties_content_headers(ctx: TestContext) -> Result<()
container_client.delete(None).await?;
Ok(())
}

// Demonstrates the friction of using a download response body as the source for a subsequent
// upload.
// `BlobClient::download` returns `AsyncResponse<BlobClientDownloadResult>`, whose body
// is a `PinnedStream`: a single-use, forward-only async byte stream.
// `BlobClient::upload` requires `RequestContent<Bytes, NoFormat>`, which internally becomes a `Body` that must
// implement `SeekableStream` for the partitioned (multi-block) upload path. `SeekableStream`
// requires both `len()` (known upfront) and `reset()` (replay on retry), neither of which a
// streaming HTTP response body can provide. As a result, the caller is forced to buffer the
// entire download into memory before uploading, regardless of blob size.
#[recorded::test]
async fn test_download_then_upload_requires_full_buffer(
ctx: TestContext,
) -> Result<(), Box<dyn Error>> {
// Recording Setup
let recording = ctx.recording();
let container_client =
get_container_client(recording, true, StorageAccount::Standard, None).await?;
let source_blob_client = container_client.blob_client(&get_blob_name(recording));
let dest_blob_client = container_client.blob_client("dest-blob");

let original_data = b"hello from the other side";

// Arrange — upload a source blob
source_blob_client
.upload(RequestContent::from(original_data.to_vec()), None)
.await?;

// Act — download the blob
// `download` returns AsyncResponse<BlobClientDownloadResult>: the body is a PinnedStream.
// PinnedStream is a Pin<Box<dyn Stream<Item=Result<Bytes>> + Send>>: it is forward-only
// and cannot be reset or asked for its length without consuming it first.
let download_response = source_blob_client.download(None).await?;

// The content-length header is available, but the body stream itself has no `len()` method
// and no `reset()`. There is no conversion from AsyncResponseBody into RequestContent or
// Body::SeekableStream. The only path forward is to collect the entire stream into memory.
let buffered: Bytes = download_response.into_body().collect().await?;

// Only after full buffering can the data be passed to `upload`.
dest_blob_client
.upload(RequestContent::from(buffered.to_vec()), None)
.await?;
Comment on lines +1188 to +1202
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the crux of the issue.


// Assert — round-trip fidelity
let downloaded = dest_blob_client
.download(None)
.await?
.into_body()
.collect()
.await?;
assert_eq!(original_data.as_slice(), downloaded.as_ref());

container_client.delete(None).await?;
Ok(())
}

// ---- Helper for test_download_then_upload_workaround ----

/// A [`SeekableStream`] adapter over a downloaded response body.
///
/// This is Ben's workaround pattern: wrap the `PinnedStream` from a `BlobClient::download`
/// response in a type that satisfies `SeekableStream`, avoiding a full in-memory buffer.
///
///
/// - **`len()`** — the content-length from response headers is known before the body is
/// consumed, satisfying `SeekableStream::len()` without buffering.
/// - **`Clone`** — required by `SeekableStream` via `DynClone`, but never actually called by
/// the upload path. Satisfied using `Arc<Mutex<...>>`; a clone shares the same stream.
/// - **`reset()`** — always errors. An HTTP response stream cannot be replayed. Upload only
/// calls `reset()` when retrying a failed block — this workaround is safe on the happy
/// path but will surface an explicit error if a retry is triggered mid-upload.
/// - **`AsyncRead`** — bridges `Stream<Item=Result<Bytes>>` to byte-level reads, keeping a
/// one-chunk buffer to handle the case where a stream chunk is larger than the read buffer.
#[derive(Clone)]
struct DownloadBodySeekableStream {
inner: Arc<Mutex<DownloadBodyInner>>,
len: u64,
}

struct DownloadBodyInner {
stream: Option<PinnedStream>,
/// Leftover bytes from the last chunk when it didn't fully fit the read buffer.
remainder: Bytes,
}

impl std::fmt::Debug for DownloadBodySeekableStream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DownloadBodySeekableStream")
.field("len", &self.len)
.finish_non_exhaustive()
}
}

impl AsyncRead for DownloadBodySeekableStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<std::io::Result<usize>> {
let mut inner = self.inner.lock().unwrap();
// Drain leftover bytes from the previous chunk before fetching the next one.
if !inner.remainder.is_empty() {
let n = buf.len().min(inner.remainder.len());
buf[..n].copy_from_slice(&inner.remainder[..n]);
inner.remainder = inner.remainder.slice(n..);
return Poll::Ready(Ok(n));
}
match inner.stream.as_mut() {
None => Poll::Ready(Ok(0)),
Some(stream) => match stream.poll_next_unpin(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(Ok(0)),
Poll::Ready(Some(Err(e))) => Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::Other,
e.to_string(),
))),
Poll::Ready(Some(Ok(bytes))) => {
let n = buf.len().min(bytes.len());
buf[..n].copy_from_slice(&bytes[..n]);
if bytes.len() > n {
inner.remainder = bytes.slice(n..);
}
Poll::Ready(Ok(n))
}
},
}
}
}

#[async_trait::async_trait]
impl SeekableStream for DownloadBodySeekableStream {
async fn reset(&mut self) -> azure_core::Result<()> {
Err(azure_core::Error::with_message(
azure_core::error::ErrorKind::Io,
"download response body cannot be reset; upload will fail if a retry is triggered",
))
}

fn len(&self) -> u64 {
self.len
}
}

// Demonstrates Ben's workaround: wrap the download response body in a custom `SeekableStream`
// to avoid the forced full in-memory buffer of `test_download_then_upload_requires_full_buffer`.
// Bytes flow directly from the response stream into the upload without an intermediate
// allocation, costing one stream chunk in memory (~64 KiB) instead of the full blob size.
//
// The boilerplate here shows what the type system asks for today:
// - A concrete struct (~30 lines) bridging Stream to AsyncRead with a partial-chunk buffer.
// - An Arc<Mutex<...>> to satisfy Clone (required by SeekableStream, never called by upload).
// - A reset() stub that honestly documents the retry limitation.
// - content_length() read from headers *before* deconstruct(), because the body provides no len().
#[recorded::test]
async fn test_download_then_upload_workaround(ctx: TestContext) -> Result<(), Box<dyn Error>> {
// Recording Setup
let recording = ctx.recording();
let container_client =
get_container_client(recording, true, StorageAccount::Standard, None).await?;
let source_blob_client = container_client.blob_client(&get_blob_name(recording));
let dest_blob_client = container_client.blob_client("dest-blob-workaround");

let original_data = b"hello from the other side";

// Arrange — upload a source blob
source_blob_client
.upload(RequestContent::from(original_data.to_vec()), None)
.await?;

// Act — download.
// content_length must be read from headers *before* deconstruct() consumes the response,
// because the body stream itself has no len().
let download_response = source_blob_client.download(None).await?;
let content_length = download_response.content_length()?;
let (_, _, body) = download_response.deconstruct();

// Wrap the response body in the SeekableStream adapter.
// Box::pin(body) converts AsyncResponseBody (!Unpin internally) into a PinnedStream that
// can be polled via poll_next_unpin() from behind a Mutex in poll_read.
let stream = DownloadBodySeekableStream {
inner: Arc::new(Mutex::new(DownloadBodyInner {
stream: Some(Box::pin(body)),
remainder: Bytes::new(),
})),
len: content_length.unwrap_or(0),
};
Comment on lines +1340 to +1346
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the crux of the workaround: a custom DownloadBodySeekableStream adapter so that we can get it into a RequestContent without collecting in-memory.


// No collect() — bytes flow from the download response directly into the upload request.
dest_blob_client
.upload(Body::SeekableStream(Box::new(stream)).into(), None)
.await?;

// Assert — round-trip fidelity
let downloaded = dest_blob_client
.download(None)
.await?
.into_body()
.collect()
.await?;
assert_eq!(original_data.as_slice(), downloaded.as_ref());

container_client.delete(None).await?;
Ok(())
}
Loading