Skip to content

Commit bd33001

Browse files
authored
ch: Distributed Table Engine (#2997)
useful in conjunction with Replicated table engines Replication is no longer stored on table mapping. Instead, peer has replicated/distributed bools. Replicated will use Replicated table engines. Specifying cluster name will adjust DDL to use `ON CLUSTER` & put raw tables / destination tables behind Distributed table engine There seems to be some issues with updates/deletes despite using `parallel_distributed_insert_select=2`. But for append only use case this seems to work. Adjusting raw table's sharding key to match destination table's sharding key seems to alleviate this issue
1 parent 53f6529 commit bd33001

File tree

37 files changed

+717
-279
lines changed

37 files changed

+717
-279
lines changed

.github/workflows/flow.yml

Lines changed: 131 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,137 @@ jobs:
201201
202202
- name: Run ClickHouse
203203
run: |
204-
./clickhouse server &
204+
cat > config1.xml <<EOF
205+
<clickhouse>
206+
<profiles><default></default></profiles>
207+
<users>
208+
<default>
209+
<password></password>
210+
<networks>
211+
<ip>::/0</ip>
212+
</networks>
213+
<profile>default</profile>
214+
<quota>default</quota>
215+
<access_management>1</access_management>
216+
<named_collection_control>1</named_collection_control>
217+
</default>
218+
</users>
219+
<logger><level>none</level></logger>
220+
<path>var/lib/clickhouse</path>
221+
<tmp_path>var/lib/clickhouse/tmp</tmp_path>
222+
<user_files_path>var/lib/clickhouse/user_files</user_files_path>
223+
<format_schema_path>var/lib/clickhouse/format_schemas</format_schema_path>
224+
<tcp_port>9000</tcp_port>
225+
<http_port remove="1"/>
226+
<postgresql_port remove="1"/>
227+
<mysql_port remove="1"/>
228+
<macros>
229+
<shard>1</shard>
230+
<replica>1</replica>
231+
</macros>
232+
<zookeeper>
233+
<node>
234+
<host>localhost</host>
235+
<port>2181</port>
236+
</node>
237+
</zookeeper>
238+
<distributed_ddl>
239+
<path>/clickhouse/task_queue/ddl</path>
240+
</distributed_ddl>
241+
<remote_servers>
242+
<cicluster>
243+
<shard>
244+
<replica>
245+
<host>localhost</host>
246+
<port>9000</port>
247+
</replica>
248+
</shard>
249+
<shard>
250+
<replica>
251+
<host>localhost</host>
252+
<port>9001</port>
253+
</replica>
254+
</shard>
255+
</cicluster>
256+
</remote_servers>
257+
</clickhouse>
258+
EOF
259+
cat > config2.xml <<EOF
260+
<clickhouse>
261+
<profiles><default></default></profiles>
262+
<users>
263+
<default>
264+
<password></password>
265+
<networks>
266+
<ip>::/0</ip>
267+
</networks>
268+
<profile>default</profile>
269+
<quota>default</quota>
270+
<access_management>1</access_management>
271+
<named_collection_control>1</named_collection_control>
272+
</default>
273+
</users>
274+
<logger><level>none</level></logger>
275+
<path>var/lib/clickhouse</path>
276+
<tmp_path>var/lib/clickhouse/tmp</tmp_path>
277+
<user_files_path>var/lib/clickhouse/user_files</user_files_path>
278+
<format_schema_path>var/lib/clickhouse/format_schemas</format_schema_path>
279+
<tcp_port>9001</tcp_port>
280+
<http_port remove="1"/>
281+
<postgresql_port remove="1"/>
282+
<mysql_port remove="1"/>
283+
<macros>
284+
<shard>2</shard>
285+
<replica>1</replica>
286+
</macros>
287+
<zookeeper>
288+
<node>
289+
<host>localhost</host>
290+
<port>2181</port>
291+
</node>
292+
</zookeeper>
293+
<distributed_ddl>
294+
<path>/clickhouse/task_queue/ddl</path>
295+
</distributed_ddl>
296+
<remote_servers>
297+
<cicluster>
298+
<shard>
299+
<replica>
300+
<host>localhost</host>
301+
<port>9000</port>
302+
</replica>
303+
</shard>
304+
<shard>
305+
<replica>
306+
<host>localhost</host>
307+
<port>9001</port>
308+
</replica>
309+
</shard>
310+
</cicluster>
311+
</remote_servers>
312+
</clickhouse>
313+
EOF
314+
cat > config-keeper.xml <<EOF
315+
<clickhouse>
316+
<keeper_server>
317+
<tcp_port>2181</tcp_port>
318+
<server_id>1</server_id>
319+
<log_storage_path>var/lib/clickhouse/coordination/log</log_storage_path>
320+
<snapshot_storage_path>var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
321+
<raft_configuration>
322+
<server>
323+
<id>1</id>
324+
<hostname>localhost</hostname>
325+
<port>9234</port>
326+
</server>
327+
</raft_configuration>
328+
</keeper_server>
329+
</clickhouse>
330+
EOF
331+
mkdir ch1 ch2 chkeep
332+
(cd ch1 && ../clickhouse server -C ../config1.xml) &
333+
(cd ch2 && ../clickhouse server -C ../config2.xml) &
334+
(cd chkeep && ../clickhouse keeper -C ../config-keeper.xml) &
205335
206336
- name: Install Temporal CLI
207337
uses: temporalio/setup-temporal@1059a504f87e7fa2f385e3fa40d1aa7e62f1c6ca # v0

flow/activities/flowable_core.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
225225
defer connectors.CloseConnector(ctx, dstConn)
226226

227227
syncState.Store(shared.Ptr("updating schema"))
228-
if err := dstConn.ReplayTableSchemaDeltas(ctx, config.Env, flowName, recordBatchSync.SchemaDeltas); err != nil {
228+
if err := dstConn.ReplayTableSchemaDeltas(ctx, config.Env, flowName, options.TableMappings, recordBatchSync.SchemaDeltas); err != nil {
229229
return nil, fmt.Errorf("failed to sync schema: %w", err)
230230
}
231231

flow/connectors/bigquery/bigquery.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ func (c *BigQueryConnector) ReplayTableSchemaDeltas(
204204
ctx context.Context,
205205
env map[string]string,
206206
flowJobName string,
207+
_ []*protos.TableMapping,
207208
schemaDeltas []*protos.TableSchemaDelta,
208209
) error {
209210
for _, schemaDelta := range schemaDeltas {

flow/connectors/bigquery/qrep.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func (c *BigQueryConnector) replayTableSchemaDeltasQRep(
8787
}
8888

8989
if err := c.ReplayTableSchemaDeltas(
90-
ctx, config.Env, config.FlowJobName, []*protos.TableSchemaDelta{tableSchemaDelta},
90+
ctx, config.Env, config.FlowJobName, nil, []*protos.TableSchemaDelta{tableSchemaDelta},
9191
); err != nil {
9292
return nil, fmt.Errorf("failed to add columns to destination table: %w", err)
9393
}

flow/connectors/bigquery/qrep_avro_sync.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ func (s *QRepAvroSyncMethod) SyncRecords(
9898
slog.String(string(shared.FlowNameKey), req.FlowJobName),
9999
slog.String("dstTableName", rawTableName))
100100

101-
if err := s.connector.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.Records.SchemaDeltas); err != nil {
101+
if err := s.connector.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.TableMappings, req.Records.SchemaDeltas); err != nil {
102102
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
103103
}
104104

0 commit comments

Comments
 (0)