feat: Add support for exclusive rollbacks with multi writer#18448
feat: Add support for exclusive rollbacks with multi writer#18448lokeshj1703 wants to merge 6 commits intoapache:masterfrom
Conversation
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for working on this — coordinating rollbacks across multi-writer setups is an important problem. The heartbeat-based approach makes sense conceptually, but I think there's a structural issue in the branching logic: when exclusive rollback is enabled and no pending rollback exists yet (first writer), the code never schedules the rollback plan or starts a heartbeat, so the rollback will always fail. The scheduleRollback + heartbeatClient.start logic in the inner else branch is unreachable under exclusive mode. There's also a heartbeat leak if rollback execution throws. Please see inline comments for details.
| .filter(instant -> EQUALS.test(instant.requestedTime(), commitInstantTime)) | ||
| .findFirst()).isEmpty()) { | ||
| // Assume rollback is already executed since the commit is no longer present in the timeline | ||
| return false; |
There was a problem hiding this comment.
When exclusive rollback is enabled and no pending rollback exists yet (first writer to schedule a rollback for a failed commit), this path falls through without setting rollbackPlanOption or rollbackInstantTimeOpt. Since rollbackPlanOption is initialized to Option.empty(), the method will always throw HoodieRollbackException for first-time rollbacks in exclusive mode. It looks like the scheduling logic (calling scheduleRollback + heartbeatClient.start) needs to be included in this if (config.isExclusiveRollbackEnabled()) branch as well.
| log.error("Cannot find instant {} in the timeline of table {} for rollback", commitInstantTime, config.getBasePath()); | ||
| return false; | ||
| } | ||
| rollbackInstantTimeOpt = suppliedRollbackInstantTime.or(() -> Option.of(createNewInstantTime(false))); |
There was a problem hiding this comment.
This if (config.isExclusiveRollbackEnabled()) check inside the else branch is unreachable — we only enter the else when isExclusiveRollbackEnabled() is false. The heartbeatClient.start() call here will never execute. I suspect this scheduling + heartbeat logic was meant to live in the exclusive-rollback branch above (to handle the case where no pending rollback exists yet).
| } | ||
| if (config.isExclusiveRollbackEnabled()) { | ||
| heartbeatClient.stop(rollbackInstantTimeOpt.get()); | ||
| } |
There was a problem hiding this comment.
If table.rollback() throws an exception after heartbeatClient.start() was called, the heartbeat is never stopped — heartbeatClient.stop() is only on the success path. This could block other writers from attempting the rollback until the heartbeat naturally expires. Could you move the stop() call into a finally block to ensure cleanup on failure?
| heartbeatClient.start(rollbackInstantTimeOpt.get()); | ||
| rollbackPlanOption = pendingRollbackOpt.map(HoodiePendingRollbackInfo::getRollbackPlan); | ||
| } else { | ||
| // Heartbeat is still active for another writer, ignore rollback for now |
There was a problem hiding this comment.
nit: // TODO: ABCDEFGHI revisit return value — looks like a placeholder that slipped in. Worth cleaning up or replacing with a real JIRA ticket reference before merging.
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for addressing the feedback — the two branching bugs (unreachable scheduling logic and dead code in the non-exclusive path) are both fixed, and the new tests cover the key multi-writer scenarios well. One prior finding remains open: heartbeatClient.stop() is still only on the success path, so a rollback failure will leak the heartbeat and block other writers. See inline comment.
There was a problem hiding this comment.
🤖 Line 1239: The heartbeatClient.stop() call is still only on the success path (after return true). If table.rollback() throws, the heartbeat is never stopped, which blocks other writers from attempting the rollback until it naturally expires. This was flagged in the previous review — could you wrap the rollback execution + stop in a try/finally? Something like:
try {
// execute rollback...
} finally {
if (config.isExclusiveRollbackEnabled()) {
heartbeatClient.stop(rollbackInstantTimeOpt.get());
}
}- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
CodeRabbit Walkthrough: This PR introduces an exclusive rollback feature that coordinates multi-writer rollback execution via heartbeat state validation. When enabled, the rollback logic checks for pending rollback info and heartbeat expiry to determine whether to proceed with rollback, defer execution, or skip if the commit is already handled.
Greptile Summary: This PR adds support for exclusive rollbacks in multi-writer mode, introducing a new configuration property hoodie.rollback.enforce.single.rollback.instant (ENABLE_EXCLUSIVE_ROLLBACK). When enabled (along with a multi-writer concurrency mode), the rollback logic uses heartbeat files to coordinate between concurrent writers so that only one writer schedules and executes a rollback plan at a time. A writer that detects an existing rollback plan with an expired heartbeat takes over; one that finds an active heartbeat defers. Five new tests in TestClientRollback cover the core scenarios.
Key changes:
HoodieWriteConfig.java: AddsENABLE_EXCLUSIVE_ROLLBACKconfig property (defaultfalse, advanced) andisExclusiveRollbackEnabled()accessor that requires both the flag and a multi-writer concurrency mode.BaseHoodieTableServiceClient.java: Refactors therollback()method to branch onisExclusiveRollbackEnabled(), coordinating via heartbeats (start/stop/expire-check) within a lock-protected scheduling phase.TestClientRollback.java: AddsbuildExclusiveRollbackConfig()helper and five tests for the new exclusive rollback behaviour.
One compilation error is present (LOG instead of log) and a placeholder TODO comment was left in the code.
Greptile Confidence Score: 3/5
Not safe to merge as-is due to a compilation error (LOG vs log) in the core rollback method.
The feature design is sound and tests are thorough, but LOG.info on line 1187 is undefined — @slf4j generates log (lowercase) and no parent defines LOG. This will fail to compile and is a blocking issue.
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java — compilation error on line 1187 (LOG → log).
Sequence Diagram (CodeRabbit):
sequenceDiagram
actor Client
participant RollbackService as Rollback Service
participant Timeline as Timeline Manager
participant Heartbeat as Heartbeat Client
participant Storage as Storage Layer
Client->>RollbackService: rollback(commitTime)
RollbackService->>Timeline: reloadActiveTimeline()
RollbackService->>Timeline: getPendingRollbackInfo()
alt Pending Rollback + Expired Heartbeat
RollbackService->>Storage: deleteHeartbeatFile()
RollbackService->>Heartbeat: start(rollbackInstantTime)
RollbackService->>Storage: performRollback()
RollbackService->>Heartbeat: stop(rollbackInstantTime)
RollbackService-->>Client: return true
else Pending Rollback + Active Heartbeat
RollbackService-->>Client: return false
else No Pending Rollback
RollbackService->>Timeline: checkCommitExists(commitTime)
alt Commit Exists
RollbackService->>Timeline: scheduleRollback(commitTime)
RollbackService->>Heartbeat: start(rollbackInstantTime)
RollbackService->>Storage: performRollback()
RollbackService->>Heartbeat: stop(rollbackInstantTime)
RollbackService-->>Client: return true
else Commit Absent
RollbackService-->>Client: return false
end
end
Sequence Diagram (Greptile):
sequenceDiagram
participant W1 as Writer 1
participant W2 as Writer 2
participant Lock as TxnManager (Lock)
participant HB as HeartbeatClient
participant TL as Timeline
Note over W1,TL: Exclusive Rollback Enabled (multi-writer mode)
W1->>Lock: beginStateChange()
W1->>TL: reloadActiveTimeline()
W1->>TL: getPendingRollbackInfo(commitTime)
alt No pending rollback and commit present
W1->>HB: start(rollbackInstantTime)
W1->>TL: scheduleRollback(rollbackInstantTime)
W1->>Lock: endStateChange()
W1->>TL: table.rollback(...)
W1->>HB: stop(rollbackInstantTime)
else Pending rollback exists, heartbeat EXPIRED
W1->>HB: deleteHeartbeatFile(rollbackInstantTime)
W1->>HB: start(rollbackInstantTime)
W1->>Lock: endStateChange()
W1->>TL: table.rollback(...)
W1->>HB: stop(rollbackInstantTime)
else Pending rollback exists, heartbeat ACTIVE
Note over W1: Another writer is rolling back
W1->>Lock: endStateChange()
W1-->>W1: return false (skip)
end
Note over W2: Writer 2 attempts same rollback later
W2->>Lock: beginStateChange()
W2->>TL: reloadActiveTimeline()
alt Commit no longer in timeline
W2->>Lock: endStateChange()
W2-->>W2: return false (already rolled back)
end
CodeRabbit: yihua#34 (review)
Greptile: yihua#34 (review)
| String rollbackInstantTime; | ||
| if (pendingRollbackInfo.isPresent()) { | ||
| Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty(); | ||
| Option<String> rollbackInstantTimeOpt; |
There was a problem hiding this comment.
Refresh commitInstantOpt after reloading the timeline.
The exclusive branch rechecks commit presence on the reloaded timeline, but Line 1205 still uses the pre-lock commitInstantOpt. If the first snapshot missed the instant and the refreshed one finds it, this ends up dereferencing an empty option instead of scheduling the rollback.
Proposed fix
Option<HoodieInstant> commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
.filter(instant -> EQUALS.test(instant.requestedTime(), commitInstantTime))
.findFirst());
@@
if (config.isExclusiveRollbackEnabled()) {
// Reload meta client within the lock so that the timeline is latest while executing pending rollback
table.getMetaClient().reloadActiveTimeline();
+ commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
+ .filter(instant -> EQUALS.test(instant.requestedTime(), commitInstantTime))
+ .findFirst());
Option<HoodiePendingRollbackInfo> pendingRollbackOpt = getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
rollbackInstantTimeOpt = pendingRollbackOpt.map(info -> info.getRollbackInstant().requestedTime());
@@
- } else if (Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
- .filter(instant -> EQUALS.test(instant.requestedTime(), commitInstantTime))
- .findFirst()).isEmpty()) {
+ } else if (commitInstantOpt.isEmpty()) {
// Assume rollback is already executed since the commit is no longer present in the timeline
return false;
} else {Also applies to: 1179-1205
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java`
around lines 1164 - 1168, The code may use a stale commitInstantOpt after
reloading the timeline; update the logic in BaseHoodieTableServiceClient so that
after calling table.reloadActiveTimeline() and re-checking commits (the
exclusive branch around the reloaded timeline check), you re-evaluate
commitInstantOpt by re-querying
table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream() (same
filter using EQUALS.test(instant.requestedTime(), commitInstantTime)) before
dereferencing it or scheduling rollback; ensure the subsequent use of
rollbackPlanOption / rollbackInstantTimeOpt uses this refreshed
commitInstantOpt.
— CodeRabbit (original) (source:comment#3067036668)
| LOG.info("Heartbeat expired for rollback instant {}, executing rollback now", rollbackInstantTimeOpt); | ||
| HeartbeatUtils.deleteHeartbeatFile(storage, basePath, rollbackInstantTimeOpt.get(), config); | ||
| heartbeatClient.start(rollbackInstantTimeOpt.get()); | ||
| rollbackPlanOption = pendingRollbackOpt.map(HoodiePendingRollbackInfo::getRollbackPlan); |
There was a problem hiding this comment.
Always stop the rollback heartbeat when this writer started it.
If scheduleRollback(...) or table.rollback(...) throws after heartbeatClient.start(...), the heartbeat never gets stopped. In the exclusive path that leaves a live rollback heartbeat behind, so other writers will keep deferring until it expires.
Proposed fix
Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty();
Option<String> rollbackInstantTimeOpt;
+ boolean startedRollbackHeartbeat = false;
@@
if (heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) {
LOG.info("Heartbeat expired for rollback instant {}, executing rollback now", rollbackInstantTimeOpt);
HeartbeatUtils.deleteHeartbeatFile(storage, basePath, rollbackInstantTimeOpt.get(), config);
heartbeatClient.start(rollbackInstantTimeOpt.get());
+ startedRollbackHeartbeat = true;
rollbackPlanOption = pendingRollbackOpt.map(HoodiePendingRollbackInfo::getRollbackPlan);
@@
rollbackInstantTimeOpt = suppliedRollbackInstantTime.or(() -> Option.of(createNewInstantTime(false)));
heartbeatClient.start(rollbackInstantTimeOpt.get());
+ startedRollbackHeartbeat = true;
rollbackPlanOption = table.scheduleRollback(context, rollbackInstantTimeOpt.get(), commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers(), false);
@@
- HoodieRollbackMetadata rollbackMetadata = commitInstantOpt.isPresent()
- ? table.rollback(context, rollbackInstantTimeOpt.get(), commitInstantOpt.get(), true, skipLocking)
- : table.rollback(context, rollbackInstantTimeOpt.get(), table.getMetaClient().createNewInstant(
- HoodieInstant.State.INFLIGHT, rollbackPlanOption.get().getInstantToRollback().getAction(), commitInstantTime),
- false, skipLocking);
- if (timerContext != null) {
- long durationInMs = metrics.getDurationInMs(timerContext.stop());
- metrics.updateRollbackMetrics(durationInMs, rollbackMetadata.getTotalFilesDeleted());
- }
- if (config.isExclusiveRollbackEnabled()) {
- heartbeatClient.stop(rollbackInstantTimeOpt.get());
- }
- return true;
+ try {
+ HoodieRollbackMetadata rollbackMetadata = commitInstantOpt.isPresent()
+ ? table.rollback(context, rollbackInstantTimeOpt.get(), commitInstantOpt.get(), true, skipLocking)
+ : table.rollback(context, rollbackInstantTimeOpt.get(), table.getMetaClient().createNewInstant(
+ HoodieInstant.State.INFLIGHT, rollbackPlanOption.get().getInstantToRollback().getAction(), commitInstantTime),
+ false, skipLocking);
+ if (timerContext != null) {
+ long durationInMs = metrics.getDurationInMs(timerContext.stop());
+ metrics.updateRollbackMetrics(durationInMs, rollbackMetadata.getTotalFilesDeleted());
+ }
+ return true;
+ } finally {
+ if (config.isExclusiveRollbackEnabled() && startedRollbackHeartbeat && rollbackInstantTimeOpt.isPresent()) {
+ heartbeatClient.stop(rollbackInstantTimeOpt.get());
+ }
+ }Also applies to: 1203-1205, 1230-1241
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java`
around lines 1184 - 1190, The rollback heartbeat started via
heartbeatClient.start(rollbackInstantTimeOpt.get()) in
BaseHoodieTableServiceClient may be left running if subsequent calls (e.g.,
scheduleRollback(...) or table.rollback(...)) throw; wrap the sequence that
starts the heartbeat and then performs rollback-scheduling/execution in a
try/finally (or catch and rethrow) to ensure
heartbeatClient.stop(rollbackInstantTimeOpt.get()) is always called on failure.
Apply the same pattern to the other similar blocks referenced (around the
instances that call heartbeatClient.start(...) at the locations corresponding to
lines ~1203-1205 and ~1230-1241) so any early exception will stop the rollback
heartbeat before propagating the error.
— CodeRabbit (original) (source:comment#3067036671)
| if (pendingRollbackOpt.isPresent()) { | ||
| // If pending rollback and heartbeat is expired, writer should start heartbeat and execute rollback | ||
| if (heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) { | ||
| LOG.info("Heartbeat expired for rollback instant {}, executing rollback now", rollbackInstantTimeOpt); |
There was a problem hiding this comment.
Compilation error:
LOG is undefined — should be log
The class is annotated with @Slf4j, which generates a field named log (lowercase). There is no LOG field defined here or in any parent class (BaseHoodieClient also uses log). This line will fail to compile.
| LOG.info("Heartbeat expired for rollback instant {}, executing rollback now", rollbackInstantTimeOpt); | |
| log.info("Heartbeat expired for rollback instant {}, executing rollback now", rollbackInstantTimeOpt); |
— Greptile (original) (source:comment#3067036686)
| } else { | ||
| // Heartbeat is still active for another writer, ignore rollback for now | ||
| // TODO: ABCDEFGHI revisit return value | ||
| return false; |
There was a problem hiding this comment.
Leftover placeholder TODO comment
The comment // TODO: ABCDEFGHI revisit return value looks like a development-time placeholder that was left in accidentally. Based on the test testExclusiveRollbackDefersToActiveHeartbeat, returning false when another writer's heartbeat is active is the intended and correct behavior. If there is still an open design question here, the TODO should use a proper JIRA issue reference instead of ABCDEFGHI.
| return false; | |
| } else { | |
| // Heartbeat is still active for another writer — skip rollback for now. | |
| // The active writer will complete the rollback; caller should retry later. | |
| return false; |
— Greptile (original) (source:comment#3067036706)
7d831a6 to
ccd0250
Compare
ccd0250 to
c573c65
Compare
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Style & Readability Review — A few readability concerns: logging an Option object instead of its value, a suspicious TODO placeholder, DRY violation with duplicated code, and some test variable naming that could be clearer.
| } | ||
| } else if (Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream() | ||
| .filter(instant -> EQUALS.test(instant.requestedTime(), commitInstantTime)) | ||
| .findFirst()).isEmpty()) { |
There was a problem hiding this comment.
🤖 nit: passing rollbackInstantTimeOpt (an Option) to LOG.info will print 'Option[...]' instead of the value; use .get() or extract the string first for cleaner logs.
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
| rollbackInstantTimeOpt = suppliedRollbackInstantTime.or(() -> Option.of(createNewInstantTime(false))); | ||
| if (config.isExclusiveRollbackEnabled()) { | ||
| heartbeatClient.start(rollbackInstantTimeOpt.get()); | ||
| } |
There was a problem hiding this comment.
🤖 nit: the TODO comment contains 'ABCDEFGHI' which looks like a placeholder; consider replacing with a real issue reference (e.g., HUDI-#### or a meaningful comment).
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
| } | ||
| rollbackPlanOption = table.scheduleRollback(context, rollbackInstantTimeOpt.get(), commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers(), false); | ||
| } | ||
| } finally { |
There was a problem hiding this comment.
🤖 nit: this code duplicates the commit-in-timeline check from lines 1254-1257; consider extracting into a helper method or reusing the earlier result.
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
| } | ||
| } finally { | ||
| if (!skipLocking) { | ||
| txnManager.endStateChange(Option.empty()); |
There was a problem hiding this comment.
🤖 [Line 1306] nit: the comment 'Case where no pending rollback is present,' is incomplete (missing period and context); consider clarifying that exclusive rollback is also disabled here.
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
| * (no heartbeat file present → returns 0L → always expired), the current writer should take ownership | ||
| * and execute the rollback. | ||
| */ | ||
| @Test |
There was a problem hiding this comment.
🤖 nit: test variable names like p1, p2 and id11, id12 are generic; consider more descriptive names like partition1, partition2, fileId1_1 to improve test clarity.
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
| final String p2 = "2016/05/02"; | ||
| final String commitTime1 = "20160501010101"; | ||
| final String commitTime2 = "20160502020601"; | ||
| final String commitTime3 = "20160506030611"; |
There was a problem hiding this comment.
🤖 nit: the double-brace initialization (anonymous class with initializer block) for creating maps is verbose; if Java 9+ is available, consider using Map.of(p1, "id11", p2, "id12") instead.
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for contributing! The exclusive rollback concept is solid, but there are a couple of significant issues in the control flow. Most critically, when exclusive rollback is enabled and no pending rollback plan exists yet (the first-writer-to-rollback case), the code falls through without scheduling a rollback — resulting in a HoodieRollbackException. Relatedly, there's dead code in the non-exclusive branch that checks isExclusiveRollbackEnabled(), suggesting the intended flow got split incorrectly. There's also a heartbeat leak on exception during rollback execution. See inline comments for details.
| .filter(instant -> EQUALS.test(instant.requestedTime(), commitInstantTime)) | ||
| .findFirst()).isEmpty()) { | ||
| // Assume rollback is already executed since the commit is no longer present in the timeline | ||
| return false; |
There was a problem hiding this comment.
🤖 When exclusive rollback is enabled, no pending rollback exists, but the commit IS still in the timeline, neither rollbackPlanOption nor rollbackInstantTimeOpt gets assigned — they remain Option.empty(). The code falls through to rollbackPlanOption.isPresent() which is false, throwing a HoodieRollbackException. This is the first-writer-to-rollback case and it's completely unhandled. It seems like the scheduling logic in the else branch (line 1290–1298) was intended to also be reachable from the exclusive path, but the if/else structure prevents it.
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
| return false; | ||
| } | ||
| rollbackInstantTimeOpt = suppliedRollbackInstantTime.or(() -> Option.of(createNewInstantTime(false))); | ||
| if (config.isExclusiveRollbackEnabled()) { |
There was a problem hiding this comment.
🤖 This if (config.isExclusiveRollbackEnabled()) check is unreachable — we're inside the else branch of if (config.isExclusiveRollbackEnabled()) at line 1268. The heartbeatClient.start(...) call will never execute. This looks related to the missing case above where the exclusive path doesn't schedule a new rollback.
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
| if (config.isExclusiveRollbackEnabled()) { | ||
| heartbeatClient.stop(rollbackInstantTimeOpt.get()); | ||
| } | ||
| return true; |
There was a problem hiding this comment.
🤖 If table.rollback() throws an exception, the heartbeat started at line 1277 is never stopped — the heartbeatClient.stop() at line 1327 is only reached on the success path. A leaked heartbeat timer + file would prevent other writers from performing the rollback (they'd see an active heartbeat). Could you move the stop() into a finally block?
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
| HeartbeatUtils.deleteHeartbeatFile(storage, basePath, rollbackInstantTimeOpt.get(), config); | ||
| heartbeatClient.start(rollbackInstantTimeOpt.get()); | ||
| rollbackPlanOption = pendingRollbackOpt.map(HoodiePendingRollbackInfo::getRollbackPlan); | ||
| } else { |
There was a problem hiding this comment.
🤖 HeartbeatUtils.deleteHeartbeatFile(storage, basePath, rollbackInstantTimeOpt.get(), config) only deletes if config.getFailedWritesCleanPolicy().isLazy(). Is this intentional for the exclusive rollback use case? If the policy isn't lazy, the stale heartbeat file from the previous writer won't be deleted before heartbeatClient.start() overwrites it. Might be fine since start() overwrites, but wanted to confirm the intent.
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
| // TODO: ABCDEFGHI revisit return value | ||
| return false; | ||
| } | ||
| } else if (Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream() |
There was a problem hiding this comment.
🤖 // TODO: ABCDEFGHI revisit return value — looks like a placeholder that shouldn't go into the final PR.
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Style & Readability Review — A few actionable suggestions: the log message should extract the Option value for clarity, one check duplicates logic from earlier in the method, a comment needs completion, and the config property name doesn't match the semantic intent of its key.
| if (pendingRollbackOpt.isPresent()) { | ||
| // If pending rollback and heartbeat is expired, writer should start heartbeat and execute rollback | ||
| if (heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) { | ||
| LOG.info("Heartbeat expired for rollback instant {}, executing rollback now", rollbackInstantTimeOpt); |
There was a problem hiding this comment.
🤖 nit: the log message passes rollbackInstantTimeOpt (an Option object) directly—it will print as Optional[...] instead of the timestamp. Extract the value with .get() or a variable for clearer debugging.
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
| // TODO: ABCDEFGHI revisit return value | ||
| return false; | ||
| } | ||
| } else if (Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream() |
There was a problem hiding this comment.
🤖 nit: this check duplicates the lookup logic from lines 1249–1251—could you just use commitInstantOpt.isEmpty() instead to avoid code duplication and improve maintainability?
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
| return false; | ||
| } | ||
| } else { | ||
| // Case where no pending rollback is present, |
There was a problem hiding this comment.
🤖 nit: the comment on line 1288 ends with a comma and looks incomplete—could you finish the thought, e.g., 'Case where no pending rollback is present, use non-exclusive mode'?
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
| .withDocumentation("Enables a more efficient mechanism for rollbacks based on the marker files generated " | ||
| + "during the writes. Turned on by default."); | ||
|
|
||
| public static final ConfigProperty<String> ENABLE_EXCLUSIVE_ROLLBACK = ConfigProperty |
There was a problem hiding this comment.
🤖 nit: the constant name ENABLE_EXCLUSIVE_ROLLBACK doesn't match the semantic intent of the config key hoodie.rollback.enforce.single.rollback.instant—could you rename one to be consistent, e.g., ENFORCE_EXCLUSIVE_ROLLBACK?
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for contributing this multi-writer exclusive rollback feature! The heartbeat-based coordination approach makes sense. However, I found a critical logic gap and a resource leak that should be addressed before merging — see inline comments for details.
| rollbackInstantTimeOpt = pendingRollbackOpt.map(info -> info.getRollbackInstant().requestedTime()); | ||
| if (pendingRollbackOpt.isPresent()) { | ||
| // If pending rollback and heartbeat is expired, writer should start heartbeat and execute rollback | ||
| if (heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) { |
There was a problem hiding this comment.
🤖 When isExclusiveRollbackEnabled() is true and no pending rollback exists but the commit IS in the timeline, this block falls through with rollbackPlanOption still Option.empty() and rollbackInstantTimeOpt also empty. No rollback plan is ever scheduled, so the code hits the else branch at line ~1297 and throws HoodieRollbackException. This is the "first writer" scenario — someone needs to create the rollback plan.
The else branch below (line ~1288) has a dead isExclusiveRollbackEnabled() check that suggests the scheduling logic was intended to be shared, but due to the if/else structure it's unreachable when exclusive rollback is enabled.
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
| metrics.updateRollbackMetrics(durationInMs, rollbackMetadata.getTotalFilesDeleted()); | ||
| } | ||
| if (config.isExclusiveRollbackEnabled()) { | ||
| heartbeatClient.stop(rollbackInstantTimeOpt.get()); |
There was a problem hiding this comment.
🤖 If table.rollback() throws an exception, heartbeatClient.stop() is never called. The heartbeat timer thread keeps running and the heartbeat file remains on storage, which blocks other writers from taking ownership of this rollback until the heartbeat naturally expires. Could you wrap the rollback execution + heartbeat stop in a try/finally so the heartbeat is always cleaned up?
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
| } | ||
| } else if (Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream() | ||
| .filter(instant -> EQUALS.test(instant.requestedTime(), commitInstantTime)) | ||
| .findFirst()).isEmpty()) { |
There was a problem hiding this comment.
🤖 This TODO (// TODO: ABCDEFGHI revisit return value) looks like a development placeholder that shouldn't be merged. Could you either resolve it or replace with a proper tracking reference (e.g., a JIRA link)?
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
| } | ||
| rollbackInstantTimeOpt = suppliedRollbackInstantTime.or(() -> Option.of(createNewInstantTime(false))); | ||
| if (config.isExclusiveRollbackEnabled()) { | ||
| heartbeatClient.start(rollbackInstantTimeOpt.get()); |
There was a problem hiding this comment.
🤖 This isExclusiveRollbackEnabled() check is unreachable — it's inside the else branch of if (config.isExclusiveRollbackEnabled()), so it can never be true. This appears to be a copy-paste issue and is related to the missing rollback scheduling path for the exclusive rollback case when no pending rollback exists.
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
| // there is a pending rollback info available | ||
| rollbackPlanOption = Option.of(pendingRollbackInfo.get().getRollbackPlan()); | ||
| rollbackInstantTime = pendingRollbackInfo.get().getRollbackInstant().requestedTime(); | ||
| rollbackInstantTimeOpt = Option.of(pendingRollbackInfo.get().getRollbackInstant().requestedTime()); |
There was a problem hiding this comment.
🤖 When exclusive rollback is enabled, the old fast-path that skips locking (when pendingRollbackInfo is present) is now bypassed — the code always takes the lock and re-fetches pending rollback info from the timeline. Is this intentional? It means callers like rollbackFailedWrites() that already looked up pendingRollbackInfo will have that value ignored, and the timeline reload + getPendingRollbackInfo() call inside the lock becomes the source of truth. Worth documenting this behavioral change.
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
| } | ||
| } else { | ||
| // Case where no pending rollback is present, | ||
| if (commitInstantOpt.isEmpty()) { |
There was a problem hiding this comment.
lets move this to L1253 and bail out early.
and then, we can tackle just 2 cases. where the pending rollback info is already present. or we need to schedule rollback.
this feedback is even for code outside of your changes.
the branching it bit confusing currently.
| // If pending rollback and heartbeat is expired, writer should start heartbeat and execute rollback | ||
| if (heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) { | ||
| LOG.info("Heartbeat expired for rollback instant {}, executing rollback now", rollbackInstantTimeOpt); | ||
| HeartbeatUtils.deleteHeartbeatFile(storage, basePath, rollbackInstantTimeOpt.get(), config); |
There was a problem hiding this comment.
we should not be needing to meddle w/ heart beat here. heart beat is just for execution.
for planning, this is what we wanted to do.
if isExclusiveRollbackEnabled
a. ensure we are within lock.
b. reload timeline.
c. if pendingRollback is empty from callsite 1. check for pending rollback again. if we find one, we should fetch it. if not, go ahead so that eventually we will schedule rollback.
1. if we do not find one, fall through. eventually we will schedule rollback
Outside of isExclusiveRollbackEnabled, lets also do this.
again check for commit to rollback(w/ reloaded timeline). if it was already removed from timeline, return early from the method.
| HoodieRollbackMetadata rollbackMetadata = commitInstantOpt.isPresent() | ||
| ? table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true, skipLocking) | ||
| : table.rollback(context, rollbackInstantTime, table.getMetaClient().createNewInstant( | ||
| ? table.rollback(context, rollbackInstantTimeOpt.get(), commitInstantOpt.get(), true, skipLocking) |
There was a problem hiding this comment.
for rollback execution, we should do something like below.
boolean isMultiWriter = config.getWriteConcurrencyMode().supportsMultiWriter();
if (isMultiWriter and ) {
acquire lock
validate heart beat. i.e. if heart beat is expired, throw exception.
reload timeline
if rollback is already completed, throw.
start heart beat for rollback instant.
release lock
.
.
execute rollback.
finally once rollback is completed, w/n finally block
if multi writer, stop heart beat.
btw, this change is irrespective of isExclusiveRollbackEnabled config.
| + "during the writes. Turned on by default."); | ||
|
|
||
| public static final ConfigProperty<String> ENABLE_EXCLUSIVE_ROLLBACK = ConfigProperty | ||
| .key("hoodie.rollback.enforce.single.rollback.instant") |
There was a problem hiding this comment.
we should always align variable name w/ config key
| return getBoolean(ROLLBACK_USING_MARKERS_ENABLE); | ||
| } | ||
|
|
||
| public boolean isExclusiveRollbackEnabled() { |
There was a problem hiding this comment.
same here. lets fix the naming across
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the thorough rework — the SCHEDULE/EXECUTION phase split reads much cleaner, and all four prior findings plus nsivabalan's structural/naming feedback look addressed:
- Missing schedule path for exclusive rollback (prior L1271) → fixed by Case 2b.
- Heartbeat leak on rollback failure (prior L1323) → fixed via
emittingHeartbeat+ try/finally around execution. - Placeholder TODO (prior L1283) → removed.
- Unreachable
isExclusiveRollbackEnabled()check (prior L1295) → gone with the restructure. - Lock-skip behavioral change for
pendingRollbackInfo(prior L1258) → now consistent: Case 1 always skips the schedule lock, and exclusive rollback takes its own lock in the execution phase. - Heartbeat moved to execution-only, multi-writer shape,
ROLLBACK_ENFORCE_SINGLE_INSTANT/isEnforceSingleRollbackInstant()naming → all aligned with nsivabalan's suggestions.
One inline comment below on the execution-phase validation that's worth double-checking before merge. Please take a look at the inline comment, and this should be ready for a Hudi committer or PMC member to take it from here.
| heartbeatClient.start(rollbackInstantTimeOpt.get()); | ||
| emittingHeartbeat = true; | ||
| } finally { | ||
| if (!skipLocking) { |
There was a problem hiding this comment.
🤖 Could this validation miss a stale-plan case? hasPendingRollback checks whether any pending rollback exists for commitInstantTime, not specifically the one in rollbackInstantTimeOpt. In Case 1 (caller-supplied pendingRollbackInfo), if that rollback was already completed by another writer and a fresh pending rollback was scheduled against the same commit, we'd pass both hasPendingRollback (the new one) and heartbeat-expired checks on our stale instant, then call table.rollback() with the stale rollbackPlanOption. Would it be safer to verify the pending rollback found here matches rollbackInstantTimeOpt.get() before taking ownership?
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Nice cleanup — removing the redundant HeartbeatUtils.deleteHeartbeatFile(...) before heartbeatClient.start(...) is safe, since start() → updateHeartbeat() opens the file with overwrite=true and will replace any stale content with a fresh timestamp. No new issues flagged from this incremental pass. My prior question on the line 1321 validation (whether hasPendingRollback should match the specific rollbackInstantTimeOpt rather than just the commit instant) is in unchanged code and remains open — worth a look before merge, but it's a corner-case question, not a blocker. Please take a look at any remaining inline comments, and this should be ready for a Hudi committer or PMC member to take it from here.
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
CodeRabbit Walkthrough: This PR refactors the rollback mechanism into distinct schedule and execution phases with heartbeat-based coordination for single-instant enforcement in multi-writer mode. It introduces a new configuration property and corresponding tests for exclusive rollback behavior.
Sequence Diagram (CodeRabbit):
sequenceDiagram
participant Writer as Client/Writer
participant TxnMgr as Transaction Manager
participant Timeline as Timeline/Metadata
participant Heartbeat as Heartbeat Coordinator
participant Executor as Rollback Executor
rect rgba(100, 150, 200, 0.5)
Note over Writer,Executor: Schedule Phase
Writer->>TxnMgr: Acquire state-change lock (if !skipLocking)
TxnMgr-->>Writer: Lock acquired
Writer->>Timeline: Reload active timeline
Timeline-->>Writer: Current state
alt Pending rollback exists
Writer->>Timeline: Reuse pending rollback instant
else
Writer->>Timeline: Validate commit instant exists
Timeline-->>Writer: Commit confirmed/not found
alt Commit not in timeline
Writer-->>Writer: Return false (abort)
else
Writer->>Timeline: Schedule new rollback instant
Timeline-->>Writer: Rollback instant created
end
end
Writer->>Timeline: Plan rollback operations
Timeline-->>Writer: Plan ready
end
rect rgba(150, 100, 200, 0.5)
Note over Writer,Executor: Execution Phase
alt Single-instant enforcement enabled
Writer->>Heartbeat: Check for active competing heartbeat
Heartbeat-->>Writer: Heartbeat status
alt Heartbeat active
Writer-->>Writer: Return false (abort)
else
Writer->>Heartbeat: Start heartbeat for this rollback
Heartbeat-->>Writer: Heartbeat started
end
end
Writer->>Executor: Execute rollback plan
Executor->>Timeline: Perform rollback operations
Timeline-->>Executor: Operations complete
Executor-->>Writer: Rollback done
alt Heartbeat was started
Writer->>Heartbeat: Stop heartbeat (in finally)
end
Writer-->>Writer: Return true (success)
end
CodeRabbit: hudi-agent#9 (review)
| } | ||
| // Take ownership: delete any stale heartbeat file and start emitting. | ||
| heartbeatClient.start(rollbackInstantTimeOpt.get()); | ||
| emittingHeartbeat = true; |
There was a problem hiding this comment.
Refresh commitInstantOpt after the execution-phase timeline reload.
After Line 1308 you detect whether the commit is still present, but Lines 1334-1338 still branch on the older commitInstantOpt snapshot. If another writer deleted the instant files before this writer takes over, this stale value can force deleteInstants=true and bypass the partial-rollback recovery path.
🐛 Minimal fix
table.getMetaClient().reloadActiveTimeline();
+ commitInstantOpt = Option.fromJavaOptional(
+ table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
+ .filter(instant -> EQUALS.test(instant.requestedTime(), commitInstantTime))
+ .findFirst());
boolean hasPendingRollback = getPendingRollbackInfo(table.getMetaClient(), commitInstantTime).isPresent();
- boolean commitStillPresent = table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
- .anyMatch(instant -> EQUALS.test(instant.requestedTime(), commitInstantTime));
+ boolean commitStillPresent = commitInstantOpt.isPresent();Also applies to: 1334-1338
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java`
around lines 1308 - 1318, The snapshot of commitInstantOpt must be refreshed
after reloading the timeline to avoid using a stale instant; after calling
table.getMetaClient().reloadActiveTimeline() (in BaseHoodieTableServiceClient
around the commit takeover logic), re-resolve commitInstantOpt from the active
timeline based on commitInstantTime (so subsequent checks and the deleteInstants
branch use the up-to-date instant presence), then proceed with
getPendingRollbackInfo(...), heartbeatClient.start(...), and setting
emittingHeartbeat; update any later uses (e.g., the decision block that sets
deleteInstants around the 1334-1338 area) to use this refreshed
commitInstantOpt.
— CodeRabbit (original) (source:comment#3114078428)
5df7d14 to
8872de6
Compare
8872de6 to
fd75ecc
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #18448 +/- ##
===========================================
Coverage 68.85% 68.85%
- Complexity 28235 28468 +233
===========================================
Files 2460 2475 +15
Lines 135256 136525 +1269
Branches 16391 16595 +204
===========================================
+ Hits 93125 94006 +881
- Misses 34761 34963 +202
- Partials 7370 7556 +186
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Nice refactor extracting the schedule and heartbeat-acquire logic into helpers, and the new completed-rollback check in acquireRollbackHeartbeatIfMultiWriter addresses the stale-plan question from the prior review. nsivabalan's feedback about moving heartbeat out of the schedule phase has been incorporated; the config naming comments on HoodieWriteConfig (L303, L1587) don't appear in this diff slice so I can't verify them here. A couple of things worth double-checking in the inline comments — specifically whether commitInstantOpt gets refreshed in the right scope after the extraction, and whether the unconditional heartbeatClient.stop in the finally can run when this writer never acquired the heartbeat. Please take a look at any inline comments, and this should be ready for a Hudi committer or PMC member to take it from here.
| } | ||
| if (commitInstantOpt.isEmpty()) { | ||
| log.error("Cannot find instant {} in the timeline of table {} for rollback", commitInstantTime, config.getBasePath()); | ||
| return Option.empty(); |
There was a problem hiding this comment.
🤖 Reassigning the commitInstantOpt parameter here only updates the local variable — the caller's commitInstantOpt still holds the pre-reload value and is what gets passed to table.rollback(..., commitInstantOpt.get(), ...) in the execution phase. The old inlined code refreshed in main-method scope, so the rollback call saw the reloaded instant. Could you return the refreshed instant as part of the result (or otherwise propagate it) so the execution phase doesn't use a stale HoodieInstant state?
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
| throw new HoodieRollbackException("Failed to rollback " + config.getBasePath() + " commits " + commitInstantTime); | ||
| } | ||
| } finally { | ||
| if (isMultiWriter) { |
There was a problem hiding this comment.
🤖 The finally calls heartbeatClient.stop(rollbackInstantOpt.get().requestedTime()) whenever isMultiWriter is true, including when acquireRollbackHeartbeatIfMultiWriter returned false (another writer owns the live heartbeat) or when rollbackPlanOption was empty and we never attempted acquire. Depending on how stop is implemented (does it just stop a local timer, or does it delete the heartbeat file on storage?), this could stomp on a heartbeat owned by another writer. Could you gate the stop on a flag set only when acquireRollbackHeartbeatIfMultiWriter actually succeeded — similar to the old emittingHeartbeat flag?
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
| if (pendingRollbackOpt.isPresent()) { | ||
| // Case 2a: a concurrent writer already scheduled the rollback — re-use it. | ||
| return Option.of(Pair.of(pendingRollbackOpt.get().getRollbackInstant(), | ||
| Option.of(pendingRollbackOpt.get().getRollbackPlan()))); |
There was a problem hiding this comment.
🤖 The reload-timeline-and-check-pending-rollback step inside the lock is now gated on isEnforceSingleRollbackInstant(). In the old code this happened unconditionally in the else branch. Is skipping it intentional for the non-enforce path? In multi-writer mode without enforcement, two writers could both miss a concurrent pending rollback and each schedule one for the same commit.
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Describe the issue this Pull Request addresses
#18050
Summary and Changelog
The PR adds support for exclusive rollback with multi writer enabled. The rollback plan takes a lock w/ this patch and the rollback execution emits a heartbeat. The writer executing rollback would start heartbeat and complete the rollback execution if no heartbeat is present or existing heartbeat is expired. The other writers would check the existence of heartbeat and skip rollback execution in case heartbeat is present.
Impact
Adds support for exclusive rollbacks with multi writer and adds a config
hoodie.rollback.enforce.single.rollback.instantto enable the same. The config by default is false.Risk Level
Low
Documentation Update
Adds support for exclusive rollbacks with multi writer and adds a config
hoodie.rollback.enforce.single.rollback.instantto enable the same. The config by default is false.Contributor's checklist