Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 11 additions & 64 deletions go/pkg/sysdb/coordinator/create_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,6 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
)

// testMinimalUUIDv7 is the test's copy of minimalUUIDv7 from task.go
// UUIDv7 format: [timestamp (48 bits)][version (4 bits)][random (12 bits)][variant (2 bits)][random (62 bits)]
// This UUID has all zeros for timestamp and random bits, making it the minimal valid UUIDv7.
var testMinimalUUIDv7 = uuid.UUID{
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // timestamp = 0 (bytes 0-5)
0x70, 0x00, // version 7 (0x7) in high nibble, low nibble = 0 (bytes 6-7)
0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // variant bits + rest = 0 (bytes 8-15)
}

// MockMemberlistStore is a mock implementation of memberlist_manager.IMemberlistStore for testing
type MockMemberlistStore struct {
mock.Mock
Expand Down Expand Up @@ -57,50 +48,6 @@ type AttachFunctionTestSuite struct {
coordinator *Coordinator
}

// setupAttachFunctionMocks sets up all the mocks for an AttachFunction call (Phases 0 and 1)
// Returns a function that can be called to capture the created attached function ID
func (suite *AttachFunctionTestSuite) setupAttachFunctionMocks(ctx context.Context, request *coordinatorpb.AttachFunctionRequest, databaseID string, functionID uuid.UUID) func(*dbmodel.AttachedFunction) bool {
inputCollectionID := request.InputCollectionId
attachedFunctionName := request.Name
outputCollectionName := request.OutputCollectionName
tenantID := request.TenantId
databaseName := request.Database
functionName := request.FunctionName

// Phase 0: No existing attached function
suite.mockMetaDomain.On("AttachedFunctionDb", ctx).Return(suite.mockAttachedFunctionDb).Once()
suite.mockAttachedFunctionDb.On("GetAnyByName", inputCollectionID, attachedFunctionName).
Return(nil, nil).Once()

// Phase 1: Create attached function in transaction
suite.mockMetaDomain.On("AttachedFunctionDb", mock.Anything).Return(suite.mockAttachedFunctionDb).Once()
suite.mockAttachedFunctionDb.On("GetAnyByName", inputCollectionID, attachedFunctionName).
Return(nil, nil).Once()

suite.mockMetaDomain.On("DatabaseDb", mock.Anything).Return(suite.mockDatabaseDb).Once()
suite.mockDatabaseDb.On("GetDatabases", tenantID, databaseName).
Return([]*dbmodel.Database{{ID: databaseID, Name: databaseName}}, nil).Once()

suite.mockMetaDomain.On("FunctionDb", mock.Anything).Return(suite.mockFunctionDb).Once()
suite.mockFunctionDb.On("GetByName", functionName).
Return(&dbmodel.Function{ID: functionID, Name: functionName}, nil).Once()

suite.mockMetaDomain.On("CollectionDb", mock.Anything).Return(suite.mockCollectionDb).Once()
suite.mockCollectionDb.On("GetCollections",
[]string{inputCollectionID}, (*string)(nil), tenantID, databaseName, (*int32)(nil), (*int32)(nil), false).
Return([]*dbmodel.CollectionAndMetadata{{Collection: &dbmodel.Collection{ID: inputCollectionID}}}, nil).Once()

suite.mockMetaDomain.On("CollectionDb", mock.Anything).Return(suite.mockCollectionDb).Once()
suite.mockCollectionDb.On("GetCollections",
[]string(nil), &outputCollectionName, tenantID, databaseName, (*int32)(nil), (*int32)(nil), false).
Return([]*dbmodel.CollectionAndMetadata{}, nil).Once()

// Return a matcher function that can be used to capture attached function data
return func(attachedFunction *dbmodel.AttachedFunction) bool {
return true
}
}

func (suite *AttachFunctionTestSuite) SetupTest() {
// Create all mocks - note: we manually control AssertExpectations
// to avoid conflicts with automatic cleanup
Expand Down Expand Up @@ -172,8 +119,8 @@ func (suite *AttachFunctionTestSuite) TestAttachFunction_SuccessfulCreation() {
// Setup mocks that will be called within the transaction (using mock.Anything for context)
// Check if attached function exists (idempotency check inside transaction)
suite.mockMetaDomain.On("AttachedFunctionDb", mock.Anything).Return(suite.mockAttachedFunctionDb).Once()
suite.mockAttachedFunctionDb.On("GetAnyByName", inputCollectionID, attachedFunctionName).
Return(nil, nil).Once()
suite.mockAttachedFunctionDb.On("GetAttachedFunctions", (*uuid.UUID)(nil), &attachedFunctionName, &inputCollectionID, false).
Return([]*dbmodel.AttachedFunction{}, nil).Once()

// Look up database
suite.mockMetaDomain.On("DatabaseDb", mock.Anything).Return(suite.mockDatabaseDb).Once()
Expand Down Expand Up @@ -289,8 +236,8 @@ func (suite *AttachFunctionTestSuite) TestAttachFunction_IdempotentRequest_Alrea

// ===== Phase 1: Transaction checks if attached function exists =====
suite.mockMetaDomain.On("AttachedFunctionDb", ctx).Return(suite.mockAttachedFunctionDb).Once()
suite.mockAttachedFunctionDb.On("GetAnyByName", inputCollectionID, attachedFunctionName).
Return(existingAttachedFunction, nil).Once()
suite.mockAttachedFunctionDb.On("GetAttachedFunctions", (*uuid.UUID)(nil), &attachedFunctionName, &inputCollectionID, false).
Return([]*dbmodel.AttachedFunction{existingAttachedFunction}, nil).Once()

// Mock transaction call
suite.mockTxImpl.On("Transaction", ctx, mock.AnythingOfType("func(context.Context) error")).
Expand Down Expand Up @@ -374,8 +321,8 @@ func (suite *AttachFunctionTestSuite) TestAttachFunction_RecoveryFlow() {

// Phase 1: Create attached function in transaction
suite.mockMetaDomain.On("AttachedFunctionDb", mock.Anything).Return(suite.mockAttachedFunctionDb).Once()
suite.mockAttachedFunctionDb.On("GetAnyByName", inputCollectionID, attachedFunctionName).
Return(nil, nil).Once()
suite.mockAttachedFunctionDb.On("GetAttachedFunctions", (*uuid.UUID)(nil), &attachedFunctionName, &inputCollectionID, false).
Return([]*dbmodel.AttachedFunction{}, nil).Once()

suite.mockMetaDomain.On("DatabaseDb", mock.Anything).Return(suite.mockDatabaseDb).Once()
suite.mockDatabaseDb.On("GetDatabases", tenantID, databaseName).
Expand Down Expand Up @@ -427,10 +374,10 @@ func (suite *AttachFunctionTestSuite) TestAttachFunction_RecoveryFlow() {

// ========== SECOND ATTEMPT: Recovery Succeeds ==========

// Phase 0: GetByName returns incomplete attached function (with ErrAttachedFunctionNotReady, which AttachFunction handles)
// Phase 0: GetAttachedFunctions returns incomplete attached function (with onlyReady=false to include all)
suite.mockMetaDomain.On("AttachedFunctionDb", ctx).Return(suite.mockAttachedFunctionDb).Once()
suite.mockAttachedFunctionDb.On("GetAnyByName", inputCollectionID, attachedFunctionName).
Return(incompleteAttachedFunction, nil).Once()
suite.mockAttachedFunctionDb.On("GetAttachedFunctions", (*uuid.UUID)(nil), &attachedFunctionName, &inputCollectionID, false).
Return([]*dbmodel.AttachedFunction{incompleteAttachedFunction}, nil).Once()

// Validate function matches
suite.mockMetaDomain.On("FunctionDb", ctx).Return(suite.mockFunctionDb).Once()
Expand Down Expand Up @@ -517,8 +464,8 @@ func (suite *AttachFunctionTestSuite) TestAttachFunction_IdempotentRequest_Param

// ===== Phase 1: Transaction checks if task exists - finds task with different params =====
suite.mockMetaDomain.On("AttachedFunctionDb", mock.Anything).Return(suite.mockAttachedFunctionDb).Once()
suite.mockAttachedFunctionDb.On("GetAnyByName", inputCollectionID, attachedFunctionName).
Return(existingAttachedFunction, nil).Once()
suite.mockAttachedFunctionDb.On("GetAttachedFunctions", (*uuid.UUID)(nil), &attachedFunctionName, &inputCollectionID, false).
Return([]*dbmodel.AttachedFunction{existingAttachedFunction}, nil).Once()

// Validate function - returns DIFFERENT function name
suite.mockMetaDomain.On("FunctionDb", mock.Anything).Return(suite.mockFunctionDb).Once()
Expand Down
12 changes: 7 additions & 5 deletions go/pkg/sysdb/coordinator/heap_client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,14 +370,16 @@ func (suite *HeapClientIntegrationTestSuite) TestPartialTaskCleanup_ThenRecreate
suite.T().Logf("Cleanup completed, removed %d tasks", cleanupResp.CleanedUpCount)

// STEP 3: Verify task still exists and can be retrieved
getResp, err := suite.sysdbClient.GetAttachedFunctionByName(ctx, &coordinatorpb.GetAttachedFunctionByNameRequest{
InputCollectionId: collectionID,
Name: taskName,
getResp, err := suite.sysdbClient.GetAttachedFunctions(ctx, &coordinatorpb.GetAttachedFunctionsRequest{
InputCollectionId: &collectionID,
Name: &taskName,
OnlyReady: func() *bool { b := true; return &b }(),
})
suite.NoError(err, "Task should still exist after cleanup")
suite.NotNil(getResp)
suite.Equal(taskResp.Id, getResp.AttachedFunction.Id)
suite.T().Logf("Task still exists after cleanup: %s", getResp.AttachedFunction.Id)
suite.Require().Len(getResp.AttachedFunctions, 1)
suite.Equal(taskResp.Id, getResp.AttachedFunctions[0].Id)
suite.T().Logf("Task still exists after cleanup: %s", getResp.AttachedFunctions[0].Id)

// STEP 4: Delete the task
_, err = suite.sysdbClient.DetachFunction(ctx, &coordinatorpb.DetachFunctionRequest{
Expand Down
24 changes: 12 additions & 12 deletions go/pkg/sysdb/coordinator/list_attached_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (suite *ListAttachedFunctionsTestSuite) TestListAttachedFunctions_Success()
}

suite.mockMetaDomain.On("AttachedFunctionDb", ctx).Return(suite.mockAttachedFunctionDb).Once()
suite.mockAttachedFunctionDb.On("GetByCollectionID", collectionID).Return(attachedFunctions, nil).Once()
suite.mockAttachedFunctionDb.On("GetAttachedFunctions", (*uuid.UUID)(nil), (*string)(nil), &collectionID, true).Return(attachedFunctions, nil).Once()

functionOne := &dbmodel.Function{ID: functionID1, Name: "function-one"}
functionTwo := &dbmodel.Function{ID: functionID2, Name: "function-two"}
Expand All @@ -106,8 +106,8 @@ func (suite *ListAttachedFunctionsTestSuite) TestListAttachedFunctions_Success()
})).
Return([]*dbmodel.Function{functionOne, functionTwo}, nil).Once()

req := &coordinatorpb.ListAttachedFunctionsRequest{InputCollectionId: collectionID}
resp, err := suite.coordinator.ListAttachedFunctions(ctx, req)
req := &coordinatorpb.GetAttachedFunctionsRequest{InputCollectionId: &collectionID, OnlyReady: func() *bool { b := true; return &b }()}
resp, err := suite.coordinator.GetAttachedFunctions(ctx, req)

suite.Require().NoError(err)
suite.Require().NotNil(resp)
Expand All @@ -128,10 +128,10 @@ func (suite *ListAttachedFunctionsTestSuite) TestListAttachedFunctions_EmptyResu
collectionID := "test-collection"

suite.mockMetaDomain.On("AttachedFunctionDb", ctx).Return(suite.mockAttachedFunctionDb).Once()
suite.mockAttachedFunctionDb.On("GetByCollectionID", collectionID).Return([]*dbmodel.AttachedFunction{}, nil).Once()
suite.mockAttachedFunctionDb.On("GetAttachedFunctions", (*uuid.UUID)(nil), (*string)(nil), &collectionID, true).Return([]*dbmodel.AttachedFunction{}, nil).Once()

req := &coordinatorpb.ListAttachedFunctionsRequest{InputCollectionId: collectionID}
resp, err := suite.coordinator.ListAttachedFunctions(ctx, req)
req := &coordinatorpb.GetAttachedFunctionsRequest{InputCollectionId: &collectionID, OnlyReady: func() *bool { b := true; return &b }()}
resp, err := suite.coordinator.GetAttachedFunctions(ctx, req)

suite.Require().NoError(err)
suite.Require().NotNil(resp)
Expand Down Expand Up @@ -160,13 +160,13 @@ func (suite *ListAttachedFunctionsTestSuite) TestListAttachedFunctions_FunctionD
}

suite.mockMetaDomain.On("AttachedFunctionDb", ctx).Return(suite.mockAttachedFunctionDb).Once()
suite.mockAttachedFunctionDb.On("GetByCollectionID", collectionID).Return([]*dbmodel.AttachedFunction{attachedFunction}, nil).Once()
suite.mockAttachedFunctionDb.On("GetAttachedFunctions", (*uuid.UUID)(nil), (*string)(nil), &collectionID, true).Return([]*dbmodel.AttachedFunction{attachedFunction}, nil).Once()

suite.mockMetaDomain.On("FunctionDb", ctx).Return(suite.mockFunctionDb).Once()
suite.mockFunctionDb.On("GetByIDs", []uuid.UUID{functionID}).Return(nil, errors.New("db error")).Once()

req := &coordinatorpb.ListAttachedFunctionsRequest{InputCollectionId: collectionID}
resp, err := suite.coordinator.ListAttachedFunctions(ctx, req)
req := &coordinatorpb.GetAttachedFunctionsRequest{InputCollectionId: &collectionID, OnlyReady: func() *bool { b := true; return &b }()}
resp, err := suite.coordinator.GetAttachedFunctions(ctx, req)

suite.Require().Error(err)
suite.Nil(resp)
Expand Down Expand Up @@ -194,15 +194,15 @@ func (suite *ListAttachedFunctionsTestSuite) TestListAttachedFunctions_InvalidPa
}

suite.mockMetaDomain.On("AttachedFunctionDb", ctx).Return(suite.mockAttachedFunctionDb).Once()
suite.mockAttachedFunctionDb.On("GetByCollectionID", collectionID).Return([]*dbmodel.AttachedFunction{attachedFunction}, nil).Once()
suite.mockAttachedFunctionDb.On("GetAttachedFunctions", (*uuid.UUID)(nil), (*string)(nil), &collectionID, true).Return([]*dbmodel.AttachedFunction{attachedFunction}, nil).Once()

functionModel := &dbmodel.Function{ID: functionID, Name: "function"}

suite.mockMetaDomain.On("FunctionDb", ctx).Return(suite.mockFunctionDb).Once()
suite.mockFunctionDb.On("GetByIDs", []uuid.UUID{functionID}).Return([]*dbmodel.Function{functionModel}, nil).Once()

req := &coordinatorpb.ListAttachedFunctionsRequest{InputCollectionId: collectionID}
resp, err := suite.coordinator.ListAttachedFunctions(ctx, req)
req := &coordinatorpb.GetAttachedFunctionsRequest{InputCollectionId: &collectionID, OnlyReady: func() *bool { b := true; return &b }()}
resp, err := suite.coordinator.GetAttachedFunctions(ctx, req)

suite.Require().Error(err)
suite.Nil(resp)
Expand Down
Loading
Loading