Skip to content

Commit ffea5af

Browse files
committed
c2c: pass statement transaction when loading producer side job
This patch fixes a small bug where we previously used a new txn to read from the producer job instead of the txn associated with the crdb_internal builtin call. Epic: none Release note: none
1 parent 0e915fb commit ffea5af

File tree

4 files changed

+10
-7
lines changed

4 files changed

+10
-7
lines changed

pkg/ccl/logictestccl/testdata/logic_test/crdb_internal

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,8 +168,6 @@ subtest replication-builtins
168168

169169
user root
170170

171-
query error pq: crdb_internal\.replication_stream_spec\(\): job.*is not a replication stream job
172-
SELECT crdb_internal.replication_stream_spec(crdb_internal.create_sql_schema_telemetry_job())
173171

174172
query error pq: crdb_internal\.stream_ingestion_stats_json\(\): unimplemented
175173
SELECT crdb_internal.stream_ingestion_stats_json(1);

pkg/ccl/streamingccl/streamclient/partitioned_stream_client_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,11 @@ INSERT INTO d.t2 VALUES (2);
160160
_, err = client.Plan(ctx, 999)
161161
require.True(t, testutils.IsError(err, fmt.Sprintf("job with ID %d does not exist", 999)), err)
162162

163+
var telemetryJobID int64
164+
h.SysSQL.QueryRow(t, "SELECT crdb_internal.create_sql_schema_telemetry_job()").Scan(&telemetryJobID)
165+
_, err = client.Plan(ctx, streampb.StreamID(telemetryJobID))
166+
require.True(t, testutils.IsError(err, fmt.Sprintf("job with id %d is not a replication stream job", telemetryJobID)), err)
167+
163168
expectStreamState(streamID, jobs.StatusRunning)
164169
status, err := client.Heartbeat(ctx, streamID, hlc.Timestamp{WallTime: timeutil.Now().UnixNano()})
165170
require.NoError(t, err)

pkg/ccl/streamingccl/streamproducer/replication_manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func (r *replicationStreamManagerImpl) StreamPartition(
5555
func (r *replicationStreamManagerImpl) GetReplicationStreamSpec(
5656
ctx context.Context, streamID streampb.StreamID,
5757
) (*streampb.ReplicationStreamSpec, error) {
58-
return getReplicationStreamSpec(ctx, r.evalCtx, streamID)
58+
return getReplicationStreamSpec(ctx, r.evalCtx, r.txn, streamID)
5959
}
6060

6161
// CompleteReplicationStream implements ReplicationStreamManager interface.

pkg/ccl/streamingccl/streamproducer/stream_lifetime.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ func heartbeatReplicationStream(
188188
// job progress.
189189
if frontier == hlc.MaxTimestamp {
190190
var status streampb.StreamReplicationStatus
191-
pj, err := execConfig.JobRegistry.LoadJob(ctx, jobspb.JobID(streamID))
191+
pj, err := execConfig.JobRegistry.LoadJobWithTxn(ctx, jobspb.JobID(streamID), txn)
192192
if jobs.HasJobNotFoundError(err) || testutils.IsError(err, "not found in system.jobs table") {
193193
status.StreamStatus = streampb.StreamReplicationStatus_STREAM_INACTIVE
194194
return status, nil
@@ -219,13 +219,13 @@ func heartbeatReplicationStream(
219219

220220
// getReplicationStreamSpec gets a replication stream specification for the specified stream.
221221
func getReplicationStreamSpec(
222-
ctx context.Context, evalCtx *eval.Context, streamID streampb.StreamID,
222+
ctx context.Context, evalCtx *eval.Context, txn isql.Txn, streamID streampb.StreamID,
223223
) (*streampb.ReplicationStreamSpec, error) {
224224
jobExecCtx := evalCtx.JobExecContext.(sql.JobExecContext)
225225
// Returns error if the replication stream is not active
226-
j, err := jobExecCtx.ExecCfg().JobRegistry.LoadJob(ctx, jobspb.JobID(streamID))
226+
j, err := jobExecCtx.ExecCfg().JobRegistry.LoadJobWithTxn(ctx, jobspb.JobID(streamID), txn)
227227
if err != nil {
228-
return nil, errors.Wrapf(err, "replication stream %d has error", streamID)
228+
return nil, errors.Wrapf(err, "could not load job for replication stream %d", streamID)
229229
}
230230
if j.Status() != jobs.StatusRunning {
231231
return nil, errors.Errorf("replication stream %d is not running", streamID)

0 commit comments

Comments
 (0)