Skip to content

Commit d7f0c96

Browse files
craig[bot]rickystewartstevendannaandyyang890
committed
150270: bazelbuilder: add `s390x` support r=rail a=rickystewart The `s390x` version of the image has the build utilities installed, but NOT some unnecessary bits like the cloud CLI's. Epic: CRDB-21133 Part of: DEVINF-1495 150281: kvcoord: disallow pipelining while write buffering is enabled r=miraradeva,arulajmani a=stevendanna The interaction between pipelining and write buffering has revealed a couple of subtle bugs. Further, these two optimisations are largely overlapping. In the happy case, when write buffering is enabled, we expect almost not write pipelining. Here, we disable pipelining when write buffering is enabled. The upside of this is: 1. It avoids other possible bugs in the interaction between these features. 2. It makes it a bit easier to reason about the behaviour of write-buffered transactions. The downsides include: 1. This results in a negative performance impact for users who turn on write buffering but whose transactions often result in buffering later being disabled. (Mitigated by the second commit). 2. Write pipelining has been the default for many releases and it is not clear that running with write pipelining disabled is as well tested as the enabled code path. Fixes #149911 Epic: none Release note: None 150283: changefeedccl: fix ALTER CHANGEFEED max PTS age bug r=KeithCh,asg0451 a=andyyang890 This patch fixes a bug where modifying a changefeed with ALTER CHANGEFEED that either unset or left the `gc_protect_expires_after` option unset would cause the changefeed's max PTS age to become unbounded instead of being set to the default value configured by the `changefeed.protect_timestamp.max_age` cluster setting. This bug was occurring because the ALTER CHANGEFEED code was manually querying the option instead of relying on the CREATE CHANGEFEED code path that also consulted the cluster setting. This inconsistency has now been corrected. Fixes #148029 Release note (bug fix): A bug where modifying a changefeed with ALTER CHANGEFEED that either unset or left the `gc_protect_expires_after` option unset would cause the changefeed's max PTS age to become unbounded instead of being set to the default value configured by the `changefeed.protect_timestamp.max_age` cluster setting. Co-authored-by: Ricky Stewart <[email protected]> Co-authored-by: Steven Danna <[email protected]> Co-authored-by: Andy Yang <[email protected]>
4 parents f09c7b8 + 4c54650 + cf7e2ee + b721e45 commit d7f0c96

File tree

10 files changed

+325
-53
lines changed

10 files changed

+325
-53
lines changed

build/bazelbuilder/Dockerfile

Lines changed: 80 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
FROM us-east1-docker.pkg.dev/crl-docker-sync/docker-io/library/ubuntu:focal
44
ARG TARGETPLATFORM
55

6+
SHELL ["/usr/bin/bash", "-c"]
7+
68
RUN apt-get update \
79
&& DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends \
810
apt-transport-https \
@@ -39,14 +41,27 @@ RUN apt-get update \
3941
# c-deps/*-rebuild to force recreating the makefiles. This prevents
4042
# strange build errors caused by those makefiles depending on the
4143
# installed version of cmake.
42-
RUN case ${TARGETPLATFORM} in \
44+
RUN if [[ ${TARGETPLATFORM} == "linux/s390x" ]] ; then \
45+
curl -fsSL "https://github.com/Kitware/CMake/archive/refs/tags/v3.20.3.tar.gz" -o cmake.tar.gz \
46+
&& echo "aa059c7f89b56215301f1baac8f88a70a67a334495c9ab6a728b97e1defab763 cmake.tar.gz" | sha256sum -c - \
47+
&& tar -xzf cmake.tar.gz \
48+
&& cd CMake-3.20.3 \
49+
&& ./bootstrap -- -DCMAKE_USE_OPENSSL=OFF \
50+
&& make \
51+
&& make install \
52+
&& cd .. \
53+
&& rm -rf CMake-3.20.3 \
54+
&& rm cmake.tar.gz ; \
55+
else \
56+
case ${TARGETPLATFORM} in \
4357
"linux/amd64") ARCH=x86_64; SHASUM=97bf730372f9900b2dfb9206fccbcf92f5c7f3b502148b832e77451aa0f9e0e6 ;; \
4458
"linux/arm64") ARCH=aarch64; SHASUM=77620f99e9d5f39cf4a49294c6a68c89a978ecef144894618974b9958efe3c2a ;; \
4559
esac \
46-
&& curl -fsSL "https://github.com/Kitware/CMake/releases/download/v3.20.3/cmake-3.20.3-linux-$ARCH.tar.gz" -o cmake.tar.gz \
47-
&& echo "$SHASUM cmake.tar.gz" | sha256sum -c - \
48-
&& tar --strip-components=1 -C /usr -xzf cmake.tar.gz \
49-
&& rm cmake.tar.gz
60+
&& curl -fsSL "https://github.com/Kitware/CMake/releases/download/v3.20.3/cmake-3.20.3-linux-$ARCH.tar.gz" -o cmake.tar.gz \
61+
&& echo "$SHASUM cmake.tar.gz" | sha256sum -c - \
62+
&& tar --strip-components=1 -C /usr -xzf cmake.tar.gz \
63+
&& rm cmake.tar.gz ; \
64+
fi
5065

5166
# git - Upgrade to a more modern version
5267
RUN apt-get update && \
@@ -62,16 +77,39 @@ RUN apt-get update && \
6277
cd .. && \
6378
rm -rf git-2.29.2.zip git-2.29.2
6479

65-
RUN curl -fsSL https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add - \
66-
&& echo 'deb https://packages.cloud.google.com/apt cloud-sdk main' | tee /etc/apt/sources.list.d/gcloud.list \
67-
&& curl -fsLS https://packages.microsoft.com/keys/microsoft.asc | gpg --dearmor | apt-key add - \
80+
# NB: Don't install the azure CLI on s390x which doesn't support it.
81+
RUN if [[ ${TARGETPLATFORM} != "linux/s390x" ]]; then \
82+
curl -fsLS https://packages.microsoft.com/keys/microsoft.asc | gpg --dearmor | apt-key add - \
6883
&& echo "deb https://packages.microsoft.com/repos/azure-cli/ focal main" > /etc/apt/sources.list.d/azure-cli.list \
6984
&& apt-get update \
70-
&& DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends \
71-
azure-cli \
72-
google-cloud-sdk \
73-
google-cloud-cli-gke-gcloud-auth-plugin \
74-
&& apt-get clean
85+
&& DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends azure-cli \
86+
&& apt-get clean ; \
87+
fi
88+
89+
# NB: As above, this is not available on `s390x`.
90+
RUN if [[ ${TARGETPLATFORM} != "linux/s390x" ]]; then \
91+
case ${TARGETPLATFORM} in \
92+
"linux/amd64") ARCH=x86_64; SHASUM= ;; \
93+
"linux/arm64") ARCH=arm; SHASUM=e6153461e3154ebce61d35b73005bdd14a0ecacd42e5008f66e25b4ad231e5c9 ;; \
94+
esac \
95+
&& curl -fsSL "https://dl.google.com/dl/cloudsdk/channels/rapid/downloads/google-cloud-cli-linux-$ARCH.tar.gz" -o gcloud.tar.gz \
96+
&& tar -xzf gcloud.tar.gz \
97+
&& rm gcloud.tar.gz ; \
98+
fi
99+
100+
ENV PATH="$PATH:/google-cloud-sdk/bin"
101+
102+
# NB: We're going to run `dev` builds inside the Docker image on `s390x`,
103+
# as we can't cross-compile them (there are no cross-toolchains for `s390x`
104+
# hosts. This means we need these extra dependencies installed specifically
105+
# on that platform. Don't install them on other platforms to avoid taking
106+
# unintended dependencies on them.
107+
RUN if [[ ${TARGETPLATFORM} == "linux/s390x" ]]; then \
108+
DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends \
109+
keyutils \
110+
libresolv-wrapper \
111+
&& apt-get clean ; \
112+
fi
75113

76114
RUN apt-get purge -y \
77115
apt-transport-https \
@@ -81,19 +119,23 @@ RUN apt-get purge -y \
81119

82120
# awscli - roachtests
83121
# NB: we don't use apt-get because we need an up to date version of awscli
84-
RUN case ${TARGETPLATFORM} in \
122+
# NB: Don't install these SDK's that are unavailable for s390x.
123+
RUN if [[ ${TARGETPLATFORM} != "linux/s390x" ]]; then \
124+
case ${TARGETPLATFORM} in \
85125
"linux/amd64") ARCH=x86_64; SHASUM=e679933eec90b0e5a75d485be6c2fae0f89a3f9ccdcb1748be69f8f456e9a85f ;; \
86126
"linux/arm64") ARCH=aarch64; SHASUM=7d6460f795712ebdac7e3c60d4800dde682d136d909810402aac164f2789b860 ;; \
87-
esac && \
88-
curl -fsSL "https://awscli.amazonaws.com/awscli-exe-linux-$ARCH-2.13.9.zip" -o "awscliv2.zip" && \
89-
echo "$SHASUM awscliv2.zip" | sha256sum -c - && \
90-
unzip awscliv2.zip && \
91-
./aws/install && \
92-
rm -rf aws awscliv2.zip
127+
esac \
128+
&& curl -fsSL "https://awscli.amazonaws.com/awscli-exe-linux-$ARCH-2.13.9.zip" -o "awscliv2.zip" \
129+
&& echo "$SHASUM awscliv2.zip" | sha256sum -c - \
130+
&& unzip awscliv2.zip \
131+
&& ./aws/install \
132+
&& rm -rf aws awscliv2.zip ; \
133+
fi
93134

94135
RUN case ${TARGETPLATFORM} in \
95136
"linux/amd64") ARCH=x86_64; SHASUM=a3fb9c1de3512bc91f27cc47297d6d6cf208adee9b64ed719130da59ac13e26b ;; \
96137
"linux/arm64") ARCH=aarch64; SHASUM=e5165eb592a317e1f6da0ac7fcbccf60d7fb8e5ac1f0d7336a9be51c23308b06 ;; \
138+
"linux/s390x") ARCH=s390x; SHASUM=4969ae702488cb79afd14bf91c10b496996852b82a364907c1ebfa5f1667a139 ;; \
97139
esac && \
98140
curl -fsSL "https://github.com/NixOS/patchelf/releases/download/0.17.2/patchelf-0.17.2-$ARCH.tar.gz" -o "patchelf.tar.gz" && \
99141
echo "$SHASUM patchelf.tar.gz" | sha256sum -c - && \
@@ -105,10 +147,11 @@ RUN case ${TARGETPLATFORM} in \
105147
# build/bootstrap/bootstrap-debian.sh -- if an update is necessary here, it's probably
106148
# necessary in the agent as well.
107149
RUN case ${TARGETPLATFORM} in \
108-
"linux/amd64") ARCH=amd64; SHASUM=4cb534c52cdd47a6223d4596d530e7c9c785438ab3b0a49ff347e991c210b2cd ;; \
109-
"linux/arm64") ARCH=arm64; SHASUM=c1de6860dd4f8d5e2ec270097bd46d6a211b971a0b8b38559784bd051ea950a1 ;; \
150+
"linux/amd64") ARCH=amd64; SHASUM=84916c44c8d81cb64f6c9a9f8fd8fa059342e872bfc1ce185f5dcbf70c6aadea ;; \
151+
"linux/arm64") ARCH=arm64; SHASUM=7937c941e5140a6a22f6b84919e561b9b77ec49e307852ed0b3cc2a45beace9e ;; \
152+
"linux/s390x") ARCH=s390x; SHASUM=df7a5cfe632da022bb2cdc51824e8b04634d86d3ad4a24610c4da758c2e5708f ;; \
110153
esac \
111-
&& curl -fsSL "https://github.com/bazelbuild/bazelisk/releases/download/v1.10.1/bazelisk-linux-$ARCH" > /tmp/bazelisk \
154+
&& curl -fsSL "https://github.com/cockroachdb/bazelisk/releases/download/2025-07-14/bazelisk-linux-$ARCH" > /tmp/bazelisk \
112155
&& echo "$SHASUM /tmp/bazelisk" | sha256sum -c - \
113156
&& chmod +x /tmp/bazelisk \
114157
&& mv /tmp/bazelisk /usr/bin/bazel
@@ -119,14 +162,26 @@ RUN ln -sf /usr/bin/llvm-nm /usr/bin/nm
119162

120163
RUN rm -rf /tmp/* /var/lib/apt/lists/*
121164

122-
RUN case ${TARGETPLATFORM} in \
165+
RUN if [[ ${TARGETPLATFORM} == "linux/s390x" ]]; then \
166+
curl -fsSL "https://github.com/benesch/autouseradd/archive/refs/tags/1.3.0.tar.gz" -o autouseradd.tar.gz \
167+
&& echo "da70cbb00878ab395276b0f6191815a763bc8aa2fc120fb36580f6313de4c41f autouseradd.tar.gz" | sha256sum -c - \
168+
&& tar -xzf autouseradd.tar.gz \
169+
&& cd autouseradd-1.3.0 \
170+
&& make \
171+
&& make install \
172+
&& cd .. \
173+
&& rm -rf autouseradd-1.3.0 \
174+
&& rm autouseradd.tar.gz ; \
175+
else \
176+
case ${TARGETPLATFORM} in \
123177
"linux/amd64") ARCH=amd64; SHASUM=442dae58b727a79f81368127fac141d7f95501ffa05f8c48943d27c4e807deb7 ;; \
124178
"linux/arm64") ARCH=arm64; SHASUM=b216bebfbe30c3c156144cff07233654e23025e26ab5827058c9b284e130599e ;; \
125179
esac \
126180
&& curl -fsSL "https://github.com/benesch/autouseradd/releases/download/1.3.0/autouseradd-1.3.0-$ARCH.tar.gz" -o autouseradd.tar.gz \
127181
&& echo "$SHASUM autouseradd.tar.gz" | sha256sum -c - \
128182
&& tar xzf autouseradd.tar.gz --strip-components 1 \
129-
&& rm autouseradd.tar.gz
183+
&& rm autouseradd.tar.gz ; \
184+
fi
130185

131186
ENTRYPOINT ["autouseradd", "--user", "roach", "--no-create-home"]
132187
CMD ["/usr/bin/bash"]

build/teamcity/internal/cockroach/build/ci/build-and-push-bazel-builder-image.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ docker_login_gcr "$gar_repository" "$IMAGE_BUILDER_GOOGLE_CREDENTIALS"
1515

1616
TAG=$(date +%Y%m%d-%H%M%S)
1717
docker buildx create --name "builder-$TAG" --use
18-
docker buildx build --push --platform linux/amd64,linux/arm64 -t "$gar_repository:$TAG" -t "$gar_repository:latest-do-not-use" build/bazelbuilder
18+
docker buildx build --push --platform linux/amd64,linux/arm64,linux/s390x -t "$gar_repository:$TAG" -t "$gar_repository:latest-do-not-use" build/bazelbuilder
1919

2020
if [[ "$open_pr_on_success" == "true" ]]; then
2121
# Trigger "Open New Bazel Builder Image PR".

pkg/ccl/changefeedccl/alter_changefeed_stmt.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -227,11 +227,11 @@ func alterChangefeedPlanHook(
227227
newPayload.Details = jobspb.WrapPayloadDetails(newDetails)
228228
newPayload.Description = jobRecord.Description
229229
newPayload.DescriptorIDs = jobRecord.DescriptorIDs
230-
newExpiration, err := newOptions.GetPTSExpiration()
231-
if err != nil {
232-
return err
233-
}
234-
newPayload.MaximumPTSAge = newExpiration
230+
231+
// The maximum PTS age on jobRecord will be set correctly (based on either
232+
// the option or cluster setting) by createChangefeedJobRecord.
233+
newPayload.MaximumPTSAge = jobRecord.MaximumPTSAge
234+
235235
j, err := p.ExecCfg().JobRegistry.LoadJobWithTxn(ctx, jobID, p.InternalSQLTxn())
236236
if err != nil {
237237
return err

pkg/ccl/changefeedccl/protected_timestamps_test.go

Lines changed: 104 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -424,27 +424,114 @@ func TestChangefeedCanceledWhenPTSIsOld(t *testing.T) {
424424
// single row with multiple versions.
425425
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b INT)`)
426426

427-
feed, err := f.Feed("CREATE CHANGEFEED FOR TABLE foo WITH protect_data_from_gc_on_pause, gc_protect_expires_after='24h'")
428-
require.NoError(t, err)
429-
defer func() {
430-
closeFeed(t, feed)
431-
}()
427+
t.Run("canceled due to gc_protect_expires_after option", func(t *testing.T) {
428+
testutils.RunValues(t, "initially-protected-with", []string{"none", "option", "setting"},
429+
func(t *testing.T, initialProtect string) {
430+
defer func() {
431+
sqlDB.Exec(t, `RESET CLUSTER SETTING changefeed.protect_timestamp.max_age`)
432+
}()
433+
434+
if initialProtect == "option" {
435+
// We set the cluster setting to something small to make sure that
436+
// the option alone is able to protect the PTS record.
437+
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.protect_timestamp.max_age = '1us'`)
438+
} else {
439+
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.protect_timestamp.max_age = '24h'`)
440+
}
432441

433-
jobFeed := feed.(cdctest.EnterpriseTestFeed)
434-
require.NoError(t, jobFeed.Pause())
442+
feedStmt := `CREATE CHANGEFEED FOR TABLE foo`
443+
switch initialProtect {
444+
case "none":
445+
feedStmt += ` WITH gc_protect_expires_after='1us'`
446+
case "option":
447+
feedStmt += ` WITH gc_protect_expires_after='24h'`
448+
}
449+
450+
feed, err := f.Feed(feedStmt)
451+
require.NoError(t, err)
452+
defer func() {
453+
closeFeed(t, feed)
454+
}()
435455

436-
// While the job is paused, take opportunity to test that alter changefeed
437-
// works when setting gc_protect_expires_after option.
456+
jobFeed := feed.(cdctest.EnterpriseTestFeed)
438457

439-
// Verify we can set it to 0 -- i.e. disable.
440-
sqlDB.Exec(t, fmt.Sprintf("ALTER CHANGEFEED %d SET gc_protect_expires_after = '0s'", jobFeed.JobID()))
441-
// Now, set it to something very small.
442-
sqlDB.Exec(t, fmt.Sprintf("ALTER CHANGEFEED %d SET gc_protect_expires_after = '250ms'", jobFeed.JobID()))
458+
if initialProtect != "none" {
459+
require.NoError(t, jobFeed.Pause())
460+
461+
// Wait a little bit and make sure the job ISN'T canceled.
462+
require.ErrorContains(t, jobFeed.WaitDurationForState(10*time.Second, func(s jobs.State) bool {
463+
return s == jobs.StateCanceled
464+
}), `still waiting for job status; current status is "paused"`)
465+
466+
if initialProtect == "option" {
467+
// Set the cluster setting back to something high to make sure the
468+
// option alone can cause the changefeed to be canceled.
469+
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.protect_timestamp.max_age = '24h'`)
470+
}
471+
472+
// Set option to something small so that job will be canceled.
473+
sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d SET gc_protect_expires_after = '1us'`, jobFeed.JobID()))
474+
}
443475

444-
// Stale PTS record should trigger job cancellation.
445-
require.NoError(t, jobFeed.WaitForState(func(s jobs.State) bool {
446-
return s == jobs.StateCanceled
447-
}))
476+
// Stale PTS record should trigger job cancellation.
477+
require.NoError(t, jobFeed.WaitForState(func(s jobs.State) bool {
478+
return s == jobs.StateCanceled
479+
}))
480+
})
481+
})
482+
483+
t.Run("canceled due to changefeed.protect_timestamp.max_age setting", func(t *testing.T) {
484+
testutils.RunValues(t, "initially-protected-with", []string{"none", "option", "setting"},
485+
func(t *testing.T, initialProtect string) {
486+
defer func() {
487+
sqlDB.Exec(t, `RESET CLUSTER SETTING changefeed.protect_timestamp.max_age`)
488+
}()
489+
490+
if initialProtect == "setting" {
491+
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.protect_timestamp.max_age = '24h'`)
492+
} else {
493+
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.protect_timestamp.max_age = '1us'`)
494+
}
495+
496+
// Set the max age cluster setting to something small.
497+
feedStmt := `CREATE CHANGEFEED FOR TABLE foo`
498+
if initialProtect == "option" {
499+
feedStmt += ` WITH gc_protect_expires_after='24h'`
500+
}
501+
feed, err := f.Feed(feedStmt)
502+
require.NoError(t, err)
503+
defer func() {
504+
closeFeed(t, feed)
505+
}()
506+
507+
jobFeed := feed.(cdctest.EnterpriseTestFeed)
508+
509+
if initialProtect != "none" {
510+
require.NoError(t, jobFeed.Pause())
511+
512+
// Wait a little bit and make sure the job ISN'T canceled.
513+
require.ErrorContains(t, jobFeed.WaitDurationForState(10*time.Second, func(s jobs.State) bool {
514+
return s == jobs.StateCanceled
515+
}), `still waiting for job status; current status is "paused"`)
516+
517+
switch initialProtect {
518+
case "option":
519+
// Reset the option so that it defaults to the cluster setting.
520+
sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d SET gc_protect_expires_after = '0s'`, jobFeed.JobID()))
521+
case "setting":
522+
// Modify the cluster setting and do an ALTER CHANGEFEED so that
523+
// the new value is picked up.
524+
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.protect_timestamp.max_age = '1us'`)
525+
sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d SET diff`, jobFeed.JobID()))
526+
}
527+
}
528+
529+
// Stale PTS record should trigger job cancellation.
530+
require.NoError(t, jobFeed.WaitForState(func(s jobs.State) bool {
531+
return s == jobs.StateCanceled
532+
}))
533+
})
534+
})
448535
}
449536

450537
cdcTestWithSystem(t, testFn, feedTestEnterpriseSinks)

pkg/ccl/changefeedccl/testfeed_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -491,7 +491,7 @@ func (f *jobFeed) WaitDurationForState(
491491
if statusPred(jobs.State(status)) {
492492
return nil
493493
}
494-
return errors.Newf("still waiting for job status; current %s", status)
494+
return errors.Newf("still waiting for job status; current status is %q", status)
495495
}, dur)
496496
}
497497

pkg/kv/kvclient/kvcoord/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ go_test(
159159
"txn_interceptor_pipeliner_test.go",
160160
"txn_interceptor_seq_num_allocator_test.go",
161161
"txn_interceptor_span_refresher_test.go",
162+
"txn_interceptor_write_buffer_client_test.go",
162163
"txn_interceptor_write_buffer_test.go",
163164
"txn_test.go",
164165
":bufferedwrite_interval_btree_test.go", # keep

pkg/kv/kvclient/kvcoord/txn_coord_sender.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,10 @@ func newRootTxnCoordSender(
281281
timeSource: timeutil.DefaultTimeSource{},
282282
txn: &tcs.mu.txn,
283283
}
284+
tcs.interceptorAlloc.txnWriteBuffer.init(
285+
&tcs.interceptorAlloc.txnPipeliner,
286+
)
287+
284288
tcs.initCommonInterceptors(tcf, txn, kv.RootTxn)
285289

286290
// Once the interceptors are initialized, piece them all together in the
@@ -446,6 +450,7 @@ func (tc *TxnCoordSender) DisablePipelining() error {
446450
if tc.mu.active {
447451
return errors.Errorf("cannot disable pipelining on a running transaction")
448452
}
453+
tc.interceptorAlloc.txnPipeliner.disabledExplicitly = true
449454
tc.interceptorAlloc.txnPipeliner.disabled = true
450455
return nil
451456
}
@@ -1192,6 +1197,9 @@ func (tc *TxnCoordSender) SetBufferedWritesEnabled(enabled bool) {
11921197
panic("cannot enable buffered writes on a running transaction")
11931198
}
11941199
tc.interceptorAlloc.txnWriteBuffer.setEnabled(enabled)
1200+
if enabled {
1201+
tc.interceptorAlloc.txnPipeliner.disabled = true
1202+
}
11951203
}
11961204

11971205
// BufferedWritesEnabled is part of the kv.TxnSender interface.

pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -226,9 +226,14 @@ var rejectTxnMaxCount = settings.RegisterIntSetting(
226226
// attached to any end transaction request that is passed through the pipeliner
227227
// to ensure that they the locks within them are released.
228228
type txnPipeliner struct {
229-
st *cluster.Settings
230-
riGen rangeIteratorFactory // used to condense lock spans, if provided
231-
wrapped lockedSender
229+
st *cluster.Settings
230+
riGen rangeIteratorFactory // used to condense lock spans, if provided
231+
wrapped lockedSender
232+
// disabledExplicitly tracks whether the user called txn.DisablePipelining().
233+
// This is separate from disabled so that the txnWriteBuffer can enable
234+
// pipelining if it has flushed its buffer iff pipelining wasn't previously
235+
// explicitly disabled.
236+
disabledExplicitly bool
232237
disabled bool
233238
txnMetrics *TxnMetrics
234239
condensedIntentsEveryN *log.EveryN
@@ -587,6 +592,12 @@ func (tp *txnPipeliner) canUseAsyncConsensus(ctx context.Context, ba *kvpb.Batch
587592
return true
588593
}
589594

595+
// enableImplicitPipelining enables pipelining unless pipelining was explicitly
596+
// disabled previously.
597+
func (tp *txnPipeliner) enableImplicitPipelining() {
598+
tp.disabled = tp.disabledExplicitly
599+
}
600+
590601
// chainToInFlightWrites ensures that we "chain" on to any in-flight writes that
591602
// overlap the keys we're trying to read/write. We do this by prepending
592603
// QueryIntent requests with the ErrorIfMissing option before each request that

0 commit comments

Comments
 (0)