kafka-consumer(ticdc): tolerate replayed resolved and DDL events (#12596)#12621
kafka-consumer(ticdc): tolerate replayed resolved and DDL events (#12596)#12621ti-chi-bot wants to merge 1 commit intopingcap:release-8.1from
Conversation
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
|
This cherry pick PR is for a release branch and has not yet been approved by triage owners. To merge this cherry pick:
DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
There was a problem hiding this comment.
Code Review
This pull request introduces a Kafka consumer writer for TiCDC, handling message decoding, DDL/DML events, and partition progress. The review identifies a critical data race caused by sharing a stateful decoder across partitions. Further improvements include clarifying DDL fallback log messages and resolving a busy-wait loop in the event flushing logic to optimize CPU usage.
| for i := 0; i < int(o.partitionNum); i++ { | ||
| if err != nil { | ||
| log.Panic("cannot create the decoder", zap.Error(err)) | ||
| } | ||
| w.progresses[i] = newPartitionProgress(int32(i), decoder) | ||
| } |
There was a problem hiding this comment.
A single decoder instance is being shared across all partitions. Decoders, such as simple.Decoder or open.BatchDecoder, are stateful and not safe for concurrent use. Since messages from different partitions can be processed concurrently, this will lead to data races.
A new decoder instance should be created for each partition to ensure thread safety.
for i := 0; i < int(o.partitionNum); i++ {
decoder, err := NewDecoder(ctx, o, db)
if err != nil {
log.Panic("cannot create the decoder", zap.Error(err))
}
w.progresses[i] = newPartitionProgress(int32(i), decoder)
}| log.Warn("DDL CommitTs < maxCommitTsDDL.CommitTs", | ||
| zap.Uint64("commitTs", ddl.CommitTs), | ||
| zap.Uint64("maxCommitTs", w.ddlWithMaxCommitTs.CommitTs), | ||
| zap.String("DDL", ddl.Query)) |
There was a problem hiding this comment.
The log message "DDL CommitTs < maxCommitTsDDL.CommitTs" is a bit confusing as maxCommitTsDDL is not a variable in this context, and it seems to contain a typo. Consider rephrasing for clarity.
| log.Warn("DDL CommitTs < maxCommitTsDDL.CommitTs", | |
| zap.Uint64("commitTs", ddl.CommitTs), | |
| zap.Uint64("maxCommitTs", w.ddlWithMaxCommitTs.CommitTs), | |
| zap.String("DDL", ddl.Query)) | |
| log.Warn("DDL commitTs fallback detected, ignore it", | |
| zap.Uint64("commitTs", ddl.CommitTs), | |
| zap.Uint64("maxCommitTs", w.ddlWithMaxCommitTs.CommitTs), | |
| zap.String("DDL", ddl.Query)) |
| if flushedResolvedTs { | ||
| return | ||
| } |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files
Flags with carried forward coverage won't be shown. Click here to find out more. @@ Coverage Diff @@
## release-8.1 #12621 +/- ##
================================================
Coverage ? 63.4259%
================================================
Files ? 166
Lines ? 14174
Branches ? 0
================================================
Hits ? 8990
Misses ? 4618
Partials ? 566 🚀 New features to boost your workflow:
|
|
@ti-chi-bot: The following test failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
This is an automated cherry-pick of #12596
What problem does this PR solve?
Issue Number: close #12595
What is changed and how it works?
cmd/kafka-consumeras duplicate delivery instead of a fatal errorCheck List
Tests
Questions
Will it cause performance regression or break compatibility?
No. This only makes the standalone Kafka consumer tolerate duplicate MQ delivery in line with TiCDC's at-least-once behavior.
Do you need to update user documentation, design documentation or monitoring documentation?
No.
Release note