Skip to content

Commit dcad761

Browse files
craig[bot]aerfreirickystewart
committed
153177: changefeedccl: remove additional job info write from pts management r=andyyang890,KeithCh a=aerfrei In per table PTS management, if a new table started lagging while another table stopped lagging, we would do two separate job info table writes. We would do one to create the new PTS record and one to remove the old one. With this change, we do only one job info table write. Epic: CRDB-1421 Release note: None 153367: build: fix some small errors r=rail a=rickystewart These were found by an LLM. None are severe, but to minimize the chances of unexpected errors or avoid further confusion we address them. Epic: none Release note: None Co-authored-by: Aerin Freilich <[email protected]> Co-authored-by: Ricky Stewart <[email protected]>
3 parents c92f38d + a64757f + 18a0b78 commit dcad761

File tree

5 files changed

+60
-52
lines changed

5 files changed

+60
-52
lines changed

build/bazelbuilder/Dockerfile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,11 @@ RUN if [[ ${TARGETPLATFORM} != "linux/s390x" ]]; then \
8989
# NB: As above, this is not available on `s390x`.
9090
RUN if [[ ${TARGETPLATFORM} != "linux/s390x" ]]; then \
9191
case ${TARGETPLATFORM} in \
92-
"linux/amd64") ARCH=x86_64; SHASUM= ;; \
92+
"linux/amd64") ARCH=x86_64; SHASUM=8ba7e746ca05f225e5a73952bbc03f4086a5f65fd94f3717df6f75f212587159 ;; \
9393
"linux/arm64") ARCH=arm; SHASUM=e6153461e3154ebce61d35b73005bdd14a0ecacd42e5008f66e25b4ad231e5c9 ;; \
9494
esac \
9595
&& curl -fsSL "https://dl.google.com/dl/cloudsdk/channels/rapid/downloads/google-cloud-cli-linux-$ARCH.tar.gz" -o gcloud.tar.gz \
96+
&& echo "$SHASUM gcloud.tar.gz" | sha256sum -c - \
9697
&& tar -xzf gcloud.tar.gz \
9798
&& rm gcloud.tar.gz ; \
9899
fi

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2022,14 +2022,19 @@ func (cf *changeFrontier) managePerTableProtectedTimestamps(
20222022
}
20232023

20242024
if len(tableIDsToRelease) > 0 {
2025-
if err := cf.releasePerTableProtectedTimestampRecords(ctx, txn, ptsEntries, tableIDsToRelease, pts); err != nil {
2025+
if err := cf.releasePerTableProtectedTimestampRecords(ctx, ptsEntries, tableIDsToRelease, pts); err != nil {
20262026
return hlc.Timestamp{}, false, err
20272027
}
2028-
updatedPerTablePTS = true
20292028
}
20302029

20312030
if len(tableIDsToCreate) > 0 {
2032-
if err := cf.createPerTableProtectedTimestampRecords(ctx, txn, ptsEntries, tableIDsToCreate, pts); err != nil {
2031+
if err := cf.createPerTableProtectedTimestampRecords(ctx, ptsEntries, tableIDsToCreate, pts); err != nil {
2032+
return hlc.Timestamp{}, false, err
2033+
}
2034+
}
2035+
2036+
if len(tableIDsToRelease) > 0 || len(tableIDsToCreate) > 0 {
2037+
if err := writeChangefeedJobInfo(ctx, perTableProtectedTimestampsFilename, ptsEntries, txn, cf.spec.JobID); err != nil {
20332038
return hlc.Timestamp{}, false, err
20342039
}
20352040
updatedPerTablePTS = true
@@ -2040,7 +2045,6 @@ func (cf *changeFrontier) managePerTableProtectedTimestamps(
20402045

20412046
func (cf *changeFrontier) releasePerTableProtectedTimestampRecords(
20422047
ctx context.Context,
2043-
txn isql.Txn,
20442048
ptsEntries *cdcprogresspb.ProtectedTimestampRecords,
20452049
tableIDs []descpb.ID,
20462050
pts protectedts.Storage,
@@ -2051,7 +2055,7 @@ func (cf *changeFrontier) releasePerTableProtectedTimestampRecords(
20512055
}
20522056
delete(ptsEntries.ProtectedTimestampRecords, tableID)
20532057
}
2054-
return writeChangefeedJobInfo(ctx, perTableProtectedTimestampsFilename, ptsEntries, txn, cf.spec.JobID)
2058+
return nil
20552059
}
20562060

20572061
func (cf *changeFrontier) advancePerTableProtectedTimestampRecord(
@@ -2079,7 +2083,6 @@ func (cf *changeFrontier) advancePerTableProtectedTimestampRecord(
20792083

20802084
func (cf *changeFrontier) createPerTableProtectedTimestampRecords(
20812085
ctx context.Context,
2082-
txn isql.Txn,
20832086
ptsEntries *cdcprogresspb.ProtectedTimestampRecords,
20842087
tableIDsToCreate map[descpb.ID]hlc.Timestamp,
20852088
pts protectedts.Storage,
@@ -2088,7 +2091,7 @@ func (cf *changeFrontier) createPerTableProtectedTimestampRecords(
20882091
ptsEntries.ProtectedTimestampRecords = make(map[descpb.ID]*uuid.UUID)
20892092
}
20902093
for tableID, tableHighWater := range tableIDsToCreate {
2091-
targets, err := cf.createPerTablePTSTarget(tableID)
2094+
targets, err := cf.createPerTablePTSTargets(tableID)
20922095
if err != nil {
20932096
return err
20942097
}
@@ -2101,22 +2104,23 @@ func (cf *changeFrontier) createPerTableProtectedTimestampRecords(
21012104
return err
21022105
}
21032106
}
2104-
return writeChangefeedJobInfo(ctx, perTableProtectedTimestampsFilename, ptsEntries, txn, cf.spec.JobID)
2107+
return nil
21052108
}
21062109

2107-
func (cf *changeFrontier) createPerTablePTSTarget(
2110+
func (cf *changeFrontier) createPerTablePTSTargets(
21082111
tableID descpb.ID,
21092112
) (changefeedbase.Targets, error) {
21102113
targets := changefeedbase.Targets{}
2111-
if cf.targets.Size > 0 {
2112-
if found, err := cf.targets.EachHavingTableID(tableID, func(target changefeedbase.Target) error {
2113-
targets.Add(target)
2114-
return nil
2115-
}); err != nil {
2116-
return changefeedbase.Targets{}, err
2117-
} else if !found {
2118-
return changefeedbase.Targets{}, errors.AssertionFailedf("attempted to create a per-table PTS record for table %d, but no target was found", tableID)
2119-
}
2114+
if found, err := cf.targets.EachHavingTableID(tableID, func(target changefeedbase.Target) error {
2115+
targets.Add(target)
2116+
return nil
2117+
}); err != nil {
2118+
return changefeedbase.Targets{}, err
2119+
} else if !found {
2120+
return changefeedbase.Targets{}, errors.AssertionFailedf(
2121+
"attempted to create a per-table PTS record for table %d, but no target was found",
2122+
tableID,
2123+
)
21202124
}
21212125
if targets.Size != 1 {
21222126
return changefeedbase.Targets{}, errors.AssertionFailedf("expected 1 target, got %d", targets.Size)

pkg/cmd/bazci/bazci.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,9 @@ func sendBepDataToBeaverHub(bepFilepath string) error {
320320
}
321321

322322
func bazciImpl(cmd *cobra.Command, args []string) error {
323+
if len(args) == 0 {
324+
return errors.Newf("must provide some subcommand (`build`, `run`, `test`, `coverage`, `merge-test-xmls`, or `munge-test-xml`)")
325+
}
323326
if args[0] != buildSubcmd && args[0] != runSubcmd && args[0] != coverageSubcmd &&
324327
args[0] != testSubcmd && args[0] != mungeTestXMLSubcmd && args[0] != mergeTestXMLsSubcmd {
325328
return errors.Newf("First argument must be `build`, `run`, `test`, `coverage`, `merge-test-xmls`, or `munge-test-xml`; got %v", args[0])

0 commit comments

Comments
 (0)