Skip to content

feat: Import 2PC — CommitImport/AbortImport with auto_commit and WAL broadcast#48524

Open
bigsheeper wants to merge 16 commits intomilvus-io:masterfrom
bigsheeper:worktree-feat-import-replication
Open

feat: Import 2PC — CommitImport/AbortImport with auto_commit and WAL broadcast#48524
bigsheeper wants to merge 16 commits intomilvus-io:masterfrom
bigsheeper:worktree-feat-import-replication

Conversation

@bigsheeper
Copy link
Contributor

@bigsheeper bigsheeper commented Mar 25, 2026

Summary

Implements Two-Phase Commit (2PC) for Import in primary/secondary replication clusters. Data stays invisible (is_importing=true) until an explicit commit signal is delivered via WAL, ensuring primary and secondary clusters reach the same visible state at the same logical position.

  • New proto: CommitImport=44, RollbackImport=45 WAL message types; Uncommitted=8, Committing=9 ImportJobState values; CommitImport, AbortImport, HandleCommitVchannel RPCs on DataCoord; committed_vchannels + auto_commit fields on ImportJob
  • WAL broadcast: DataCoord broadcasts CommitImportMessage/RollbackImportMessage to all vchannels via DDL broadcast; CDC replicates to secondary clusters verbatim
  • DDL ack callbacks: commitImportV2AckCallback CAS Uncommitted→Committing; rollbackImportV2AckCallback CAS *→Failed + segment drop
  • WAL flusher: wal_flusher.dispatch() intercepts CommitImportMessage per-vchannel, calls wbMgr.FlushChannel + DataCoord.HandleCommitVchannel; no-op handler for RollbackImportMessage
  • ImportChecker: new Uncommitted case (auto-commits when auto_commit=true); new Committing case (transitions to Completed when all vchannels confirmed)
  • auto_commit option: default true (backward compatible); false lets replication platform control commit timing
  • RESTful API: POST /v2/vectordb/jobs/import/commit and POST /v2/vectordb/jobs/import/abort; GetImportProgress surfaces Uncommitted and Committing states

Out of scope: commit_timestamp propagation (handled in companion PR)

Test Plan

  • Unit tests for IsAutoCommit helper
  • Unit tests for HandleCommitVchannel idempotency
  • Unit tests for CommitImport/AbortImport RPC handlers (state validation, mutex, broadcast)
  • Unit tests for DDL ack callbacks (CAS races for commit/abort)
  • Unit tests for ImportChecker Uncommitted/Committing cases
  • Unit tests for WAL flusher dispatch (CommitImport/RollbackImport cases)
  • Unit tests for RESTful commit/abort handlers (valid jobId, invalid jobId error path)
  • GetImportProgress surfaces Uncommitted and Committing states
  • E2E: non-replication cluster with auto_commit=true (default) behaves identically to pre-2PC

issue: #48525
design doc: milvus-io/milvus-design-docs#29

🤖 Generated with Claude Code

bigsheeper and others added 16 commits March 25, 2026 15:27
- Add Uncommitted and Committing states to ImportJobState enum
- Add CommitImport/RollbackImport message types and headers to messages.proto
- Add committed_vchannels and auto_commit fields to ImportJob
- Add CommitImport, AbortImport, HandleCommitVchannel RPCs to DataCoord service
- Regenerate Go bindings from proto files
- Add codegen entries and message type properties for CommitImport/RollbackImport
- Register CommitImport/RollbackImport ack callbacks in specialized_callback.go

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Yihao Dai <yihao.dai@zilliz.com>
…service

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Yihao Dai <yihao.dai@zilliz.com>
- Add AutoCommitKey const and IsAutoCommit() helper in importutilv2/option.go
  that defaults to true when the option is absent
- Populate AutoCommit field on ImportJob at job creation time from request options
- Add stub implementations of CommitImport, AbortImport, HandleCommitVchannel
  on datacoord.Server to satisfy the updated datapb.DataCoordServer interface
- Regenerate mock_datacoord.go and mock_mixcoord.go with the three new RPC methods

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Yihao Dai <yihao.dai@zilliz.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Yihao Dai <yihao.dai@zilliz.com>
- Add GetCommittedVchannels() to ImportJob interface (promoted from
  embedded *datapb.ImportJob which already has the proto field)
- Add HandleCommitVchannel to ImportMeta interface and implement on
  importMeta: acquires lock, checks idempotency, invokes callback,
  then clones job and persists updated committed_vchannels via catalog
- Add corresponding mock method to MockImportMeta
- Add TestHandleCommitVchannel covering first commit, idempotent
  re-commit (callback not re-fired), and unknown-job error path

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Yihao Dai <yihao.dai@zilliz.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Yihao Dai <yihao.dai@zilliz.com>
…andlers

Replace stub implementations with real logic:
- CommitImport: validates Uncommitted state, acquires per-job mutex, broadcasts
  CommitImport WAL message via control channel using existing broadcaster pattern
- AbortImport: validates job is not yet committed, acquires per-job mutex, broadcasts
  RollbackImport WAL message via control channel
- HandleCommitVchannel: delegates to importMeta.HandleCommitVchannel with a callback
  that unsets is_importing on all segments for the given job+vchannel
- Add importJobMu sync.Map to Server struct for per-job mutex serialization
- Add broadcastCommitImportMessage / broadcastRollbackImportMessage helpers
- Add setImportSegmentsVisible helper that walks import tasks to find segments
- Forward all three RPCs in mixcoord distributed server and client

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Yihao Dai <yihao.dai@zilliz.com>
- Pre-fetch segment IDs outside HandleCommitVchannel to eliminate deadlock:
  the callback was calling GetTaskBy which tries to re-acquire m.mu (read
  lock) while HandleCommitVchannel already holds m.mu (write lock).
- Replace unsafe append(proto slice) with safe copy in segment ID gathering.
- AbortImport now rejects Failed state (in addition to Committing/Completed)
  in both pre-lock and post-lock checks.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Yihao Dai <yihao.dai@zilliz.com>
- Add ddl_callbacks_import.go with commitImportV2AckCallback and
  rollbackImportV2AckCallback methods on DDLCallbacks
- commitImportV2AckCallback: CAS Uncommitted→Committing on WAL ack;
  no-op if job already left Uncommitted (abort won the race)
- rollbackImportV2AckCallback: transitions job to Failed and drops all
  import segments via dropImportJobSegments; no-op if Committing/Completed
- Add UpdateJobStateWithCAS helper for compare-and-swap state transitions
- Register both callbacks in RegisterDDLCallbacks via registerImportCallbacks()
- Add skipped test stubs as placeholders for full integration tests

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Yihao Dai <yihao.dai@zilliz.com>
…eedDrop flag

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Yihao Dai <yihao.dai@zilliz.com>
…acks

- UpdateJobStateWithCAS now delegates to UpdateJobState so terminal-state
  side effects (CleanupTs, RequestedDiskSize reset) are applied correctly
- Move UpdateJobStateWithCAS from ddl_callbacks_import.go to import_job.go
  alongside other UpdateJobAction factories
- Change dropImportJobSegments receiver from *Server to *DDLCallbacks for
  consistency; field access is unchanged via the embedded *Server
- Improve post-CAS read comment to clarify why the read correctly detects
  whether the CAS applied under the mutex

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Yihao Dai <yihao.dai@zilliz.com>
…mmit support

- Add commitImportFn callback (variadic) to importChecker so *Server can
  inject broadcastCommitImportMessage without creating a circular dependency
- Add Uncommitted and Committing cases to the ticker1 switch dispatch
- Implement checkUncommittedJob: no-op when auto_commit=false; calls
  commitImportFn when auto_commit=true
- Implement checkCommittingJob: transitions Committing→Completed once all
  vchannels are present in CommittedVchannels
- Fix checkPreImportingJob empty-import fast path: goes to Uncommitted when
  auto_commit=false, Completed when auto_commit=true (backward-compatible)
- Add GetAutoCommit() bool to the ImportJob interface
- Pass broadcastCommitImportMessage to NewImportChecker in server.go
- Add suite tests: TestCheckUncommittedJob_{AutoCommitTrue,AutoCommitFalse,NilFn},
  TestCheckCommittingJob_{AllVchannelsDone,Partial},
  TestCheckPreImporting_EmptyImport_{AutoCommitFalse,AutoCommitTrue}

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Yihao Dai <yihao.dai@zilliz.com>
…mportChecker

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Yihao Dai <yihao.dai@zilliz.com>
When StreamingNode receives CommitImportMessageV2 from the WAL, the
dispatch() method now:
1. Calls WriteBufferManager.FlushChannel (non-fatal) to flush any
   pending DML on the vchannel before signalling commit.
2. Calls MixCoord.HandleCommitVchannel RPC to notify DataCoord that
   this vchannel has committed its import job (fatal on error).
3. Returns nil without forwarding to flusherComponents.

RollbackImportMessageV2 is a complete no-op in the flusher — DataCoord
DDL ack callbacks already handle all state cleanup.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Yihao Dai <yihao.dai@zilliz.com>
…usher

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Yihao Dai <yihao.dai@zilliz.com>
- Add CommitAction/AbortAction constants to httpserver
- Add CommitImport/AbortImport to Proxy interface (types.go)
- Add CommitImport/AbortImport pass-through methods to Proxy impl
- Add commitImportJob/abortImportJob handlers to HandlersV2
- Register /jobs/import/commit and /jobs/import/abort routes
- Add CommitImport/AbortImport stubs to MockProxy and MockMixCoordClient
- Add HandleCommitVchannel stub to MockMixCoordClient (missing from prev task)
- Add TestCommitImportJob and TestAbortImportJob tests

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Yihao Dai <yihao.dai@zilliz.com>
@sre-ci-robot
Copy link
Contributor

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by: bigsheeper
To complete the pull request process, please assign congqixia after the PR has been reviewed.
You can assign the PR to them by writing /assign @congqixia in a comment when ready.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@sre-ci-robot sre-ci-robot added size/XXL Denotes a PR that changes 1000+ lines. area/internal-api labels Mar 25, 2026
@mergify mergify bot added the dco-passed DCO check passed. label Mar 25, 2026
@mergify
Copy link
Contributor

mergify bot commented Mar 25, 2026

@bigsheeper This is a feature PR (feat:). Please provide a design document.

How to resolve:
Link a design doc in the PR description:

design doc: https://github.com/milvus-io/milvus-design-docs/blob/main/design_docs/your_design.md

Design documents location: https://github.com/milvus-io/milvus-design-docs/tree/main/design_docs

@mergify mergify bot added do-not-merge/missing-design-doc kind/feature Issues related to feature request from users labels Mar 25, 2026
@sre-ci-robot
Copy link
Contributor

[ci-v2-notice]
Notice: New ci-v2 system is enabled for this PR.

To rerun ci-v2 checks, comment with:

  • /ci-rerun-code-check // for ci-v2/code-check
  • /ci-rerun-build // for ci-v2/build
  • /ci-rerun-build-all // for ci-v2/build-all (multi-arch builds)
  • /ci-rerun-buildenv // for ci-v2/build-env (build milvus-env builder images)
  • /ci-rerun-ut-integration // for ci-v2/ut-integration, will rerun ci-v2/build
  • /ci-rerun-ut-go // for ci-v2/ut-go, will rerun ci-v2/build
  • /ci-rerun-ut-cpp // for ci-v2/ut-cpp
  • /ci-rerun-ut // for all ci-v2/ut-integration, ci-v2/ut-go, ci-v2/ut-cpp, will rerun ci-v2/build
  • /ci-rerun-e2e-arm // for ci-v2/e2e-arm
  • /ci-rerun-e2e-default // for ci-v2/e2e-default
  • /ci-rerun-ciloop // for ci-v2/ciloop (build + unit tests in one pipeline)
  • /ci-rerun-gosdk // for ci-v2/go-sdk (Go SDK E2E tests, AMD)
  • /ci-rerun-gosdk-arm // for ci-v2/go-sdk-arm (Go SDK E2E tests, ARM)

If you have any questions or requests, please contact @zhikunyao.

@mergify
Copy link
Contributor

mergify bot commented Mar 25, 2026

@bigsheeper Please associate the related issue to the body of your Pull Request. (eg. "issue: #")

@codecov
Copy link

codecov bot commented Mar 25, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 83.98%. Comparing base (0121bbf) to head (2250b56).
⚠️ Report is 6 commits behind head on master.

Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff             @@
##           master   #48524       +/-   ##
===========================================
+ Coverage   74.89%   83.98%    +9.09%     
===========================================
  Files        1484      627      -857     
  Lines      246977   103896   -143081     
===========================================
- Hits       184968    87259    -97709     
+ Misses      53653    16637    -37016     
+ Partials     8356        0     -8356     
Components Coverage Δ
Client ∅ <ø> (∅)
Core 83.98% <ø> (∅)
Go ∅ <ø> (∅)
see 2110 files with indirect coverage changes
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/internal-api dco-passed DCO check passed. do-not-merge/missing-design-doc kind/feature Issues related to feature request from users size/XXL Denotes a PR that changes 1000+ lines.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants