Skip to content

Commit 7dee54f

Browse files
authored
clickhouse: PARTITION BY expression (#3341)
I was wrong, making partitioning as inflexible as ORDER BY columns doesn't work Most easily is that you should pretty much never partition by a datetime without rounding off most of its lower bits Previous partitioning should be considered deprecated
1 parent aa53a69 commit 7dee54f

File tree

5 files changed

+96
-6
lines changed

5 files changed

+96
-6
lines changed

flow/connectors/clickhouse/normalize.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ func (c *ClickHouseConnector) generateCreateTableSQLForNormalizedTable(
222222

223223
if tmEngine != protos.TableEngine_CH_ENGINE_NULL {
224224
hasNullableKeyFn := buildIsNullableKeyFn(tableMapping, tableSchema.Columns, tableSchema.NullableEnabled)
225-
orderByColumns, hasNullableOrderKey := getOrderedOrderByColumns(
225+
orderByColumns, allowNullableKey := getOrderedOrderByColumns(
226226
tableMapping, colNameMap, tableSchema.PrimaryKeyColumns, hasNullableKeyFn)
227227
if sourceSchemaAsDestinationColumn {
228228
orderByColumns = append([]string{sourceSchemaColName}, orderByColumns...)
@@ -235,13 +235,21 @@ func (c *ClickHouseConnector) generateCreateTableSQLForNormalizedTable(
235235
stmtBuilder.WriteString(" ORDER BY tuple()")
236236
}
237237

238-
partitionByColumns, hasNullablePartitionKey := getOrderedPartitionByColumns(tableMapping, colNameMap, hasNullableKeyFn)
239-
if len(partitionByColumns) > 0 {
240-
partitionByStr := strings.Join(partitionByColumns, ",")
241-
fmt.Fprintf(&stmtBuilder, " PARTITION BY (%s)", partitionByStr)
238+
if tableMapping != nil && tableMapping.PartitionByExpr != "" {
239+
allowNullableKey = true
240+
fmt.Fprintf(&stmtBuilder, " PARTITION BY (%s)", tableMapping.PartitionByExpr)
241+
} else {
242+
partitionByColumns, hasNullablePartitionKey := getOrderedPartitionByColumns(tableMapping, colNameMap, hasNullableKeyFn)
243+
if hasNullablePartitionKey {
244+
allowNullableKey = true
245+
}
246+
if len(partitionByColumns) > 0 {
247+
partitionByStr := strings.Join(partitionByColumns, ",")
248+
fmt.Fprintf(&stmtBuilder, " PARTITION BY (%s)", partitionByStr)
249+
}
242250
}
243251

244-
if hasNullableOrderKey || hasNullablePartitionKey {
252+
if allowNullableKey {
245253
stmtBuilder.WriteString(" SETTINGS allow_nullable_key = 1")
246254
}
247255

flow/e2e/clickhouse/peer_flow_ch_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2434,3 +2434,52 @@ func (s ClickHouseSuite) Test_PartitionBy() {
24342434
env.Cancel(s.t.Context())
24352435
e2e.RequireEnvCanceled(s.t, env)
24362436
}
2437+
2438+
func (s ClickHouseSuite) Test_PartitionByExpr() {
2439+
srcTableName := "test_partition_by_expr"
2440+
srcFullName := s.attachSchemaSuffix(srcTableName)
2441+
dstTableName := "test_partition_by_expr"
2442+
2443+
require.NoError(s.t, s.source.Exec(s.t.Context(),
2444+
fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (id INT PRIMARY KEY, num INT, val TEXT NOT NULL)`, srcFullName)))
2445+
2446+
connectionGen := e2e.FlowConnectionGenerationConfig{
2447+
FlowJobName: s.attachSuffix("clickhouse_partition_by"),
2448+
TableMappings: []*protos.TableMapping{{
2449+
SourceTableIdentifier: srcFullName,
2450+
DestinationTableIdentifier: dstTableName,
2451+
PartitionByExpr: "num%2,val",
2452+
Columns: []*protos.ColumnSetting{
2453+
{SourceName: "id", NullableEnabled: true},
2454+
{SourceName: "num", NullableEnabled: true},
2455+
{SourceName: "val", NullableEnabled: true},
2456+
},
2457+
}},
2458+
Destination: s.Peer().Name,
2459+
}
2460+
2461+
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
2462+
tc := e2e.NewTemporalClient(s.t)
2463+
env := e2e.ExecutePeerflow(s.t, tc, flowConnConfig)
2464+
e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig)
2465+
2466+
e2e.EnvWaitForEqualTablesWithNames(env, s, "table setup", srcTableName, dstTableName, "id")
2467+
2468+
var partitionKey string
2469+
ch, err := connclickhouse.Connect(s.t.Context(), nil, s.Peer().GetClickhouseConfig())
2470+
require.NoError(s.t, err)
2471+
var dstTableSuffix string
2472+
if s.cluster {
2473+
dstTableSuffix = "_shard"
2474+
}
2475+
require.NoError(s.t,
2476+
ch.QueryRow(s.t.Context(),
2477+
"select partition_key from system.tables where name="+clickhouse.QuoteLiteral(dstTableName+dstTableSuffix),
2478+
).Scan(&partitionKey),
2479+
)
2480+
require.NoError(s.t, ch.Close())
2481+
require.Equal(s.t, "(num % 2, val)", partitionKey)
2482+
2483+
env.Cancel(s.t.Context())
2484+
e2e.RequireEnvCanceled(s.t, env)
2485+
}

protos/flow.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ message TableMapping {
3333
TableEngine engine = 6;
3434
string sharding_key = 7;
3535
string policy_name = 8;
36+
string partition_by_expr = 9;
3637
}
3738

3839
message SetupInput {

ui/app/mirrors/create/cdc/schemabox.tsx

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,13 @@ export default function SchemaBox({
137137
setRows(newRows);
138138
};
139139

140+
const updatePartitionByExpr = (source: string, partitionByExpr: string) => {
141+
const newRows = [...rows];
142+
const index = newRows.findIndex((row) => row.source === source);
143+
newRows[index] = { ...newRows[index], partitionByExpr };
144+
setRows(newRows);
145+
};
146+
140147
const addTableColumns = useCallback(
141148
(table: string) => {
142149
const [schemaName, tableName] = table.split('.');
@@ -228,6 +235,7 @@ export default function SchemaBox({
228235
row.partitionKey = existingRow.partitionKey;
229236
row.shardingKey = existingRow.shardingKey;
230237
row.policyName = existingRow.policyName;
238+
row.partitionByExpr = existingRow.partitionByExpr;
231239
row.exclude = new Set(existingRow.exclude ?? []);
232240
row.destination = existingRow.destinationTableIdentifier;
233241
addTableColumns(row.source);
@@ -366,6 +374,7 @@ export default function SchemaBox({
366374
rowGap: '0.5rem',
367375
columnGap: '3rem',
368376
display: row.selected ? 'flex' : 'none',
377+
flexWrap: 'wrap',
369378
}}
370379
key={row.source}
371380
>
@@ -466,6 +475,27 @@ export default function SchemaBox({
466475
}
467476
/>
468477
</div>
478+
<div style={{ width: '30%', fontSize: 12 }}>
479+
Partition By Expr:
480+
<TextField
481+
disabled={row.editingDisabled}
482+
style={{
483+
marginTop: '0.5rem',
484+
cursor: 'pointer',
485+
}}
486+
variant='simple'
487+
placeholder='Partition By expression (optional)'
488+
value={row.partitionByExpr}
489+
onChange={(
490+
e: React.ChangeEvent<HTMLInputElement>
491+
) =>
492+
updatePartitionByExpr(
493+
row.source,
494+
e.target.value
495+
)
496+
}
497+
/>
498+
</div>
469499
</>
470500
)}
471501
</div>

ui/app/mirrors/create/handlers.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ export function reformattedTableMapping(
188188
engine: row.engine,
189189
shardingKey: row.shardingKey,
190190
policyName: row.policyName,
191+
partitionByExpr: row.partitionByExpr,
191192
}));
192193
}
193194

@@ -454,6 +455,7 @@ export async function fetchTables(
454455
engine: TableEngine.CH_ENGINE_REPLACING_MERGE_TREE,
455456
shardingKey: '',
456457
policyName: '',
458+
partitionByExpr: '',
457459
});
458460
}
459461
}

0 commit comments

Comments
 (0)