Skip to content

Commit 23911b3

Browse files
committed
[ENH]: Execute task with no backfill or incremental
1 parent 4104603 commit 23911b3

29 files changed

+3253
-1416
lines changed

go/pkg/sysdb/coordinator/coordinator.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,16 @@ func (s *Coordinator) FlushCollectionCompactionAndAttachedFunction(
297297
return s.catalog.FlushCollectionCompactionAndAttachedFunction(ctx, flushCollectionCompaction, attachedFunctionID, runNonce, completionOffset)
298298
}
299299

300+
func (s *Coordinator) FlushCollectionCompactionAndAttachedFunctionExtended(
301+
ctx context.Context,
302+
collectionCompactions []*model.FlushCollectionCompaction,
303+
attachedFunctionID uuid.UUID,
304+
runNonce uuid.UUID,
305+
completionOffset int64,
306+
) (*model.ExtendedFlushCollectionInfo, error) {
307+
return s.catalog.FlushCollectionCompactionAndAttachedFunctionExtended(ctx, collectionCompactions, attachedFunctionID, runNonce, completionOffset)
308+
}
309+
300310
func (s *Coordinator) ListCollectionsToGc(ctx context.Context, cutoffTimeSecs *uint64, limit *uint64, tenantID *string, minVersionsIfAlive *uint64) ([]*model.CollectionToGc, error) {
301311
return s.catalog.ListCollectionsToGc(ctx, cutoffTimeSecs, limit, tenantID, minVersionsIfAlive)
302312
}

go/pkg/sysdb/coordinator/create_task_test.go

Lines changed: 181 additions & 65 deletions
Large diffs are not rendered by default.

go/pkg/sysdb/coordinator/heap_client_integration_test.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,8 @@ func (suite *HeapClientIntegrationTestSuite) TestAttachFunctionPushesScheduleToH
176176
})
177177
suite.NoError(err, "Should attached function successfully")
178178
suite.NotNil(response)
179-
suite.NotEmpty(response.Id, "Attached function ID should be returned")
179+
suite.NotNil(response.AttachedFunction)
180+
suite.NotEmpty(response.AttachedFunction.Id, "Attached function ID should be returned")
180181

181182
// Get updated heap summary
182183
updatedSummary, err := suite.heapClient.Summary(ctx, &coordinatorpb.HeapSummaryRequest{})
@@ -263,7 +264,8 @@ func (suite *HeapClientIntegrationTestSuite) TestPartialTaskRecovery_HybridAppro
263264
return
264265
}
265266
suite.NotNil(taskResp)
266-
originalTaskID := taskResp.Id
267+
suite.NotNil(taskResp.AttachedFunction)
268+
originalTaskID := taskResp.AttachedFunction.Id
267269
suite.T().Logf("Created fully initialized task: %s", originalTaskID)
268270

269271
// STEP 2: Directly UPDATE database to make task partial (simulate Phase 3 failure)
@@ -363,7 +365,8 @@ func (suite *HeapClientIntegrationTestSuite) TestPartialTaskCleanup_ThenRecreate
363365
return
364366
}
365367
suite.NotNil(taskResp)
366-
suite.T().Logf("Created task: %s", taskResp.Id)
368+
suite.NotNil(taskResp.AttachedFunction)
369+
suite.T().Logf("Created task: %s", taskResp.AttachedFunction.Id)
367370

368371
// STEP 2: Call CleanupExpiredPartialAttachedFunctions (with short timeout to test it doesn't affect complete tasks)
369372
cleanupResp, err := suite.sysdbClient.CleanupExpiredPartialAttachedFunctions(ctx, &coordinatorpb.CleanupExpiredPartialAttachedFunctionsRequest{
@@ -381,12 +384,12 @@ func (suite *HeapClientIntegrationTestSuite) TestPartialTaskCleanup_ThenRecreate
381384
})
382385
suite.NoError(err, "Task should still exist after cleanup")
383386
suite.NotNil(getResp)
384-
suite.Equal(taskResp.Id, getResp.AttachedFunction.Id)
387+
suite.Equal(taskResp.AttachedFunction.Id, getResp.AttachedFunction.Id)
385388
suite.T().Logf("Task still exists after cleanup: %s", getResp.AttachedFunction.Id)
386389

387390
// STEP 4: Delete the task
388391
_, err = suite.sysdbClient.DetachFunction(ctx, &coordinatorpb.DetachFunctionRequest{
389-
AttachedFunctionId: taskResp.Id,
392+
AttachedFunctionId: taskResp.AttachedFunction.Id,
390393
DeleteOutput: true,
391394
})
392395
suite.NoError(err, "Should delete task")
@@ -403,8 +406,9 @@ func (suite *HeapClientIntegrationTestSuite) TestPartialTaskCleanup_ThenRecreate
403406
})
404407
suite.NoError(err, "Should be able to recreate task after deletion")
405408
suite.NotNil(taskResp2)
406-
suite.NotEqual(taskResp.Id, taskResp2.Id, "New task should have different ID")
407-
suite.T().Logf("Successfully recreated task: %s", taskResp2.Id)
409+
suite.NotNil(taskResp2.AttachedFunction)
410+
suite.NotEqual(taskResp.AttachedFunction.Id, taskResp2.AttachedFunction.Id, "New task should have different ID")
411+
suite.T().Logf("Successfully recreated task: %s", taskResp2.AttachedFunction.Id)
408412
}
409413

410414
func TestHeapClientIntegrationSuite(t *testing.T) {

go/pkg/sysdb/coordinator/model/collection.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,10 @@ type FlushCollectionInfo struct {
105105
AttachedFunctionCompletionOffset *int64
106106
}
107107

108+
type ExtendedFlushCollectionInfo struct {
109+
Collections []*FlushCollectionInfo
110+
}
111+
108112
func FilterCollection(collection *Collection, collectionID types.UniqueID, collectionName *string) bool {
109113
if collectionID != types.NilUniqueID() && collectionID != collection.ID {
110114
return false

go/pkg/sysdb/coordinator/table_catalog.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1789,6 +1789,76 @@ func (tc *Catalog) FlushCollectionCompactionAndAttachedFunction(
17891789
return flushCollectionInfo, nil
17901790
}
17911791

1792+
// FlushCollectionCompactionAndAttachedFunctionExtended atomically updates multiple collection compaction data
1793+
// and attached function completion offset in a single transaction.
1794+
// NOTE: This does NOT advance next_nonce - that is done separately by AdvanceAttachedFunction.
1795+
// This only updates the completion_offset to record how far we've processed.
1796+
// This is only supported for versioned collections (the modern/default path).
1797+
func (tc *Catalog) FlushCollectionCompactionAndAttachedFunctionExtended(
1798+
ctx context.Context,
1799+
collectionCompactions []*model.FlushCollectionCompaction,
1800+
attachedFunctionID uuid.UUID,
1801+
runNonce uuid.UUID,
1802+
completionOffset int64,
1803+
) (*model.ExtendedFlushCollectionInfo, error) {
1804+
if !tc.versionFileEnabled {
1805+
// Attached-function-based compactions are only supported with versioned collections
1806+
log.Error("FlushCollectionCompactionAndAttachedFunctionExtended is only supported for versioned collections")
1807+
return nil, errors.New("attached-function-based compaction requires versioned collections")
1808+
}
1809+
1810+
if len(collectionCompactions) == 0 {
1811+
return nil, errors.New("at least one collection compaction is required")
1812+
}
1813+
1814+
flushInfos := make([]*model.FlushCollectionInfo, 0, len(collectionCompactions))
1815+
1816+
err := tc.txImpl.Transaction(ctx, func(txCtx context.Context) error {
1817+
var err error
1818+
// Get the transaction from context to pass to FlushCollectionCompactionForVersionedCollection
1819+
tx := dbcore.GetDB(txCtx)
1820+
1821+
// Handle all collection compactions
1822+
for _, collectionCompaction := range collectionCompactions {
1823+
log.Info("FlushCollectionCompactionAndAttachedFunctionExtended", zap.String("collection_id", collectionCompaction.ID.String()))
1824+
flushInfo, err := tc.FlushCollectionCompactionForVersionedCollection(txCtx, collectionCompaction, tx)
1825+
if err != nil {
1826+
return err
1827+
}
1828+
flushInfos = append(flushInfos, flushInfo)
1829+
}
1830+
1831+
// Update ONLY completion_offset - next_nonce was already advanced by AdvanceAttachedFunction
1832+
// We still validate runNonce to ensure we're updating the correct nonce
1833+
err = tc.metaDomain.AttachedFunctionDb(txCtx).UpdateCompletionOffset(attachedFunctionID, runNonce, completionOffset)
1834+
if err != nil {
1835+
return err
1836+
}
1837+
1838+
return nil
1839+
})
1840+
1841+
if err != nil {
1842+
return nil, err
1843+
}
1844+
1845+
// Populate attached function fields with authoritative values from database
1846+
for _, flushInfo := range flushInfos {
1847+
flushInfo.AttachedFunctionCompletionOffset = &completionOffset
1848+
}
1849+
1850+
// Log with first collection ID (typically the output collection)
1851+
log.Info("FlushCollectionCompactionAndAttachedFunctionExtended",
1852+
zap.String("first_collection_id", collectionCompactions[0].ID.String()),
1853+
zap.Int("collection_count", len(collectionCompactions)),
1854+
zap.String("attached_function_id", attachedFunctionID.String()),
1855+
zap.Int64("completion_offset", completionOffset))
1856+
1857+
return &model.ExtendedFlushCollectionInfo{
1858+
Collections: flushInfos,
1859+
}, nil
1860+
}
1861+
17921862
func (tc *Catalog) validateVersionFile(versionFile *coordinatorpb.CollectionVersionFile, collectionID string, version int64) error {
17931863
if versionFile.GetCollectionInfoImmutable().GetCollectionId() != collectionID {
17941864
log.Error("collection id mismatch", zap.String("collection_id", collectionID), zap.String("version_file_collection_id", versionFile.GetCollectionInfoImmutable().GetCollectionId()))

0 commit comments

Comments
 (0)