Skip to content

Commit ce42571

Browse files
add support for CTID bucketing with snapshotNumPartitionsOverride (#3624)
PeerDB supports parallel snapshotting to optimize initial load time. To do this there are two ways today: 1) we compute the total row count in a table, bucket the data evenly by watermark column -- this is the default behavior and provides even distribution of data for parallel initial load. However calculating total row count can be slow on large tables. 2) with `SnapshotNumPartitionsOverride` enabled, rather than calculating num partitions, we fetch the min / max values of watermark column, and increment the column evenly to get the partition ranges. In this case we can't guarantee even distribution of data across partitions, but can speed up initial snapshot on large table by bypassing calculating total row count. This change handles the case for 2) when watermark column is not explicitly passed in and defaults to `ctid`, which currently is a no-op. For append-only tables, we expect even distribution of data, so this change should result in a pure performance win. For updatable tables, this may result in uneven distribution of data partitioning, but this is already the case with approach 2). Thank you @alon-zeltser-cyera for the contribution. Separate note: `SnapshotNumPartitionsOverride` was introduced to support use case where num partitions is explicitly provided. There's no reason that it has to be tied to the two initial snapshot bucketing approaches, so we may want to evaluate decoupling the two concept later on if we want to provide this feature more widely. TODO: - [x] Add e2e test - [x] Run test against a large table --------- Co-authored-by: Alon Zeltser <[email protected]>
1 parent eebdc7c commit ce42571

File tree

2 files changed

+210
-2
lines changed

2 files changed

+210
-2
lines changed

flow/connectors/postgres/qrep.go

Lines changed: 99 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,12 @@ package connpostgres
22

33
import (
44
"bytes"
5+
"cmp"
56
"context"
7+
"errors"
68
"fmt"
79
"log/slog"
10+
"math"
811
"strconv"
912
"strings"
1013
"text/template"
@@ -26,7 +29,10 @@ import (
2629
"github.com/PeerDB-io/peerdb/flow/shared/exceptions"
2730
)
2831

29-
const qRepMetadataTableName = "_peerdb_query_replication_metadata"
32+
const (
33+
qRepMetadataTableName = "_peerdb_query_replication_metadata"
34+
ctidColumnName = "ctid"
35+
)
3036

3137
type QRepPullSink interface {
3238
Close(error)
@@ -89,7 +95,7 @@ func (c *PostgresConnector) GetDefaultPartitionKeyForTables(
8995

9096
if supportsTidScans {
9197
for _, tm := range input.TableMappings {
92-
output.TableDefaultPartitionKeyMapping[tm.SourceTableIdentifier] = "ctid"
98+
output.TableDefaultPartitionKeyMapping[tm.SourceTableIdentifier] = ctidColumnName
9399
}
94100
}
95101

@@ -264,6 +270,18 @@ func (c *PostgresConnector) getNumRowsPartitions(
264270

265271
return partitionHelper.GetPartitions(), nil
266272
} else {
273+
// Special handling for CTID watermark column when a fixed number of partitions is specified:
274+
// Partitions are created by dividing table blocks uniformly.
275+
// Note: partition boundaries (block ranges) are uniform, but actual row distribution may be skewed
276+
// due to table bloat, deleted tuples, or uneven data distribution across blocks.
277+
if config.WatermarkColumn == ctidColumnName {
278+
return c.getCTIDBlockPartitions(ctx, tx, *parsedWatermarkTable, numPartitions, last)
279+
}
280+
281+
// Default path for non-CTID watermark column when a fixed number of partitions is specified:
282+
// Partitions are created by uniformly splitting the min/max value range.
283+
// Note: partition boundaries are uniform, but actual row distribution may be skewed
284+
// due to non-uniform data distribution, gaps in the value range, or deleted rows.
267285
minmaxQuery := fmt.Sprintf("SELECT MIN(%[2]s),MAX(%[2]s) FROM %[1]s %[3]s",
268286
parsedWatermarkTable.String(), quotedWatermarkColumn, whereClause)
269287
var row pgx.Row
@@ -295,6 +313,85 @@ func (c *PostgresConnector) getNumRowsPartitions(
295313
}
296314
}
297315

316+
func (c *PostgresConnector) getCTIDBlockPartitions(
317+
ctx context.Context,
318+
tx pgx.Tx,
319+
parsedWatermarkTable utils.SchemaTable,
320+
numPartitions int64,
321+
last *protos.QRepPartition,
322+
) ([]*protos.QRepPartition, error) {
323+
if numPartitions <= 1 {
324+
return nil, errors.New("expect numPartitions to be greater than 1")
325+
}
326+
327+
blocksQuery := "SELECT (pg_relation_size(to_regclass($1)) / current_setting('block_size')::int)::bigint"
328+
var totalBlocks pgtype.Int8
329+
if err := tx.QueryRow(ctx, blocksQuery, parsedWatermarkTable.String()).Scan(&totalBlocks); err != nil {
330+
return nil, fmt.Errorf("failed to get relation blocks: %w", err)
331+
}
332+
if !totalBlocks.Valid || totalBlocks.Int64 <= 0 {
333+
return nil, fmt.Errorf("total blocks: %d, valid: %t", totalBlocks.Int64, totalBlocks.Valid)
334+
}
335+
336+
tidCmp := func(a pgtype.TID, b pgtype.TID) int {
337+
if blockCmp := cmp.Compare(a.BlockNumber, b.BlockNumber); blockCmp != 0 {
338+
return blockCmp
339+
}
340+
return cmp.Compare(a.OffsetNumber, b.OffsetNumber)
341+
}
342+
343+
tidInc := func(t pgtype.TID) pgtype.TID {
344+
if t.OffsetNumber < math.MaxUint16 {
345+
return pgtype.TID{BlockNumber: t.BlockNumber, OffsetNumber: t.OffsetNumber + 1, Valid: true}
346+
}
347+
return pgtype.TID{BlockNumber: t.BlockNumber + 1, OffsetNumber: 0, Valid: true}
348+
}
349+
350+
tidRangeForPartition := func(partitionIndex int64) (pgtype.TID, pgtype.TID, bool) {
351+
blockStart := uint32((partitionIndex * totalBlocks.Int64) / numPartitions)
352+
nextPartitionBlockStart := uint32(((partitionIndex + 1) * totalBlocks.Int64) / numPartitions)
353+
if nextPartitionBlockStart <= blockStart {
354+
return pgtype.TID{}, pgtype.TID{}, false
355+
}
356+
tidStartInclusive := pgtype.TID{BlockNumber: blockStart, OffsetNumber: 0, Valid: true}
357+
tidEndInclusive := pgtype.TID{BlockNumber: nextPartitionBlockStart - 1, OffsetNumber: math.MaxUint16, Valid: true}
358+
return tidStartInclusive, tidEndInclusive, true
359+
}
360+
361+
var resumeFrom pgtype.TID
362+
if last != nil && last.Range != nil {
363+
if lr, ok := last.Range.Range.(*protos.PartitionRange_TidRange); ok {
364+
resume := pgtype.TID{BlockNumber: lr.TidRange.End.BlockNumber, OffsetNumber: uint16(lr.TidRange.End.OffsetNumber), Valid: true}
365+
resumeFrom = tidInc(resume)
366+
} else {
367+
c.logger.Warn("Ignoring resume offset because it's not TidRange")
368+
}
369+
}
370+
371+
partitionHelper := utils.NewPartitionHelper(c.logger)
372+
for i := range numPartitions {
373+
start, end, valid := tidRangeForPartition(i)
374+
if !valid {
375+
continue
376+
}
377+
if resumeFrom.Valid {
378+
if tidCmp(end, resumeFrom) < 0 {
379+
continue
380+
}
381+
if tidCmp(start, resumeFrom) < 0 {
382+
start = resumeFrom
383+
}
384+
}
385+
if err := partitionHelper.AddPartition(
386+
pgtype.TID{BlockNumber: start.BlockNumber, OffsetNumber: start.OffsetNumber, Valid: true},
387+
pgtype.TID{BlockNumber: end.BlockNumber, OffsetNumber: end.OffsetNumber, Valid: true},
388+
); err != nil {
389+
return nil, fmt.Errorf("failed to add TID partition: %w", err)
390+
}
391+
}
392+
return partitionHelper.GetPartitions(), nil
393+
}
394+
298395
func (c *PostgresConnector) getMinMaxValues(
299396
ctx context.Context,
300397
tx pgx.Tx,

flow/e2e/clickhouse_test.go

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package e2e
33
import (
44
"embed"
55
"fmt"
6+
"math"
67
"math/big"
78
"reflect"
89
"regexp"
@@ -12,6 +13,7 @@ import (
1213
"time"
1314

1415
"github.com/jackc/pgerrcode"
16+
"github.com/jackc/pgx/v5/pgtype"
1517
"github.com/shopspring/decimal"
1618
"github.com/stretchr/testify/assert"
1719
"github.com/stretchr/testify/require"
@@ -2830,3 +2832,112 @@ func (s ClickHouseSuite) Test_PartitionByExpr() {
28302832
env.Cancel(s.t.Context())
28312833
RequireEnvCanceled(s.t, env)
28322834
}
2835+
2836+
func (s ClickHouseSuite) Test_Partition_By_CTID_With_Num_Partitions_Override() {
2837+
if _, ok := s.source.(*PostgresSource); !ok {
2838+
s.t.Skip("only applies to postgres")
2839+
}
2840+
2841+
srcTableName := "test_ctid_block_partitions"
2842+
srcFullName := s.attachSchemaSuffix(srcTableName)
2843+
dstTableName := "test_ctid_block_partitions_dst"
2844+
2845+
require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf(`
2846+
CREATE TABLE IF NOT EXISTS %s (
2847+
id SERIAL PRIMARY KEY,
2848+
name TEXT,
2849+
age INT,
2850+
email TEXT,
2851+
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
2852+
)
2853+
`, srcFullName)))
2854+
numRows := 1000
2855+
deletedRows := 10
2856+
for i := 1; i <= numRows; i++ {
2857+
require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf(`
2858+
INSERT INTO %s (name, age, email) VALUES ('user_%d', %d, 'user_%[email protected]')
2859+
`, srcFullName, i, 20+(i%50), i)))
2860+
}
2861+
for i := 1; i <= numRows; i++ {
2862+
require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf(`
2863+
UPDATE %s SET age = %d WHERE id = %d
2864+
`, srcFullName, 30+(i%50), i)))
2865+
}
2866+
for i := 1; i <= deletedRows; i++ {
2867+
require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf(`
2868+
DELETE FROM %s WHERE id = %d
2869+
`, srcFullName, i)))
2870+
}
2871+
2872+
connectionGen := FlowConnectionGenerationConfig{
2873+
FlowJobName: s.attachSuffix("clickhouse_partition_by_ctid"),
2874+
TableNameMapping: map[string]string{srcFullName: dstTableName},
2875+
Destination: s.Peer().Name,
2876+
}
2877+
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
2878+
flowConnConfig.DoInitialSnapshot = true
2879+
flowConnConfig.SnapshotNumPartitionsOverride = 3
2880+
2881+
tc := NewTemporalClient(s.t)
2882+
env := ExecutePeerflow(s.t, tc, flowConnConfig)
2883+
2884+
SetupCDCFlowStatusQuery(s.t, env, flowConnConfig)
2885+
EnvWaitForCount(env, s, "wait on initial", dstTableName, "id", numRows-deletedRows)
2886+
2887+
require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf(`
2888+
INSERT INTO %s (name, age, email) VALUES ('user_%d', %d, 'user_%[email protected]')
2889+
`, srcFullName, numRows+1, 25, numRows+1)))
2890+
EnvWaitForCount(env, s, "wait on cdc", dstTableName, "id", numRows-deletedRows+1)
2891+
2892+
rows, err := s.Conn().Query(s.t.Context(),
2893+
`SELECT partition_start, partition_end FROM peerdb_stats.qrep_partitions WHERE parent_mirror_name = $1
2894+
ORDER BY
2895+
CAST(split_part(trim(both '()' from partition_start), ',', 1) AS bigint),
2896+
CAST(split_part(trim(both '()' from partition_start), ',', 2) AS bigint)`,
2897+
flowConnConfig.FlowJobName)
2898+
require.NoError(s.t, err, "failed to query partition ranges")
2899+
defer rows.Close()
2900+
2901+
var partitionRanges []struct{ start, end string }
2902+
for rows.Next() {
2903+
var start, end string
2904+
require.NoError(s.t, rows.Scan(&start, &end), "failed to scan partition range")
2905+
partitionRanges = append(partitionRanges, struct{ start, end string }{start, end})
2906+
}
2907+
require.NoError(s.t, rows.Err())
2908+
// Verify partition count matches override
2909+
require.Len(s.t, partitionRanges, 3, "expected exactly 3 partitions to be created with SnapshotNumPartitionsOverride=3")
2910+
2911+
// Verify partitions ranges are contiguous (intentionally ignoring `TID.Valid` field for tests)
2912+
tidParse := func(tidStr string) pgtype.TID {
2913+
blockStr, offsetStr, found := strings.Cut(tidStr[1:len(tidStr)-1], ",")
2914+
require.True(s.t, found, "failed to parse block number")
2915+
block, err := strconv.ParseUint(blockStr, 10, 32)
2916+
require.NoError(s.t, err, "failed to parse block number")
2917+
offset, err := strconv.ParseUint(offsetStr, 10, 16)
2918+
require.NoError(s.t, err, "failed to parse offset number")
2919+
return pgtype.TID{BlockNumber: uint32(block), OffsetNumber: uint16(offset)}
2920+
}
2921+
tidInc := func(t pgtype.TID) pgtype.TID {
2922+
if t.OffsetNumber < math.MaxUint16 {
2923+
return pgtype.TID{BlockNumber: t.BlockNumber, OffsetNumber: t.OffsetNumber + 1}
2924+
}
2925+
return pgtype.TID{BlockNumber: t.BlockNumber + 1, OffsetNumber: 0}
2926+
}
2927+
tidEq := func(t1, t2 pgtype.TID) bool {
2928+
return t1.BlockNumber == t2.BlockNumber && t1.OffsetNumber == t2.OffsetNumber
2929+
}
2930+
for i, pr := range partitionRanges {
2931+
startTID := tidParse(pr.start)
2932+
if i > 0 {
2933+
prevEndTID := tidParse(partitionRanges[i-1].end)
2934+
require.True(s.t, tidEq(tidInc(prevEndTID), startTID),
2935+
"partitions not contiguous; partition ranges are %v", partitionRanges)
2936+
} else {
2937+
require.True(s.t, tidEq(pgtype.TID{}, startTID))
2938+
}
2939+
}
2940+
2941+
env.Cancel(s.t.Context())
2942+
RequireEnvCanceled(s.t, env)
2943+
}

0 commit comments

Comments
 (0)