Skip to content

Commit 41ca799

Browse files
authored
make settings versioning more robust (#3614)
- Make sure `type_json_skip_duplicated_paths` is also conditionally set only if clickhouse version supports it. - Create a setting_util.go file to centralize settings definition and minimum versioning, to make setting generation more robust. This has been an issue in the past, and we'll likely need to continue to add more settings later. - Separately, added some comments for how PeerDB Internal Version System works We also have a number of settings that we set at the client level [here](https://github.com/PeerDB-io/peerdb/blob/a6d0527bb3443dac36097e0a47affc286c8eab23/flow/connectors/clickhouse/clickhouse.go#L243-L260). Not sure if client automatically set settings based on server version, but since we don't check ch version there explicitly in code, that seems to be the case. The caveats of these settings is that they are not backwards compatible, since this is on the peer creation flow and not mirror creation flow, so internal version is not currently available if needed. The change in this PR only affects query-level settings. It may be possible that some of these query-level SETTINGs could instead be client-level (i.e. `type_json_skip_duplicated_paths`), but query-level settings seem to be more flexible with backwards compatibility support.
1 parent a6d0527 commit 41ca799

File tree

9 files changed

+224
-86
lines changed

9 files changed

+224
-86
lines changed

flow/connectors/clickhouse/avro_sync.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -282,12 +282,11 @@ func (s *ClickHouseAvroSyncMethod) pushS3DataToClickHouse(
282282
}
283283
numParts = max(numParts, 1)
284284

285-
settings := map[string]string{
286-
"throw_on_max_partitions_per_insert_block": "0",
287-
"type_json_skip_duplicated_paths": "1",
288-
}
289-
if config.Version >= shared.InternalVersion_JsonEscapeDotsInKeys && ShouldUseJSONEscapeDotsInKeys(ctx, s.chVersion) {
290-
settings["json_type_escape_dots_in_keys"] = "1"
285+
chSettings := NewCHSettings(s.chVersion)
286+
chSettings.Add(SettingThrowOnMaxPartitionsPerInsertBlock, "0")
287+
chSettings.Add(SettingTypeJsonSkipDuplicatedPaths, "1")
288+
if config.Version >= shared.InternalVersion_JsonEscapeDotsInKeys {
289+
chSettings.Add(SettingJsonTypeEscapeDotsInKeys, "1")
291290
}
292291

293292
// Process each chunk file individually
@@ -313,9 +312,10 @@ func (s *ClickHouseAvroSyncMethod) pushS3DataToClickHouse(
313312

314313
var query string
315314
if numParts > 1 {
316-
query, err = buildInsertFromTableFunctionQueryWithPartitioning(ctx, insertConfig, s3TableFunction, i, numParts, settings)
315+
query, err = buildInsertFromTableFunctionQueryWithPartitioning(
316+
ctx, insertConfig, s3TableFunction, i, numParts, chSettings)
317317
} else {
318-
query, err = buildInsertFromTableFunctionQuery(ctx, insertConfig, s3TableFunction, settings)
318+
query, err = buildInsertFromTableFunctionQuery(ctx, insertConfig, s3TableFunction, chSettings)
319319
}
320320
if err != nil {
321321
s.logger.Error("failed to build insert query",

flow/connectors/clickhouse/normalize.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ func (c *ClickHouseConnector) generateCreateTableSQLForNormalizedTable(
264264
}
265265

266266
if allowNullableKey {
267-
stmtBuilder.WriteString(" SETTINGS allow_nullable_key = 1")
267+
stmtBuilder.WriteString(NewCHSettingsString(chVersion, CHSettingEntry{SettingAllowNullableKey, "1"}))
268268
}
269269

270270
if c.Config.Cluster != "" {

flow/connectors/clickhouse/normalize_query.go

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -309,20 +309,18 @@ func (t *NormalizeQueryGenerator) BuildQuery(ctx context.Context) (string, error
309309
}
310310
}
311311

312-
settings := map[string]string{
313-
"throw_on_max_partitions_per_insert_block": "0",
314-
"type_json_skip_duplicated_paths": "1",
315-
}
312+
chSettings := NewCHSettings(t.chVersion)
313+
chSettings.Add(SettingThrowOnMaxPartitionsPerInsertBlock, "0")
314+
chSettings.Add(SettingTypeJsonSkipDuplicatedPaths, "1")
316315
if t.cluster {
317-
settings["parallel_distributed_insert_select"] = "0"
316+
chSettings.Add(SettingParallelDistributedInsertSelect, "0")
318317
}
319-
if t.version >= shared.InternalVersion_JsonEscapeDotsInKeys && ShouldUseJSONEscapeDotsInKeys(ctx, t.chVersion) {
320-
settings["json_type_escape_dots_in_keys"] = "1"
318+
if t.version >= shared.InternalVersion_JsonEscapeDotsInKeys {
319+
chSettings.Add(SettingJsonTypeEscapeDotsInKeys, "1")
321320
}
322-
settingsStr := buildSettingsStr(settings)
323321

324322
insertIntoSelectQuery := fmt.Sprintf("INSERT INTO %s %s %s%s",
325-
peerdb_clickhouse.QuoteIdentifier(t.TableName), colSelector.String(), selectQuery.String(), settingsStr)
323+
peerdb_clickhouse.QuoteIdentifier(t.TableName), colSelector.String(), selectQuery.String(), chSettings.String())
326324

327325
t.Query = insertIntoSelectQuery
328326

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package connclickhouse
2+
3+
import (
4+
"maps"
5+
"slices"
6+
"strings"
7+
8+
chproto "github.com/ClickHouse/clickhouse-go/v2/lib/proto"
9+
)
10+
11+
// When adding a new clickhouse setting to this list, check when the setting is introduced to ClickHouse
12+
// and if applicable, add a corresponding minimum supported version below to ensure queries on older versions
13+
// of ClickHouse servers are not impacted.
14+
// Important: if the setting causes breaking changes to existing PeerDB flows (not just ClickHouse compatibility),
15+
// it must also be gated by PeerDB's internal version.
16+
const (
17+
SettingAllowNullableKey CHSetting = "allow_nullable_key"
18+
SettingJsonTypeEscapeDotsInKeys CHSetting = "json_type_escape_dots_in_keys"
19+
SettingTypeJsonSkipDuplicatedPaths CHSetting = "type_json_skip_duplicated_paths"
20+
SettingThrowOnMaxPartitionsPerInsertBlock CHSetting = "throw_on_max_partitions_per_insert_block"
21+
SettingParallelDistributedInsertSelect CHSetting = "parallel_distributed_insert_select"
22+
)
23+
24+
// CHSettingMinVersions maps setting names to their minimum required ClickHouse versions
25+
// If minimum version is not specified, we assume the setting is available to all ClickHouse versions
26+
var CHSettingMinVersions = map[CHSetting]chproto.Version{
27+
SettingJsonTypeEscapeDotsInKeys: {Major: 25, Minor: 8, Patch: 0},
28+
SettingTypeJsonSkipDuplicatedPaths: {Major: 24, Minor: 8, Patch: 0},
29+
}
30+
31+
type CHSetting string
32+
33+
type CHSettings struct {
34+
settings map[CHSetting]string
35+
chVersion *chproto.Version
36+
}
37+
38+
type CHSettingEntry struct {
39+
key CHSetting
40+
val string
41+
}
42+
43+
func GetMinVersion(name CHSetting) (chproto.Version, bool) {
44+
if minVersion, exists := CHSettingMinVersions[name]; exists {
45+
return minVersion, true
46+
}
47+
return chproto.Version{}, false
48+
}
49+
50+
// NewCHSettingsString is a one-liner method to generate an immutable settings string
51+
func NewCHSettingsString(version *chproto.Version, settings ...CHSettingEntry) string {
52+
sg := NewCHSettings(version)
53+
for _, setting := range settings {
54+
sg.Add(setting.key, setting.val)
55+
}
56+
return sg.String()
57+
}
58+
59+
func NewCHSettings(version *chproto.Version, settings ...CHSettingEntry) *CHSettings {
60+
newSettings := make(map[CHSetting]string)
61+
for _, s := range settings {
62+
newSettings[s.key] = s.val
63+
}
64+
chSettings := &CHSettings{
65+
settings: newSettings,
66+
chVersion: version,
67+
}
68+
return chSettings
69+
}
70+
71+
func (sg *CHSettings) Add(key CHSetting, val string) *CHSettings {
72+
sg.settings[key] = val
73+
return sg
74+
}
75+
76+
// String generates settings string ' SETTINGS <key1> = <val1>, <key2> = <val2>, ...';
77+
// If ClickHouse version is set in the CHSettings, settings that do not meet CH version
78+
// requirement will be filtered out. Otherwise, all settings are included.
79+
func (sg *CHSettings) String() string {
80+
if len(sg.settings) == 0 {
81+
return ""
82+
}
83+
84+
// sort keys alphabetically for consistent output
85+
names := slices.Collect(maps.Keys(sg.settings))
86+
slices.SortFunc(names, func(a, b CHSetting) int {
87+
return strings.Compare(string(a), string(b))
88+
})
89+
90+
first := true
91+
var sb strings.Builder
92+
for _, name := range names {
93+
if minVersion, exists := GetMinVersion(name); exists && sg.chVersion != nil {
94+
if !chproto.CheckMinVersion(minVersion, *sg.chVersion) {
95+
continue
96+
}
97+
}
98+
99+
if first {
100+
sb.WriteString(" SETTINGS ")
101+
first = false
102+
} else {
103+
sb.WriteString(", ")
104+
}
105+
sb.WriteString(string(name))
106+
sb.WriteString("=")
107+
sb.WriteString(sg.settings[name])
108+
}
109+
return sb.String()
110+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package connclickhouse
2+
3+
import (
4+
"testing"
5+
6+
chproto "github.com/ClickHouse/clickhouse-go/v2/lib/proto"
7+
"github.com/stretchr/testify/require"
8+
)
9+
10+
func TestCHSettings(t *testing.T) {
11+
// Test empty settings
12+
version := chproto.Version{Major: 25, Minor: 8, Patch: 0}
13+
chSettings := NewCHSettings(&version)
14+
result := chSettings.String()
15+
require.Empty(t, result)
16+
17+
// Test single setting
18+
version = chproto.Version{Major: 25, Minor: 8, Patch: 0}
19+
chSettings = NewCHSettings(&version)
20+
chSettings.Add(SettingThrowOnMaxPartitionsPerInsertBlock, "0")
21+
result = chSettings.String()
22+
require.Equal(t, " SETTINGS throw_on_max_partitions_per_insert_block=0", result)
23+
24+
// Test multiple settings
25+
version = chproto.Version{Major: 25, Minor: 8, Patch: 0}
26+
chSettings = NewCHSettings(&version)
27+
chSettings.Add(SettingTypeJsonSkipDuplicatedPaths, "1")
28+
chSettings.Add(SettingThrowOnMaxPartitionsPerInsertBlock, "0")
29+
result = chSettings.String()
30+
require.Equal(t, " SETTINGS throw_on_max_partitions_per_insert_block=0, type_json_skip_duplicated_paths=1", result)
31+
}
32+
33+
func TestCHSettingsVersionFiltering(t *testing.T) {
34+
// settings meeting minimum version should be included
35+
minVersion, _ := GetMinVersion(SettingJsonTypeEscapeDotsInKeys)
36+
require.Equal(t, chproto.Version{Major: 25, Minor: 8, Patch: 0}, minVersion)
37+
chSettings := NewCHSettings(&chproto.Version{Major: 25, Minor: 8, Patch: 0})
38+
chSettings.Add(SettingJsonTypeEscapeDotsInKeys, "1")
39+
result := chSettings.String()
40+
require.Equal(t, " SETTINGS json_type_escape_dots_in_keys=1", result)
41+
42+
// setting not meeting minimum version should be filter out
43+
chSettings = NewCHSettings(&chproto.Version{Major: 25, Minor: 7, Patch: 0})
44+
chSettings.Add(SettingJsonTypeEscapeDotsInKeys, "1")
45+
result = chSettings.String()
46+
require.Empty(t, result)
47+
48+
// settings not having minimum version should be included
49+
_, minVersionExist := GetMinVersion(SettingThrowOnMaxPartitionsPerInsertBlock)
50+
require.False(t, minVersionExist)
51+
result = NewCHSettingsString(&chproto.Version{Major: 25, Minor: 8, Patch: 0},
52+
CHSettingEntry{SettingThrowOnMaxPartitionsPerInsertBlock, "0"})
53+
require.Equal(t, " SETTINGS throw_on_max_partitions_per_insert_block=0", result)
54+
55+
// setting should be included when ch version is not specified
56+
result = NewCHSettingsString(nil,
57+
CHSettingEntry{SettingJsonTypeEscapeDotsInKeys, "1"},
58+
CHSettingEntry{SettingThrowOnMaxPartitionsPerInsertBlock, "0"},
59+
)
60+
require.Equal(t, " SETTINGS json_type_escape_dots_in_keys=1, throw_on_max_partitions_per_insert_block=0", result)
61+
}

flow/connectors/clickhouse/table_function.go

Lines changed: 9 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,6 @@ import (
44
"context"
55
"errors"
66
"fmt"
7-
"maps"
8-
"slices"
9-
"sort"
107
"strings"
118

129
"go.temporal.io/sdk/log"
@@ -65,7 +62,7 @@ func buildInsertFromTableFunctionQuery(
6562
ctx context.Context,
6663
config *insertFromTableFunctionConfig,
6764
tableFunctionExpr string,
68-
settings map[string]string,
65+
chSettings *CHSettings,
6966
) (string, error) {
7067
fieldExpressionConverters := defaultFieldExpressionConverters
7168
fieldExpressionConverters = append(fieldExpressionConverters, config.fieldExpressionConverters...)
@@ -129,7 +126,10 @@ func buildInsertFromTableFunctionQuery(
129126

130127
selectorStr := strings.Join(selectedColumnNames, ",")
131128
insertedStr := strings.Join(insertedColumnNames, ",")
132-
settingsStr := buildSettingsStr(settings)
129+
settingsStr := ""
130+
if chSettings != nil {
131+
settingsStr = chSettings.String()
132+
}
133133

134134
return fmt.Sprintf("INSERT INTO %s(%s) SELECT %s FROM %s%s",
135135
peerdb_clickhouse.QuoteIdentifier(config.destinationTable), insertedStr, selectorStr, tableFunctionExpr, settingsStr), nil
@@ -142,7 +142,7 @@ func buildInsertFromTableFunctionQueryWithPartitioning(
142142
tableFunctionExpr string,
143143
partitionIndex uint64,
144144
totalPartitions uint64,
145-
settings map[string]string,
145+
chSettings *CHSettings,
146146
) (string, error) {
147147
var query strings.Builder
148148

@@ -171,33 +171,9 @@ func buildInsertFromTableFunctionQueryWithPartitioning(
171171
query.WriteString(whereClause)
172172
}
173173

174-
query.WriteString(buildSettingsStr(settings))
175-
176-
return query.String(), nil
177-
}
178-
179-
// helper function to generate settings string ' SETTINGS <key1> = <val1>, <key2> = <val2>, ...'
180-
func buildSettingsStr(settings map[string]string) string {
181-
if len(settings) == 0 {
182-
return ""
174+
if chSettings != nil {
175+
query.WriteString(chSettings.String())
183176
}
184177

185-
// sort keys alphabetically for consistent output
186-
keys := slices.Collect(maps.Keys(settings))
187-
sort.Strings(keys)
188-
189-
first := true
190-
var builder strings.Builder
191-
for _, k := range keys {
192-
if first {
193-
builder.WriteString(" SETTINGS ")
194-
first = false
195-
} else {
196-
builder.WriteString(", ")
197-
}
198-
builder.WriteString(k)
199-
builder.WriteString("=")
200-
builder.WriteString(settings[k])
201-
}
202-
return builder.String()
178+
return query.String(), nil
203179
}

flow/connectors/clickhouse/table_function_test.go

Lines changed: 8 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"testing"
77

8+
chproto "github.com/ClickHouse/clickhouse-go/v2/lib/proto"
89
"github.com/stretchr/testify/require"
910

1011
"github.com/PeerDB-io/peerdb/flow/generated/protos"
@@ -32,39 +33,22 @@ func TestBuildInsertFromTableFunctionQuery(t *testing.T) {
3233
}
3334

3435
tableFunctionExpr := "s3('s3://bucket/key', 'format')"
35-
settings := map[string]string{"key": "val"}
36+
chSettings := NewCHSettings(&chproto.Version{Major: 25, Minor: 8})
37+
chSettings.Add(SettingTypeJsonSkipDuplicatedPaths, "1")
3638

3739
// without partitioning
38-
query, err := buildInsertFromTableFunctionQuery(ctx, config, tableFunctionExpr, settings)
40+
query, err := buildInsertFromTableFunctionQuery(ctx, config, tableFunctionExpr, chSettings)
3941
require.NoError(t, err)
40-
require.Equal(t, "INSERT INTO `t1`(`id`,`name`) SELECT `id`,`name` FROM s3('s3://bucket/key', 'format') SETTINGS key=val", query)
42+
require.Equal(t, fmt.Sprintf("INSERT INTO `t1`(`id`,`name`) SELECT `id`,`name` FROM s3('s3://bucket/key', 'format') SETTINGS %s=%s",
43+
string(SettingTypeJsonSkipDuplicatedPaths), "1"), query)
4144

4245
// with partitioning
4346
totalPartitions := uint64(8)
4447
for idx := range totalPartitions {
45-
query, err := buildInsertFromTableFunctionQueryWithPartitioning(ctx, config, tableFunctionExpr, idx, totalPartitions, settings)
48+
query, err := buildInsertFromTableFunctionQueryWithPartitioning(ctx, config, tableFunctionExpr, idx, totalPartitions, chSettings)
4649
require.NoError(t, err)
4750
require.Equal(t, query,
4851
"INSERT INTO `t1`(`id`,`name`) SELECT `id`,`name` FROM s3('s3://bucket/key', 'format')"+
49-
fmt.Sprintf(" WHERE cityHash64(`id`) %% 8 = %d SETTINGS key=val", idx))
52+
fmt.Sprintf(" WHERE cityHash64(`id`) %% 8 = %d SETTINGS %s=%s", idx, string(SettingTypeJsonSkipDuplicatedPaths), "1"))
5053
}
5154
}
52-
53-
func TestBuildSettingsStr(t *testing.T) {
54-
settings := map[string]string{}
55-
result := buildSettingsStr(settings)
56-
require.Empty(t, result)
57-
58-
settings = map[string]string{
59-
"max_threads": "8",
60-
}
61-
result = buildSettingsStr(settings)
62-
require.Equal(t, " SETTINGS max_threads=8", result)
63-
64-
settings = map[string]string{
65-
"max_threads": "8",
66-
"max_block_size": "65536",
67-
}
68-
result = buildSettingsStr(settings)
69-
require.Equal(t, " SETTINGS max_block_size=65536, max_threads=8", result)
70-
}

flow/connectors/clickhouse/validate.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@ import (
77
"slices"
88
"strings"
99

10-
chproto "github.com/ClickHouse/clickhouse-go/v2/lib/proto"
11-
1210
"github.com/PeerDB-io/peerdb/flow/generated/protos"
1311
"github.com/PeerDB-io/peerdb/flow/internal"
1412
chvalidate "github.com/PeerDB-io/peerdb/flow/pkg/clickhouse"
@@ -87,12 +85,3 @@ func (c *ClickHouseConnector) ValidateMirrorDestination(
8785
}
8886
return nil
8987
}
90-
91-
func ShouldUseJSONEscapeDotsInKeys(ctx context.Context, chVersion *chproto.Version) bool {
92-
if chVersion == nil {
93-
return false
94-
}
95-
// https://clickhouse.com/docs/operations/settings/formats#json_type_escape_dots_in_keys
96-
// Available in ClickHouse 25.8 and later
97-
return chproto.CheckMinVersion(chproto.Version{Major: 25, Minor: 8, Patch: 0}, *chVersion)
98-
}

0 commit comments

Comments
 (0)