diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 4368099725c..bc9a5979d4c 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -25,7 +25,7 @@ java.library.path: "/usr/local/lib:/opt/local/lib:/usr/lib:/usr/lib64"
storm.local.dir: "storm-local"
storm.log4j2.conf.dir: "log4j2"
storm.zookeeper.servers:
- - "localhost"
+ - "localhost"
storm.zookeeper.port: 2181
storm.zookeeper.root: "/storm"
storm.zookeeper.session.timeout: 20000
@@ -52,7 +52,7 @@ storm.nimbus.retry.intervalceiling.millis: 60000
storm.nimbus.zookeeper.acls.check: true
storm.nimbus.zookeeper.acls.fixup: true
-storm.auth.simple-white-list.users: []
+storm.auth.simple-white-list.users: [ ]
storm.cluster.state.store: "org.apache.storm.cluster.ZKStateStorageFactory"
storm.meta.serialization.delegate: "org.apache.storm.serialization.GzipThriftSerializationDelegate"
storm.codedistributor.class: "org.apache.storm.codedistributor.LocalFileSystemCodeDistributor"
@@ -62,7 +62,7 @@ storm.health.check.timeout.ms: 5000
storm.disable.symlinks: false
### nimbus.* configs are for the master
-nimbus.seeds : ["localhost"]
+nimbus.seeds: [ "localhost" ]
nimbus.thrift.port: 6627
nimbus.thrift.threads: 64
nimbus.thrift.max_buffer_size: 1048576
@@ -163,10 +163,10 @@ storm.blobstore.acl.validation.enabled: false
### supervisor.* configs are for node supervisors
# Define the amount of workers that can be run on this machine. Each worker is assigned a port to use for communication
supervisor.slots.ports:
- - 6700
- - 6701
- - 6702
- - 6703
+ - 6700
+ - 6701
+ - 6702
+ - 6703
supervisor.childopts: "-Xmx256m"
supervisor.run.worker.as.user: false
#how long supervisor will wait to ensure that a worker process is started
@@ -184,8 +184,8 @@ supervisor.worker.heartbeats.max.timeout.secs: 600
#For topology configurable heartbeat timeout, maximum allowed heartbeat timeout.
worker.max.timeout.secs: 600
supervisor.enable: true
-supervisor.supervisors: []
-supervisor.supervisors.commands: []
+supervisor.supervisors: [ ]
+supervisor.supervisors.commands: [ ]
supervisor.memory.capacity.mb: 4096.0
#By convention 1 cpu core should be about 100, but this can be adjusted if needed
# using 100 makes it simple to set the desired value to the capacity measurement
@@ -278,6 +278,8 @@ topology.max.task.parallelism: null
topology.max.spout.pending: null # ideally should be larger than topology.producer.batch.size. (esp. if topology.batch.flush.interval.millis=0)
topology.state.synchronization.timeout.secs: 60
topology.stats.sample.rate: 0.05
+topology.stats.ewma.enable: false
+topology.stats.ewma.smoothing.factor: 0.0625
topology.builtin.metrics.bucket.size.secs: 60
topology.fall.back.on.java.serialization: false
topology.worker.childopts: null
@@ -287,16 +289,16 @@ topology.worker.shared.thread.pool.size: 4
# Spout Wait Strategy - employed when there is no data to produce
topology.spout.wait.strategy: "org.apache.storm.policy.WaitStrategyProgressive"
-topology.spout.wait.park.microsec : 100 # park time for org.apache.storm.policy.WaitStrategyPark. Busy spins if set to 0.
+topology.spout.wait.park.microsec: 100 # park time for org.apache.storm.policy.WaitStrategyPark. Busy spins if set to 0.
topology.spout.wait.progressive.level1.count: 0 # number of iterations to spend in level 1 [no sleep] of WaitStrategyProgressive, before progressing to level 2
topology.spout.wait.progressive.level2.count: 0 # number of iterations to spend in level 2 [parkNanos(1)] of WaitStrategyProgressive, before progressing to level 3
topology.spout.wait.progressive.level3.sleep.millis: 1 # sleep duration for idling iterations in level 3 of WaitStrategyProgressive
# Bolt Wait Strategy - employed when there is no data in its receive buffer to process
-topology.bolt.wait.strategy : "org.apache.storm.policy.WaitStrategyProgressive"
+topology.bolt.wait.strategy: "org.apache.storm.policy.WaitStrategyProgressive"
-topology.bolt.wait.park.microsec : 100 # park time for org.apache.storm.policy.WaitStrategyPark. Busy spins if set to 0.
+topology.bolt.wait.park.microsec: 100 # park time for org.apache.storm.policy.WaitStrategyPark. Busy spins if set to 0.
topology.bolt.wait.progressive.level1.count: 1 # number of iterations to spend in level 1 [no sleep] of WaitStrategyProgressive, before progressing to level 2
topology.bolt.wait.progressive.level2.count: 1000 # number of iterations to spend in level 2 [parkNanos(1)] of WaitStrategyProgressive, before progressing to level 3
@@ -363,7 +365,7 @@ blacklist.scheduler.assume.supervisor.bad.based.on.bad.slot: true
dev.zookeeper.path: "/tmp/dev-storm-zookeeper"
-pacemaker.servers: []
+pacemaker.servers: [ ]
pacemaker.port: 6699
pacemaker.base.threads: 10
pacemaker.max.threads: 50
@@ -371,12 +373,12 @@ pacemaker.client.max.threads: 2
pacemaker.thread.timeout: 10
pacemaker.childopts: "-Xmx1024m"
pacemaker.auth.method: "NONE"
-pacemaker.kerberos.users: []
+pacemaker.kerberos.users: [ ]
pacemaker.thrift.message.size.max: 10485760
#default storm daemon metrics reporter plugins
storm.daemon.metrics.reporter.plugins:
- - "org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter"
+ - "org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter"
storm.daemon.metrics.reporter.interval.secs: 10
storm.metricstore.class: "org.apache.storm.metricstore.rocksdb.RocksDbStore"
@@ -399,8 +401,8 @@ storm.cgroup.inherit.cpuset.configs: false
# Configs for CGroup support
storm.cgroup.hierarchy.dir: "/cgroup/storm_resources"
storm.cgroup.resources:
- - "cpu"
- - "memory"
+ - "cpu"
+ - "memory"
storm.cgroup.hierarchy.name: "storm"
storm.supervisor.cgroup.rootdir: "storm"
storm.cgroup.cgexec.cmd: "/bin/cgexec"
@@ -419,12 +421,12 @@ storm.worker.min.cpu.pcore.percent: 0.0
storm.topology.classpath.beginning.enabled: false
worker.metrics:
- "CGroupMemory": "org.apache.storm.metrics2.cgroup.CGroupMemoryUsage"
- "CGroupMemoryLimit": "org.apache.storm.metrics2.cgroup.CGroupMemoryLimit"
- "CGroupCpu": "org.apache.storm.metrics2.cgroup.CGroupCpu"
- "CGroupCpuGuarantee": "org.apache.storm.metrics2.cgroup.CGroupCpuGuarantee"
- "CGroupCpuGuaranteeByCfsQuota": "org.apache.storm.metrics2.cgroup.CGroupCpuGuaranteeByCfsQuota"
- "CGroupCpuStat": "org.apache.storm.metrics2.cgroup.CGroupCpuStat"
+ "CGroupMemory": "org.apache.storm.metrics2.cgroup.CGroupMemoryUsage"
+ "CGroupMemoryLimit": "org.apache.storm.metrics2.cgroup.CGroupMemoryLimit"
+ "CGroupCpu": "org.apache.storm.metrics2.cgroup.CGroupCpu"
+ "CGroupCpuGuarantee": "org.apache.storm.metrics2.cgroup.CGroupCpuGuarantee"
+ "CGroupCpuGuaranteeByCfsQuota": "org.apache.storm.metrics2.cgroup.CGroupCpuGuaranteeByCfsQuota"
+ "CGroupCpuStat": "org.apache.storm.metrics2.cgroup.CGroupCpuStat"
# The number of buckets for running statistics
num.stat.buckets: 20
diff --git a/dev-tools/simulators/jitter_aware_scheduler_sim.py.py b/dev-tools/simulators/jitter_aware_scheduler_sim.py.py
new file mode 100644
index 00000000000..8e0a1dd7a35
--- /dev/null
+++ b/dev-tools/simulators/jitter_aware_scheduler_sim.py.py
@@ -0,0 +1,545 @@
+"""
+JCQueue Discrete Event Simulator
+=================================
+Compares two Apache Storm JCQueue scheduling strategies:
+
+ 1. FIFO pure — poll one tuple at a time, O(1), no reordering
+ 2. FIFO + batch — poll N tuples, sort by downstream EWMA jitter
+ (lowest-jitter task served first, mirrors
+ TaskJitterComparator), pay a flat reorder_cost
+ in ticks before draining the batch
+
+Event model
+-----------
+ - Each producer fires ARRIVE events at Poisson-like intervals
+ (Uniform[1, arrive_rate]).
+ - The consumer fires CONSUME events; it is single-threaded and
+ processes consume_cost ticks per tuple.
+ - The event heap drives a proper discrete-event loop — arrival and
+ consumption advance on independent clocks, so queue depth and
+ back-pressure emerge naturally.
+
+Metrics
+-------
+ Throughput — events consumed / total ticks (rolling window + overall)
+ Latency — clock-ticks from tuple birth to end-of-consume (P50/P95/P99/avg)
+ EWMA jitter — per child-task inter-delivery deviation from its EWMA prediction;
+ jitter_t = |actual_inter - ewma_before_update|,
+ global_jitter = EWMA(jitter_t) across all child tasks
+
+Usage
+-----
+ python jcqueue_sim.py # defaults
+ python jcqueue_sim.py --events 5000 \\
+ --batch-n 8 --reorder-cost 5 --alpha 0.1
+ python jcqueue_sim.py --plot # requires matplotlib
+ python jcqueue_sim.py --sweep reorder-cost # sensitivity sweep
+"""
+
+import argparse
+import heapq
+import random
+import statistics
+import sys
+from dataclasses import dataclass
+from typing import List, Optional
+
+
+# ---------------------------------------------------------------------------
+# Constants
+# ---------------------------------------------------------------------------
+
+ARRIVE = 0
+CONSUME = 1
+
+
+# ---------------------------------------------------------------------------
+# Result container
+# ---------------------------------------------------------------------------
+
+@dataclass
+class SimResult:
+ name: str
+ throughput: float # events / total ticks
+ avg_latency: float # mean ticks from birth to consume
+ p50_latency: float
+ p95_latency: float
+ p99_latency: float
+ avg_jitter: float # final global EWMA jitter
+ tp_windows: List[float] # throughput snapshot per window
+ jit_windows: List[float] # ewma jitter snapshot per window
+ lat_all: List[float] # raw per-event latencies (for CDF)
+
+
+# ---------------------------------------------------------------------------
+# Core simulator
+# ---------------------------------------------------------------------------
+
+def simulate(
+ *,
+ n_producers: int = 4,
+ queue_size: int = 64,
+ arrive_rate: int = 3, # max ticks between successive arrivals per producer
+ consume_cost: int = 12, # ticks to process one tuple
+ use_batch: bool = False,
+ batch_n: int = 8, # reorder buffer size (JCQueue: REORDER_BUFFER_SIZE)
+ reorder_cost: int = 5, # overhead ticks for the batch sort
+ alpha: float = 0.10, # EWMA smoothing factor
+ n_events: int = 5000, # tuples to consume before stopping
+ n_tasks: int = 6, # distinct downstream task ids
+ task_skew: float = 0.40, # fraction of extra traffic sent to hot task-0
+ window_size: int = 50, # events between metric snapshots
+ seed: Optional[int] = 42,
+ name: str = "sim",
+) -> SimResult:
+ """
+ Run one simulation and return a SimResult.
+
+ The consumer is intentionally slower than the aggregate producer rate
+ (consume_cost > arrive_rate) so the queue builds pressure and latency
+ accumulates meaningfully.
+ """
+ rng = random.Random(seed)
+
+ # Task weight distribution: task-0 is hot
+ weights = [1.0] * n_tasks
+ weights[0] += task_skew * (n_tasks - 1)
+ total_w = sum(weights)
+ cum_w = [sum(weights[: i + 1]) / total_w for i in range(n_tasks)]
+
+ def pick_task() -> int:
+ r = rng.random()
+ for i, cw in enumerate(cum_w):
+ if r <= cw:
+ return i
+ return n_tasks - 1
+
+ # Event heap: (time, event_type, payload)
+ heap: list = []
+ for pid in range(n_producers):
+ t = rng.randint(1, max(1, arrive_rate))
+ heapq.heappush(heap, (t, ARRIVE, pid))
+
+ queue: List[dict] = [] # {born: int, task: int}
+ consumer_free: int = 0
+ consumed: int = 0
+ produced: int = 0
+ produce_limit = int(n_events * 1.5)
+
+ latencies: List[float] = []
+
+ # Per child-task delivery cadence.
+ # task_ewma[t] : EWMA of inter-delivery interval to child task t.
+ # Seeded on first delivery; no jitter on first sample.
+ # task_last_seen[t] : clock tick of the last tuple delivered to task t.
+ # jitter per sample = |actual_inter - ewma_before_update| (prediction error)
+ # global_jitter : EWMA of per-sample jitter across all child tasks.
+ task_ewma: List[float] = [0.0] * n_tasks
+ task_last_seen: List[int] = [-1] * n_tasks
+ global_jitter: float = 0.0
+
+ # Windowed snapshot accumulators
+ tp_windows: List[float] = []
+ jit_windows: List[float] = []
+ win_consumed: int = 0
+ win_start: int = 0
+
+ # Main event loop
+ while consumed < n_events:
+ if not heap:
+ break
+
+ now, etype, payload = heapq.heappop(heap)
+
+ if etype == ARRIVE:
+ pid = payload
+ if produced < produce_limit:
+ if len(queue) < queue_size:
+ queue.append({"born": now, "task": pick_task()})
+ produced += 1
+
+ # Reschedule producer
+ nxt = now + max(1, rng.randint(1, arrive_rate))
+ heapq.heappush(heap, (nxt, ARRIVE, pid))
+
+ # Wake consumer if idle
+ if consumer_free <= now and queue:
+ heapq.heappush(heap, (now, CONSUME, None))
+
+ elif etype == CONSUME:
+ if not queue or consumed >= n_events:
+ consumer_free = now
+ continue
+
+ start = max(now, consumer_free)
+
+ if use_batch:
+ take = min(batch_n, len(queue))
+ batch = queue[:take]
+ del queue[:take]
+ if take > 1:
+ # Mirror TaskJitterComparator: sort by child-task EWMA inter-delivery
+ # interval ascending — the most frequent / stable downstream tasks
+ # are served first, smoothing their receive cadence.
+ batch.sort(key=lambda t: task_ewma[t["task"]])
+ start += reorder_cost
+ else:
+ batch = [queue.pop(0)]
+
+ t_end = start
+ for item in batch:
+ t_end += consume_cost
+ lat = t_end - item["born"]
+ latencies.append(float(lat))
+
+ # Update child-task delivery cadence and compute jitter.
+ # inter : actual gap between consecutive deliveries to this task.
+ # ewma_pred : what the EWMA predicted before seeing this sample.
+ # jitter : absolute prediction error |actual - predicted|.
+ # On first delivery the EWMA is seeded with the observed interval
+ # (no jitter recorded — there is no prior prediction to compare).
+ tk = item["task"]
+ if task_last_seen[tk] >= 0:
+ inter = t_end - task_last_seen[tk]
+ ewma_pred = task_ewma[tk] # prediction BEFORE update
+ task_ewma[tk] = alpha * inter + (1.0 - alpha) * ewma_pred
+ jitter = abs(inter - ewma_pred) # deviation from prediction
+ global_jitter = alpha * jitter + (1.0 - alpha) * global_jitter
+ else:
+ # First delivery: seed EWMA, no jitter sample
+ task_ewma[tk] = float(consume_cost)
+ task_last_seen[tk] = t_end
+
+ consumed += 1
+ win_consumed += 1
+
+ if win_consumed >= window_size:
+ elapsed = t_end - win_start or 1
+ tp_windows.append(win_consumed / elapsed)
+ jit_windows.append(global_jitter)
+ win_consumed = 0
+ win_start = t_end
+
+ if consumed >= n_events:
+ break
+
+ consumer_free = t_end
+
+ if queue and consumed < n_events:
+ heapq.heappush(heap, (consumer_free, CONSUME, None))
+
+ # Flush trailing window
+ if win_consumed > 0:
+ elapsed = consumer_free - win_start or 1
+ tp_windows.append(win_consumed / elapsed)
+ jit_windows.append(global_jitter)
+
+ total_time = consumer_free or 1
+
+ def percentile(data: List[float], p: float) -> float:
+ if not data:
+ return 0.0
+ s = sorted(data)
+ return s[max(0, int(len(s) * p / 100) - 1)]
+
+ return SimResult(
+ name=name,
+ throughput=consumed / total_time,
+ avg_latency=statistics.mean(latencies) if latencies else 0.0,
+ p50_latency=percentile(latencies, 50),
+ p95_latency=percentile(latencies, 95),
+ p99_latency=percentile(latencies, 99),
+ avg_jitter=global_jitter,
+ tp_windows=tp_windows,
+ jit_windows=jit_windows,
+ lat_all=latencies,
+ )
+
+
+# ---------------------------------------------------------------------------
+# Text report
+# ---------------------------------------------------------------------------
+
+def _bar(val: float, ref: float, width: int = 28) -> str:
+ if ref <= 0:
+ return "░" * width
+ filled = int(width * min(val / ref, 1.0))
+ return "█" * filled + "░" * (width - filled)
+
+
+def _delta(fifo_val: float, batch_val: float, lower_is_better: bool) -> str:
+ if fifo_val == 0:
+ return " n/a"
+ diff_pct = (batch_val - fifo_val) / fifo_val * 100
+ sign = "+" if diff_pct >= 0 else ""
+ better = (diff_pct < 0) if lower_is_better else (diff_pct > 0)
+ tag = "✓ better" if better else "✗ worse "
+ return f"batch {sign}{diff_pct:5.1f}% ({tag})"
+
+
+def print_report(fifo: SimResult, batch: SimResult, cfg: dict) -> None:
+ W = 74
+ sep = "─" * W
+
+ print(f"\n{'JCQueue Discrete Event Simulation':^{W}}")
+ print(sep)
+ print(
+ f" producers={cfg['n_producers']} queue_size={cfg['queue_size']}"
+ f" arrive_rate={cfg['arrive_rate']} consume_cost={cfg['consume_cost']}"
+ )
+ print(
+ f" batch_n={cfg['batch_n']} reorder_cost={cfg['reorder_cost']}"
+ f" alpha={cfg['alpha']} skew={cfg['task_skew']:.0%}"
+ f" events={cfg['n_events']}"
+ )
+ print(sep)
+ print(f" {'Metric':<28} {'FIFO pure':>10} {'Batch+reorder':>13} Delta")
+ print(sep)
+
+ rows = [
+ ("Throughput (evt/tick)", fifo.throughput, batch.throughput, False),
+ ("Avg latency (ticks)", fifo.avg_latency, batch.avg_latency, True),
+ ("P50 latency (ticks)", fifo.p50_latency, batch.p50_latency, True),
+ ("P95 latency (ticks)", fifo.p95_latency, batch.p95_latency, True),
+ ("P99 latency (ticks)", fifo.p99_latency, batch.p99_latency, True),
+ ("Avg EWMA jitter", fifo.avg_jitter, batch.avg_jitter, True),
+ ]
+ for label, fv, bv, lib in rows:
+ d = _delta(fv, bv, lib)
+ print(f" {label:<28} {fv:>10.4f} {bv:>13.4f} {d}")
+
+ print(sep)
+ print("\n Latency bars (normalised to FIFO P99):\n")
+ ref = fifo.p99_latency or 1.0
+ for label, fv, bv in [
+ ("P50", fifo.p50_latency, batch.p50_latency),
+ ("P95", fifo.p95_latency, batch.p95_latency),
+ ("P99", fifo.p99_latency, batch.p99_latency),
+ ]:
+ print(f" FIFO {label} {_bar(fv, ref)} {fv:,.0f} ticks")
+ print(f" Batch {label} {_bar(bv, ref)} {bv:,.0f} ticks")
+ print()
+
+ print(sep)
+ n = min(20, len(fifo.tp_windows), len(batch.tp_windows))
+ print(f"\n Throughput windows (first {n}):\n")
+ print(f" {'Win':>4} {'FIFO':>10} {'Batch':>10} {'Delta%':>8}")
+ for i in range(n):
+ fv = fifo.tp_windows[i]
+ bv = batch.tp_windows[i]
+ d = (bv - fv) / fv * 100 if fv else 0.0
+ sign = "+" if d >= 0 else ""
+ print(f" {i+1:>4} {fv:>10.5f} {bv:>10.5f} {sign}{d:>6.1f}%")
+
+ print(f"\n{sep}\n")
+
+
+# ---------------------------------------------------------------------------
+# Sensitivity sweep
+# ---------------------------------------------------------------------------
+
+_SWEEP_RANGES = {
+ "reorder-cost": [0, 2, 5, 10, 15, 20],
+ "batch-n": [2, 4, 8, 12, 16],
+ "alpha": [0.05, 0.10, 0.20, 0.30, 0.50],
+ "task-skew": [0.0, 0.2, 0.4, 0.6, 0.8, 1.0],
+ "consume-cost": [4, 8, 12, 16, 24],
+}
+
+
+def run_sweep(param: str, values: list, base_cfg: dict) -> None:
+ print(f"\n{'─'*74}")
+ print(f" Sensitivity sweep: {param}")
+ print(f"{'─'*74}")
+ print(
+ f" {'Value':>8} {'FIFO tp':>9} {'Batch tp':>9}"
+ f" {'FIFO lat':>9} {'Batch lat':>9}"
+ f" {'FIFO jit':>9} {'Batch jit':>9}"
+ )
+ print(f"{'─'*74}")
+ for v in values:
+ cfg = dict(base_cfg)
+ cfg[param.replace("-", "_")] = v
+ fifo = simulate(**cfg, use_batch=False, name="fifo")
+ batch = simulate(**cfg, use_batch=True, name="batch")
+ print(
+ f" {str(v):>8} {fifo.throughput:>9.5f} {batch.throughput:>9.5f}"
+ f" {fifo.avg_latency:>9.1f} {batch.avg_latency:>9.1f}"
+ f" {fifo.avg_jitter:>9.3f} {batch.avg_jitter:>9.3f}"
+ )
+ print()
+
+
+# ---------------------------------------------------------------------------
+# Matplotlib plots
+# ---------------------------------------------------------------------------
+
+def plot_results(fifo: SimResult, batch: SimResult, cfg: dict) -> None:
+ try:
+ import matplotlib.pyplot as plt
+ import matplotlib.gridspec as gridspec
+ except ImportError:
+ print("matplotlib not installed: pip install matplotlib")
+ return
+
+ BLUE = "#378ADD"
+ GREEN = "#1D9E75"
+
+ fig = plt.figure(figsize=(14, 10))
+ title = (
+ f"JCQueue DES — N={cfg['batch_n']}, reorder_cost={cfg['reorder_cost']}, "
+ f"alpha={cfg['alpha']}, consume_cost={cfg['consume_cost']}, "
+ f"skew={cfg['task_skew']:.0%}, events={cfg['n_events']}"
+ )
+ fig.suptitle(title, fontsize=11, fontweight="bold")
+ gs = gridspec.GridSpec(2, 2, figure=fig, hspace=0.42, wspace=0.32)
+
+ n = min(len(fifo.tp_windows), len(batch.tp_windows))
+ xs = list(range(1, n + 1))
+
+ # 1. Throughput over time
+ ax1 = fig.add_subplot(gs[0, 0])
+ ax1.plot(xs, fifo.tp_windows[:n], label="FIFO pure", color=BLUE, lw=1.5)
+ ax1.plot(xs, batch.tp_windows[:n], label="Batch+reorder", color=GREEN, lw=1.5, ls="--")
+ ax1.set_title("Throughput (evt/tick)", fontsize=11)
+ ax1.set_xlabel("Window index")
+ ax1.set_ylabel("Events / tick")
+ ax1.legend(fontsize=9)
+ ax1.grid(alpha=0.3)
+
+ # 2. EWMA Jitter over time
+ ax2 = fig.add_subplot(gs[0, 1])
+ ax2.plot(xs, fifo.jit_windows[:n], label="FIFO pure", color=BLUE, lw=1.5)
+ ax2.plot(xs, batch.jit_windows[:n], label="Batch+reorder", color=GREEN, lw=1.5, ls="--")
+ ax2.set_title("EWMA Jitter over time", fontsize=11)
+ ax2.set_xlabel("Window index")
+ ax2.set_ylabel("Jitter (ticks)")
+ ax2.legend(fontsize=9)
+ ax2.grid(alpha=0.3)
+
+ # 3. Latency percentile bars
+ ax3 = fig.add_subplot(gs[1, 0])
+ pct_labels = ["P50", "P95", "P99"]
+ fifo_vals = [fifo.p50_latency, fifo.p95_latency, fifo.p99_latency]
+ batch_vals = [batch.p50_latency, batch.p95_latency, batch.p99_latency]
+ xp = range(len(pct_labels))
+ w = 0.35
+ ax3.bar([i - w/2 for i in xp], fifo_vals, width=w, label="FIFO pure", color=BLUE, alpha=0.85)
+ ax3.bar([i + w/2 for i in xp], batch_vals, width=w, label="Batch+reorder", color=GREEN, alpha=0.85)
+ ax3.set_xticks(list(xp))
+ ax3.set_xticklabels(pct_labels)
+ ax3.set_title("Latency percentiles", fontsize=11)
+ ax3.set_ylabel("Ticks")
+ ax3.legend(fontsize=9)
+ ax3.grid(axis="y", alpha=0.3)
+
+ # 4. Latency CDF
+ ax4 = fig.add_subplot(gs[1, 1])
+ for res, label, c in [
+ (fifo, "FIFO pure", BLUE),
+ (batch, "Batch+reorder", GREEN),
+ ]:
+ s = sorted(res.lat_all)
+ cdf = [i / len(s) for i in range(len(s))]
+ ax4.plot(s, cdf, label=label, color=c, lw=1.5)
+ for q in (0.50, 0.95, 0.99):
+ ax4.axhline(q, color="gray", lw=0.7, ls=":")
+ ax4.set_title("Latency CDF", fontsize=11)
+ ax4.set_xlabel("Latency (ticks)")
+ ax4.set_ylabel("Cumulative probability")
+ ax4.legend(fontsize=9)
+ ax4.grid(alpha=0.3)
+
+ out = "jcqueue_sim.png"
+ plt.savefig(out, dpi=150, bbox_inches="tight")
+ print(f"Plot saved → {out}")
+ plt.show()
+
+
+# ---------------------------------------------------------------------------
+# CLI
+# ---------------------------------------------------------------------------
+
+def build_parser() -> argparse.ArgumentParser:
+ p = argparse.ArgumentParser(
+ description="JCQueue DES: FIFO pure vs FIFO+batch reorder",
+ formatter_class=argparse.ArgumentDefaultsHelpFormatter,
+ )
+ p.add_argument("--producers", type=int, default=4,
+ help="Number of concurrent producer threads")
+ p.add_argument("--queue-size", type=int, default=64,
+ help="Bounded recvQueue capacity")
+ p.add_argument("--arrive-rate", type=int, default=3,
+ help="Max inter-arrival ticks per producer (Uniform[1,R])")
+ p.add_argument("--consume-cost", type=int, default=12,
+ help="Ticks to process one tuple (consumer speed)")
+ p.add_argument("--batch-n", type=int, default=8,
+ help="Reorder buffer size N")
+ p.add_argument("--reorder-cost", type=int, default=5,
+ help="Extra ticks overhead for sorting a batch")
+ p.add_argument("--alpha", type=float, default=0.10,
+ help="EWMA smoothing factor alpha in (0,1)")
+ p.add_argument("--events", type=int, default=5000,
+ help="Total tuples to consume")
+ p.add_argument("--tasks", type=int, default=6,
+ help="Number of distinct downstream task ids")
+ p.add_argument("--task-skew", type=float, default=0.40,
+ help="Extra traffic fraction to hot task-0 (0=uniform)")
+ p.add_argument("--window", type=int, default=50,
+ help="Events per metric snapshot window")
+ p.add_argument("--seed", type=int, default=42,
+ help="Random seed (0 = non-deterministic)")
+ p.add_argument("--plot", action="store_true",
+ help="Render matplotlib charts (pip install matplotlib)")
+ p.add_argument("--sweep", type=str, default=None, metavar="PARAM",
+ help=(
+ "Sensitivity sweep over PARAM. Choices: "
+ + ", ".join(_SWEEP_RANGES.keys())
+ ))
+ return p
+
+
+def main() -> None:
+ args = build_parser().parse_args()
+ seed = args.seed if args.seed != 0 else None
+
+ base_cfg = dict(
+ n_producers = args.producers,
+ queue_size = args.queue_size,
+ arrive_rate = args.arrive_rate,
+ consume_cost = args.consume_cost,
+ batch_n = args.batch_n,
+ reorder_cost = args.reorder_cost,
+ alpha = args.alpha,
+ n_events = args.events,
+ n_tasks = args.tasks,
+ task_skew = args.task_skew,
+ window_size = args.window,
+ seed = seed,
+ )
+
+ if args.sweep:
+ param = args.sweep
+ if param not in _SWEEP_RANGES:
+ print(f"Unknown sweep param '{param}'. Choose from: {list(_SWEEP_RANGES)}")
+ sys.exit(1)
+ run_sweep(param, _SWEEP_RANGES[param], base_cfg)
+ return
+
+ print(f"\nRunning FIFO pure ...", end=" ", flush=True)
+ fifo = simulate(**base_cfg, use_batch=False, name="FIFO pure")
+ print("done.")
+
+ print(f"Running Batch+reorder (N={args.batch_n}, cost={args.reorder_cost}) ...",
+ end=" ", flush=True)
+ batch = simulate(**base_cfg, use_batch=True, name="Batch+reorder")
+ print("done.")
+
+ print_report(fifo, batch, base_cfg)
+
+ if args.plot:
+ plot_results(fifo, batch, base_cfg)
+
+
+if __name__ == "__main__":
+ main()
\ No newline at end of file
diff --git a/docs/Metrics.md b/docs/Metrics.md
index 4a2db2728b7..fe13e9d80f5 100644
--- a/docs/Metrics.md
+++ b/docs/Metrics.md
@@ -180,15 +180,22 @@ Similar to the tuple counting metrics storm also collects average latency metric
##### `__complete-latency`
-The complete latency is just for spouts. It is the average amount of time it took for `ack` or `fail` to be called for a tuple after it was emitted. If acking is disabled this metric is likely to be blank or 0 for all values, and should be ignored.
+The complete latency is just for spouts. It is the average amount of time it took for `ack` or `fail` to be called for a
+tuple after it was emitted. If acking is disabled this metric is likely to be blank or 0 for all values, and should be
+ignored.
##### `__execute-latency`
-This is just for bolts. It is the average amount of time that the bolt spent in the call to the `execute` method. The higher this gets, the lower the throughput of tuples per bolt instance.
+This is just for bolts. It is the average amount of time that the bolt spent in the call to the `execute` method. The
+higher this gets, the lower the throughput of tuples per bolt instance.
##### `__process-latency`
-This is also just for bolts. It is the average amount of time between when `execute` was called to start processing a tuple, to when it was acked or failed by the bolt. If your bolt is a very simple bolt and the processing is synchronous then `__process-latency` and `__execute-latency` should be very close to one another, with process latency being slightly smaller. If you are doing a join or have asynchronous processing then it may take a while for a tuple to be acked so the process latency would be higher than the execute latency.
+This is also just for bolts. It is the average amount of time between when `execute` was called to start processing a
+tuple, to when it was acked or failed by the bolt. If your bolt is a very simple bolt and the processing is synchronous
+then `__process-latency` and `__execute-latency` should be very close to one another, with process latency being
+slightly smaller. If you are doing a join or have asynchronous processing then it may take a while for a tuple to be
+acked so the process latency would be higher than the execute latency.
##### `__skipped-max-spout-ms`
@@ -207,6 +214,35 @@ This metric indicates the overflow count last time BP status was sent, with a mi
This metric records how much time a spout was idle because the topology was deactivated. This is the total time in milliseconds, not the average amount of time and is not sub-sampled.
+#### Tuple Jitter Metrics
+To activate jitter-based metrics, `topology.stats.ewma.enable` must be set to `true`, which switches the system to a jitter estimation based on an Exponential Moving Average (EWMA).
+In this model, jitter is dynamically updated by weighting new latency samples against historical data using a smoothing factor (`topology.stats.ewma.smoothing.factor`).
+This parameter, which defaults to 0.0625 (equivalent to $1/16$ or a 4-bit right shift), determines the metric's reactivity: higher values make the jitter more sensitive to recent spikes, while lower values prioritize long-term stability.
+Operators should be aware that enabling this feature triples the gauge count for every component-stream pair per task; this significant increase in metric cardinality can impact TSDB storage and costs, so backend capacity should be verified before deployment.
+
+##### `__complete-jitter`
+
+This metric is specific to spouts. It measures the variation (jitter) in the total completion time (end-to-end latency)
+of tuples, calculated using the exponentially weighted moving average (EWMA) algorithm as defined in RFC 1889 §A.8 / RFC 3550 §A.8.
+While `__complete-latency` indicates the average amount of time it took for a tuple to be fully processed by the
+topology (from emission to the final ack), the jitter metric quantifies the consistency of that process. If acking is
+disabled, this metric is likely to be blank or 0 and should be ignored.
+
+##### `__execute-jitter`
+
+This metric is specific to bolts. It measures the variation (jitter) in the time spent within the execute method,
+calculated using the exponentially weighted moving average (EWMA) algorithm as defined in RFC 1889 §A.8 / RFC 3550 §A.8.
+While `__execute-latency` provides the average time spent in the execute call, the jitter metric quantifies the
+predictability of that execution time. It is a critical indicator of computational "smoothness".
+
+##### `__process-jitter`
+
+This metric is specific to bolts. It measures the variation (jitter) in the process latency, calculated using the
+exponentially weighted moving average (EWMA) algorithm as defined in RFC 1889 §A.8 / RFC 3550 §A.8.
+While `__process-latency` provides the average time a tuple spends being processed, the jitter metric quantifies the
+stability of that processing time. It helps identify "noisy" execution environments where processing times fluctuate
+significantly, even if the average remains within acceptable limits.
+
#### Error Reporting Metrics
Storm also collects error reporting metrics for bolts and spouts.
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index e332b726b28..cb4b1e764b6 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -350,13 +350,13 @@ public class Config extends HashMap {
*
comp-1 cannot exist on same worker as comp-2 or comp-3, and at most "2" comp-1 on same node
*
comp-2 and comp-4 cannot be on same worker (missing comp-1 is implied from comp-1 constraint)
*/
- @IsExactlyOneOf(valueValidatorClasses = { ListOfListOfStringValidator.class, RasConstraintsTypeValidator.class })
+ @IsExactlyOneOf(valueValidatorClasses = {ListOfListOfStringValidator.class, RasConstraintsTypeValidator.class})
public static final String TOPOLOGY_RAS_CONSTRAINTS = "topology.ras.constraints";
/**
@@ -424,17 +424,17 @@ public class Config extends HashMap {
*
*
* 1. If not setting this variable or setting it as null,
- * a. If RAS is not used:
- * Nimbus will set it to {@link Config#TOPOLOGY_WORKERS}.
- * b. If RAS is used:
- * Nimbus will set it to (the estimate number of workers * {@link Config#TOPOLOGY_RAS_ACKER_EXECUTORS_PER_WORKER}).
- * {@link Config#TOPOLOGY_RAS_ACKER_EXECUTORS_PER_WORKER} is default to be 1 if not set.
+ * a. If RAS is not used:
+ * Nimbus will set it to {@link Config#TOPOLOGY_WORKERS}.
+ * b. If RAS is used:
+ * Nimbus will set it to (the estimate number of workers * {@link Config#TOPOLOGY_RAS_ACKER_EXECUTORS_PER_WORKER}).
+ * {@link Config#TOPOLOGY_RAS_ACKER_EXECUTORS_PER_WORKER} is default to be 1 if not set.
* 2. If this variable is set to 0,
- * then Storm will immediately ack tuples as soon as they come off the spout,
- * effectively disabling reliability.
+ * then Storm will immediately ack tuples as soon as they come off the spout,
+ * effectively disabling reliability.
* 3. If this variable is set to a positive integer,
- * Storm will not honor {@link Config#TOPOLOGY_RAS_ACKER_EXECUTORS_PER_WORKER} setting.
- * Instead, nimbus will set it as (this variable / estimate num of workers).
+ * Storm will not honor {@link Config#TOPOLOGY_RAS_ACKER_EXECUTORS_PER_WORKER} setting.
+ * Instead, nimbus will set it as (this variable / estimate num of workers).
*
*/
@IsInteger
@@ -465,7 +465,7 @@ public class Config extends HashMap {
*
Note that EventLoggerBolt takes care of all the implementations of IEventLogger, hence registering many
* implementations (especially they're implemented as 'blocking' manner) would slow down overall topology.
*/
- @IsListEntryCustom(entryValidatorClasses = { EventLoggerRegistryValidator.class })
+ @IsListEntryCustom(entryValidatorClasses = {EventLoggerRegistryValidator.class})
public static final String TOPOLOGY_EVENT_LOGGER_REGISTER = "topology.event.logger.register";
/**
* How many executors to spawn for event logger.
@@ -543,7 +543,7 @@ public class Config extends HashMap {
* it's parallelism is configurable.
*/
- @IsListEntryCustom(entryValidatorClasses = { MetricRegistryValidator.class })
+ @IsListEntryCustom(entryValidatorClasses = {MetricRegistryValidator.class})
public static final String TOPOLOGY_METRICS_CONSUMER_REGISTER = "topology.metrics.consumer.register";
/**
* Enable tracking of network message byte counts per source-destination task. This is off by default as it creates tasks^2 metric
@@ -596,6 +596,58 @@ public class Config extends HashMap {
*/
@IsPositiveNumber
public static final String TOPOLOGY_STATS_SAMPLE_RATE = "topology.stats.sample.rate";
+ /**
+ * Enabling jitter streaming calculation (RFC 1889 §A.8).
+ *
+ * @see RFC 1889 §A.8
+ */
+ @IsBoolean
+ public static final String TOPOLOGY_STATS_EWMA_ENABLE = "topology.stats.ewma.enable";
+ /**
+ * The smoothing factor (alpha) used for exponential jitter calculation (RFC 1889 §A.8). The default value is set to 1/16.
+ *
+ * @see RFC 1889 §A.8
+ */
+ @CustomValidator(validatorClass = ConfigValidation.ZeroOneOpenIntervalValidator.class)
+ public static final String TOPOLOGY_STATS_EWMA_SMOOTHING_FACTOR = "topology.stats.ewma.smoothing.factor";
+ /**
+ * Flag to enable or disable the feedback channel for upstream communication.
+ * When true, components can send unanchored tuples back to their source tasks.
+ */
+ @IsBoolean
+ public static final String TOPOLOGY_UPSTREAM_FEEDBACK_ENABLE = "topology.upstream.feedback.enable";
+ /**
+ * The specific stream ID used for upstream feedback communication.
+ * Defaults to "__feedback" if not explicitly configured.
+ */
+ @IsString
+ public static final String TOPOLOGY_UPSTREAM_FEEDBACK_STREAM_ID = "topology.upstream.feedback.stream";
+ /**
+ * Configuration for the sampling rate of upstream feedback messages within the topology.
+ *
+ *
This ratio defines the probability with which a task will emit a feedback tuple
+ * (containing metrics such as EWMA jitter stats) back to its parent tasks.
+ * This mechanism allows parent tasks to receive performance signals from downstream
+ * components to facilitate adaptive flow control or load balancing.
+ *
+ *
Validation: Must be a double value within the open interval (0.0, 1.0).
+ * Values of 0.0 (disabled) or 1.0 (every tuple) are rejected by the
+ * {@link ConfigValidation.ZeroOneOpenIntervalValidator} to prevent improper
+ * configuration of the feedback loop.
+ *
+ *
Impact:
+ *
+ *
Higher values provide more precise, real-time performance data but increase
+ * network overhead and CPU usage on the control plane.
+ *
Lower values minimize the "observer effect" on the topology's throughput
+ * while still providing statistical snapshots of health.
+ *
+ *
+ *
+ * Defaults to 0.1 if not explicitly configured.
+ */
+ @CustomValidator(validatorClass = ConfigValidation.ZeroOneOpenIntervalValidator.class)
+ public static final String TOPOLOGY_UPSTREAM_FEEDBACK_RATIO = "topology.upstream.feedback.ratio";
/**
* The time period that builtin metrics data in bucketed into.
*/
@@ -754,6 +806,12 @@ public class Config extends HashMap {
@IsPositiveNumber
@NotNull
public static final String TOPOLOGY_BACKPRESSURE_CHECK_MILLIS = "topology.backpressure.check.millis";
+ /**
+ * Predict the backpressure consuming the jitter stats of downstream tasks. Produce deterministic flows.
+ * To produce the stats it is required to enable as well `topology.upstream.feedback.enable`.
+ */
+ @IsBoolean
+ public static final String TOPOLOGY_BACKPRESSURE_PREDICTION_ENABLE = "topology.backpressure.prediction.enable";
/**
* How often to send flush tuple to the executors for flushing out batched events.
*/
@@ -833,14 +891,14 @@ public class Config extends HashMap {
* Topology central logging sensitivity to determine who has access to logs in central logging system. The possible values are: S0 -
* Public (open to all users on grid) S1 - Restricted S2 - Confidential S3 - Secret (default.)
*/
- @IsString(acceptedValues = { "S0", "S1", "S2", "S3" })
+ @IsString(acceptedValues = {"S0", "S1", "S2", "S3"})
public static final String TOPOLOGY_LOGGING_SENSITIVITY = "topology.logging.sensitivity";
/**
* Log file the user can use to configure Log4j2.
* Can be a resource in the jar (specified with classpath:/path/to/resource) or a file.
* This configuration is applied in addition to the regular worker log4j2 configuration.
* The configs are merged according to the rules here:
- * https://logging.apache.org/log4j/2.x/manual/configuration.html#CompositeConfiguration
+ * https://logging.apache.org/log4j/2.x/manual/configuration.html#CompositeConfiguration
*/
@IsString
public static final String TOPOLOGY_LOGGING_CONFIG_FILE = "topology.logging.config";
@@ -884,7 +942,8 @@ public class Config extends HashMap {
* Alternatively set {@code storm.scheduler} to {@code org.apache.storm.scheduler.resource.ResourceAwareScheduler}
* using {@link Config#TOPOLOGY_SCHEDULER_STRATEGY} set to
* {@code org.apache.storm.scheduler.resource.strategies.scheduling.RoundRobinResourceAwareStrategy}
- * */
+ *
+ */
@IsInteger
@IsPositiveNumber
public static final String TOPOLOGY_ISOLATED_MACHINES = "topology.isolate.machines";
@@ -1434,22 +1493,34 @@ public class Config extends HashMap {
@IsString
public static final String STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME = "storm.zookeeper.topology.auth.scheme";
- /** Enable SSL/TLS for ZooKeeper client connection. */
+ /**
+ * Enable SSL/TLS for ZooKeeper client connection.
+ */
@IsBoolean
public static final String ZK_SSL_ENABLE = "storm.zookeeper.ssl.enable";
- /** Keystore location for ZooKeeper client connection over SSL. */
+ /**
+ * Keystore location for ZooKeeper client connection over SSL.
+ */
@IsString
public static final String STORM_ZOOKEEPER_SSL_KEYSTORE_PATH = "storm.zookeeper.ssl.keystore.path";
- /** Keystore password for ZooKeeper client connection over SSL. */
+ /**
+ * Keystore password for ZooKeeper client connection over SSL.
+ */
@IsString
public static final String STORM_ZOOKEEPER_SSL_KEYSTORE_PASSWORD = "storm.zookeeper.ssl.keystore.password";
- /** Truststore location for ZooKeeper client connection over SSL. */
+ /**
+ * Truststore location for ZooKeeper client connection over SSL.
+ */
@IsString
public static final String STORM_ZOOKEEPER_SSL_TRUSTSTORE_PATH = "storm.zookeeper.ssl.truststore.path";
- /** Truststore password for ZooKeeper client connection over SSL. */
+ /**
+ * Truststore password for ZooKeeper client connection over SSL.
+ */
@IsString
public static final String STORM_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD = "storm.zookeeper.ssl.truststore.password";
- /** Enable or disable hostname verification.*/
+ /**
+ * Enable or disable hostname verification.
+ */
@IsBoolean
public static final String STORM_ZOOKEEPER_SSL_HOSTNAME_VERIFICATION = "storm.zookeeper.ssl.hostnameVerification";
/**
@@ -1462,13 +1533,13 @@ public class Config extends HashMap {
/**
* Configure the topology metrics reporters to be used on workers.
*/
- @IsListEntryCustom(entryValidatorClasses = { MetricReportersValidator.class })
+ @IsListEntryCustom(entryValidatorClasses = {MetricReportersValidator.class})
public static final String TOPOLOGY_METRICS_REPORTERS = "topology.metrics.reporters";
/**
* A list of system metrics reporters that will get added to each topology.
*/
- @IsListEntryCustom(entryValidatorClasses = { MetricReportersValidator.class })
+ @IsListEntryCustom(entryValidatorClasses = {MetricReportersValidator.class})
public static final String STORM_TOPOLOGY_METRICS_SYSTEM_REPORTERS = "storm.topology.metrics.system.reporters";
/**
@@ -1476,7 +1547,7 @@ public class Config extends HashMap {
* Use {@link Config#TOPOLOGY_METRICS_REPORTERS} instead.
*/
@Deprecated(forRemoval = true, since = "2.0.0")
- @IsListEntryCustom(entryValidatorClasses = { MetricReportersValidator.class })
+ @IsListEntryCustom(entryValidatorClasses = {MetricReportersValidator.class})
public static final String STORM_METRICS_REPORTERS = "storm.metrics.reporters";
/**
@@ -1511,6 +1582,7 @@ public class Config extends HashMap {
public static final String BLOBSTORE_HDFS_PRINCIPAL = "blobstore.hdfs.principal";
/**
* keytab for nimbus/supervisor to use to access secure hdfs for the blobstore.
+ *
* @Deprecated Use {@link Config#STORM_HDFS_LOGIN_KEYTAB} instead.
*/
@Deprecated
@@ -1753,7 +1825,7 @@ public class Config extends HashMap {
*/
@IsInteger
public static final String STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS =
- "storm.messaging.netty.client_worker_threads";
+ "storm.messaging.netty.client_worker_threads";
/**
* Netty based messaging: Enables TLS connections between workers.
@@ -1808,7 +1880,7 @@ public class Config extends HashMap {
*/
@IsString
public static final String STORM_MESSAGING_NETTY_TLS_CLIENT_TRUSTSTORE_PASSWORD =
- "storm.messaging.netty.tls.client.truststore.password";
+ "storm.messaging.netty.tls.client.truststore.password";
/**
* Netty based messaging: Specifies the client keystore when TLS is enabled.
@@ -1821,7 +1893,7 @@ public class Config extends HashMap {
*/
@IsString
public static final String STORM_MESSAGING_NETTY_TLS_CLIENT_KEYSTORE_PASSWORD =
- "storm.messaging.netty.tls.client.keystore.password";
+ "storm.messaging.netty.tls.client.keystore.password";
/**
* Netty based messaging: Specifies the protocols TLS is enabled.
@@ -1830,7 +1902,7 @@ public class Config extends HashMap {
public static final String STORM_MESSAGING_NETTY_TLS_SSL_PROTOCOLS = "storm.messaging.netty.tls.ssl.protocols";
/**
- /**
+ * /**
* Netty based messaging: The number of milliseconds that a Netty client will retry flushing messages that are already
* buffered to be sent.
*/
@@ -1915,7 +1987,7 @@ public class Config extends HashMap {
@IsPositiveNumber
@IsInteger
public static final String STORM_BLOBSTORE_DEPENDENCY_JAR_UPLOAD_CHUNK_SIZE_BYTES =
- "storm.blobstore.dependency.jar.upload.chunk.size.bytes";
+ "storm.blobstore.dependency.jar.upload.chunk.size.bytes";
/**
* FQCN of a class that implements {@code ISubmitterHook} @see ISubmitterHook for details.
*/
@@ -1924,8 +1996,8 @@ public class Config extends HashMap {
/**
* Impersonation user ACL config entries.
*/
- @IsMapEntryCustom(keyValidatorClasses = { ConfigValidation.StringValidator.class },
- valueValidatorClasses = { ConfigValidation.ImpersonationAclUserEntryValidator.class })
+ @IsMapEntryCustom(keyValidatorClasses = {ConfigValidation.StringValidator.class},
+ valueValidatorClasses = {ConfigValidation.ImpersonationAclUserEntryValidator.class})
public static final String NIMBUS_IMPERSONATION_ACL = "nimbus.impersonation.acl";
/**
* A whitelist of the RAS scheduler strategies allowed by nimbus. Should be a list of fully-qualified class names or null to allow all.
@@ -2430,6 +2502,7 @@ public static String getBlobstoreHDFSPrincipal(Map conf) throws UnknownHostExcep
/**
* Get the hostname substituted hdfs principal.
+ *
* @param conf the storm Configuration
* @return the principal
* @throws UnknownHostException on UnknowHostException
@@ -2458,6 +2531,7 @@ public static String getHdfsPrincipal(Map conf) throws UnknownHo
/**
* Get the hdfs keytab.
+ *
* @param conf the storm Configuration
* @return the keytab
*/
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java b/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
index b3cfd90d4d6..f9445bf2ef7 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
@@ -360,6 +360,19 @@ public static void addEventLogger(Map conf, StormTopology topolo
topology.put_to_bolts(EVENTLOGGER_COMPONENT_ID, eventLoggerBolt);
}
+ public static void addUpstreamFeedback(Map conf, StormTopology topology) {
+ Integer numExecutors = ObjectReader.getInt(conf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS),
+ ObjectReader.getInt(conf.get(Config.TOPOLOGY_WORKERS)));
+ if (numExecutors == null || numExecutors == 0) {
+ return;
+ }
+ String feedbackStreamId = ConfigUtils.upstreamFeedbackStreamId(conf);
+ for (Object component : allComponents(topology).values()) {
+ ComponentCommon common = getComponentCommon(component);
+ common.put_to_streams(feedbackStreamId, Thrift.outputFields(eventLoggerBoltFields()));
+ }
+ }
+
@SuppressWarnings("unchecked")
public static Map metricsConsumerBoltSpecs(Map conf, StormTopology topology) {
Map metricsConsumerBolts = new HashMap<>();
@@ -464,6 +477,10 @@ public static boolean hasEventLoggers(Map topoConf) {
return eventLoggerNum == null || ObjectReader.getInt(eventLoggerNum) > 0;
}
+ public static boolean hasUpstreamFeedback(Map topoConf) {
+ return ConfigUtils.upstreamFeedbackEnable(topoConf);
+ }
+
public static int numStartExecutors(Object component) throws InvalidTopologyException {
ComponentCommon common = getComponentCommon(component);
return Thrift.getParallelismHint(common);
@@ -538,6 +555,9 @@ protected StormTopology systemTopologyImpl(Map topoConf, StormTo
if (hasEventLoggers(topoConf)) {
addEventLogger(topoConf, ret);
}
+ if (hasUpstreamFeedback(topoConf)) {
+ addUpstreamFeedback(topoConf, ret);
+ }
addMetricComponents(topoConf, ret);
addSystemComponents(topoConf, ret);
addMetricStreams(ret);
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/Task.java b/storm-client/src/jvm/org/apache/storm/daemon/Task.java
index 45a1f2d7e3a..8020445229b 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/Task.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/Task.java
@@ -216,6 +216,27 @@ public void sendUnanchored(String stream, List
+ *
+ * @param task The {@link Task} associated with this update.
+ * @param tuple The {@link TupleImpl} emitted by the upstream feedback builder.
+ */
+ public void updateChildEwmaStats(Task task, TupleImpl tuple) {
+ if (!this.upstreamFeedbackEnabled || tuple == null) {
+ return;
+ }
+
+ List values = tuple.getValues();
+ if (values == null || values.size() < 2) {
+ LOG.warn("Feedback tuple for task {} has insufficient elements (size={})",
+ task.getTaskId(), values == null ? 0 : values.size());
+ return;
+ }
+
+ // Safe type check replaces unchecked cast and suppression
+ if (!(values.get(0) instanceof IMetricsConsumer.TaskInfo taskInfo)) {
+ LOG.warn("Unexpected type at index 0 in feedbackTuple for task {}: {}",
+ task.getTaskId(), values.get(0) == null ? "null" : values.get(0).getClass().getName());
+ return;
+ }
+
+ if (!(values.get(1) instanceof Collection> rawDataPoints)) {
+ LOG.warn("Unexpected type at index 1 in feedbackTuple for task {}: {}",
+ task.getTaskId(), values.get(1) == null ? "null" : values.get(1).getClass().getName());
+ return;
+ }
+
+ int taskId = task.getTaskId();
+ int childTaskId = taskInfo.srcTaskId;
+
+ // Filter to only valid DataPoint instances, safely skipping any unexpected elements
+ List dataPoints = rawDataPoints.stream()
+ .filter(IMetricsConsumer.DataPoint.class::isInstance)
+ .map(IMetricsConsumer.DataPoint.class::cast)
+ .toList();
+
+ if (!dataPoints.isEmpty()) {
+ Map metricsMap =
+ getOrCreateMetricsMap(taskId, childTaskId);
+
+ for (IMetricsConsumer.DataPoint dp : dataPoints) {
+ metricsMap.put(dp.name, dp);
+ }
+
+ // Invalidate cached averages for this taskId since underlying data changed
+ avgCache.remove(taskId);
+ }
+ }
+
+ /**
+ * Retrieves the collected metric statistics for all child tasks of a given parent task.
+ *
+ * @param taskId The ID of the parent task.
+ * @return A map of childTaskId to metric name to {@link IMetricsConsumer.DataPoint}.
+ * Returns an empty map if feedback is disabled or no data exists.
+ */
+ public Map> getChildEwmaStats(int taskId) {
+ if (!this.upstreamFeedbackEnabled) {
+ return Collections.emptyMap();
+ }
+ return this.childEwmaStats.getOrDefault(taskId, Collections.emptyMap());
+ }
+
+ /**
+ * Calculates the average value for each metric across all child tasks
+ * associated with the given parent task.
+ *
+ *
Results are cached per {@code taskId} and recomputed only when the
+ * underlying data has changed (i.e. after a call to {@link #updateChildEwmaStats}).
+ *
+ * @param taskId The ID of the parent task.
+ * @return A map of metric name to computed average value.
+ * Returns an empty map if no stats are found or feedback is disabled.
+ */
+ public Map getChildEwmaAvgStats(int taskId) {
+ Map> taskStats = this.getChildEwmaStats(taskId);
+ if (taskStats.isEmpty()) {
+ return Collections.emptyMap();
+ }
+ return avgCache.computeIfAbsent(taskId, this::computeAvgStats);
+ }
+
+ private Map computeAvgStats(int taskId) {
+ Map> taskStats = this.getChildEwmaStats(taskId);
+
+ Map accumulators = new HashMap<>();
+
+ for (Map childMetrics : taskStats.values()) {
+ for (Map.Entry entry : childMetrics.entrySet()) {
+ if (entry.getValue().value instanceof Number n) {
+ accumulators.merge(
+ entry.getKey(),
+ new StatsAccumulator(n.doubleValue(), 1),
+ StatsAccumulator::combine
+ );
+ }
+ }
+ }
+
+ return accumulators.entrySet().stream()
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ e -> e.getValue().average()
+ ));
+ }
+
+ private Map getOrCreateMetricsMap(int taskId, int childTaskId) {
+ return childEwmaStats
+ .computeIfAbsent(taskId, k -> new ConcurrentHashMap<>())
+ .computeIfAbsent(childTaskId, k -> new ConcurrentHashMap<>());
+ }
+
+ // Functional pattern
+ private record StatsAccumulator(double sum, int count) {
+ StatsAccumulator combine(StatsAccumulator other) {
+ return new StatsAccumulator(this.sum + other.sum, this.count + other.count);
+ }
+
+ double average() {
+ return count > 0 ? sum / count : 0.0;
+ }
+ }
+
+ private List buildEwmaDataPoints(int taskId, Set metrics) {
+ if (metrics == null || metrics.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ List dataPoints = new ArrayList<>(metrics.size());
+ Map allGauges = workerData.getMetricRegistry().getTaskGauges(taskId);
+
+ if (allGauges == null || allGauges.isEmpty()) {
+ return dataPoints;
+ }
+
+ for (String metricName : metrics) {
+ Gauge gauge = allGauges.get(metricName);
+
+ if (gauge != null) {
+ Object v = (gauge instanceof PerReporterGauge)
+ ? ((PerReporterGauge) gauge).getValueForReporter(this)
+ : gauge.getValue();
+ if (v instanceof Number) {
+ dataPoints.add(new IMetricsConsumer.DataPoint(metricName, v));
+ }
+ }
+ }
+ return dataPoints;
+ }
+
// updates v1 metric dataPoints with v2 metric API data
private void addV2Metrics(int taskId, List dataPoints, int interval) {
if (!enableV2MetricsDataPoints) {
diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
index 273bab5e69f..9f099ded740 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
@@ -198,6 +198,13 @@ public void tupleActionFn(int taskId, TupleImpl tuple) throws Exception {
outputCollector.flush();
} else if (Constants.METRICS_TICK_STREAM_ID.equals(streamId)) {
metricsTick(idToTask.get(taskId - idToTaskBase), tuple);
+ } else if (this.upstreamFeedbackStreamId.equals(streamId)) {
+ if (!this.upstreamFeedbackEnabled) {
+ LOG.debug("Upstream feedback skipped.");
+ } else {
+ // update internal metrics
+ this.updateChildEwmaStats(idToTask.get(taskId - idToTaskBase), tuple);
+ }
} else {
IBolt boltObject = (IBolt) idToTask.get(taskId - idToTaskBase).getTaskObject();
boolean isSampled = sampler.getAsBoolean();
diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
index 886a00c7f89..5fa89f6e3d5 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
@@ -12,13 +12,18 @@
package org.apache.storm.executor.bolt;
+import static org.apache.storm.metrics2.TaskMetrics.EWMA_METRICS_SET;
+
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
+import java.util.function.Supplier;
import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.StormCommon;
import org.apache.storm.daemon.Task;
import org.apache.storm.executor.ExecutorTransfer;
import org.apache.storm.hooks.info.BoltAckInfo;
@@ -29,6 +34,7 @@
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
@@ -43,6 +49,9 @@ public class BoltOutputCollectorImpl implements IOutputCollector {
private final int taskId;
private final Random random;
private final boolean isEventLoggers;
+ private final boolean isUpstreamFeedback;
+ private final String upstreamFeedbackStreamId;
+ private Supplier upstreamFeedbackRate;
private final ExecutorTransfer xsfer;
private final boolean isDebug;
private boolean ackingEnabled;
@@ -57,6 +66,19 @@ public BoltOutputCollectorImpl(BoltExecutor executor, Task taskData, Random rand
this.ackingEnabled = ackingEnabled;
this.isDebug = isDebug;
this.xsfer = executor.getExecutorTransfer();
+
+ // configure the upstream feedback if enabled.
+ Map conf = executor.getTopoConf();
+ if (StormCommon.hasUpstreamFeedback(conf)) {
+ this.isUpstreamFeedback = true;
+ this.upstreamFeedbackStreamId = ConfigUtils.upstreamFeedbackStreamId(conf);
+ double ratio = ConfigUtils.upstreamFeedbackRatio(conf);
+ this.upstreamFeedbackRate = () -> random.nextDouble() < ratio;
+ } else {
+ // explicitly declare
+ this.isUpstreamFeedback = false;
+ this.upstreamFeedbackStreamId = "__NOT_SET__";
+ }
}
@Override
@@ -93,6 +115,7 @@ private List boltEmit(String streamId, Collection anchors, List<
MessageId msgId;
if (ackingEnabled && anchors != null) {
final Map anchorsToIds = new HashMap<>();
+ final boolean sendUpstreamFeedback = isUpstreamFeedback && upstreamFeedbackRate.get();
for (Tuple a : anchors) { // perf critical path. would be nice to avoid iterator allocation here and below
Set rootIds = a.getMessageId().getAnchorsToIds().keySet();
if (rootIds.size() > 0) {
@@ -102,6 +125,11 @@ private List boltEmit(String streamId, Collection anchors, List<
putXor(anchorsToIds, rootId, edgeId);
}
}
+ if (sendUpstreamFeedback) {
+ int parentTask = a.getSourceTask();
+ task.sendUnanchoredFeedback(upstreamFeedbackStreamId, executor.buildUpstreamFeedbackTuple(taskId, EWMA_METRICS_SET),
+ parentTask, xsfer, executor.getPendingEmits());
+ }
}
msgId = MessageId.makeId(anchorsToIds);
} else {
diff --git a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
index 734fca2a2d6..880c22e7d41 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
@@ -321,6 +321,13 @@ public void tupleActionFn(int taskId, TupleImpl tuple) throws Exception {
if (pendingForId != null) {
pending.put(id, pendingForId);
}
+ } else if (this.upstreamFeedbackStreamId.equals(streamId)) {
+ if (!this.upstreamFeedbackEnabled) {
+ LOG.debug("Upstream feedback skipped.");
+ } else {
+ // update internal metrics
+ this.updateChildEwmaStats(idToTask.get(taskId - idToTaskBase), tuple);
+ }
} else {
Long id = (Long) tuple.getValue(0);
Long timeDeltaMs = (Long) tuple.getValue(1);
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/EwmaGauge.java b/storm-client/src/jvm/org/apache/storm/metrics2/EwmaGauge.java
new file mode 100644
index 00000000000..857e34215f0
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/EwmaGauge.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.metrics2;
+
+import static org.apache.storm.utils.ConfigUtils.RFC1889_ALPHA;
+
+import com.codahale.metrics.Gauge;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Lock-free jitter estimator following RFC 1889 §A.8 / RFC 3550 §A.8.
+ * The jitter accumulator is stored as raw IEEE 754 bits in an AtomicLong
+ * so that CAS can be used without locks.
+ * Thread safety: addValue is lock-free; getValue is wait-free.
+ */
+public class EwmaGauge implements Gauge {
+
+ private static final long UNSEEDED = Long.MIN_VALUE;
+ private static final long ZERO_BITS = Double.doubleToLongBits(0.0);
+
+ private final AtomicLong lastTransit = new AtomicLong(UNSEEDED);
+ private final AtomicLong jitterBits = new AtomicLong(ZERO_BITS);
+ private final double alpha;
+
+ EwmaGauge(double alpha) {
+ if (alpha <= 0.0 || alpha >= 1.0 || Double.isNaN(alpha)) {
+ throw new IllegalArgumentException(
+ "alpha must be in (0, 1), got: " + alpha);
+ }
+ this.alpha = alpha;
+ }
+
+ EwmaGauge() {
+ this(RFC1889_ALPHA); // 1.0 / 16.0
+ }
+
+ /**
+ * Update the jitter estimate.
+ *
+ * @param transitMs transit time for this tuple: {@code arrival - timestamp}
+ * Negative values are silently ignored.
+ */
+ public void addValue(long transitMs) {
+ if (transitMs < 0) {
+ return;
+ }
+ // Seed on the very first packet: store transit, nothing to diff against yet.
+ if (lastTransit.compareAndSet(UNSEEDED, transitMs)) {
+ return;
+ }
+ long prev = lastTransit.getAndSet(transitMs);
+ // Safe from Math.abs(Long.MIN_VALUE) pathology: both transitMs and prev
+ // are >= 0 (enforced by the negative-guard), so their
+ // difference is in [-Long.MAX_VALUE, Long.MAX_VALUE].
+ double d = Math.abs(transitMs - prev);
+ long currentBits;
+ long updatedBits;
+ do {
+ currentBits = jitterBits.get();
+ double currentJitter = Double.longBitsToDouble(currentBits);
+ double updatedJitter = currentJitter + alpha * (d - currentJitter);
+ updatedBits = Double.doubleToLongBits(updatedJitter);
+ } while (!jitterBits.compareAndSet(currentBits, updatedBits));
+ }
+
+ /**
+ * Returns the current jitter estimate in timestamp units.
+ */
+ @Override
+ public Double getValue() {
+ return Double.longBitsToDouble(jitterBits.get());
+ }
+}
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java b/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
index 0ac3a5e9492..9565467302c 100644
--- a/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
@@ -12,10 +12,14 @@
package org.apache.storm.metrics2;
-import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.function.Supplier;
+
import org.apache.storm.task.WorkerTopologyContext;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.Utils;
@@ -27,12 +31,24 @@ public class TaskMetrics {
private static final String METRIC_NAME_TRANSFERRED = "__transfer-count";
private static final String METRIC_NAME_EXECUTED = "__execute-count";
private static final String METRIC_NAME_PROCESS_LATENCY = "__process-latency";
+ public static final String METRIC_NAME_PROCESS_JITTER = "__process-jitter";
private static final String METRIC_NAME_COMPLETE_LATENCY = "__complete-latency";
+ public static final String METRIC_NAME_COMPLETE_JITTER = "__complete-jitter";
private static final String METRIC_NAME_EXECUTE_LATENCY = "__execute-latency";
+ public static final String METRIC_NAME_EXECUTE_JITTER = "__execute-jitter";
private static final String METRIC_NAME_CAPACITY = "__capacity";
+ public static final Set EWMA_METRICS_SET = Set.of(
+ METRIC_NAME_PROCESS_JITTER,
+ METRIC_NAME_COMPLETE_JITTER,
+ METRIC_NAME_EXECUTE_JITTER
+ );
+
private final ConcurrentMap rateCounters = new ConcurrentHashMap<>();
- private final ConcurrentMap gauges = new ConcurrentHashMap<>();
+ private final ConcurrentMap> gauges = new ConcurrentHashMap<>();
+ // Gauge supplier singleton factories
+ private final Supplier ewmaGaugeFactory;
+ private final Supplier rollingAverageGaugeFactory;
private final String topologyId;
private final String componentId;
@@ -40,6 +56,7 @@ public class TaskMetrics {
private final Integer workerPort;
private final StormMetricRegistry metricRegistry;
private final int samplingRate;
+ private final boolean ewmaEnable;
public TaskMetrics(WorkerTopologyContext context, String componentId, Integer taskid,
@@ -50,6 +67,10 @@ public TaskMetrics(WorkerTopologyContext context, String componentId, Integer ta
this.taskId = taskid;
this.workerPort = context.getThisWorkerPort();
this.samplingRate = ConfigUtils.samplingRate(topoConf);
+ double ewmaSmoothingFactor = ConfigUtils.ewmaSmoothingFactor(topoConf);
+ this.ewmaEnable = ConfigUtils.ewmaEnable(topoConf);
+ this.rollingAverageGaugeFactory = RollingAverageGauge::new;
+ this.ewmaGaugeFactory = () -> new EwmaGauge(ewmaSmoothingFactor);
}
public void setCapacity(double capacity) {
@@ -67,6 +88,12 @@ public void spoutAckedTuple(String streamId, long latencyMs) {
metricName = METRIC_NAME_COMPLETE_LATENCY + "-" + streamId;
RollingAverageGauge gauge = this.getRollingAverageGauge(metricName, streamId);
gauge.addValue(latencyMs);
+
+ if (this.ewmaEnable) {
+ metricName = METRIC_NAME_COMPLETE_JITTER + "-" + streamId;
+ EwmaGauge ewmaGauge = this.getExponentialWeightedMovingAverageGauge(metricName, streamId);
+ ewmaGauge.addValue(latencyMs);
+ }
}
public void boltAckedTuple(String sourceComponentId, String sourceStreamId, long latencyMs) {
@@ -78,6 +105,12 @@ public void boltAckedTuple(String sourceComponentId, String sourceStreamId, long
metricName = METRIC_NAME_PROCESS_LATENCY + "-" + key;
RollingAverageGauge gauge = this.getRollingAverageGauge(metricName, sourceStreamId);
gauge.addValue(latencyMs);
+
+ if (this.ewmaEnable) {
+ metricName = METRIC_NAME_PROCESS_JITTER + "-" + key;
+ EwmaGauge ewmaGauge = this.getExponentialWeightedMovingAverageGauge(metricName, sourceStreamId);
+ ewmaGauge.addValue(latencyMs);
+ }
}
public void spoutFailedTuple(String streamId) {
@@ -117,6 +150,12 @@ public void boltExecuteTuple(String sourceComponentId, String sourceStreamId, lo
metricName = METRIC_NAME_EXECUTE_LATENCY + "-" + key;
RollingAverageGauge gauge = this.getRollingAverageGauge(metricName, sourceStreamId);
gauge.addValue(latencyMs);
+
+ if (this.ewmaEnable) {
+ metricName = METRIC_NAME_EXECUTE_JITTER + "-" + key;
+ EwmaGauge ewmaGauge = this.getExponentialWeightedMovingAverageGauge(metricName, sourceStreamId);
+ ewmaGauge.addValue(latencyMs);
+ }
}
private RateCounter getRateCounter(String metricName, String streamId) {
@@ -135,18 +174,54 @@ private RateCounter getRateCounter(String metricName, String streamId) {
}
private RollingAverageGauge getRollingAverageGauge(String metricName, String streamId) {
- RollingAverageGauge gauge = this.gauges.get(metricName);
- if (gauge == null) {
+ return getOrCreateGauge(metricName, streamId, RollingAverageGauge.class, this.rollingAverageGaugeFactory);
+ }
+
+ private EwmaGauge getExponentialWeightedMovingAverageGauge(String metricName, String streamId) {
+ return getOrCreateGauge(metricName, streamId, EwmaGauge.class, this.ewmaGaugeFactory);
+ }
+
+ private > G getOrCreateGauge(
+ String metricName,
+ String streamId,
+ Class gaugeClass,
+ Supplier factory) {
+
+ Object existing = this.gauges.get(metricName);
+ if (existing == null) {
synchronized (this) {
- gauge = this.gauges.get(metricName);
- if (gauge == null) {
- gauge = new RollingAverageGauge();
- metricRegistry.gauge(metricName, gauge, this.topologyId, this.componentId,
- streamId, this.taskId, this.workerPort);
- this.gauges.put(metricName, gauge);
+ existing = this.gauges.get(metricName);
+ if (existing == null) {
+ G created = factory.get();
+ registerGauge(metricName, streamId, created);
+ this.gauges.put(metricName, created);
+ return created;
}
}
}
- return gauge;
+
+ if (!gaugeClass.isInstance(existing)) {
+ throw new IllegalStateException(
+ "Metric '" + metricName + "' is registered as "
+ + existing.getClass().getName()
+ + " but expected " + gaugeClass.getName());
+ }
+
+ return gaugeClass.cast(existing);
+ }
+
+ /*
+ * Safe cast: G is bounded by Gauge> in the signature of getOrCreateGauge,
+ * so every instance of G is by definition a Gauge.
+ * The cast to raw Gauge is required because metricRegistry.gauge() does not
+ * accept Gauge> the wildcard is not compatible with the type parameter T
+ * expected by the external API. Type-safety is guaranteed by the bound
+ * > declared at the call site.
+ */
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private void registerGauge(String metricName, String streamId, Gauge> gauge) {
+ metricRegistry.gauge(metricName, (Gauge) gauge, this.topologyId,
+ this.componentId, streamId, this.taskId, this.workerPort);
}
+
}
diff --git a/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java b/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java
index 9357cc562cf..f326f6ff4e6 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java
@@ -39,6 +39,9 @@ public class ConfigUtils {
public static final String FILE_SEPARATOR = File.separator;
public static final String STORM_HOME = "storm.home";
public static final String RESOURCES_SUBDIR = "resources";
+ public static final double RFC1889_ALPHA = 1.0 / 16.0;
+ public static final String UPSTREAM_FEEDBACK_STREAM_ID = "__feedback";
+ public static final double UPSTREAM_FEEDBACK_RATIO = 0.1;
private static final Set passwordConfigKeys = new HashSet<>();
@@ -175,6 +178,66 @@ public static int samplingRate(Map conf) {
throw new IllegalArgumentException("Illegal topology.stats.sample.rate in conf: " + rate);
}
+ public static double ewmaSmoothingFactor(Map conf) {
+ Object value = conf.get(Config.TOPOLOGY_STATS_EWMA_SMOOTHING_FACTOR);
+ if (value == null) {
+ return RFC1889_ALPHA;
+ }
+ double alpha = ObjectReader.getDouble(value);
+ if (alpha > 0.0 && alpha < 1.0) {
+ return alpha;
+ }
+ throw new IllegalArgumentException(
+ "Illegal " + Config.TOPOLOGY_STATS_EWMA_SMOOTHING_FACTOR
+ + " in conf: " + alpha + " must be in (0, 1)");
+ }
+
+ public static boolean ewmaEnable(Map conf) {
+ Object value = conf.get(Config.TOPOLOGY_STATS_EWMA_ENABLE);
+ if (value == null) {
+ return false;
+ }
+ return ObjectReader.getBoolean(value, false);
+ }
+
+ public static boolean upstreamFeedbackEnable(Map conf) {
+ Object value = conf.get(Config.TOPOLOGY_UPSTREAM_FEEDBACK_ENABLE);
+ if (value == null) {
+ return false;
+ }
+ return ObjectReader.getBoolean(value, false);
+ }
+
+ public static boolean backpressurePredictionEnable(Map conf) {
+ Object value = conf.get(Config.TOPOLOGY_BACKPRESSURE_PREDICTION_ENABLE);
+ if (value == null) {
+ return false;
+ }
+ return ObjectReader.getBoolean(value, false);
+ }
+
+ public static String upstreamFeedbackStreamId(Map conf) {
+ Object value = conf.get(Config.TOPOLOGY_UPSTREAM_FEEDBACK_STREAM_ID);
+ if (value == null) {
+ return UPSTREAM_FEEDBACK_STREAM_ID;
+ }
+ return ObjectReader.getString(value);
+ }
+
+ public static double upstreamFeedbackRatio(Map conf) {
+ Object value = conf.get(Config.TOPOLOGY_UPSTREAM_FEEDBACK_RATIO);
+ if (value == null) {
+ return UPSTREAM_FEEDBACK_RATIO;
+ }
+ double ratio = ObjectReader.getDouble(value);
+ if (ratio > 0.0 && ratio < 1.0) {
+ return ratio;
+ }
+ throw new IllegalArgumentException(
+ "Illegal " + Config.TOPOLOGY_UPSTREAM_FEEDBACK_RATIO
+ + " in conf: " + ratio + " must be in (0, 1)");
+ }
+
public static BooleanSupplier mkStatsSampler(Map conf) {
return evenSampler(samplingRate(conf));
}
diff --git a/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java b/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
index 6aa668f566d..8d5da0629d6 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
@@ -20,18 +20,23 @@
import java.io.Closeable;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
+import org.apache.storm.executor.Executor;
import org.apache.storm.metrics2.StormMetricRegistry;
import org.apache.storm.policy.IWaitStrategy;
import org.apache.storm.shade.org.jctools.queues.MessagePassingQueue;
import org.apache.storm.shade.org.jctools.queues.MpscArrayQueue;
import org.apache.storm.shade.org.jctools.queues.MpscUnboundedArrayQueue;
+import org.apache.storm.tuple.AddressedTuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings("checkstyle:AbbreviationAsWordInName")
public class JCQueue implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(JCQueue.class);
+ private static final int REORDER_BUFFER_SIZE = 4;
+
private final ExitCondition continueRunning = () -> true;
private final List jcqMetrics = new ArrayList<>();
private final MpscArrayQueue recvQueue;
@@ -43,6 +48,11 @@ public class JCQueue implements Closeable {
private final ThreadLocal thdLocalBatcher = new ThreadLocal(); // ensure 1 instance per producer thd.
private final IWaitStrategy backPressureWaitStrategy;
private final String queueName;
+ private final AddressedTuple[] reorderingBuffer = new AddressedTuple[REORDER_BUFFER_SIZE];
+
+ // The TaskJitterComparator is not a mandatory field. It is required for the power of two adaptive task selection.
+ private TaskJitterComparator taskJitterComparator;
+ private boolean enbalePredictiveBackpressure = taskJitterComparator != null;
public JCQueue(String queueName, String metricNamePrefix, int size, int overflowLimit, int producerBatchSz,
IWaitStrategy backPressureWaitStrategy, String topologyId, String componentId, List taskIds,
@@ -66,6 +76,16 @@ public String getQueueName() {
return queueName;
}
+ public void enablePredictiveBackpressure(Executor executor) {
+ this.taskJitterComparator = new TaskJitterComparator(executor);
+ this.enbalePredictiveBackpressure = true;
+ }
+
+ public void disablePredictiveBackpressure(){
+ this.enbalePredictiveBackpressure = false;
+ this.taskJitterComparator = null; // gc
+ }
+
@Override
public void close() {
for (JCQueueMetrics jcQueueMetric : jcqMetrics) {
@@ -106,12 +126,18 @@ public double getQueueLoad() {
private int consumeImpl(Consumer consumer, ExitCondition exitCond) throws InterruptedException {
int drainCount = 0;
while (exitCond.keepRunning()) {
- Object tuple = recvQueue.poll();
- if (tuple == null) {
- break;
+ if (consumer instanceof Executor && this.taskJitterComparator != null) {
+ int drained = drainExecutorPriorityBatch(consumer);
+ if (drained == 0) break;
+ drainCount += drained;
+ } else {
+ Object tuple = recvQueue.poll();
+ if (tuple == null) {
+ break;
+ }
+ consumer.accept(tuple);
+ ++drainCount;
}
- consumer.accept(tuple);
- ++drainCount;
}
int overflowDrainCount = 0;
@@ -128,6 +154,31 @@ private int consumeImpl(Consumer consumer, ExitCondition exitCond) throws Interr
return total;
}
+ private int drainExecutorPriorityBatch(Consumer consumer) {
+ int count = 0;
+
+ // for higher value of reorderingBuffer.length better a heap (with a batch of 8 objects the array is the best implementation)
+ for (int i = 0; i < reorderingBuffer.length; i++) {
+ Object tuple = recvQueue.poll();
+ if (tuple == null) break;
+ reorderingBuffer[count++] = (AddressedTuple) tuple;
+ }
+
+ if (count == 0) return 0;
+
+ if (count > 1) {
+ // Dual-Pivot Quicksort or Insertion Sort
+ Arrays.sort(reorderingBuffer, 0, count, this.taskJitterComparator);
+ }
+
+ for (int i = 0; i < count; i++) {
+ consumer.accept(reorderingBuffer[i]);
+ reorderingBuffer[i] = null; // gc
+ }
+
+ return count;
+ }
+
// Non Blocking. returns true/false indicating success/failure. Fails if full.
private boolean tryPublishInternal(Object obj) {
if (recvQueue.offer(obj)) {
diff --git a/storm-client/src/jvm/org/apache/storm/utils/ObjectReader.java b/storm-client/src/jvm/org/apache/storm/utils/ObjectReader.java
index 54445b5eb49..ac28be8cf30 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/ObjectReader.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/ObjectReader.java
@@ -128,6 +128,8 @@ public static Double getDouble(Object o, Double defaultValue) {
}
if (o instanceof Number) {
return ((Number) o).doubleValue();
+ } else if (o instanceof String) {
+ return Double.parseDouble((String) o);
} else {
throw new IllegalArgumentException("Don't know how to convert (" + o + ") to double");
}
diff --git a/storm-client/src/jvm/org/apache/storm/utils/TaskJitterComparator.java b/storm-client/src/jvm/org/apache/storm/utils/TaskJitterComparator.java
new file mode 100644
index 00000000000..cf30247d885
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/utils/TaskJitterComparator.java
@@ -0,0 +1,44 @@
+package org.apache.storm.utils;
+
+import static org.apache.storm.metrics2.TaskMetrics.METRIC_NAME_EXECUTE_JITTER;
+import static org.apache.storm.metrics2.TaskMetrics.METRIC_NAME_PROCESS_JITTER;
+
+import java.util.Comparator;
+import java.util.Map;
+import org.apache.storm.executor.Executor;
+import org.apache.storm.tuple.AddressedTuple;
+
+public class TaskJitterComparator implements Comparator {
+ private final Executor executor;
+
+ public TaskJitterComparator(Executor executor) {
+ this.executor = executor;
+ }
+ @Override
+ public int compare(AddressedTuple t1, AddressedTuple t2) {
+ int taskId1 = t1.getDest();
+ int taskId2 = t2.getDest();
+
+ if (taskId1 == taskId2) {
+ return 0;
+ }
+
+ Map task1Stats = executor.getChildEwmaAvgStats(taskId1);
+ Map task2Stats = executor.getChildEwmaAvgStats(taskId2);
+
+ double processJitter1 = task1Stats.getOrDefault(METRIC_NAME_PROCESS_JITTER, Double.MAX_VALUE);
+ double processJitter2 = task2Stats.getOrDefault(METRIC_NAME_PROCESS_JITTER, Double.MAX_VALUE);
+
+ // compare process jitter
+ if (processJitter1 < processJitter2) {
+ return -1;
+ } else if (processJitter1 > processJitter2) {
+ return 1;
+ }
+
+ // fallback on execution jitter (it means that the network jitter is not affecting too much the whole process)
+ double execJitter1 = task1Stats.getOrDefault(METRIC_NAME_EXECUTE_JITTER, Double.MAX_VALUE);
+ double execJitter2 = task2Stats.getOrDefault(METRIC_NAME_EXECUTE_JITTER, Double.MAX_VALUE);
+ return Double.compare(execJitter1, execJitter2);
+ }
+}
\ No newline at end of file
diff --git a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
index 4175ee9f4a7..8b3fb830ddf 100644
--- a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
+++ b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
@@ -30,6 +30,7 @@
import java.util.stream.Collectors;
import org.apache.storm.Config;
+import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Utils;
import org.apache.storm.validation.ConfigValidationAnnotations.ValidatorParams;
import org.slf4j.Logger;
@@ -849,6 +850,23 @@ public void validateField(String name, Object o) {
}
}
+ public static class ZeroOneOpenIntervalValidator extends Validator {
+ @Override
+ public void validateField(String name, Object o) {
+ if (o == null) {
+ return;
+ }
+ // ObjectReader.getDouble(o) handles the type conversion and will throw an
+ // IllegalArgumentException if the value cannot be parsed as a number.
+ double alpha = ObjectReader.getDouble(o);
+ if (alpha > 0.0 && alpha < 1.0) {
+ return;
+ }
+ throw new IllegalArgumentException(
+ "Field " + name + " must be a number in the open interval (0, 1), got: " + o);
+ }
+ }
+
public static class CustomIsExactlyOneOfValidators extends Validator {
private Class>[] subValidators;
private List validatorClassNames;
diff --git a/storm-client/test/jvm/org/apache/storm/TestConfigValidate.java b/storm-client/test/jvm/org/apache/storm/TestConfigValidate.java
index 3c335308da5..31b9c483446 100644
--- a/storm-client/test/jvm/org/apache/storm/TestConfigValidate.java
+++ b/storm-client/test/jvm/org/apache/storm/TestConfigValidate.java
@@ -206,6 +206,34 @@ public void testTopologyStatsSampleRateIsFloat() {
ConfigValidation.validateFields(conf);
}
+ @Test
+ public void testTopologyStatsEwmaEnableIsBoolean() {
+ Map conf = new HashMap<>();
+ // optional configuration
+ ConfigValidation.validateFields(conf);
+ conf.put(Config.TOPOLOGY_STATS_EWMA_ENABLE, true);
+ ConfigValidation.validateFields(conf);
+ conf.put(Config.TOPOLOGY_STATS_EWMA_ENABLE, false);
+ ConfigValidation.validateFields(conf);
+ }
+
+ @Test
+ public void testTopologyStatsEwmaSmoothingFactorCustomValidator() {
+ Map conf = new HashMap<>();
+ // optional configuration
+ ConfigValidation.validateFields(conf);
+ conf.put(Config.TOPOLOGY_STATS_EWMA_SMOOTHING_FACTOR, 0.1);
+ ConfigValidation.validateFields(conf);
+ conf.put(Config.TOPOLOGY_STATS_EWMA_SMOOTHING_FACTOR, "0.1");
+ ConfigValidation.validateFields(conf);
+ conf.put(Config.TOPOLOGY_STATS_EWMA_SMOOTHING_FACTOR, 0.9);
+ ConfigValidation.validateFields(conf);
+ for (Object notAllowedValue : new Object[]{0.0, -0.1, 1.9, "1.9"}) {
+ conf.put(Config.TOPOLOGY_STATS_EWMA_SMOOTHING_FACTOR, notAllowedValue);
+ assertThrows(IllegalArgumentException.class, () -> ConfigValidation.validateFields(conf));
+ }
+ }
+
@Test
public void testWorkerChildoptsIsStringOrStringList() {
Map conf = new HashMap<>();
diff --git a/storm-client/test/jvm/org/apache/storm/metrics2/EwmaGaugeTest.java b/storm-client/test/jvm/org/apache/storm/metrics2/EwmaGaugeTest.java
new file mode 100644
index 00000000000..ed1b5009ceb
--- /dev/null
+++ b/storm-client/test/jvm/org/apache/storm/metrics2/EwmaGaugeTest.java
@@ -0,0 +1,347 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.metrics2;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class EwmaGaugeTest {
+
+ private static final double DELTA = 1e-9;
+
+ @Nested
+ @DisplayName("Construction")
+ class ConstructionTest {
+
+ @Test
+ @DisplayName("Default constructor uses RFC 1889 alpha (1/16)")
+ void defaultAlpha() {
+ EwmaGauge gauge = new EwmaGauge();
+ gauge.addValue(0L);
+ gauge.addValue(16L); // D = 16 ; J = 0 + (16 - 0) * (1/16) = 1.0
+ assertEquals(1.0, gauge.getValue(), DELTA);
+ }
+
+ @Test
+ @DisplayName("Invalid alpha values throw IllegalArgumentException")
+ void invalidAlphaThrows() {
+ double[] invalidAlphas = {
+ 0.0, 1.0, -0.1, 1.1,
+ Double.NaN, Double.POSITIVE_INFINITY, Double.NEGATIVE_INFINITY
+ };
+ for (double alpha : invalidAlphas) {
+ assertThrows(IllegalArgumentException.class,
+ () -> new EwmaGauge(alpha),
+ "Expected IllegalArgumentException for alpha=" + alpha);
+ }
+ }
+
+ @Test
+ @DisplayName("Valid alpha boundary values are accepted")
+ void validAlphaAccepted() {
+ double[] validAlphas = {0.001, 0.0625, 0.5, 0.999};
+ for (double alpha : validAlphas) {
+ assertNotNull(new EwmaGauge(alpha),
+ "Expected no exception for alpha=" + alpha);
+ }
+ }
+ }
+
+
+ @Nested
+ @DisplayName("Cold-start semantics")
+ class ColdStartTest {
+
+ private EwmaGauge gauge;
+
+ @BeforeEach
+ void setUp() {
+ gauge = new EwmaGauge();
+ }
+
+ @Test
+ @DisplayName("getValue() returns 0.0 before any sample")
+ void noSamples() {
+ assertEquals(0.0, gauge.getValue(), DELTA);
+ }
+
+ @Test
+ @DisplayName("getValue() returns 0.0 after exactly one sample (seed only)")
+ void oneSample() {
+ gauge.addValue(100L);
+ assertEquals(0.0, gauge.getValue(), DELTA);
+ }
+
+ }
+
+ @Nested
+ @DisplayName("EWMA formula RFC 1889 §A.8")
+ class FormulaTest {
+
+ @Test
+ @DisplayName("Single update: J = 0 + (D - 0) * alpha")
+ void singleDeviation() {
+ EwmaGauge gauge = new EwmaGauge(0.5);
+ gauge.addValue(0L);
+ gauge.addValue(10L);
+ assertEquals(5.0, gauge.getValue(), DELTA);
+ }
+
+ @Test
+ @DisplayName("Manual step-by-step verification against reference values")
+ void manualSteps() {
+ EwmaGauge gauge = new EwmaGauge(0.5);
+
+ gauge.addValue(0L); // seed
+
+ // Step 1: transit=10, prev=0, D=10, J = 0 + (10-0) * 0.5 = 5.0
+ gauge.addValue(10L);
+ assertEquals(5.0, gauge.getValue(), DELTA, "Step 1");
+
+ // Step 2: transit=0, prev=10, D=10, J = 5.0 + (10-5.0) * 0.5 = 7.5
+ gauge.addValue(0L);
+ assertEquals(7.5, gauge.getValue(), DELTA, "Step 2");
+
+ // Step 3: transit=10, prev=0, D=10, J = 7.5 + (10-7.5) * 0.5 = 8.75
+ gauge.addValue(10L);
+ assertEquals(8.75, gauge.getValue(), DELTA, "Step 3");
+ }
+
+ @Test
+ @DisplayName("Zero deviation decays jitter toward zero")
+ void zeroDeviationDecays() {
+ EwmaGauge gauge = new EwmaGauge(0.5);
+ gauge.addValue(0L); // 0
+ gauge.addValue(10L); // 0 + 5*alpha = 2.5
+ double afterFirst = gauge.getValue();
+ assertEquals(afterFirst, gauge.getValue(), DELTA);
+
+ gauge.addValue(10L); // 2.5 - 2.5*alpha = 2.5 - 1.25 = 1.25
+ assertEquals(afterFirst * 0.5, gauge.getValue(), DELTA);
+ }
+
+ }
+
+
+ @Nested
+ @DisplayName("Negative value guard")
+ class NegativeValueTest {
+
+ @Test
+ @DisplayName("Negative transit values are silently ignored before seed")
+ void negativeIgnoredBeforeSeed() {
+ EwmaGauge gauge = new EwmaGauge();
+ gauge.addValue(-1L);
+ gauge.addValue(-100L);
+ assertEquals(0.0, gauge.getValue(), DELTA);
+ }
+
+ @Test
+ @DisplayName("Negative value after seed does not corrupt lastTransit")
+ void negativeAfterSeedIgnored() {
+ EwmaGauge gauge = new EwmaGauge(0.5);
+ gauge.addValue(10L);
+ gauge.addValue(-5L);
+ gauge.addValue(20L);
+ assertEquals(5.0, gauge.getValue(), DELTA);
+ }
+ }
+
+
+ @Nested
+ @DisplayName("getValue() preserves EWMA across calls")
+ class GetValueIdempotentTest {
+
+ @Test
+ @DisplayName("Repeated getValue() without new samples returns same estimate")
+ void repeatedGetValueStable() {
+ EwmaGauge gauge = new EwmaGauge(0.5);
+ gauge.addValue(0L);
+ gauge.addValue(10L);
+ double first = gauge.getValue();
+
+ assertEquals(first, gauge.getValue(), DELTA, "Second call");
+ assertEquals(first, gauge.getValue(), DELTA, "Third call");
+ }
+
+ @Test
+ @DisplayName("EWMA accumulates correctly across multiple reporting windows")
+ void acrossReportingWindows() {
+ EwmaGauge gauge = new EwmaGauge(0.5);
+ gauge.addValue(0L);
+
+ gauge.addValue(10L);
+ assertEquals(5.0, gauge.getValue(), DELTA, "Window 1");
+
+ gauge.addValue(0L);
+ assertEquals(7.5, gauge.getValue(), DELTA, "Window 2");
+
+ gauge.addValue(10L);
+ assertEquals(8.75, gauge.getValue(), DELTA, "Window 3");
+ }
+ }
+
+ @Nested
+ @DisplayName("thread safe")
+ class ConcurrencyTest {
+
+ @Test
+ @DisplayName("Concurrent addValue() calls do not corrupt state")
+ void concurrentAddValue() throws InterruptedException {
+ EwmaGauge gauge = new EwmaGauge();
+ int threads = 8;
+ int samplesPerThread = 10_000;
+ CountDownLatch ready = new CountDownLatch(threads);
+ CountDownLatch start = new CountDownLatch(1);
+ ExecutorService pool = Executors.newFixedThreadPool(threads);
+
+ for (int t = 0; t < threads; t++) {
+ final long base = t * 10L;
+ pool.submit(() -> {
+ ready.countDown();
+ try {
+ start.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ for (int i = 0; i < samplesPerThread; i++) {
+ gauge.addValue(base + (i % 10));
+ }
+ });
+ }
+
+ ready.await();
+ start.countDown();
+ pool.shutdown();
+ assertTrue(pool.awaitTermination(10, TimeUnit.SECONDS),
+ "Executor did not terminate — possible deadlock");
+
+ double value = gauge.getValue();
+ assertTrue(value >= 0.0, "Jitter must be non-negative, got: " + value);
+ assertTrue(Double.isFinite(value), "Jitter must be finite, got: " + value);
+ }
+
+ @Test
+ @DisplayName("Concurrent getValue() and addValue() do not deadlock")
+ void concurrentGetAndAdd() throws Exception {
+ EwmaGauge gauge = new EwmaGauge();
+ ExecutorService pool = Executors.newFixedThreadPool(2);
+ CountDownLatch done = new CountDownLatch(2);
+
+ Future> writer = pool.submit(() -> {
+ for (int i = 0; i < 50_000; i++) {
+ gauge.addValue(i % 100);
+ }
+ done.countDown();
+ });
+
+ Future> reader = pool.submit(() -> {
+ for (int i = 0; i < 1_000; i++) {
+ double v = gauge.getValue();
+ assertTrue(v >= 0.0 && Double.isFinite(v),
+ "getValue() returned invalid result: " + v);
+ }
+ done.countDown();
+ });
+
+ assertTrue(done.await(10, TimeUnit.SECONDS),
+ "Test did not complete within timeout possible deadlock");
+
+ writer.get();
+ reader.get();
+ pool.shutdown();
+ }
+
+ @Test
+ @DisplayName("Only one thread seeds lastTransit all same value gives zero jitter")
+ void seedRace() throws InterruptedException {
+ EwmaGauge gauge = new EwmaGauge();
+ int threads = 16;
+ CountDownLatch ready = new CountDownLatch(threads);
+ CountDownLatch start = new CountDownLatch(1);
+ ExecutorService pool = Executors.newFixedThreadPool(threads);
+
+ for (int t = 0; t < threads; t++) {
+ pool.submit(() -> {
+ ready.countDown();
+ try {
+ start.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ gauge.addValue(42L);
+ });
+ }
+
+ ready.await();
+ start.countDown();
+ pool.shutdown();
+ assertTrue(pool.awaitTermination(5, TimeUnit.SECONDS),
+ "Executor did not terminate possible deadlock");
+
+ assertEquals(0.0, gauge.getValue(), DELTA);
+ }
+ }
+
+ @Nested
+ @DisplayName("Edge cases")
+ class EdgeCaseTest {
+
+ @Test
+ @DisplayName("Long.MAX_VALUE transit does not overflow deviation")
+ void maxLongTransit() {
+ EwmaGauge gauge = new EwmaGauge(0.5);
+ gauge.addValue(0L);
+ gauge.addValue(Long.MAX_VALUE);
+ double value = gauge.getValue();
+ assertTrue(value > 0.0, "Jitter should be positive");
+ assertTrue(Double.isFinite(value), "Jitter should be finite");
+ }
+
+ @Test
+ @DisplayName("Zero transit time is valid and produces zero deviation")
+ void zeroTransit() {
+ EwmaGauge gauge = new EwmaGauge(0.5);
+ gauge.addValue(0L);
+ gauge.addValue(0L);
+ assertEquals(0.0, gauge.getValue(), DELTA);
+ }
+
+ @Test
+ @DisplayName("Large number of samples does not overflow LongAdder")
+ void manySamples() {
+ EwmaGauge gauge = new EwmaGauge();
+ gauge.addValue(0L);
+ for (int i = 1; i <= 100_000; i++) {
+ gauge.addValue(i % 2 == 0 ? 0L : 10L);
+ }
+ double value = gauge.getValue();
+ assertTrue(value > 0.0, "Jitter should be positive after many samples");
+ assertTrue(value <= 10.0, "Jitter cannot exceed max deviation of 10");
+ assertTrue(Double.isFinite(value), "Jitter must be finite");
+ }
+ }
+}
diff --git a/storm-client/test/jvm/org/apache/storm/metrics2/TaskMetricsTest.java b/storm-client/test/jvm/org/apache/storm/metrics2/TaskMetricsTest.java
new file mode 100644
index 00000000000..c812994280c
--- /dev/null
+++ b/storm-client/test/jvm/org/apache/storm/metrics2/TaskMetricsTest.java
@@ -0,0 +1,485 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Gauge;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.ConfigUtils;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockedStatic;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.Mockito.*;
+
+@ExtendWith(MockitoExtension.class)
+class TaskMetricsTest {
+
+ private static final String TOPOLOGY_ID = "test-topology-1";
+ private static final String COMPONENT_ID = "test-bolt";
+ private static final Integer TASK_ID = 42;
+ private static final Integer WORKER_PORT = 6700;
+ private static final String STREAM_ID = "default";
+ private static final String SOURCE_COMP = "source-spout";
+ private static final int SAMPLING_RATE = 1;
+ private static final double EWMA_FACTOR = 0.3;
+
+ @Mock private WorkerTopologyContext context;
+ @Mock private StormMetricRegistry metricRegistry;
+ @Mock private RateCounter rateCounter;
+
+ private Map topoConf;
+
+ private TaskMetrics buildTaskMetrics(boolean ewmaEnabled) {
+ try (MockedStatic cfgUtils = mockStatic(ConfigUtils.class)) {
+ cfgUtils.when(() -> ConfigUtils.samplingRate(topoConf)).thenReturn(SAMPLING_RATE);
+ cfgUtils.when(() -> ConfigUtils.ewmaSmoothingFactor(topoConf)).thenReturn(EWMA_FACTOR);
+ cfgUtils.when(() -> ConfigUtils.ewmaEnable(topoConf)).thenReturn(ewmaEnabled);
+
+ return new TaskMetrics(context, COMPONENT_ID, TASK_ID, metricRegistry, topoConf);
+ }
+ }
+
+ @BeforeEach
+ void setUp() {
+ when(context.getStormId()).thenReturn(TOPOLOGY_ID);
+ when(context.getThisWorkerPort()).thenReturn(WORKER_PORT);
+
+ topoConf = new HashMap<>();
+
+ when(metricRegistry.rateCounter(anyString(), anyString(), anyString(),
+ anyInt(), anyInt(), anyString())).thenReturn(rateCounter);
+ }
+
+ @Test
+ void spoutAckedTuple_incrementsAckCounter() {
+ when(metricRegistry.gauge(anyString(), any(Gauge.class), anyString(),
+ anyString(), anyString(), anyInt(), anyInt())).thenReturn(null);
+ TaskMetrics tm = buildTaskMetrics(false);
+
+ tm.spoutAckedTuple(STREAM_ID, 100L);
+
+ verify(rateCounter).inc(SAMPLING_RATE);
+ }
+
+ @Test
+ void spoutAckedTuple_registersCompleteLatencyGauge() {
+ when(metricRegistry.gauge(anyString(), any(Gauge.class), anyString(),
+ anyString(), anyString(), anyInt(), anyInt())).thenReturn(null);
+ TaskMetrics tm = buildTaskMetrics(false);
+
+ tm.spoutAckedTuple(STREAM_ID, 200L);
+
+ verify(metricRegistry, atLeastOnce()).gauge(
+ contains("__complete-latency"), any(Gauge.class),
+ anyString(), anyString(), anyString(), anyInt(), anyInt());
+ }
+
+ @Test
+ void spoutAckedTuple_withEwmaEnabled_registersJitterGauge() {
+ when(metricRegistry.gauge(anyString(), any(Gauge.class), anyString(),
+ anyString(), anyString(), anyInt(), anyInt())).thenReturn(null);
+ TaskMetrics tm = buildTaskMetrics(true);
+
+ tm.spoutAckedTuple(STREAM_ID, 150L);
+
+ verify(metricRegistry, atLeastOnce()).gauge(
+ contains("__complete-jitter"), any(Gauge.class),
+ anyString(), anyString(), anyString(), anyInt(), anyInt());
+ }
+
+ @Test
+ void spoutAckedTuple_withEwmaDisabled_doesNotRegisterJitterGauge() {
+ when(metricRegistry.gauge(anyString(), any(Gauge.class), anyString(),
+ anyString(), anyString(), anyInt(), anyInt())).thenReturn(null);
+ TaskMetrics tm = buildTaskMetrics(false);
+
+ tm.spoutAckedTuple(STREAM_ID, 150L);
+
+ verify(metricRegistry, never()).gauge(
+ contains("__complete-rfc1889a-jitter"), any(Gauge.class),
+ anyString(), anyString(), anyString(), anyInt(), anyInt());
+ }
+
+ @Test
+ void boltAckedTuple_incrementsAckCounter() {
+ when(metricRegistry.gauge(anyString(), any(Gauge.class), anyString(),
+ anyString(), anyString(), anyInt(), anyInt())).thenReturn(null);
+ TaskMetrics tm = buildTaskMetrics(false);
+
+ tm.boltAckedTuple(SOURCE_COMP, STREAM_ID, 50L);
+
+ verify(rateCounter).inc(SAMPLING_RATE);
+ }
+
+ @Test
+ void boltAckedTuple_registersProcessLatencyGauge() {
+ when(metricRegistry.gauge(anyString(), any(Gauge.class), anyString(),
+ anyString(), anyString(), anyInt(), anyInt())).thenReturn(null);
+ TaskMetrics tm = buildTaskMetrics(false);
+
+ tm.boltAckedTuple(SOURCE_COMP, STREAM_ID, 50L);
+
+ verify(metricRegistry, atLeastOnce()).gauge(
+ contains("__process-latency"), any(Gauge.class),
+ anyString(), anyString(), anyString(), anyInt(), anyInt());
+ }
+
+ @Test
+ void boltAckedTuple_withEwmaEnabled_registersJitterGauge() {
+ when(metricRegistry.gauge(anyString(), any(Gauge.class), anyString(),
+ anyString(), anyString(), anyInt(), anyInt())).thenReturn(null);
+ TaskMetrics tm = buildTaskMetrics(true);
+
+ tm.boltAckedTuple(SOURCE_COMP, STREAM_ID, 75L);
+
+ verify(metricRegistry, atLeastOnce()).gauge(
+ contains("__process-jitter"), any(Gauge.class),
+ anyString(), anyString(), anyString(), anyInt(), anyInt());
+ }
+
+ @Test
+ void boltAckedTuple_metricKeyIncludesSourceComponentAndStream() {
+ when(metricRegistry.gauge(anyString(), any(Gauge.class), anyString(),
+ anyString(), anyString(), anyInt(), anyInt())).thenReturn(null);
+ TaskMetrics tm = buildTaskMetrics(false);
+
+ tm.boltAckedTuple(SOURCE_COMP, STREAM_ID, 50L);
+
+ verify(metricRegistry).rateCounter(
+ contains(SOURCE_COMP + ":" + STREAM_ID),
+ anyString(), anyString(), anyInt(), anyInt(), anyString());
+ }
+
+ @Test
+ void spoutFailedTuple_incrementsFailCounter() {
+ TaskMetrics tm = buildTaskMetrics(false);
+
+ tm.spoutFailedTuple(STREAM_ID);
+
+ verify(rateCounter).inc(SAMPLING_RATE);
+ }
+
+ @Test
+ void spoutFailedTuple_usesCorrectMetricName() {
+ TaskMetrics tm = buildTaskMetrics(false);
+
+ tm.spoutFailedTuple(STREAM_ID);
+
+ verify(metricRegistry).rateCounter(
+ eq("__fail-count-" + STREAM_ID),
+ eq(TOPOLOGY_ID), eq(COMPONENT_ID), eq(TASK_ID), eq(WORKER_PORT), eq(STREAM_ID));
+ }
+
+ @Test
+ void boltFailedTuple_incrementsFailCounter() {
+ TaskMetrics tm = buildTaskMetrics(false);
+
+ tm.boltFailedTuple(SOURCE_COMP, STREAM_ID);
+
+ verify(rateCounter).inc(SAMPLING_RATE);
+ }
+
+ @Test
+ void boltFailedTuple_metricKeyIncludesSourceComponentAndStream() {
+ TaskMetrics tm = buildTaskMetrics(false);
+
+ tm.boltFailedTuple(SOURCE_COMP, STREAM_ID);
+
+ verify(metricRegistry).rateCounter(
+ eq("__fail-count-" + SOURCE_COMP + ":" + STREAM_ID),
+ anyString(), anyString(), anyInt(), anyInt(), anyString());
+ }
+
+ @Test
+ void emittedTuple_incrementsEmitCounter() {
+ TaskMetrics tm = buildTaskMetrics(false);
+
+ tm.emittedTuple(STREAM_ID);
+
+ verify(rateCounter).inc(SAMPLING_RATE);
+ }
+
+ @Test
+ void emittedTuple_usesCorrectMetricName() {
+ TaskMetrics tm = buildTaskMetrics(false);
+
+ tm.emittedTuple(STREAM_ID);
+
+ verify(metricRegistry).rateCounter(
+ eq("__emit-count-" + STREAM_ID),
+ eq(TOPOLOGY_ID), eq(COMPONENT_ID), eq(TASK_ID), eq(WORKER_PORT), eq(STREAM_ID));
+ }
+
+ @Test
+ void transferredTuples_incrementsByAmountTimesSamplingRate() {
+ TaskMetrics tm = buildTaskMetrics(false);
+ int amount = 5;
+
+ tm.transferredTuples(STREAM_ID, amount);
+
+ verify(rateCounter).inc(amount * SAMPLING_RATE);
+ }
+
+ @Test
+ void transferredTuples_usesCorrectMetricName() {
+ TaskMetrics tm = buildTaskMetrics(false);
+
+ tm.transferredTuples(STREAM_ID, 3);
+
+ verify(metricRegistry).rateCounter(
+ eq("__transfer-count-" + STREAM_ID),
+ anyString(), anyString(), anyInt(), anyInt(), anyString());
+ }
+
+ @Test
+ void boltExecuteTuple_incrementsExecuteCounter() {
+ when(metricRegistry.gauge(anyString(), any(Gauge.class), anyString(),
+ anyString(), anyString(), anyInt(), anyInt())).thenReturn(null);
+ TaskMetrics tm = buildTaskMetrics(false);
+
+ tm.boltExecuteTuple(SOURCE_COMP, STREAM_ID, 30L);
+
+ verify(rateCounter).inc(SAMPLING_RATE);
+ }
+
+ @Test
+ void boltExecuteTuple_registersExecuteLatencyGauge() {
+ when(metricRegistry.gauge(anyString(), any(Gauge.class), anyString(),
+ anyString(), anyString(), anyInt(), anyInt())).thenReturn(null);
+ TaskMetrics tm = buildTaskMetrics(false);
+
+ tm.boltExecuteTuple(SOURCE_COMP, STREAM_ID, 30L);
+
+ verify(metricRegistry, atLeastOnce()).gauge(
+ contains("__execute-latency"), any(Gauge.class),
+ anyString(), anyString(), anyString(), anyInt(), anyInt());
+ }
+
+ @Test
+ void boltExecuteTuple_withEwmaEnabled_registersJitterGauge() {
+ when(metricRegistry.gauge(anyString(), any(Gauge.class), anyString(),
+ anyString(), anyString(), anyInt(), anyInt())).thenReturn(null);
+ TaskMetrics tm = buildTaskMetrics(true);
+
+ tm.boltExecuteTuple(SOURCE_COMP, STREAM_ID, 30L);
+
+ verify(metricRegistry, atLeastOnce()).gauge(
+ contains("__execute-jitter"), any(Gauge.class),
+ anyString(), anyString(), anyString(), anyInt(), anyInt());
+ }
+
+ @Test
+ void boltExecuteTuple_withEwmaDisabled_doesNotRegisterJitterGauge() {
+ when(metricRegistry.gauge(anyString(), any(Gauge.class), anyString(),
+ anyString(), anyString(), anyInt(), anyInt())).thenReturn(null);
+ TaskMetrics tm = buildTaskMetrics(false);
+
+ tm.boltExecuteTuple(SOURCE_COMP, STREAM_ID, 30L);
+
+ verify(metricRegistry, never()).gauge(
+ contains("__execute-rfc1889a-jitter"), any(Gauge.class),
+ anyString(), anyString(), anyString(), anyInt(), anyInt());
+ }
+
+ @Test
+ void differentStreams_produceSeparateRateCounters() {
+ TaskMetrics tm = buildTaskMetrics(false);
+
+ tm.emittedTuple("stream-A");
+ tm.emittedTuple("stream-B");
+
+ verify(metricRegistry).rateCounter(
+ eq("__emit-count-stream-A"),
+ anyString(), anyString(), anyInt(), anyInt(), eq("stream-A"));
+ verify(metricRegistry).rateCounter(
+ eq("__emit-count-stream-B"),
+ anyString(), anyString(), anyInt(), anyInt(), eq("stream-B"));
+ }
+
+ @Test
+ void rateCounter_registeredOnlyOnceForSameMetricName() {
+ TaskMetrics tm = buildTaskMetrics(false);
+
+ tm.emittedTuple(STREAM_ID);
+ tm.emittedTuple(STREAM_ID);
+ tm.emittedTuple(STREAM_ID);
+
+ verify(metricRegistry, times(1)).rateCounter(
+ eq("__emit-count-" + STREAM_ID),
+ anyString(), anyString(), anyInt(), anyInt(), anyString());
+ }
+
+ @Test
+ void gauge_registeredOnlyOnceForSameMetricName() {
+ when(metricRegistry.gauge(anyString(), any(Gauge.class), anyString(),
+ anyString(), anyString(), anyInt(), anyInt())).thenReturn(null);
+ TaskMetrics tm = buildTaskMetrics(false);
+
+ tm.spoutAckedTuple(STREAM_ID, 10L);
+ tm.spoutAckedTuple(STREAM_ID, 20L);
+ tm.spoutAckedTuple(STREAM_ID, 30L);
+
+ verify(metricRegistry, times(1)).gauge(
+ contains("__complete-latency"), any(Gauge.class),
+ anyString(), anyString(), anyString(), anyInt(), anyInt());
+ }
+
+ @Test
+ void concurrentEmittedTuple_registersRateCounterExactlyOnce() throws InterruptedException {
+ TaskMetrics tm = buildTaskMetrics(false);
+ int threadCount = 20;
+ CountDownLatch ready = new CountDownLatch(threadCount);
+ CountDownLatch start = new CountDownLatch(1);
+ CountDownLatch done = new CountDownLatch(threadCount);
+
+ ExecutorService pool = Executors.newFixedThreadPool(threadCount);
+ for (int i = 0; i < threadCount; i++) {
+ pool.submit(() -> {
+ ready.countDown();
+ try { start.await(); } catch (InterruptedException ignored) {}
+ tm.emittedTuple(STREAM_ID);
+ done.countDown();
+ });
+ }
+
+ ready.await();
+ start.countDown();
+ assertTrue(done.await(5, TimeUnit.SECONDS));
+ pool.shutdown();
+
+ verify(metricRegistry, times(1)).rateCounter(
+ eq("__emit-count-" + STREAM_ID),
+ anyString(), anyString(), anyInt(), anyInt(), anyString());
+ }
+
+ @Test
+ void concurrentSpoutAckedTuple_registersGaugeExactlyOnce() throws InterruptedException {
+ when(metricRegistry.gauge(anyString(), any(Gauge.class), anyString(),
+ anyString(), anyString(), anyInt(), anyInt())).thenReturn(null);
+ TaskMetrics tm = buildTaskMetrics(false);
+ int threadCount = 20;
+ CountDownLatch ready = new CountDownLatch(threadCount);
+ CountDownLatch start = new CountDownLatch(1);
+ CountDownLatch done = new CountDownLatch(threadCount);
+
+ ExecutorService pool = Executors.newFixedThreadPool(threadCount);
+ for (int i = 0; i < threadCount; i++) {
+ pool.submit(() -> {
+ ready.countDown();
+ try { start.await(); } catch (InterruptedException ignored) {}
+ tm.spoutAckedTuple(STREAM_ID, 100L);
+ done.countDown();
+ });
+ }
+
+ ready.await();
+ start.countDown();
+ assertTrue(done.await(5, TimeUnit.SECONDS));
+ pool.shutdown();
+
+ verify(metricRegistry, times(1)).gauge(
+ contains("__complete-latency"), any(Gauge.class),
+ anyString(), anyString(), anyString(), anyInt(), anyInt());
+ }
+
+ @Test
+ void getOrCreateGauge_sameTypeReusedWithoutThrowing() {
+ when(metricRegistry.gauge(anyString(), any(Gauge.class), anyString(),
+ anyString(), anyString(), anyInt(), anyInt())).thenReturn(null);
+ TaskMetrics tm = buildTaskMetrics(false);
+
+ tm.spoutAckedTuple(STREAM_ID, 10L);
+ tm.spoutAckedTuple(STREAM_ID, 20L);
+
+ verify(metricRegistry, times(1)).gauge(
+ contains("__complete-latency"), any(Gauge.class),
+ anyString(), anyString(), anyString(), anyInt(), anyInt());
+ }
+
+ @Test
+ void boltAckedTuple_metricNameFormat() {
+ when(metricRegistry.gauge(anyString(), any(Gauge.class), anyString(),
+ anyString(), anyString(), anyInt(), anyInt())).thenReturn(null);
+ TaskMetrics tm = buildTaskMetrics(false);
+ String expectedKey = SOURCE_COMP + ":" + STREAM_ID;
+
+ tm.boltAckedTuple(SOURCE_COMP, STREAM_ID, 10L);
+
+ verify(metricRegistry).rateCounter(
+ eq("__ack-count-" + expectedKey),
+ eq(TOPOLOGY_ID), eq(COMPONENT_ID), eq(TASK_ID), eq(WORKER_PORT), eq(STREAM_ID));
+ }
+
+ @Test
+ void boltExecuteTuple_metricNameFormat() {
+ when(metricRegistry.gauge(anyString(), any(Gauge.class), anyString(),
+ anyString(), anyString(), anyInt(), anyInt())).thenReturn(null);
+ TaskMetrics tm = buildTaskMetrics(false);
+ String expectedKey = SOURCE_COMP + ":" + STREAM_ID;
+
+ tm.boltExecuteTuple(SOURCE_COMP, STREAM_ID, 10L);
+
+ verify(metricRegistry).rateCounter(
+ eq("__execute-count-" + expectedKey),
+ eq(TOPOLOGY_ID), eq(COMPONENT_ID), eq(TASK_ID), eq(WORKER_PORT), eq(STREAM_ID));
+ }
+
+ @Test
+ void boltFailedTuple_metricNameFormat() {
+ TaskMetrics tm = buildTaskMetrics(false);
+ String expectedKey = SOURCE_COMP + ":" + STREAM_ID;
+
+ tm.boltFailedTuple(SOURCE_COMP, STREAM_ID);
+
+ verify(metricRegistry).rateCounter(
+ eq("__fail-count-" + expectedKey),
+ eq(TOPOLOGY_ID), eq(COMPONENT_ID), eq(TASK_ID), eq(WORKER_PORT), eq(STREAM_ID));
+ }
+
+ @Test
+ void contextFields_propagatedCorrectlyToRegistry() {
+ TaskMetrics tm = buildTaskMetrics(false);
+
+ tm.emittedTuple(STREAM_ID);
+
+ ArgumentCaptor topoCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor compCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor taskCaptor = ArgumentCaptor.forClass(Integer.class);
+ ArgumentCaptor portCaptor = ArgumentCaptor.forClass(Integer.class);
+
+ verify(metricRegistry).rateCounter(
+ anyString(),
+ topoCaptor.capture(), compCaptor.capture(),
+ taskCaptor.capture(), portCaptor.capture(),
+ anyString());
+
+ assertEquals(TOPOLOGY_ID, topoCaptor.getValue());
+ assertEquals(COMPONENT_ID, compCaptor.getValue());
+ assertEquals(TASK_ID, taskCaptor.getValue());
+ assertEquals(WORKER_PORT, portCaptor.getValue());
+ }
+}