Skip to content

Commit c2ce44c

Browse files
committed
c2c: fix producer job proto
This patch fixes a papercut where we were storing pointers to spans in the producer job proto. This isn't a correctness bug, as protocol buffers will dereference pointers to objects during unmarshalling, but rather, it seems dangerous to work with slices of pointers to spans. Release note: None
1 parent 45cd1c9 commit c2ce44c

File tree

4 files changed

+7
-12
lines changed

4 files changed

+7
-12
lines changed

pkg/ccl/streamingccl/streamproducer/producer_job.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@ import (
2929
"github.com/cockroachdb/errors"
3030
)
3131

32-
func makeTenantSpan(tenantID uint64) *roachpb.Span {
32+
func makeTenantSpan(tenantID uint64) roachpb.Span {
3333
prefix := keys.MakeTenantPrefix(roachpb.MustMakeTenantID(tenantID))
34-
return &roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()}
34+
return roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()}
3535
}
3636

3737
func makeProducerJobRecord(
@@ -47,7 +47,7 @@ func makeProducerJobRecord(
4747
Username: user,
4848
Details: jobspb.StreamReplicationDetails{
4949
ProtectedTimestampRecordID: ptsID,
50-
Spans: []*roachpb.Span{makeTenantSpan(tenantID)},
50+
Spans: []roachpb.Span{makeTenantSpan(tenantID)},
5151
TenantID: roachpb.MustMakeTenantID(tenantID),
5252
},
5353
Progress: jobspb.StreamReplicationProgress{

pkg/ccl/streamingccl/streamproducer/producer_job_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ func TestStreamReplicationProducerJob(t *testing.T) {
160160
return insqlDB.Txn(ctx, func(
161161
ctx context.Context, txn isql.Txn,
162162
) error {
163-
deprecatedTenantSpan := roachpb.Spans{*makeTenantSpan(30)}
163+
deprecatedTenantSpan := roachpb.Spans{makeTenantSpan(30)}
164164
tenantTarget := ptpb.MakeTenantsTarget([]roachpb.TenantID{roachpb.MustMakeTenantID(30)})
165165
record := jobsprotectedts.MakeRecord(
166166
ptsID, int64(jr.JobID), ts, deprecatedTenantSpan,

pkg/ccl/streamingccl/streamproducer/stream_lifetime.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func startReplicationProducerJob(
6464
statementTime := hlc.Timestamp{
6565
WallTime: evalCtx.GetStmtTimestamp().UnixNano(),
6666
}
67-
deprecatedSpansToProtect := roachpb.Spans{*makeTenantSpan(tenantID)}
67+
deprecatedSpansToProtect := roachpb.Spans{makeTenantSpan(tenantID)}
6868
targetToProtect := ptpb.MakeTenantsTarget([]roachpb.TenantID{roachpb.MustMakeTenantID(tenantID)})
6969
pts := jobsprotectedts.MakeRecord(ptsID, int64(jr.JobID), statementTime,
7070
deprecatedSpansToProtect, jobsprotectedts.Jobs, targetToProtect)
@@ -236,12 +236,7 @@ func getReplicationStreamSpec(
236236
if !ok {
237237
return nil, errors.Errorf("job with id %d is not a replication stream job", streamID)
238238
}
239-
replicatedSpans := details.Spans
240-
spans := make([]roachpb.Span, 0, len(replicatedSpans))
241-
for _, span := range replicatedSpans {
242-
spans = append(spans, *span)
243-
}
244-
spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, spans)
239+
spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, details.Spans)
245240
if err != nil {
246241
return nil, err
247242
}

pkg/jobs/jobspb/jobs.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ message StreamIngestionProgress {
184184

185185
message StreamReplicationDetails {
186186
// Key spans we are replicating
187-
repeated roachpb.Span spans = 1;
187+
repeated roachpb.Span spans = 1 [(gogoproto.nullable) = false];
188188

189189
// ID of the protected timestamp record that protects the above spans
190190
bytes protected_timestamp_record_id = 2 [

0 commit comments

Comments
 (0)