Skip to content

Commit 0c210d6

Browse files
authored
remove PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE (#3354)
This has been default for some time, has never come up to turn it off, plays vital role in PeerDB's prime directive: always read from slot
1 parent 35bdcd0 commit 0c210d6

File tree

3 files changed

+3
-34
lines changed

3 files changed

+3
-34
lines changed

flow/activities/flowable.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ type CheckMetadataTablesResult struct {
4242
}
4343

4444
type NormalizeBatchRequest struct {
45-
Done chan struct{}
4645
BatchID int64
4746
}
4847

flow/activities/flowable_core.go

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -330,27 +330,12 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
330330
}
331331

332332
if recordBatchSync.NeedsNormalize() {
333-
parallel, err := internal.PeerDBEnableParallelSyncNormalize(ctx, config.Env)
334-
if err != nil {
335-
return nil, err
336-
}
337-
var done chan struct{}
338-
if !parallel {
339-
done = make(chan struct{})
340-
}
341333
syncState.Store(shared.Ptr("normalizing"))
342334
select {
343-
case normRequests <- NormalizeBatchRequest{BatchID: res.CurrentSyncBatchID, Done: done}:
335+
case normRequests <- NormalizeBatchRequest{BatchID: res.CurrentSyncBatchID}:
344336
case <-ctx.Done():
345337
return res, nil
346338
}
347-
if done != nil {
348-
select {
349-
case <-done:
350-
case <-ctx.Done():
351-
return res, nil
352-
}
353-
}
354339
}
355340

356341
return res, nil
@@ -722,8 +707,6 @@ func (a *FlowableActivity) normalizeLoop(
722707
continue retryLoop
723708
}
724709
}
725-
} else if req.Done != nil {
726-
close(req.Done)
727710
}
728711
a.OtelManager.Metrics.LastNormalizedBatchIdGauge.Record(ctx, req.BatchID, metric.WithAttributeSet(attribute.NewSet(
729712
attribute.String(otel_metrics.FlowNameKey, config.FlowJobName),

flow/internal/dynamicconf.go

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,8 @@ var DynamicSettings = [...]*protos.DynamicSetting{
3333
TargetForSetting: protos.DynconfTarget_ALL,
3434
},
3535
{
36-
Name: "PEERDB_NORMALIZE_CHANNEL_BUFFER_SIZE",
37-
Description: "Advanced setting: changes buffer size of channel PeerDB uses for queueing normalization, " +
38-
"use with PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE",
36+
Name: "PEERDB_NORMALIZE_CHANNEL_BUFFER_SIZE",
37+
Description: "Advanced setting: changes buffer size of channel PeerDB uses for queueing normalization",
3938
DefaultValue: "128",
4039
ValueType: protos.DynconfValueType_INT,
4140
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME,
@@ -89,14 +88,6 @@ var DynamicSettings = [...]*protos.DynamicSetting{
8988
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
9089
TargetForSetting: protos.DynconfTarget_ALL,
9190
},
92-
{
93-
Name: "PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE",
94-
Description: "Enables parallel sync (moving rows to target) and normalize (updating rows in target table)",
95-
DefaultValue: "true",
96-
ValueType: protos.DynconfValueType_BOOL,
97-
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME,
98-
TargetForSetting: protos.DynconfTarget_ALL,
99-
},
10091
{
10192
Name: "PEERDB_RECONNECT_AFTER_BATCHES",
10293
Description: "Force peerdb to reconnect connection to source after N batches",
@@ -557,10 +548,6 @@ func PeerDBWALHeartbeatQuery(ctx context.Context, env map[string]string) (string
557548
return dynLookup(ctx, env, "PEERDB_WAL_HEARTBEAT_QUERY")
558549
}
559550

560-
func PeerDBEnableParallelSyncNormalize(ctx context.Context, env map[string]string) (bool, error) {
561-
return dynamicConfBool(ctx, env, "PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE")
562-
}
563-
564551
func PeerDBReconnectAfterBatches(ctx context.Context, env map[string]string) (int32, error) {
565552
return dynamicConfSigned[int32](ctx, env, "PEERDB_RECONNECT_AFTER_BATCHES")
566553
}

0 commit comments

Comments
 (0)