Description
After upgrading from Apache Storm 2.8.0 to 2.8.7, the "Kafka Spouts Lag" panel in the Storm UI stops working for many topologies. The UI displays no lag information for affected spouts, and nimbus.log / ui.log are flooded with ClassCastException warnings (with a null exception message) for every Kafka spout in those topologies.
To Reproduce
- Run a topology containing one or more
KafkaSpout instances.
- Configure at least one of the spout's subscribed topic-partitions so that no offset has been committed yet for the consumer group (e.g., a brand-new consumer group, a passive replication topic, or an idle partition).
- Open the topology page in Storm UI and look at the "Kafka Spouts Lag" panel.
- Observe that the panel shows no data for that spout.
- Inspect the UI/Nimbus logs.
Expected behavior
The lag panel reports per-partition lag for each Kafka spout. Partitions with no committed offset are reported gracefully (e.g., as -1) without breaking the rest of the spout's lag query. No ClassCastException warnings appear in the logs.
Actual behavior
For every affected spout, the following warning is logged on each lag refresh:
TopologySpoutLag [WARN] Exception thrown while getting lag for spout id: <spoutId>
TopologySpoutLag [WARN] Exception message:null
java.lang.ClassCastException
The spoutLagResult for those spouts is missing entirely from the /api/v1/topology/<id>/lag response, and the UI's "Kafka Spouts Lag" panel stays empty.
Identified Root cause
There are two interacting bugs introduced by the Kafka client upgrade from 3.9.0 to 4.x (commit 2958d3f):
- KafkaOffsetLagUtil.getOffsetLags (
storm-kafka-monitor): the Kafka 4 API replaced consumer.committed(TopicPartition) (which returned OffsetAndMetadata or null) with consumer.committed(Set) (which returns a Map). The migration kept the old null-check shape but on the wrong object:
Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata = consumer.committed(Collections.singleton(topicPartition));
long committedOffset = offsetAndMetadata != null ? offsetAndMetadata.get(topicPartition).offset() : -1;
- The map itself is never null, but its value for a partition without a committed offset is null. .offset() on null throws
NullPointerException, which is caught by main().
- TopologySpoutLag.getLagResultForKafka (
storm-core): captures the monitor's stdout and parses it with JSONValue.parseWithException, expecting either a JSON object or a ParseException. However, json-smart parses unquoted plain text leniently as a java.lang.String rather than throwing ParseException. The unchecked cast (Map<String, Object>) parsedResult then throws ClassCastException (with null detail message on some JVMs), which is caught by the outer handler in lag() and surfaces as the warning above. The original error text from the monitor is lost.
In previous Storm version with Kafka client 3.9.0, the committed(TopicPartition) API correctly returned null, the existing null-check worked, and the chain never broke, so this only surfaces after the 4.x upgrade.
Proposed fix
- In KafkaOffsetLagUtil, null-check the value retrieved from the map, not the map itself:
Map<TopicPartition, OffsetAndMetadata> committed = consumer.committed(Collections.singleton(topicPartition));
OffsetAndMetadata partitionOffset = committed.get(topicPartition);
long committedOffset = partitionOffset != null ? partitionOffset.offset() : -1;
- In TopologySpoutLag, defensively handle the case where
JSONValue.parseWithException succeeds but yields a non-Map (e.g., a String from leniently parsed error text). Treat that as a monitor failure and surface the raw stdout as the spout's errorInfo, instead of letting the cast throw.
This way uncommitted partitions report -1 instead of crashing the whole spout's query, and any future monitor error is propagated to the UI as a useful error message rather than silently lost.
Environment
- Apache Storm 2.8.7
- Java 17
Description
After upgrading from Apache Storm 2.8.0 to 2.8.7, the "Kafka Spouts Lag" panel in the Storm UI stops working for many topologies. The UI displays no lag information for affected spouts, and
nimbus.log/ui.logare flooded withClassCastExceptionwarnings (with a null exception message) for every Kafka spout in those topologies.To Reproduce
KafkaSpoutinstances.Expected behavior
The lag panel reports per-partition lag for each Kafka spout. Partitions with no committed offset are reported gracefully (e.g., as -1) without breaking the rest of the spout's lag query. No
ClassCastExceptionwarnings appear in the logs.Actual behavior
For every affected spout, the following warning is logged on each lag refresh:
The spoutLagResult for those spouts is missing entirely from the
/api/v1/topology/<id>/lagresponse, and the UI's "Kafka Spouts Lag" panel stays empty.Identified Root cause
There are two interacting bugs introduced by the Kafka client upgrade from 3.9.0 to 4.x (commit 2958d3f):
storm-kafka-monitor): the Kafka 4 API replaced consumer.committed(TopicPartition) (which returned OffsetAndMetadata or null) with consumer.committed(Set) (which returns a Map). The migration kept the old null-check shape but on the wrong object:NullPointerException, which is caught by main().storm-core): captures the monitor'sstdoutand parses it withJSONValue.parseWithException, expecting either a JSON object or a ParseException. However, json-smart parses unquoted plain text leniently as a java.lang.String rather than throwing ParseException. The unchecked cast (Map<String, Object>) parsedResult then throws ClassCastException (with null detail message on some JVMs), which is caught by the outer handler in lag() and surfaces as the warning above. The original error text from the monitor is lost.In previous Storm version with Kafka client 3.9.0, the
committed(TopicPartition)API correctly returned null, the existing null-check worked, and the chain never broke, so this only surfaces after the 4.x upgrade.Proposed fix
JSONValue.parseWithExceptionsucceeds but yields a non-Map (e.g., a String from leniently parsed error text). Treat that as a monitor failure and surface the raw stdout as the spout's errorInfo, instead of letting the cast throw.This way uncommitted partitions report -1 instead of crashing the whole spout's query, and any future monitor error is propagated to the UI as a useful error message rather than silently lost.
Environment