Skip to content

Commit 2a72895

Browse files
craig[bot]rafissmsbutleryuzefovich
committed
150178: kvpb: avoid over redaction of prev_err field r=rafiss a=rafiss When logging a kvpb.TxnAlreadyEncounteredErrorError, the previous error was getting redacted too aggressively. This patch makes the error use a redactable string instead. fixes #146588 Release note: None 150643: crosscluster/physical: send recompute stats requests on range key flush r=jeffswenson a=msbutler This patch ensures that destination cluster mvcc stats are up to date after a rangekey flush. Since range key flushes are assumed to be rare, we recomuptes stats over each affected range. Fixes [#142481](#142481) Release note: none 150876: sqlccl: bump engflow worker size r=yuzefovich a=yuzefovich We just saw a failure that looks like an OOM, so let's bump the worker size. Fixes: #150648. Release note: None Co-authored-by: Rafi Shamim <[email protected]> Co-authored-by: Michael Butler <[email protected]> Co-authored-by: Yahor Yuzefovich <[email protected]>
4 parents dc790f0 + 22ed1bc + 0f13ee0 + bd82415 commit 2a72895

File tree

7 files changed

+337
-23
lines changed

7 files changed

+337
-23
lines changed

pkg/ccl/testccl/sqlccl/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ go_test(
1919
data = [
2020
"//c-deps:libgeos", # keep
2121
],
22+
exec_properties = select({
23+
"//conditions:default": {"test.Pool": "large"},
24+
}),
2225
shard_count = 16,
2326
deps = [
2427
"//pkg/base",

pkg/crosscluster/physical/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ go_library(
8484
"//pkg/util/log",
8585
"//pkg/util/metric",
8686
"//pkg/util/protoutil",
87+
"//pkg/util/rangedesc",
8788
"//pkg/util/retry",
8889
"//pkg/util/span",
8990
"//pkg/util/syncutil",
@@ -144,6 +145,7 @@ go_test(
144145
"//pkg/jobs",
145146
"//pkg/jobs/jobspb",
146147
"//pkg/keys",
148+
"//pkg/kv",
147149
"//pkg/kv/kvclient/rangefeed/rangefeedcache",
148150
"//pkg/kv/kvpb",
149151
"//pkg/kv/kvserver",
@@ -185,6 +187,7 @@ go_test(
185187
"//pkg/util",
186188
"//pkg/util/ctxgroup",
187189
"//pkg/util/duration",
190+
"//pkg/util/encoding",
188191
"//pkg/util/hlc",
189192
"//pkg/util/httputil",
190193
"//pkg/util/leaktest",

pkg/crosscluster/physical/replication_stream_e2e_test.go

Lines changed: 96 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/cockroachdb/cockroach/pkg/jobs"
2525
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
2626
"github.com/cockroachdb/cockroach/pkg/keys"
27+
"github.com/cockroachdb/cockroach/pkg/kv"
2728
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
2829
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
2930
"github.com/cockroachdb/cockroach/pkg/roachpb"
@@ -44,6 +45,7 @@ import (
4445
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
4546
"github.com/cockroachdb/cockroach/pkg/testutils/storageutils"
4647
"github.com/cockroachdb/cockroach/pkg/util/duration"
48+
"github.com/cockroachdb/cockroach/pkg/util/encoding"
4749
"github.com/cockroachdb/cockroach/pkg/util/hlc"
4850
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
4951
"github.com/cockroachdb/cockroach/pkg/util/log"
@@ -1674,7 +1676,7 @@ func TestComputeStatsDiff(t *testing.T) {
16741676
var tableID int
16751677
c.SrcTenantSQL.QueryRow(t, "SELECT id FROM system.namespace WHERE name = 'x'").Scan(&tableID)
16761678
tenantID := c.Args.DestTenantID
1677-
liveCountOverPKQuery := fmt.Sprintf(`SELECT stats->'approximate_total_stats'->'live_count' FROM crdb_internal.tenant_span_stats(ARRAY(SELECT(crdb_internal.index_span(%d,%d,1)[1],crdb_internal.index_span(%d,%d,1)[2])))`, tenantID.ToUint64(), tableID, tenantID.ToUint64(), tableID)
1679+
liveCountOverPKQuery := makeLiveCountOverPKQuery(tenantID, tableID)
16781680

16791681
var liveCount int64
16801682
c.DestSysSQL.QueryRow(t, liveCountOverPKQuery).Scan(&liveCount)
@@ -1690,11 +1692,7 @@ func TestComputeStatsDiff(t *testing.T) {
16901692
// Split out the index span we will gather stats so that
16911693
// crdb_internal.tenant_span_stats() uses the stats that hang off of the
16921694
// descriptor, instead of computing the stats manually.
1693-
codec := keys.MakeSQLCodec(tenantID)
1694-
pkStartKey := codec.IndexPrefix(uint32(tableID), 1)
1695-
pkEndKey := pkStartKey.PrefixEnd()
1696-
require.NoError(t, c.DestSysServer.DB().AdminSplit(ctx, pkStartKey, hlc.MaxTimestamp))
1697-
require.NoError(t, c.DestSysServer.DB().AdminSplit(ctx, pkEndKey, hlc.MaxTimestamp))
1695+
splitPrimaryKeyIndexSpan(ctx, t, c.DestSysServer.DB(), tenantID, tableID)
16981696

16991697
c.DestSysSQL.QueryRow(t, liveCountOverPKQuery).Scan(&liveCount)
17001698
require.Equal(t, int64(2), liveCount)
@@ -1710,3 +1708,95 @@ func TestComputeStatsDiff(t *testing.T) {
17101708
c.DestSysSQL.QueryRow(t, liveCountOverPKQuery).Scan(&liveCount)
17111709
require.Equal(t, int64(2), liveCount)
17121710
}
1711+
1712+
// TestDelRangeStatsUpdates is an end to end test that ensures that mvcc stats are
1713+
// computed correctly after replicating range key deletes.
1714+
func TestDelRangeStatsUpdates(t *testing.T) {
1715+
defer leaktest.AfterTest(t)()
1716+
defer log.Scope(t).Close(t)
1717+
1718+
ctx := context.Background()
1719+
args := replicationtestutils.DefaultTenantStreamingClustersArgs
1720+
c, cleanup := replicationtestutils.CreateTenantStreamingClusters(ctx, t, args)
1721+
defer cleanup()
1722+
1723+
c.DestSysSQL.Exec(t, "SET CLUSTER SETTING server.debug.default_vmodule = 'stream_ingestion_processor=2'; ")
1724+
1725+
c.SrcTenantSQL.Exec(t, "CREATE DATABASE test")
1726+
c.SrcTenantSQL.Exec(t, "CREATE TABLE test.x (id INT PRIMARY KEY, n INT)")
1727+
1728+
var tableID int
1729+
c.SrcTenantSQL.QueryRow(t, "SELECT id FROM system.namespace WHERE name = 'x'").Scan(&tableID)
1730+
tenantID := c.Args.DestTenantID
1731+
liveCountOverPKQuery := makeLiveCountOverPKQuery(tenantID, tableID)
1732+
1733+
var liveCount int64
1734+
c.DestSysSQL.QueryRow(t, liveCountOverPKQuery).Scan(&liveCount)
1735+
require.Equal(t, int64(0), liveCount)
1736+
1737+
producerJobID, ingestionJobID := c.StartStreamReplication(ctx)
1738+
1739+
jobutils.WaitForJobToRun(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID))
1740+
jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))
1741+
1742+
c.WaitUntilStartTimeReached(jobspb.JobID(ingestionJobID))
1743+
1744+
// Split out the index span we will gather stats so that
1745+
// crdb_internal.tenant_span_stats() uses the stats that hang off of the
1746+
// descriptor, instead of computing the stats manually.
1747+
splitPrimaryKeyIndexSpan(ctx, t, c.DestSysServer.DB(), tenantID, tableID)
1748+
1749+
for i := 1; i <= 50; i++ {
1750+
c.SrcTenantSQL.Exec(t, fmt.Sprintf("INSERT INTO test.x VALUES (%d, %d)", i, i))
1751+
}
1752+
1753+
srcTime := c.SrcCluster.Server(0).Clock().Now()
1754+
c.WaitUntilReplicatedTime(srcTime, jobspb.JobID(ingestionJobID))
1755+
1756+
c.DestSysSQL.QueryRow(t, liveCountOverPKQuery).Scan(&liveCount)
1757+
require.Equal(t, int64(50), liveCount)
1758+
1759+
// Add 5 split points to the replicated table on destination cluster, to test
1760+
// that the recompute stats logic works on multiple ranges.
1761+
destCodec := keys.MakeSQLCodec(c.Args.DestTenantID)
1762+
for i := 1; i <= 5; i++ {
1763+
splitValue := i * 5
1764+
splitKey := destCodec.IndexPrefix(uint32(tableID), 1)
1765+
splitKey = encoding.EncodeVarintAscending(splitKey, int64(splitValue))
1766+
require.NoError(t, c.DestSysServer.DB().AdminSplit(ctx, splitKey, hlc.MaxTimestamp))
1767+
}
1768+
1769+
// Write a range key over table test.x's key span instead of dropping the
1770+
// table to guarantee that the srcTime below is after the range key write.
1771+
srcCodec := keys.MakeSQLCodec(c.Args.SrcTenantID)
1772+
tableSpan := roachpb.Span{
1773+
Key: srcCodec.TablePrefix(uint32(tableID)),
1774+
EndKey: srcCodec.TablePrefix(uint32(tableID)).PrefixEnd(),
1775+
}
1776+
require.NoError(t, c.SrcSysServer.DB().DelRangeUsingTombstone(ctx, tableSpan.Key, tableSpan.EndKey))
1777+
1778+
srcTime = c.SrcCluster.Server(0).Clock().Now()
1779+
c.WaitUntilReplicatedTime(srcTime, jobspb.JobID(ingestionJobID))
1780+
c.RequireFingerprintMatchAtTimestamp(srcTime.AsOfSystemTime())
1781+
1782+
c.DestSysSQL.QueryRow(t, liveCountOverPKQuery).Scan(&liveCount)
1783+
require.Equal(t, int64(0), liveCount)
1784+
}
1785+
1786+
// makeLiveCountOverPKQuery constructs a query to get live count stats for a table's primary key.
1787+
func makeLiveCountOverPKQuery(tenantID roachpb.TenantID, tableID int) string {
1788+
return fmt.Sprintf(`SELECT stats->'approximate_total_stats'->'live_count' FROM crdb_internal.tenant_span_stats(ARRAY(SELECT(crdb_internal.index_span(%d,%d,1)[1],crdb_internal.index_span(%d,%d,1)[2])))`,
1789+
tenantID.ToUint64(), tableID, tenantID.ToUint64(), tableID)
1790+
}
1791+
1792+
// splitPrimaryKeyIndexSpan splits the primary key index span for a table to ensure
1793+
// crdb_internal.tenant_span_stats() uses the stats that hang off of the descriptor.
1794+
func splitPrimaryKeyIndexSpan(
1795+
ctx context.Context, t *testing.T, db *kv.DB, tenantID roachpb.TenantID, tableID int,
1796+
) {
1797+
codec := keys.MakeSQLCodec(tenantID)
1798+
pkStartKey := codec.IndexPrefix(uint32(tableID), 1)
1799+
pkEndKey := pkStartKey.PrefixEnd()
1800+
require.NoError(t, db.AdminSplit(ctx, pkStartKey, hlc.MaxTimestamp))
1801+
require.NoError(t, db.AdminSplit(ctx, pkEndKey, hlc.MaxTimestamp))
1802+
}

pkg/crosscluster/physical/stream_ingestion_processor.go

Lines changed: 64 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
"github.com/cockroachdb/cockroach/pkg/util/hlc"
4141
"github.com/cockroachdb/cockroach/pkg/util/log"
4242
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
43+
"github.com/cockroachdb/cockroach/pkg/util/rangedesc"
4344
"github.com/cockroachdb/cockroach/pkg/util/span"
4445
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
4546
"github.com/cockroachdb/cockroach/pkg/util/tracing"
@@ -199,21 +200,27 @@ func releaseBuffer(b *streamIngestionBuffer) {
199200

200201
// Specialized SST batcher that is responsible for ingesting range tombstones.
201202
type rangeKeyBatcher struct {
202-
db *kv.DB
203-
settings *cluster.Settings
203+
db *kv.DB
204+
settings *cluster.Settings
205+
rangeDescIterFactory rangedesc.IteratorFactory
204206

205207
// onFlush is the callback called after the current batch has been
206208
// successfully ingested.
207209
onFlush func(kvpb.BulkOpSummary)
208210
}
209211

210212
func newRangeKeyBatcher(
211-
ctx context.Context, cs *cluster.Settings, db *kv.DB, onFlush func(summary kvpb.BulkOpSummary),
213+
ctx context.Context,
214+
cs *cluster.Settings,
215+
db *kv.DB,
216+
ranges rangedesc.IteratorFactory,
217+
onFlush func(summary kvpb.BulkOpSummary),
212218
) *rangeKeyBatcher {
213219
batcher := &rangeKeyBatcher{
214-
db: db,
215-
settings: cs,
216-
onFlush: onFlush,
220+
db: db,
221+
rangeDescIterFactory: ranges,
222+
settings: cs,
223+
onFlush: onFlush,
217224
}
218225
return batcher
219226
}
@@ -414,7 +421,8 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) {
414421
return
415422
}
416423

417-
sip.rangeBatcher = newRangeKeyBatcher(ctx, st, db.KV(), sip.onFlushUpdateMetricUpdate)
424+
execCfg := sip.FlowCtx.Cfg.ExecutorConfig.(*sql.ExecutorConfig)
425+
sip.rangeBatcher = newRangeKeyBatcher(ctx, st, db.KV(), execCfg.RangeDescIteratorFactory, sip.onFlushUpdateMetricUpdate)
418426

419427
var subscriptionCtx context.Context
420428
subscriptionCtx, sip.subscriptionCancel = context.WithCancel(sip.Ctx())
@@ -979,6 +987,7 @@ func (r *rangeKeyBatcher) flush(ctx context.Context, toFlush mvccRangeKeyValues)
979987

980988
batchSummary := kvpb.BulkOpSummary{}
981989
start, end := keys.MaxKey, keys.MinKey
990+
var spanGroup roachpb.SpanGroup
982991
for _, rangeKeyVal := range toFlush {
983992
if err := sstWriter.PutRawMVCCRangeKey(rangeKeyVal.RangeKey, rangeKeyVal.Value); err != nil {
984993
return err
@@ -991,6 +1000,7 @@ func (r *rangeKeyBatcher) flush(ctx context.Context, toFlush mvccRangeKeyValues)
9911000
end = rangeKeyVal.RangeKey.EndKey
9921001
}
9931002
batchSummary.DataSize += int64(rangeKeyVal.RangeKey.EncodedSize() + len(rangeKeyVal.Value))
1003+
spanGroup.Add(roachpb.Span{Key: rangeKeyVal.RangeKey.StartKey, EndKey: rangeKeyVal.RangeKey.EndKey})
9941004
}
9951005

9961006
// Finish the current batch.
@@ -1056,6 +1066,7 @@ func (r *rangeKeyBatcher) flush(ctx context.Context, toFlush mvccRangeKeyValues)
10561066
batchSummary.SSTDataSize += int64(len(data))
10571067
}
10581068
}
1069+
r.recomputeStats(ctx, spanGroup.Slice())
10591070

10601071
if r.onFlush != nil {
10611072
r.onFlush(batchSummary)
@@ -1064,6 +1075,52 @@ func (r *rangeKeyBatcher) flush(ctx context.Context, toFlush mvccRangeKeyValues)
10641075
return nil
10651076
}
10661077

1078+
func (r *rangeKeyBatcher) recomputeStats(ctx context.Context, rangekeySpans roachpb.Spans) {
1079+
1080+
rangeStartKeys := getRangeStartkeys(ctx, r.rangeDescIterFactory, rangekeySpans)
1081+
var b kv.Batch
1082+
// Sending RecomputeStatsRequests in one batch, allowing DistSender to
1083+
// parallelize the requests across ranges.
1084+
//
1085+
// TODO(msbutler): send this request asynchronously to prevent checkpoint
1086+
// delay.
1087+
for i := range rangeStartKeys {
1088+
b.AddRawRequest(&kvpb.RecomputeStatsRequest{
1089+
RequestHeader: kvpb.RequestHeader{Key: roachpb.Key(rangeStartKeys[i])},
1090+
})
1091+
}
1092+
if err := r.db.Run(ctx, &b); err != nil {
1093+
log.Warningf(ctx, "recomputes stats bath failed with error: %v", err)
1094+
}
1095+
}
1096+
1097+
func getRangeStartkeys(
1098+
ctx context.Context, rangeDescIterFactory rangedesc.IteratorFactory, rangeKeySpans roachpb.Spans,
1099+
) []roachpb.RKey {
1100+
rangeStartKeysFound := make(map[string]struct{})
1101+
rangeStartKeys := make([]roachpb.RKey, 0)
1102+
1103+
for i := range rangeKeySpans {
1104+
iter, err := rangeDescIterFactory.NewLazyIterator(ctx, rangeKeySpans[i], 100)
1105+
if err != nil {
1106+
log.Warningf(ctx, "could not create range descriptor iterator for %s: %v", rangeKeySpans[i], err)
1107+
continue
1108+
}
1109+
for ; iter.Valid(); iter.Next() {
1110+
desc := iter.CurRangeDescriptor()
1111+
startKeyStr := desc.StartKey.String()
1112+
if _, ok := rangeStartKeysFound[startKeyStr]; !ok {
1113+
rangeStartKeysFound[startKeyStr] = struct{}{}
1114+
rangeStartKeys = append(rangeStartKeys, desc.StartKey)
1115+
}
1116+
}
1117+
if err := iter.Error(); err != nil {
1118+
log.Warningf(ctx, "error iterating range descriptors for %s: %v", rangeKeySpans[i], err)
1119+
}
1120+
}
1121+
return rangeStartKeys
1122+
}
1123+
10671124
// splitRangeKeySSTAtKey splits the given SST (passed as bytes) at the
10681125
// given split key.
10691126
//

0 commit comments

Comments
 (0)