Skip to content

Commit 3b6dcb4

Browse files
authored
[ENH]: Garbage collection for soft deleted attached functions (#5774)
## Description of changes This change introduces garbage collection for soft deleted attached functions. - Improvements & Bug fixes - ... - New functionality - ... ## Test plan _How are these changes tested?_ - [ ] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Migration plan _Are there any migrations, or any forwards/backwards compatibility changes needed in order to make sure this change deploys reliably?_ ## Observability plan _What is the plan to instrument and monitor this change?_ ## Documentation Changes _Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the_ [_docs section](https://github.com/chroma-core/chroma/tree/main/docs/docs.trychroma.com)?_
1 parent 35d7f5b commit 3b6dcb4

File tree

14 files changed

+894
-5
lines changed

14 files changed

+894
-5
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

go/pkg/sysdb/coordinator/create_task_test.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"google.golang.org/grpc/codes"
1717
"google.golang.org/grpc/status"
1818
"google.golang.org/protobuf/types/known/structpb"
19+
"google.golang.org/protobuf/types/known/timestamppb"
1920
)
2021

2122
// testMinimalUUIDv7 is the test's copy of minimalUUIDv7 from task.go
@@ -663,3 +664,85 @@ func (suite *AttachFunctionTestSuite) TestAttachFunction_IdempotentRequest_Param
663664
func TestAttachFunctionTestSuite(t *testing.T) {
664665
suite.Run(t, new(AttachFunctionTestSuite))
665666
}
667+
668+
// TestGetSoftDeletedAttachedFunctions_TimestampConsistency verifies that timestamps
669+
// are returned in microseconds (UnixMicro) to match other API methods
670+
func TestGetSoftDeletedAttachedFunctions_TimestampConsistency(t *testing.T) {
671+
ctx := context.Background()
672+
673+
// Create test timestamps with known values
674+
testTime := time.Date(2025, 10, 30, 12, 0, 0, 123456000, time.UTC) // 123.456 milliseconds
675+
expectedMicros := uint64(testTime.UnixMicro())
676+
677+
// Create mock coordinator with minimal setup
678+
mockMetaDomain := &dbmodel_mocks.IMetaDomain{}
679+
mockAttachedFunctionDb := &dbmodel_mocks.IAttachedFunctionDb{}
680+
mockMetaDomain.On("AttachedFunctionDb", mock.Anything).Return(mockAttachedFunctionDb)
681+
682+
// Mock the database response with our test timestamps
683+
attachedFunctions := []*dbmodel.AttachedFunction{
684+
{
685+
ID: uuid.New(),
686+
Name: "test_function",
687+
InputCollectionID: "collection_123",
688+
OutputCollectionName: "output_collection",
689+
CompletionOffset: 100,
690+
MinRecordsForInvocation: 10,
691+
CreatedAt: testTime,
692+
UpdatedAt: testTime,
693+
NextRun: testTime,
694+
},
695+
}
696+
697+
mockAttachedFunctionDb.On("GetSoftDeletedAttachedFunctions", mock.Anything, mock.Anything).
698+
Return(attachedFunctions, nil)
699+
700+
coordinator := &Coordinator{
701+
catalog: Catalog{
702+
metaDomain: mockMetaDomain,
703+
},
704+
}
705+
706+
// Call GetSoftDeletedAttachedFunctions
707+
cutoffTime := timestamppb.New(testTime.Add(-24 * time.Hour))
708+
resp, err := coordinator.GetSoftDeletedAttachedFunctions(ctx, &coordinatorpb.GetSoftDeletedAttachedFunctionsRequest{
709+
CutoffTime: cutoffTime,
710+
Limit: 100,
711+
})
712+
713+
// Verify response
714+
if err != nil {
715+
t.Fatalf("GetSoftDeletedAttachedFunctions failed: %v", err)
716+
}
717+
if len(resp.AttachedFunctions) != 1 {
718+
t.Fatalf("Expected 1 attached function, got %d", len(resp.AttachedFunctions))
719+
}
720+
721+
af := resp.AttachedFunctions[0]
722+
723+
// Verify timestamps are in microseconds (not seconds)
724+
if af.CreatedAt != expectedMicros {
725+
t.Errorf("CreatedAt timestamp mismatch: expected %d microseconds, got %d", expectedMicros, af.CreatedAt)
726+
}
727+
if af.UpdatedAt != expectedMicros {
728+
t.Errorf("UpdatedAt timestamp mismatch: expected %d microseconds, got %d", expectedMicros, af.UpdatedAt)
729+
}
730+
if af.NextRunAt != expectedMicros {
731+
t.Errorf("NextRunAt timestamp mismatch: expected %d microseconds, got %d", expectedMicros, af.NextRunAt)
732+
}
733+
734+
// Verify these are NOT in seconds (would be ~1000x smaller)
735+
expectedSeconds := uint64(testTime.Unix())
736+
if af.CreatedAt == expectedSeconds {
737+
t.Error("CreatedAt appears to be in seconds instead of microseconds")
738+
}
739+
if af.UpdatedAt == expectedSeconds {
740+
t.Error("UpdatedAt appears to be in seconds instead of microseconds")
741+
}
742+
if af.NextRunAt == expectedSeconds {
743+
t.Error("NextRunAt appears to be in seconds instead of microseconds")
744+
}
745+
746+
mockMetaDomain.AssertExpectations(t)
747+
mockAttachedFunctionDb.AssertExpectations(t)
748+
}

go/pkg/sysdb/coordinator/task.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -852,3 +852,75 @@ func (s *Coordinator) CleanupExpiredPartialAttachedFunctions(ctx context.Context
852852
CleanedUpIds: cleanedAttachedFunctionIDStrings,
853853
}, nil
854854
}
855+
856+
// GetSoftDeletedAttachedFunctions retrieves attached functions that are soft deleted and were updated before the cutoff time
857+
func (s *Coordinator) GetSoftDeletedAttachedFunctions(ctx context.Context, req *coordinatorpb.GetSoftDeletedAttachedFunctionsRequest) (*coordinatorpb.GetSoftDeletedAttachedFunctionsResponse, error) {
858+
log := log.With(zap.String("method", "GetSoftDeletedAttachedFunctions"))
859+
860+
if req.CutoffTime == nil {
861+
log.Error("GetSoftDeletedAttachedFunctions: cutoff_time is required")
862+
return nil, status.Errorf(codes.InvalidArgument, "cutoff_time is required")
863+
}
864+
865+
if req.Limit <= 0 {
866+
log.Error("GetSoftDeletedAttachedFunctions: limit must be greater than 0")
867+
return nil, status.Errorf(codes.InvalidArgument, "limit must be greater than 0")
868+
}
869+
870+
cutoffTime := req.CutoffTime.AsTime()
871+
attachedFunctions, err := s.catalog.metaDomain.AttachedFunctionDb(ctx).GetSoftDeletedAttachedFunctions(cutoffTime, req.Limit)
872+
if err != nil {
873+
log.Error("GetSoftDeletedAttachedFunctions: failed to get soft deleted attached functions", zap.Error(err))
874+
return nil, err
875+
}
876+
877+
// Convert to proto response
878+
protoAttachedFunctions := make([]*coordinatorpb.AttachedFunction, len(attachedFunctions))
879+
for i, af := range attachedFunctions {
880+
protoAttachedFunctions[i] = &coordinatorpb.AttachedFunction{
881+
Id: af.ID.String(),
882+
Name: af.Name,
883+
InputCollectionId: af.InputCollectionID,
884+
OutputCollectionName: af.OutputCollectionName,
885+
CompletionOffset: uint64(af.CompletionOffset),
886+
MinRecordsForInvocation: uint64(af.MinRecordsForInvocation),
887+
CreatedAt: uint64(af.CreatedAt.UnixMicro()),
888+
UpdatedAt: uint64(af.UpdatedAt.UnixMicro()),
889+
}
890+
891+
protoAttachedFunctions[i].NextRunAt = uint64(af.NextRun.UnixMicro())
892+
if af.OutputCollectionID != nil {
893+
protoAttachedFunctions[i].OutputCollectionId = proto.String(*af.OutputCollectionID)
894+
}
895+
}
896+
897+
log.Info("GetSoftDeletedAttachedFunctions: completed successfully",
898+
zap.Int("count", len(attachedFunctions)))
899+
900+
return &coordinatorpb.GetSoftDeletedAttachedFunctionsResponse{
901+
AttachedFunctions: protoAttachedFunctions,
902+
}, nil
903+
}
904+
905+
// FinishAttachedFunctionDeletion permanently deletes an attached function from the database (hard delete)
906+
// This should only be called after the soft delete grace period has passed
907+
func (s *Coordinator) FinishAttachedFunctionDeletion(ctx context.Context, req *coordinatorpb.FinishAttachedFunctionDeletionRequest) (*coordinatorpb.FinishAttachedFunctionDeletionResponse, error) {
908+
log := log.With(zap.String("method", "FinishAttachedFunctionDeletion"))
909+
910+
attachedFunctionID, err := uuid.Parse(req.AttachedFunctionId)
911+
if err != nil {
912+
log.Error("FinishAttachedFunctionDeletion: invalid attached_function_id", zap.Error(err))
913+
return nil, status.Errorf(codes.InvalidArgument, "invalid attached_function_id: %v", err)
914+
}
915+
916+
err = s.catalog.metaDomain.AttachedFunctionDb(ctx).HardDeleteAttachedFunction(attachedFunctionID)
917+
if err != nil {
918+
log.Error("FinishAttachedFunctionDeletion: failed to hard delete attached function", zap.Error(err))
919+
return nil, err
920+
}
921+
922+
log.Info("FinishAttachedFunctionDeletion: completed successfully",
923+
zap.String("attached_function_id", attachedFunctionID.String()))
924+
925+
return &coordinatorpb.FinishAttachedFunctionDeletionResponse{}, nil
926+
}

go/pkg/sysdb/grpc/task_service.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,3 +151,29 @@ func (s *Server) CleanupExpiredPartialAttachedFunctions(ctx context.Context, req
151151
log.Info("CleanupExpiredPartialAttachedFunctions succeeded", zap.Uint64("cleaned_up_count", res.CleanedUpCount))
152152
return res, nil
153153
}
154+
155+
func (s *Server) GetSoftDeletedAttachedFunctions(ctx context.Context, req *coordinatorpb.GetSoftDeletedAttachedFunctionsRequest) (*coordinatorpb.GetSoftDeletedAttachedFunctionsResponse, error) {
156+
log.Info("GetSoftDeletedAttachedFunctions", zap.Time("cutoff_time", req.CutoffTime.AsTime()), zap.Int32("limit", req.Limit))
157+
158+
res, err := s.coordinator.GetSoftDeletedAttachedFunctions(ctx, req)
159+
if err != nil {
160+
log.Error("GetSoftDeletedAttachedFunctions failed", zap.Error(err))
161+
return nil, grpcutils.BuildInternalGrpcError(err.Error())
162+
}
163+
164+
log.Info("GetSoftDeletedAttachedFunctions succeeded", zap.Int("count", len(res.AttachedFunctions)))
165+
return res, nil
166+
}
167+
168+
func (s *Server) FinishAttachedFunctionDeletion(ctx context.Context, req *coordinatorpb.FinishAttachedFunctionDeletionRequest) (*coordinatorpb.FinishAttachedFunctionDeletionResponse, error) {
169+
log.Info("FinishAttachedFunctionDeletion", zap.String("id", req.AttachedFunctionId))
170+
171+
res, err := s.coordinator.FinishAttachedFunctionDeletion(ctx, req)
172+
if err != nil {
173+
log.Error("FinishAttachedFunctionDeletion failed", zap.Error(err))
174+
return nil, grpcutils.BuildInternalGrpcError(err.Error())
175+
}
176+
177+
log.Info("FinishAttachedFunctionDeletion succeeded", zap.String("id", req.AttachedFunctionId))
178+
return res, nil
179+
}

go/pkg/sysdb/metastore/db/dao/task.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,3 +419,51 @@ func (s *attachedFunctionDb) CleanupExpiredPartial(maxAgeSeconds uint64) ([]uuid
419419

420420
return ids, nil
421421
}
422+
423+
// GetSoftDeletedAttachedFunctions returns attached functions that are soft deleted
424+
// and were updated before the cutoff time (eligible for hard deletion)
425+
func (s *attachedFunctionDb) GetSoftDeletedAttachedFunctions(cutoffTime time.Time, limit int32) ([]*dbmodel.AttachedFunction, error) {
426+
var attachedFunctions []*dbmodel.AttachedFunction
427+
err := s.db.
428+
Where("is_deleted = ?", true).
429+
Where("updated_at < ?", cutoffTime).
430+
Limit(int(limit)).
431+
Find(&attachedFunctions).Error
432+
433+
if err != nil {
434+
log.Error("GetSoftDeletedAttachedFunctions failed",
435+
zap.Error(err),
436+
zap.Time("cutoff_time", cutoffTime))
437+
return nil, err
438+
}
439+
440+
log.Debug("GetSoftDeletedAttachedFunctions found attached functions",
441+
zap.Int("count", len(attachedFunctions)),
442+
zap.Time("cutoff_time", cutoffTime))
443+
444+
return attachedFunctions, nil
445+
}
446+
447+
// HardDeleteAttachedFunction permanently deletes an attached function from the database
448+
// This should only be called after the soft delete grace period has passed
449+
func (s *attachedFunctionDb) HardDeleteAttachedFunction(id uuid.UUID) error {
450+
result := s.db.Unscoped().Delete(&dbmodel.AttachedFunction{}, "id = ? AND is_deleted = true", id)
451+
452+
if result.Error != nil {
453+
log.Error("HardDeleteAttachedFunction failed",
454+
zap.Error(result.Error),
455+
zap.String("id", id.String()))
456+
return result.Error
457+
}
458+
459+
if result.RowsAffected == 0 {
460+
log.Warn("HardDeleteAttachedFunction: no rows affected (attached function not found)",
461+
zap.String("id", id.String()))
462+
return nil // Idempotent - no error if not found
463+
}
464+
465+
log.Info("HardDeleteAttachedFunction succeeded",
466+
zap.String("id", id.String()))
467+
468+
return nil
469+
}

go/pkg/sysdb/metastore/db/dbmodel/mocks/IAttachedFunctionDb.go

Lines changed: 50 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

go/pkg/sysdb/metastore/db/dbmodel/task.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,4 +59,6 @@ type IAttachedFunctionDb interface {
5959
PeekScheduleByCollectionId(collectionIDs []string) ([]*AttachedFunction, error)
6060
GetMinCompletionOffsetForCollection(inputCollectionID string) (*int64, error)
6161
CleanupExpiredPartial(maxAgeSeconds uint64) ([]uuid.UUID, error)
62+
GetSoftDeletedAttachedFunctions(cutoffTime time.Time, limit int32) ([]*AttachedFunction, error)
63+
HardDeleteAttachedFunction(id uuid.UUID) error
6264
}

idl/chromadb/proto/coordinator.proto

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -691,6 +691,21 @@ message PeekScheduleByCollectionIdResponse {
691691
repeated ScheduleEntry schedule = 1;
692692
}
693693

694+
message GetSoftDeletedAttachedFunctionsRequest {
695+
google.protobuf.Timestamp cutoff_time = 1;
696+
int32 limit = 2;
697+
}
698+
699+
message GetSoftDeletedAttachedFunctionsResponse {
700+
repeated AttachedFunction attached_functions = 1;
701+
}
702+
703+
message FinishAttachedFunctionDeletionRequest {
704+
string attached_function_id = 1;
705+
}
706+
707+
message FinishAttachedFunctionDeletionResponse {}
708+
694709
service SysDB {
695710
rpc CreateDatabase(CreateDatabaseRequest) returns (CreateDatabaseResponse) {}
696711
rpc GetDatabase(GetDatabaseRequest) returns (GetDatabaseResponse) {}
@@ -740,4 +755,6 @@ service SysDB {
740755
rpc CleanupExpiredPartialAttachedFunctions(CleanupExpiredPartialAttachedFunctionsRequest) returns (CleanupExpiredPartialAttachedFunctionsResponse) {}
741756
rpc GetFunctions(GetFunctionsRequest) returns (GetFunctionsResponse) {}
742757
rpc PeekScheduleByCollectionId(PeekScheduleByCollectionIdRequest) returns (PeekScheduleByCollectionIdResponse) {}
758+
rpc GetSoftDeletedAttachedFunctions(GetSoftDeletedAttachedFunctionsRequest) returns (GetSoftDeletedAttachedFunctionsResponse) {}
759+
rpc FinishAttachedFunctionDeletion(FinishAttachedFunctionDeletionRequest) returns (FinishAttachedFunctionDeletionResponse) {}
743760
}

rust/frontend/src/impls/service_based_frontend.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1974,6 +1974,11 @@ impl ServiceBasedFrontend {
19741974
chroma_sysdb::DeleteAttachedFunctionError::FailedToDeleteAttachedFunction(s) => {
19751975
DetachFunctionError::Internal(Box::new(chroma_error::TonicError(s)))
19761976
}
1977+
chroma_sysdb::DeleteAttachedFunctionError::NotImplemented => {
1978+
DetachFunctionError::Internal(Box::new(chroma_error::TonicError(
1979+
tonic::Status::unimplemented("Not implemented"),
1980+
)))
1981+
}
19771982
})?;
19781983

19791984
Ok(DetachFunctionResponse { success: true })

rust/garbage_collector/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ chroma-index = { workspace = true }
5555
chroma-memberlist = { workspace = true }
5656
chroma-tracing = { workspace = true }
5757
chroma-jemalloc-pprof-server = { workspace = true }
58+
s3heap = { workspace = true }
59+
s3heap-service = { workspace = true }
5860
wal3 = { workspace = true }
5961

6062
[target.'cfg(not(target_env = "msvc"))'.dependencies]

0 commit comments

Comments
 (0)