Skip to content

Commit 65ac80f

Browse files
committed
[CHORE]: Remove next_run from attached_functions
1 parent d730c23 commit 65ac80f

File tree

11 files changed

+3
-37
lines changed

11 files changed

+3
-37
lines changed

go/pkg/sysdb/coordinator/create_task_test.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,6 @@ func (suite *AttachFunctionTestSuite) TestAttachFunction_IdempotentRequest_Alrea
333333
OutputCollectionName: outputCollectionName,
334334
FunctionID: functionID,
335335
MinRecordsForInvocation: int64(MinRecordsForInvocation),
336-
NextRun: now,
337336
CreatedAt: now,
338337
UpdatedAt: now,
339338
}
@@ -480,7 +479,6 @@ func (suite *AttachFunctionTestSuite) TestAttachFunction_RecoveryFlow_HeapFailur
480479
OutputCollectionName: outputCollectionName,
481480
FunctionID: functionID,
482481
MinRecordsForInvocation: int64(MinRecordsForInvocation),
483-
NextRun: now,
484482
CreatedAt: now,
485483
UpdatedAt: now,
486484
}
@@ -585,7 +583,6 @@ func (suite *AttachFunctionTestSuite) TestAttachFunction_IdempotentRequest_Param
585583
OutputCollectionName: outputCollectionName,
586584
FunctionID: existingOperatorID,
587585
MinRecordsForInvocation: int64(MinRecordsForInvocation),
588-
NextRun: now,
589586
CreatedAt: now,
590587
UpdatedAt: now,
591588
}
@@ -665,7 +662,6 @@ func TestGetSoftDeletedAttachedFunctions_TimestampConsistency(t *testing.T) {
665662
MinRecordsForInvocation: 10,
666663
CreatedAt: testTime,
667664
UpdatedAt: testTime,
668-
NextRun: testTime,
669665
},
670666
}
671667

@@ -702,9 +698,6 @@ func TestGetSoftDeletedAttachedFunctions_TimestampConsistency(t *testing.T) {
702698
if af.UpdatedAt != expectedMicros {
703699
t.Errorf("UpdatedAt timestamp mismatch: expected %d microseconds, got %d", expectedMicros, af.UpdatedAt)
704700
}
705-
if af.NextRunAt != expectedMicros {
706-
t.Errorf("NextRunAt timestamp mismatch: expected %d microseconds, got %d", expectedMicros, af.NextRunAt)
707-
}
708701

709702
// Verify these are NOT in seconds (would be ~1000x smaller)
710703
expectedSeconds := uint64(testTime.Unix())
@@ -714,9 +707,6 @@ func TestGetSoftDeletedAttachedFunctions_TimestampConsistency(t *testing.T) {
714707
if af.UpdatedAt == expectedSeconds {
715708
t.Error("UpdatedAt appears to be in seconds instead of microseconds")
716709
}
717-
if af.NextRunAt == expectedSeconds {
718-
t.Error("NextRunAt appears to be in seconds instead of microseconds")
719-
}
720710

721711
mockMetaDomain.AssertExpectations(t)
722712
mockAttachedFunctionDb.AssertExpectations(t)

go/pkg/sysdb/coordinator/list_attached_functions_test.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ func (suite *ListAttachedFunctionsTestSuite) TestListAttachedFunctions_Success()
6464
DatabaseID: "db",
6565
CompletionOffset: 10,
6666
MinRecordsForInvocation: 5,
67-
NextRun: now,
6867
CreatedAt: now,
6968
UpdatedAt: now,
7069
},
@@ -79,7 +78,6 @@ func (suite *ListAttachedFunctionsTestSuite) TestListAttachedFunctions_Success()
7978
DatabaseID: "db",
8079
CompletionOffset: 20,
8180
MinRecordsForInvocation: 15,
82-
NextRun: now,
8381
CreatedAt: now,
8482
UpdatedAt: now,
8583
},
@@ -157,7 +155,6 @@ func (suite *ListAttachedFunctionsTestSuite) TestListAttachedFunctions_FunctionD
157155
DatabaseID: "db",
158156
CompletionOffset: 0,
159157
MinRecordsForInvocation: 1,
160-
NextRun: now,
161158
CreatedAt: now,
162159
UpdatedAt: now,
163160
}
@@ -192,7 +189,6 @@ func (suite *ListAttachedFunctionsTestSuite) TestListAttachedFunctions_InvalidPa
192189
DatabaseID: "db",
193190
CompletionOffset: 0,
194191
MinRecordsForInvocation: 1,
195-
NextRun: now,
196192
CreatedAt: now,
197193
UpdatedAt: now,
198194
}

go/pkg/sysdb/coordinator/model/collection.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,6 @@ type FlushCollectionInfo struct {
9999
CollectionVersion int32
100100
TenantLastCompactionTime int64
101101
// Optional attached function fields (only populated for attached-function-based compactions)
102-
AttachedFunctionNextRun *time.Time
103102
AttachedFunctionCompletionOffset *int64
104103
}
105104

go/pkg/sysdb/coordinator/task.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,6 @@ func (s *Coordinator) AttachFunction(ctx context.Context, req *coordinatorpb.Att
111111

112112
// Validation passed, reuse the concurrent attached function's data
113113
attachedFunctionID = concurrentAttachedFunction.ID
114-
nextRun = concurrentAttachedFunction.NextRun
115114
// Already created, skip Phase 2
116115
skipPhase2 = true
117116
return nil
@@ -187,16 +186,13 @@ func (s *Coordinator) AttachFunction(ctx context.Context, req *coordinatorpb.Att
187186
FunctionParams: paramsJSON,
188187
CompletionOffset: 0,
189188
LastRun: nil,
190-
NextRun: now,
191189
MinRecordsForInvocation: int64(req.MinRecordsForInvocation),
192190
CurrentAttempts: 0,
193191
CreatedAt: now,
194192
UpdatedAt: now,
195193
OldestWrittenNonce: nil,
196194
}
197195

198-
nextRun = attachedFunction.NextRun
199-
200196
err = s.catalog.metaDomain.AttachedFunctionDb(txCtx).Insert(attachedFunction)
201197
if err != nil {
202198
log.Error("AttachFunction: failed to insert attached function", zap.Error(err))
@@ -289,7 +285,6 @@ func attachedFunctionToProto(attachedFunction *dbmodel.AttachedFunction, functio
289285
MinRecordsForInvocation: uint64(attachedFunction.MinRecordsForInvocation),
290286
TenantId: attachedFunction.TenantID,
291287
DatabaseId: attachedFunction.DatabaseID,
292-
NextRunAt: uint64(attachedFunction.NextRun.UnixMicro()),
293288
CreatedAt: uint64(attachedFunction.CreatedAt.UnixMicro()),
294289
UpdatedAt: uint64(attachedFunction.UpdatedAt.UnixMicro()),
295290
}
@@ -729,7 +724,6 @@ func (s *Coordinator) GetSoftDeletedAttachedFunctions(ctx context.Context, req *
729724
UpdatedAt: uint64(af.UpdatedAt.UnixMicro()),
730725
}
731726

732-
protoAttachedFunctions[i].NextRunAt = uint64(af.NextRun.UnixMicro())
733727
if af.OutputCollectionID != nil {
734728
protoAttachedFunctions[i].OutputCollectionId = proto.String(*af.OutputCollectionID)
735729
}

go/pkg/sysdb/metastore/db/dbmodel/task.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ type AttachedFunction struct {
1818
FunctionParams string `gorm:"column:function_params;type:jsonb;not null"`
1919
CompletionOffset int64 `gorm:"column:completion_offset;type:bigint;not null;default:0"`
2020
LastRun *time.Time `gorm:"column:last_run;type:timestamp"`
21-
NextRun time.Time `gorm:"column:next_run;type:timestamp;not null"`
2221
MinRecordsForInvocation int64 `gorm:"column:min_records_for_invocation;type:bigint;not null;default:100"`
2322
CurrentAttempts int32 `gorm:"column:current_attempts;type:integer;not null;default:0"`
2423
IsAlive bool `gorm:"column:is_alive;type:boolean;not null;default:true"`
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- Remove next_run column from attached_functions table as it's no longer needed
2+
ALTER TABLE "public"."attached_functions" DROP COLUMN "next_run";

idl/chromadb/proto/coordinator.proto

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -575,7 +575,6 @@ message AttachedFunction {
575575
uint64 min_records_for_invocation = 9;
576576
string tenant_id = 10;
577577
string database_id = 11;
578-
uint64 next_run_at = 12;
579578
uint64 created_at = 15;
580579
uint64 updated_at = 16;
581580
string function_id = 17;

rust/sysdb/src/sysdb.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1743,10 +1743,6 @@ impl GrpcSysDb {
17431743
})?,
17441744
);
17451745

1746-
// Parse next_run timestamp from microseconds
1747-
let next_run = std::time::SystemTime::UNIX_EPOCH
1748-
+ std::time::Duration::from_micros(attached_function.next_run_at);
1749-
17501746
// Convert params from Struct to JSON string
17511747
let params_str = attached_function.params.map(|s| {
17521748
let json_value = prost_struct_to_json(s);
@@ -1795,7 +1791,6 @@ impl GrpcSysDb {
17951791
tenant_id: attached_function.tenant_id,
17961792
database_id: attached_function.database_id,
17971793
last_run: None,
1798-
next_run,
17991794
completion_offset: attached_function.completion_offset,
18001795
min_records_for_invocation: attached_function.min_records_for_invocation,
18011796
is_deleted: false,

rust/sysdb/src/test_sysdb.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -702,7 +702,6 @@ fn attached_function_to_proto(
702702
id: attached_function.id.0.to_string(),
703703
name: attached_function.name.clone(),
704704
function_name: attached_function.function_id.to_string(),
705-
function_id: attached_function.function_id.to_string(),
706705
input_collection_id: attached_function.input_collection_id.0.to_string(),
707706
output_collection_name: attached_function.output_collection_name.clone(),
708707
output_collection_id: attached_function
@@ -714,9 +713,9 @@ fn attached_function_to_proto(
714713
min_records_for_invocation: attached_function.min_records_for_invocation,
715714
tenant_id: attached_function.tenant_id.clone(),
716715
database_id: attached_function.database_id.clone(),
717-
next_run_at: system_time_to_micros(attached_function.next_run),
718716
created_at: system_time_to_micros(attached_function.created_at),
719717
updated_at: system_time_to_micros(attached_function.updated_at),
718+
function_id: attached_function.function_id.to_string(),
720719
}
721720
}
722721

rust/types/src/flush.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ impl ChromaError for AdvanceAttachedFunctionError {
6969

7070
#[derive(Debug, Clone)]
7171
pub struct AdvanceAttachedFunctionResponse {
72-
pub next_run: std::time::SystemTime,
7372
pub completion_offset: u64,
7473
}
7574

@@ -126,8 +125,6 @@ pub enum FlushCompactionResponseConversionError {
126125
InvalidUuid,
127126
#[error("Invalid attached function nonce, valid UUID required")]
128127
InvalidAttachedFunctionNonce,
129-
#[error("Missing next_run timestamp")]
130-
MissingNextRun,
131128
#[error("Invalid timestamp format")]
132129
InvalidTimestamp,
133130
}
@@ -139,7 +136,6 @@ impl ChromaError for FlushCompactionResponseConversionError {
139136
FlushCompactionResponseConversionError::InvalidAttachedFunctionNonce => {
140137
ErrorCodes::InvalidArgument
141138
}
142-
FlushCompactionResponseConversionError::MissingNextRun => ErrorCodes::InvalidArgument,
143139
FlushCompactionResponseConversionError::InvalidTimestamp => ErrorCodes::InvalidArgument,
144140
FlushCompactionResponseConversionError::DecodeError(e) => e.code(),
145141
}

0 commit comments

Comments
 (0)