@@ -202,7 +202,9 @@ impl HealthProbe for GetDatabaseAccountProbe {
202202 body : None ,
203203 auth_context ,
204204 execution_context : ExecutionContext :: HealthCheckProbe ,
205- deadline : None , // Per-request timeout deferred; uses client-level timeout.
205+ // Per-request timeout will be wired when PR #3871 lands.
206+ // Until then, uses client-level metadata timeout.
207+ deadline : None ,
206208 };
207209
208210 // Use a lightweight DiagnosticsContextBuilder for the probe.
@@ -267,25 +269,18 @@ pub(crate) struct HealthMonitorConfig {
267269 /// Default: 300s (5 minutes), matching the Python SDK.
268270 pub refresh_interval : Duration ,
269271
270- /// Maximum number of retries per probe attempt .
271- /// Default: 3, matching the Python SDK.
272+ /// Maximum number of probe attempts per endpoint per sweep .
273+ /// Default: 3 (3 total attempts) , matching the Python SDK.
272274 /// Used by the probe's own retry loop; the transport pipeline handles
273275 /// 429 throttle retries independently.
274- pub max_probe_retries : u32 ,
275-
276- /// Timeout for individual probe requests. Currently uses the
277- /// client-level metadata timeout from `ConnectionPoolOptions`.
278- /// Per-request timeout override is deferred.
279- /// Default: 5s.
280- pub probe_timeout : Duration ,
276+ pub max_probe_attempts : u32 ,
281277}
282278
283279impl Default for HealthMonitorConfig {
284280 fn default () -> Self {
285281 Self {
286282 refresh_interval : Duration :: from_secs (300 ),
287- max_probe_retries : 3 ,
288- probe_timeout : Duration :: from_secs (5 ),
283+ max_probe_attempts : 3 ,
289284 }
290285 }
291286}
@@ -295,14 +290,25 @@ impl Default for HealthMonitorConfig {
295290/// Spawned by `CosmosDriver` during initialization. Runs a periodic
296291/// sweep that probes all relevant endpoints and updates the
297292/// `AccountEndpointStateStore`.
293+ ///
294+ /// On drop, the `CancellationToken` is cancelled and the background
295+ /// task exits (see §8).
298296pub (crate ) struct EndpointHealthMonitor {
299297 config : HealthMonitorConfig ,
300298 probe : Arc <dyn HealthProbe >,
301299 endpoint_state_store : Arc <AccountEndpointStateStore >,
302300 account_metadata_cache : Arc <AccountMetadataCache >,
301+ /// Cancellation token to signal the background task to stop.
302+ shutdown : CancellationToken ,
303303 /// Handle to the background sweep task. Dropped on shutdown.
304304 _sweep_handle : JoinHandle <()>,
305305}
306+
307+ impl Drop for EndpointHealthMonitor {
308+ fn drop (& mut self ) {
309+ self . shutdown. cancel ();
310+ }
311+ }
306312```
307313
308314### 3.4 Endpoint Selection — Which Endpoints to Probe
@@ -356,11 +362,11 @@ Health probes use a two-layer retry approach matching the pipeline architecture:
356362
357363``` rust
358364// Probe retry is a simple loop in run_sweep():
359- for attempt in 0 ..= config . max_probe_retries {
365+ for attempt in 0 .. config . max_probe_attempts {
360366 let result = probe . probe (& ep ). await ;
361367 match & result {
362368 ProbeResult :: Healthy { .. } => return result ,
363- ProbeResult :: Unhealthy { .. } if attempt < config . max_probe_retries => {
369+ ProbeResult :: Unhealthy { .. } if attempt + 1 < config . max_probe_attempts => {
364370 tokio :: time :: sleep (backoff_delay (attempt )). await ;
365371 }
366372 _ => return result ,
@@ -376,6 +382,9 @@ for attempt in 0..=config.max_probe_retries {
376382impl EndpointHealthMonitor {
377383 /// The main background loop. Runs an initial sweep immediately,
378384 /// then periodically until the monitor is dropped.
385+ ///
386+ /// Uses `azure_core` async primitives (not tokio directly) to remain
387+ /// async-runtime agnostic.
379388 async fn sweep_loop (
380389 config : HealthMonitorConfig ,
381390 probe : Arc <dyn HealthProbe >,
@@ -393,9 +402,15 @@ impl EndpointHealthMonitor {
393402 . await ;
394403
395404 loop {
396- tokio :: select! {
397- _ = tokio :: time :: sleep (config . refresh_interval) => {},
398- _ = shutdown . cancelled () => break ,
405+ // Wait for either the refresh interval to elapse or shutdown,
406+ // using azure_core async primitives (runtime-agnostic).
407+ let sleep = azure_core :: task :: sleep (config . refresh_interval);
408+ let cancelled = shutdown . cancelled ();
409+ futures :: pin_mut! (sleep , cancelled );
410+
411+ match futures :: future :: select (sleep , cancelled ). await {
412+ futures :: future :: Either :: Left (_ ) => {},
413+ futures :: future :: Either :: Right (_ ) => break ,
399414 }
400415
401416 Self :: run_sweep (
@@ -445,21 +460,18 @@ impl EndpointHealthMonitor {
445460
446461 let results = futures :: future :: join_all (probe_futures ). await ;
447462
448- // Apply results to endpoint state.
449- // We build a single new state snapshot with all changes applied atomically.
450- let mut new_state = (* state ). clone ();
463+ // Apply results per-endpoint against the latest state to avoid
464+ // overwriting concurrent reactive updates (no snapshot+swap).
451465 for (endpoint , result ) in & results {
452466 match result {
453467 ProbeResult :: Healthy { .. } => {
454- // Remove from unavailable set if present.
455- new_state . unavailable_endpoints. remove (endpoint );
468+ endpoint_state_store . mark_available (endpoint );
456469 }
457470 ProbeResult :: Unhealthy { failure , .. } => {
458- // Mark unavailable (or keep unavailable if already marked).
459- new_state . unavailable_endpoints. entry (endpoint . clone ())
460- . or_insert_with (|| {
461- (Instant :: now (), UnavailableReason :: ServiceUnavailable )
462- });
471+ endpoint_state_store . mark_unavailable (
472+ endpoint ,
473+ UnavailableReason :: ServiceUnavailable ,
474+ );
463475 tracing :: warn! (
464476 endpoint = % endpoint ,
465477 reason = % failure . message,
@@ -470,8 +482,6 @@ impl EndpointHealthMonitor {
470482 }
471483 }
472484 }
473-
474- endpoint_state_store . swap (new_state );
475485 }
476486}
477487```
@@ -500,7 +510,7 @@ impl EndpointHealthMonitor {
500510 account_metadata_cache : Arc <AccountMetadataCache >,
501511 ) -> Self {
502512 let shutdown = CancellationToken :: new ();
503- let sweep_handle = tokio :: spawn (Self :: sweep_loop (
513+ let sweep_handle = azure_core :: task :: spawn (Self :: sweep_loop (
504514 config . clone (),
505515 Arc :: clone (& probe ),
506516 Arc :: clone (& endpoint_state_store ),
@@ -513,6 +523,7 @@ impl EndpointHealthMonitor {
513523 probe ,
514524 endpoint_state_store ,
515525 account_metadata_cache ,
526+ shutdown ,
516527 _sweep_handle : sweep_handle ,
517528 }
518529 }
@@ -602,7 +613,7 @@ same `AccountEndpointStateStore`:
602613This means:
603614- Reactive failures are detected instantly (no waiting for next sweep).
604615- Proactive sweeps detect recovery (no waiting for expiration timer).
605- - Both write atomically via ` swap() ` on the same store.
616+ - Both apply per-endpoint mutations to the same store (no lost updates) .
606617
607618### 4.4 Diagnostics Integration
608619
@@ -652,20 +663,16 @@ New option group nested within `DriverOptions`:
652663pub struct HealthCheckOptions {
653664 /// Interval between background health sweeps. Default: 300s (5 minutes).
654665 refresh_interval : Duration ,
655- /// Maximum retries per probe attempt (probe-level retry loop).
666+ /// Maximum probe attempts per endpoint per sweep (probe-level retry loop).
656667 /// Default: 3.
657- max_probe_retries : u32 ,
658- /// Timeout for individual probe requests. Currently uses client-level
659- /// metadata timeout; per-request override is deferred. Default: 5s.
660- probe_timeout : Duration ,
668+ max_probe_attempts : u32 ,
661669}
662670
663671impl Default for HealthCheckOptions {
664672 fn default () -> Self {
665673 Self {
666674 refresh_interval : Duration :: from_secs (300 ),
667- max_probe_retries : 3 ,
668- probe_timeout : Duration :: from_secs (5 ),
675+ max_probe_attempts : 3 ,
669676 }
670677 }
671678}
@@ -680,13 +687,8 @@ impl HealthCheckOptions {
680687 self
681688 }
682689
683- pub fn with_max_probe_retries (mut self , retries : u32 ) -> Self {
684- self . max_probe_retries = retries ;
685- self
686- }
687-
688- pub fn with_probe_timeout (mut self , timeout : Duration ) -> Self {
689- self . probe_timeout = timeout ;
690+ pub fn with_max_probe_attempts (mut self , attempts : u32 ) -> Self {
691+ self . max_probe_attempts = attempts ;
690692 self
691693 }
692694}
@@ -756,12 +758,12 @@ This is the key design benefit of the `HealthProbe` trait abstraction.
756758
757759## 8. Shutdown & Resource Cleanup
758760
759- The ` EndpointHealthMonitor ` uses a ` CancellationToken ` to signal the background task to
760- stop. When the ` CosmosDriver ` is dropped:
761+ The ` EndpointHealthMonitor ` implements ` Drop ` to cancel the background task.
762+ When the monitor is dropped:
761763
762- 1 . The ` CancellationToken ` is cancelled .
763- 2 . The ` JoinHandle ` is dropped (the sweep loop exits on next ` tokio::select! ` ) .
764- 3 . Any in-flight probe requests complete naturally (they have a timeout).
764+ 1 . ` Drop::drop ` cancels the ` CancellationToken ` .
765+ 2 . The sweep loop detects cancellation and exits .
766+ 3 . Any in-flight probe requests complete naturally (they have a client-level timeout).
765767
766768No explicit ` shutdown() ` method is needed — the RAII pattern via ` Drop ` handles cleanup.
767769
0 commit comments