Skip to content

Commit 8f2fc95

Browse files
committed
fix(eventhubs): add owner_level support, fix partition expiration, and add partition revocation
- Pass owner_level to AMQP receiver link as com.microsoft:epoch for broker-enforced exclusive partition access - Fix default partition expiration from 0s to 60s so load balancer properly detects stale ownerships - Add partition revocation mechanism: when dispatch() detects partitions lost to other consumers, it revokes the corresponding PartitionClient via an AtomicBool flag that callers check with is_revoked() - Update processor example to demonstrate is_revoked() usage
1 parent fa1dd0c commit 8f2fc95

File tree

3 files changed

+108
-3
lines changed

3 files changed

+108
-3
lines changed

sdk/eventhubs/azure_messaging_eventhubs/examples/eventhubs_processor_client.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
7171

7272
let checkpoint_store = Arc::new(InMemoryCheckpointStore::new());
7373
let processor = EventProcessor::builder()
74+
// Setting owner_level enables epoch-based partition ownership.
75+
// The Event Hub broker will disconnect any existing receiver on
76+
// the same partition when a new receiver with the same or higher
77+
// owner level connects, preventing duplicate event processing
78+
// across multiple consumer instances.
79+
.with_owner_level(0)
7480
.build(consumer, checkpoint_store)
7581
.await?;
7682
let background_processor = BackgroundProcessor::new(processor.clone());
@@ -90,6 +96,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
9096
let mut event_stream = partition_client.stream_events();
9197
let mut event_count = 0;
9298
while let Some(event) = event_stream.next().await {
99+
// Check if the partition was reassigned to another consumer
100+
// during rebalancing. When revoked, stop processing so the
101+
// partition client can be re-acquired.
102+
if partition_client.is_revoked() {
103+
println!("Partition was reassigned, stopping.");
104+
break;
105+
}
93106
println!("Received message {event_count}");
94107
event_count += 1;
95108
if event_count > 10 {

sdk/eventhubs/azure_messaging_eventhubs/src/event_processor/partition_client.rs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,12 @@ use azure_core_amqp::{message::AmqpAnnotationKey, AmqpValue};
1212
use futures::Stream;
1313
use std::{
1414
pin::Pin,
15-
sync::{Arc, OnceLock, Weak},
15+
sync::{
16+
atomic::{AtomicBool, Ordering},
17+
Arc, OnceLock, Weak,
18+
},
1619
};
17-
use tracing::{debug, trace, warn};
20+
use tracing::{debug, info, trace, warn};
1821

1922
/// Represents a client for interacting with a specific partition in Event Hubs.
2023
///
@@ -26,6 +29,7 @@ pub struct PartitionClient {
2629
client_details: ConsumerClientDetails,
2730
event_receiver: OnceLock<EventReceiver>,
2831
consumers: Weak<ProcessorConsumersMap>,
32+
revoked: Arc<AtomicBool>,
2933
}
3034

3135
// It's safe to use the PartitionClient from multiple threads simultaneously.
@@ -45,6 +49,7 @@ impl PartitionClient {
4549
client_details,
4650
event_receiver: OnceLock::new(),
4751
consumers,
52+
revoked: Arc::new(AtomicBool::new(false)),
4853
}
4954
}
5055

@@ -56,6 +61,25 @@ impl PartitionClient {
5661
&self.partition_id
5762
}
5863

64+
/// Marks this partition client as revoked.
65+
///
66+
/// When a partition is reassigned to another consumer by the load balancer,
67+
/// the event processor calls this method to signal that this client should
68+
/// stop processing events. Callers should check [`is_revoked`](Self::is_revoked)
69+
/// between event receives and stop when it returns `true`.
70+
pub fn revoke(&self) {
71+
info!("Revoking partition client for partition {}", self.partition_id);
72+
self.revoked.store(true, Ordering::Relaxed);
73+
}
74+
75+
/// Returns `true` if this partition client has been revoked.
76+
///
77+
/// A revoked partition client indicates that the partition has been
78+
/// reassigned to another consumer and this client should stop processing.
79+
pub fn is_revoked(&self) -> bool {
80+
self.revoked.load(Ordering::Relaxed)
81+
}
82+
5983
/// Receives events from the partition.
6084
///
6185
/// This method returns a stream of `ReceivedEventData` wrapped in a `Result`.

sdk/eventhubs/azure_messaging_eventhubs/src/event_processor/processor.rs

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ pub struct EventProcessor {
5050
next_partition_client_sender: Sender<Arc<PartitionClient>>,
5151
client_details: ConsumerClientDetails,
5252
prefetch: u32,
53+
owner_level: Option<i64>,
5354
update_interval: Duration,
5455
start_positions: StartPositions,
5556
is_running: std::sync::Mutex<bool>,
@@ -62,6 +63,7 @@ struct EventProcessorOptions {
6263
update_interval: Duration,
6364
start_positions: StartPositions,
6465
prefetch: u32,
66+
owner_level: Option<i64>,
6567
partition_ids: Vec<String>,
6668
}
6769

@@ -123,6 +125,38 @@ impl ProcessorConsumersMap {
123125
info!("Consumers for partition now: {:?}", consumers.keys());
124126
Ok(())
125127
}
128+
129+
/// Returns the set of partition IDs that have active partition clients.
130+
fn get_active_partition_ids(&self) -> Result<Vec<String>> {
131+
let consumers = self
132+
.consumers
133+
.lock()
134+
.map_err(|_| EventHubsError::with_message("Could not lock consumers mutex."))?;
135+
Ok(consumers.keys().cloned().collect())
136+
}
137+
138+
/// Revokes and removes partition clients for partitions that are no longer
139+
/// owned by this processor instance.
140+
///
141+
/// This is called during dispatch when the load balancer indicates that
142+
/// a partition has been reassigned to another consumer. The partition client
143+
/// is marked as revoked (so the consumer can detect it via `is_revoked()`)
144+
/// and removed from the consumers map (so a new client can be created if the
145+
/// partition is later reassigned back).
146+
fn revoke_partition_clients(&self, partition_ids: &[String]) -> Result<()> {
147+
let mut consumers = self
148+
.consumers
149+
.lock()
150+
.map_err(|_| EventHubsError::with_message("Could not lock consumers mutex."))?;
151+
for partition_id in partition_ids {
152+
if let Some(weak) = consumers.remove(partition_id) {
153+
if let Some(client) = weak.upgrade() {
154+
client.revoke();
155+
}
156+
}
157+
}
158+
Ok(())
159+
}
126160
}
127161

128162
//pub(crate) type ConsumersType = std::sync::Mutex<HashMap<String, Arc<PartitionClient>>>;
@@ -163,6 +197,7 @@ impl EventProcessor {
163197
))),
164198
client_details,
165199
prefetch: options.prefetch,
200+
owner_level: options.owner_level,
166201
update_interval: options.update_interval,
167202
start_positions: options.start_positions,
168203
next_partition_client_sender: sender,
@@ -293,6 +328,25 @@ impl EventProcessor {
293328
e
294329
})?;
295330

331+
// Detect partitions that were stolen by another consumer.
332+
// Compare the set of partitions we currently own (from load_balance)
333+
// against active partition clients. Revoke any clients for partitions
334+
// we no longer own so they stop processing.
335+
let owned_ids: std::collections::HashSet<&str> =
336+
ownerships.iter().map(|o| o.partition_id.as_str()).collect();
337+
let active_ids = consumers.get_active_partition_ids()?;
338+
let stolen: Vec<String> = active_ids
339+
.into_iter()
340+
.filter(|id| !owned_ids.contains(id.as_str()))
341+
.collect();
342+
if !stolen.is_empty() {
343+
info!(
344+
"Partitions no longer owned, revoking: {}",
345+
stolen.join(", ")
346+
);
347+
consumers.revoke_partition_clients(&stolen)?;
348+
}
349+
296350
let checkpoints = self.get_checkpoint_map().await;
297351
let checkpoints = checkpoints.map_err(|e| {
298352
error!("Error in getting checkpoint map: {:?}", e);
@@ -366,6 +420,7 @@ impl EventProcessor {
366420
Some(OpenReceiverOptions {
367421
start_position: Some(start_position),
368422
prefetch: Some(self.prefetch),
423+
owner_level: self.owner_level,
369424
..Default::default()
370425
}),
371426
)
@@ -527,7 +582,7 @@ pub mod builders {
527582

528583
const DEFAULT_PREFETCH: u32 = 300;
529584
const DEFAULT_UPDATE_INTERVAL: Duration = Duration::seconds(30);
530-
const DEFAULT_PARTITION_EXPIRATION_DURATION: Duration = Duration::seconds(10);
585+
const DEFAULT_PARTITION_EXPIRATION_DURATION: Duration = Duration::seconds(60);
531586

532587
/// Builder for creating an `EventProcessor`.
533588
/// This builder allows you to configure various options for the event processor,
@@ -564,6 +619,7 @@ pub mod builders {
564619
start_positions: Option<StartPositions>,
565620
max_partition_count: Option<usize>,
566621
prefetch: Option<u32>,
622+
owner_level: Option<i64>,
567623
load_balancing_strategy: Option<super::ProcessorStrategy>,
568624
partition_expiration_duration: Option<Duration>,
569625
}
@@ -611,6 +667,17 @@ pub mod builders {
611667
self
612668
}
613669

670+
/// Sets the owner level (epoch) for partition receivers.
671+
///
672+
/// When set, the Event Hub broker enforces exclusive access: a new
673+
/// receiver with the same or higher owner level will disconnect any
674+
/// existing receiver on the same partition. This prevents duplicate
675+
/// processing when partitions are rebalanced across consumers.
676+
pub fn with_owner_level(mut self, owner_level: i64) -> Self {
677+
self.owner_level = Some(owner_level);
678+
self
679+
}
680+
614681
/// Sets the partition expiration duration for the event processor.
615682
pub fn with_partition_expiration_duration(
616683
mut self,
@@ -647,6 +714,7 @@ pub mod builders {
647714
update_interval: self.update_interval.unwrap_or(DEFAULT_UPDATE_INTERVAL),
648715
start_positions: self.start_positions.unwrap_or_default(),
649716
prefetch: self.prefetch.unwrap_or(DEFAULT_PREFETCH),
717+
owner_level: self.owner_level,
650718
partition_ids: eh_properties.partition_ids,
651719
},
652720
)

0 commit comments

Comments
 (0)