Skip to content

[feature](RoutineLoad) Support the Amazon Kinesis#61325

Open
0AyanamiRei wants to merge 43 commits intoapache:masterfrom
0AyanamiRei:feature-routineload-AWS_Kinesis
Open

[feature](RoutineLoad) Support the Amazon Kinesis#61325
0AyanamiRei wants to merge 43 commits intoapache:masterfrom
0AyanamiRei:feature-routineload-AWS_Kinesis

Conversation

@0AyanamiRei
Copy link
Copy Markdown
Contributor

@0AyanamiRei 0AyanamiRei commented Mar 14, 2026

What problem does this PR solve?

Issue Number: close #xxx

Related PR: this pr should merge after #62184

Problem Summary:

support the Amazon Kinesis for routine load.

CREATE ROUTINE LOAD [db_name.]job_name ON table_name
[load_properties]
[job_properties]
FROM KINESIS
(
    "aws.region" = "your_region",
    "kinesis_stream" = "your_stream_name",
    "aws.access_key" = "your_access_key",
    "aws.secret_key" = "your_secret_key"
);

compare AWS Kinesis with Kafka:

Kinesis Stream <=> Kafka Topic
shards <=> partition

doc pr:apache/doris-website#3521

Release note

None

Check List (For Author)

  • Test

    • Regression test
    • Unit Test
    • Manual test (add detailed scripts or steps below)
    • No need to test or manual test. Explain why:
      • This is a refactor/code format and no logic has been changed.
      • Previous test can cover this change.
      • No code files have been changed.
      • Other reason
  • Behavior changed:

    • No.
    • Yes.
  • Does this need documentation?

    • No.
    • Yes.

Check List (For Reviewer who merge this PR)

  • Confirm the release note
  • Confirm test cases
  • Confirm document
  • Add branch pick label

@hello-stephen
Copy link
Copy Markdown
Contributor

Thank you for your contribution to Apache Doris.
Don't know what should be done next? See How to process your PR.

Please clearly describe your PR:

  1. What problem was fixed (it's best to include specific error reporting information). How it was fixed.
  2. Which behaviors were modified. What was the previous behavior, what is it now, why was it modified, and what possible impacts might there be.
  3. What features were added. Why was this function added?
  4. Which code was refactored and why was this part of the code refactored?
  5. Which functions were optimized and what is the difference before and after the optimization?

@0AyanamiRei
Copy link
Copy Markdown
Contributor Author

run buildall

@hello-stephen
Copy link
Copy Markdown
Contributor

Cloud UT Coverage Report

Increment line coverage 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 79.24% (1798/2269)
Line Coverage 64.56% (32298/50026)
Region Coverage 65.44% (16166/24702)
Branch Coverage 55.88% (8615/15416)

@hello-stephen
Copy link
Copy Markdown
Contributor

FE UT Coverage Report

Increment line coverage 1.15% (10/866) 🎉
Increment coverage report
Complete coverage report

@liaoxin01
Copy link
Copy Markdown
Contributor

/review

Copy link
Copy Markdown
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found 3 correctness issues in this PR.

  1. be/src/load/stream_load/stream_load_executor.cpp: the Kinesis error path does not reset ctx->kinesis_info->cmt_sequence_number when plan execution fails. KinesisDataConsumerGroup::start_all() has already copied the last consumed sequence numbers into the context before the fragment/txn result is known, so a failed attempt can leave advanced progress in memory for the retried task. Kafka explicitly rewinds here; Kinesis needs the same protection to avoid skipping records after an aborted batch.

  2. fe/fe-core/src/main/java/org/apache/doris/load/routineload/KinesisRoutineLoadJob.java: cloud mode is not blocked, but the cloud routine-load transaction path is still Kafka-only. RoutineLoadManager can create Kinesis jobs in cloud mode, KinesisTaskInfo sets cloudCluster, and then TxnUtil.rlTaskTxnCommitAttachmentToPb() still casts attachment.getProgress() to KafkaProgress. The first cloud Kinesis commit will therefore fail with a ClassCastException instead of persisting progress.

  3. regression-test/conf/regression-conf.groovy: the new load_p0/kinesis_routine_load directory is added to excludeDirectories, so the entire new regression suite is skipped by default. That leaves the feature effectively untested in CI even though the PR adds many Kinesis cases.

Critical checkpoint conclusions:

  • Goal of the task / correctness / proof: The PR clearly aims to add end-to-end Kinesis routine-load support, but the current code does not fully achieve that because retry safety and cloud-mode behavior are still broken. Tests were added, but the regression config currently excludes them.
  • Small / clear / focused: Not fully. This is a broad cross-layer feature (FE, BE, thrift/proto, persistence, tests), so the risk is naturally high and missing parity with existing Kafka paths matters.
  • Concurrency: The feature introduces new consumer-group and concurrent progress-tracking paths. I did not find a primary lock-order bug in the reviewed hunks, but the retry/reset issue shows the lifecycle between consumer progress and transaction failure is not yet safe.
  • Lifecycle / initialization: No static initialization issue identified in the reviewed code.
  • Configuration items: New Kinesis properties are added. I did not validate dynamic-config behavior because these are routine-load job properties rather than mutable process configs.
  • Compatibility changes: Yes. New thrift/proto enums and structs are introduced. The most concrete compatibility/runtime gap I found is the cloud transaction attachment path remaining Kafka-specific.
  • Parallel code paths: Yes. Kafka already resets progress on failure and has a complete cloud progress path; Kinesis needs the same treatment.
  • Special conditional checks: The explicit Config.isCloudMode() branches in Kinesis code currently lead to incomplete behavior and should either be implemented fully or rejected early.
  • Test coverage: Functional tests were added, including restart/error scenarios, but they are excluded in regression-conf.groovy, so effective coverage is currently missing.
  • Observability: Basic logs are present; no major observability blocker found in the reviewed paths.
  • Transaction / persistence: Yes, this feature touches transaction commit attachments and persisted progress. The cloud attachment serialization path is currently incorrect for Kinesis.
  • Data write / modification: Yes. Retry safety is not guaranteed because failed attempts can retain advanced Kinesis progress.
  • FE/BE variable passing: The new FE/BE Kinesis progress structures are mostly wired, but cloud-specific passing is incomplete.
  • Performance: I did not identify a primary performance regression worth blocking on relative to the correctness issues above.
  • Other issues: No additional blocker beyond the three findings above.

Because of the issues above, I cannot consider the current implementation correct yet.

@0AyanamiRei
Copy link
Copy Markdown
Contributor Author

/review

Copy link
Copy Markdown
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Findings:

  1. be/src/load/routine_load/kinesis_conf.cpp: timestamp-based start positions are sent as AFTER_SEQUENCE_NUMBER, so FE datetime inputs cannot work.
  2. fe/fe-core/src/main/java/org/apache/doris/load/routineload/kinesis/KinesisDataSourceProperties.java: custom endpoint support is unreachable because no SQL property can populate endpoint.
  3. fe/fe-core/src/main/java/org/apache/doris/load/routineload/KinesisRoutineLoadJob.java: SHOW CREATE ROUTINE LOAD emits Kinesis property names that the parser does not accept, so the statement is not round-trippable.
  4. fe/fe-core/src/main/java/org/apache/doris/load/routineload/KinesisRoutineLoadJob.java: ALTER on paused jobs can leave customKinesisShards stale, so resume may schedule shards from the old stream/shard set.

Critical checkpoints:

  • Goal of current task: add end-to-end Kinesis routine load support. Conclusion: not fully met because the four issues above break valid start-position, endpoint, show-create, and alter/resume flows.
  • Modification size/focus: mostly focused on Kinesis paths, but the remaining gaps are in core user-visible flows.
  • Concurrency: reviewed new BE consumer-group/queue paths; no definite new deadlock or lock-order bug found in the inspected code.
  • Lifecycle/static initialization: no cross-TU static initialization issue found in the reviewed paths.
  • Configuration items: applicable. Conclusion: endpoint configuration is declared in FE/BE but not actually plumbed from SQL; timestamp config semantics are also inconsistent across FE and BE.
  • Incompatible/protocol changes: applicable. Conclusion: FE/BE thrift/proto additions exist, but FE-to-BE semantics are incomplete for timestamp-based positions.
  • Functionally parallel paths: applicable. Conclusion: Kafka/Kinesis split is explicit; no additional mirrored-path bug confirmed beyond the Kinesis-specific issues above.
  • Special conditional checks: no major issue beyond the property-name/position handling bugs.
  • Test coverage: insufficient for this feature. Missing coverage for timestamp start positions, custom endpoint configuration, SHOW CREATE round-trip, and ALTER + resume with changed explicit shards.
  • Observability: new BE metrics are a positive addition; no blocking observability gap found.
  • Transaction/persistence: no confirmed shared-nothing EditLog/replay omission found in the reviewed path.
  • Data writes/modifications: not a storage-engine write-path change, but routine-load job state/progress can become incorrect after ALTER.
  • FE-BE variable passing: incomplete for timestamp start-position semantics and endpoint parsing.
  • Performance: no primary performance blocker identified relative to the correctness issues.
  • Other issues: none beyond the findings above.

Summary opinion: the PR should not be merged as-is because these issues affect correctness and operability of the new Kinesis routine load feature.

@0AyanamiRei 0AyanamiRei requested a review from zclllyybb as a code owner April 7, 2026 18:36
@0AyanamiRei
Copy link
Copy Markdown
Contributor Author

run buildall

@hello-stephen
Copy link
Copy Markdown
Contributor

FE UT Coverage Report

Increment line coverage 15.05% (135/897) 🎉
Increment coverage report
Complete coverage report

@0AyanamiRei
Copy link
Copy Markdown
Contributor Author

/review

Copy link
Copy Markdown
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found 2 correctness issues.

  1. be/src/load/stream_load/stream_load_context.h: Kinesis progress is initialized/reset to the requested begin sequence number, but BE later resumes numeric positions with AFTER_SEQUENCE_NUMBER. If a task commits without consuming any records from a shard, the next task will skip the first requested record instead of retrying from it.
  2. be/src/load/routine_load/data_consumer.cpp: shard discovery uses a single ListShards call and ignores pagination. Streams whose shard list spans multiple pages will silently lose later shards from FE scheduling.

Critical checkpoint conclusions:

  • Goal of current task: Adds Kinesis routine load support end to end. The implementation is close, but these two bugs mean it does not yet reliably preserve correct consumption progress and full shard discovery.
  • Modification size/focus: Broad but still centered on routine-load Kinesis support.
  • Concurrency: Reviewed FE job locking and BE consumer-group threading; I did not find a lock-order or thread-safety bug in the touched paths.
  • Lifecycle/static initialization: No static initialization issue found. Consumer/task lifecycle is understandable, but the Kinesis progress lifecycle has the bug above.
  • Configuration items: No new Doris config item requiring dynamic-config review.
  • Incompatible changes: FE/BE thrift/proto additions are additive; no storage-format incompatibility found in the supported shared-nothing path.
  • Functionally parallel paths: Kafka remains the parallel path. Kinesis should mirror Kafka's progress semantics more closely; currently the numeric-sequence resume path diverges incorrectly.
  • Special conditional checks: No extra issue beyond the two reported logic branches.
  • Test coverage: FE unit tests and regression cases were added, but coverage is still missing for explicit numeric start positions with an empty first batch and for paginated shard discovery.
  • Observability: Added metrics/logging are adequate for the new BE path.
  • Transaction/persistence: Shared-nothing routine-load attachment wiring is present for the supported mode; no additional supported-mode edit-log gap was confirmed in this review.
  • Data writes/modifications: Not safe yet because the explicit-sequence resume bug can skip data.
  • FE/BE variable passing: The new variables are wired through the supported FE/BE path.
  • Performance: No major hot-path performance regression identified beyond the correctness issue in shard discovery.
  • Other issues: None beyond the 2 findings above.

Overall opinion: not ready to merge until these correctness issues are fixed.

Copy link
Copy Markdown
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found 5 issues that should be addressed before this feature is considered correct end-to-end.

  1. SHOW CREATE ROUTINE LOAD prints Kinesis property keys that the analyzer does not accept, so the generated SQL cannot recreate the job.
  2. BE advances committed Kinesis sequence numbers before records are durably handed off, which can skip unread rows at batch boundaries.
  3. BE shard discovery ignores ListShards pagination and can miss shards on larger streams.
  4. Explicit shard start sequence numbers are treated as exclusive (AFTER_SEQUENCE_NUMBER), so the user-specified first record is skipped.
  5. The cloud routine-load txn attachment protobuf path is still Kafka-only, so Kinesis progress is not serialized/replayed correctly there.

Critical checkpoint conclusions:

  • Goal of current task: Partially achieved. The PR adds FE/BE Kinesis routine-load plumbing and tests, but the issues above mean core correctness and operability are still incomplete.
  • Small, clear, focused change: No. The change spans FE, BE, thrift/proto, persistence, metrics, thirdparty, and regression, which increases the need for exhaustive path parity checks.
  • Concurrency: Not safe yet. The BE consumer/queue/progress interaction can commit progress ahead of actual ingestion at task stop boundaries.
  • Lifecycle / static initialization: No special lifecycle or static-init issue stood out in the touched code.
  • Configuration items: New Kinesis/AWS properties were added, but property naming is inconsistent between emitted SQL and accepted SQL.
  • Incompatible / protocol changes: Not fully handled. New routine-load protocol/persistence paths were added, but the cloud protobuf attachment path remains Kafka-only.
  • Parallel code paths: Not fully updated. Shared-nothing thrift handling supports Kinesis, while the cloud transaction serialization path does not.
  • Special conditional checks: The new shard/open-shard assumptions rely on BE behavior that still has edge-case gaps such as paginated discovery.
  • Test coverage: Incomplete. The added tests do not cover SHOW CREATE round-trip, explicit sequence-number semantics, batch-boundary progress safety, or cloud attachment serialization.
  • Observability: Acceptable but incomplete for debugging skipped-record scenarios; the new metrics help, but they do not prevent the correctness bug above.
  • Transaction / persistence: Not fully correct because Kinesis routine-load progress is not wired through the cloud protobuf attachment path.
  • Data writes / modifications: Not fully correct because committed progress can move past data that was not actually loaded.
  • New FE-BE variables/path propagation: Not fully covered across all paths; shared-nothing is wired, cloud protobuf is not.
  • Performance review: I did not find a blocking hot-path regression, but missing shard pagination will create functional starvation on larger streams.
  • Other issues: None beyond the findings above.

Residual risk if those are fixed: closed-shard filtering after split/merge still deserves another pass because FE now assumes shard discovery returns only OPEN shards.

### What problem does this PR solve?

Issue Number: None

Related PR: apache#61325

Problem Summary: Fix Kinesis routine load so empty batches do not advance numeric shard sequence progress, and shard discovery walks every ListShards page instead of only the first response.

### Release note

None

### Check List (For Author)

- Test: No need to test (with reason)
    - Per local agent.md constraints for this turn, no tests or UTs were run.
- Behavior changed: Yes (with explanation)
    - Kinesis routine load now keeps numeric begin positions stable across empty batches and discovers shards across paginated ListShards responses.
- Does this need documentation: No
### What problem does this PR solve?

Issue Number: None

Related PR: apache#61325

Problem Summary: Align the Kinesis reset_sequence_numbers helper with the formatter output to avoid style-only CI diffs.

### Release note

None

### Check List (For Author)

- Test: No need to test (with reason)
    - Formatting-only change; per local agent.md constraints, no tests were run.
- Behavior changed: No
- Does this need documentation: No
@0AyanamiRei
Copy link
Copy Markdown
Contributor Author

/review

Copy link
Copy Markdown
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found 2 issues.

  1. be/src/load/routine_load/data_consumer.cpp: ListShards still appends every shard returned by AWS, but the comment and FE logic assume this list contains only OPEN shards. Kinesis ListShards also returns non-expired CLOSED parent shards after split/merge, so the next reschedule will re-add those retired parents to openKinesisShards, initialize missing progress from TRIM_HORIZON, and re-consume old data.
  2. fe/fe-core/src/main/java/org/apache/doris/load/routineload/KinesisRoutineLoadJob.java: lag cache updates use Math::max, which keeps the historical worst MillisBehindLatest forever. Once a shard has ever lagged, SHOW ROUTINE LOAD / getStatistic() can keep reporting inflated totalMillisBehindLatest and maxMillisBehindLatest even after the consumer has caught up, which breaks observability and any operator decisions based on these values.

Critical checkpoints:

  • Goal / correctness: The PR adds Kinesis routine load end-to-end, but the current code does not fully meet the goal because resharding can trigger duplicate consumption and lag reporting becomes stale. Existing tests do not cover either case.
  • Change size / focus: The feature is large and touches FE, BE, thrift/proto, metrics, and regression tests; the risky parts are concentrated in shard lifecycle and lag bookkeeping.
  • Concurrency: I did not find a primary locking/deadlock bug in the reviewed paths, but the shared state assumptions between FE shard tracking and BE ListShards output are incorrect.
  • Lifecycle / persistence: FE gson registration is present. I did notice cloud PB conversion is still Kafka-only, but Kinesis is explicitly blocked in cloud mode here, so that is not a blocking issue for this PR.
  • Config / compatibility: New FE-BE protocol fields are added on both sides; no immediate mixed-path omission found in the shared-nothing flow reviewed.
  • Parallel code paths: Kafka has analogous logic, but Kinesis-specific shard lifecycle differs enough that it needs its own closed-shard filtering and lag cache semantics.
  • Special checks: The OPEN-shard assumption needs to be enforced in code, not only in comments.
  • Test coverage: FE unit tests exist, but I did not see coverage for split/merge parent-shard filtering or for lag cache freshness after catching up.
  • Observability: Metrics were added, but the lag values exposed to users are currently wrong after any transient backlog.
  • Transaction / persistence / writes: No additional blocking issue found in commit attachment handling for the non-cloud path.
  • FE/BE variable passing: The new Kinesis thrift/proto fields used in the shared-nothing path appear wired through.
  • Performance: No primary performance blocker found beyond the correctness issues above.
  • Other issues: None beyond the two findings below.

### What problem does this PR solve?

Issue Number: None

Related PR: apache#61325

Problem Summary: Make the Kinesis SHOW ROUTINE LOAD regression case align with actual Doris behavior by avoiding Groovy GString/String assertion mismatches, waiting for open shard display fields to be populated, and using supported SHOW ROUTINE LOAD syntax.

### Release note

None

### Check List (For Author)

- Test: Regression test
    - ./run-regression-test.sh --run -d load_p0/kinesis_routine_load -s test_kinesis_show_routine_load
- Behavior changed: No
- Does this need documentation: No
@0AyanamiRei
Copy link
Copy Markdown
Contributor Author

/review

Copy link
Copy Markdown
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found 2 issues.

  1. High: fe/fe-core/src/main/java/org/apache/doris/datasource/kinesis/KinesisUtil.java sends Kinesis metadata requests to any load-available BE, but this PR adds a brand-new kinesis_meta_request protobuf field and BE handler. During a rolling upgrade, an old BE will ignore the unknown field and still return OK, so FE gets an empty shard list and the job can stay unscheduled or pause even though the stream is healthy. This path needs a capability/version gate or a fail-fast check instead of silently accepting pre-Kinesis BEs.
  2. Medium: fe/fe-core/src/main/java/org/apache/doris/load/routineload/KinesisRoutineLoadJob.java emits "kinesis_region" from getDataSourceProperties(), but the analyzer only accepts aws.region. SHOW CREATE ROUTINE LOAD therefore produces SQL that cannot recreate the same Kinesis job.

Critical checkpoint conclusions:

  • Goal of current task: Partially met. The PR wires Kinesis routine load through FE/BE and adds tests, but the mixed-version FE/BE path and SHOW CREATE round-trip are still broken.
  • Small, clear, focused change: Reasonably focused for a new data source, but protocol compatibility handling is missing for the new FE/BE surface.
  • Concurrency review: No blocking-under-lock or obvious lock-order issue stood out in the reviewed FE/BE Kinesis paths.
  • Lifecycle/static initialization: No special lifecycle or static-init-order issue found in reviewed code.
  • Configuration items: No critical dynamic-config issue found in the reviewed Kinesis path.
  • Incompatible changes / compatibility: Not satisfied. New thrift/proto/enum protocol additions are not guarded for mixed-version clusters.
  • Parallel code paths: Kafka and Kinesis separation is fine, but the Kinesis metadata path should preserve the same operational safety expectations as existing routine-load metadata fetches.
  • Special conditional checks: The shard open/closed handling is documented well enough in the reviewed code.
  • Test coverage: There is useful FE UT and regression coverage for shard lifecycle, but no coverage for mixed-version routing or SHOW CREATE round-trip.
  • Observability: Added logging/metrics are adequate for the reviewed path.
  • Transaction / persistence: No additional edit-log or replay correctness issue found beyond the points above.
  • Data writes / modifications: No additional data-write atomicity issue found in the reviewed path beyond the points above.
  • FE/BE variable passing: The new Kinesis fields are wired through the reviewed thrift/proto paths.
  • Performance: No critical hot-path regression identified from the reviewed snippets.
  • Other issues: The two findings above are the main blocking risks I found in this review.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants