Skip to content
This repository was archived by the owner on Dec 29, 2025. It is now read-only.

Commit a0f400f

Browse files
committed
ack-after-write pattern for improved throughput and durability
1 parent 18ab1f2 commit a0f400f

File tree

2 files changed

+289
-5
lines changed

2 files changed

+289
-5
lines changed
Lines changed: 265 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
#!/bin/bash
2+
set -euo pipefail
3+
4+
source "$(dirname "$0")/e2e_common.sh"
5+
6+
BENCHMARK_ROWS=${BENCHMARK_ROWS:-10000}
7+
BATCH_SIZE=${BATCH_SIZE:-1000}
8+
FETCH_BATCH=${FETCH_BATCH:-10}
9+
MAX_COPY_WORKERS=${MAX_COPY_WORKERS:-8}
10+
TEST_MODE=${TEST_MODE:-"copy-and-stream"}
11+
12+
RESULTS_FILE="/tmp/pg_flo_benchmark_results.json"
13+
START_TIME=""
14+
END_TIME=""
15+
16+
start_timer() {
17+
START_TIME=$(date +%s.%N)
18+
}
19+
20+
end_timer() {
21+
END_TIME=$(date +%s.%N)
22+
}
23+
24+
calculate_duration() {
25+
echo "$END_TIME - $START_TIME" | bc -l
26+
}
27+
28+
calculate_throughput() {
29+
local rows=$1
30+
local duration=$2
31+
echo "scale=2; $rows / $duration" | bc -l
32+
}
33+
34+
log_performance() {
35+
local phase=$1
36+
local rows=$2
37+
local duration=$3
38+
local throughput=$4
39+
log "📊 Performance Results - $phase:"
40+
log " Rows processed: $rows"
41+
log " Duration: ${duration}s"
42+
log " Throughput: ${throughput} rows/sec"
43+
}
44+
45+
create_benchmark_table() {
46+
log "Creating benchmark table..."
47+
run_sql "DROP TABLE IF EXISTS public.benchmark_table CASCADE;"
48+
run_sql "CREATE TABLE public.benchmark_table (
49+
id bigserial PRIMARY KEY,
50+
user_id bigint NOT NULL,
51+
email varchar(255),
52+
data jsonb,
53+
balance numeric(15,2),
54+
created_at timestamp DEFAULT current_timestamp
55+
);"
56+
success "Benchmark table created"
57+
}
58+
59+
generate_test_data() {
60+
local num_rows=$1
61+
log "Generating $num_rows test records..."
62+
start_timer
63+
64+
run_sql "INSERT INTO public.benchmark_table (user_id, email, data, balance)
65+
SELECT
66+
generate_series(1, $num_rows),
67+
'user' || generate_series(1, $num_rows) || '@example.com',
68+
json_build_object('key', 'value' || generate_series(1, $num_rows), 'number', generate_series(1, $num_rows)),
69+
random() * 1000
70+
;"
71+
72+
run_sql "ANALYZE public.benchmark_table;"
73+
74+
end_timer
75+
local generation_time=$(calculate_duration)
76+
local generation_throughput=$(calculate_throughput "$num_rows" "$generation_time")
77+
log_performance "Data Generation" "$num_rows" "$generation_time" "$generation_throughput"
78+
success "Test data generated"
79+
}
80+
81+
start_replicator() {
82+
local mode=$1
83+
log "Starting pg_flo replicator in $mode mode..."
84+
85+
local mode_flags=""
86+
case $mode in
87+
"stream") mode_flags="--stream" ;;
88+
"copy-and-stream") mode_flags="--copy-and-stream --max-copy-workers-per-table $MAX_COPY_WORKERS" ;;
89+
"copy") mode_flags="--copy --max-copy-workers-per-table $MAX_COPY_WORKERS" ;;
90+
*) error "Unknown test mode: $mode"; exit 1 ;;
91+
esac
92+
93+
$pg_flo_BIN replicator \
94+
--host "$PG_HOST" \
95+
--port "$PG_PORT" \
96+
--dbname "$PG_DB" \
97+
--user "$PG_USER" \
98+
--password "$PG_PASSWORD" \
99+
--group "benchmark_group" \
100+
--tables "benchmark_table" \
101+
--schema "public" \
102+
--nats-url "$NATS_URL" \
103+
"$mode_flags" \
104+
>"$pg_flo_LOG" 2>&1 &
105+
pg_flo_PID=$!
106+
success "pg_flo replicator started (PID: $pg_flo_PID)"
107+
}
108+
109+
start_worker() {
110+
log "Starting pg_flo worker with PostgreSQL sink..."
111+
$pg_flo_BIN worker postgres \
112+
--group "benchmark_group" \
113+
--nats-url "$NATS_URL" \
114+
--source-host "$PG_HOST" \
115+
--source-port "$PG_PORT" \
116+
--source-dbname "$PG_DB" \
117+
--source-user "$PG_USER" \
118+
--source-password "$PG_PASSWORD" \
119+
--target-host "$TARGET_PG_HOST" \
120+
--target-port "$TARGET_PG_PORT" \
121+
--target-dbname "$TARGET_PG_DB" \
122+
--target-user "$TARGET_PG_USER" \
123+
--target-password "$TARGET_PG_PASSWORD" \
124+
--batch-size "$BATCH_SIZE" \
125+
--target-sync-schema \
126+
>"$pg_flo_WORKER_LOG" 2>&1 &
127+
pg_flo_WORKER_PID=$!
128+
success "pg_flo worker started (PID: $pg_flo_WORKER_PID)"
129+
}
130+
131+
simulate_concurrent_operations() {
132+
local num_operations=${1:-1000}
133+
log "Simulating $num_operations operations during replication..."
134+
start_timer
135+
136+
for i in $(seq 1 "$num_operations"); do
137+
local new_id=$((BENCHMARK_ROWS + i))
138+
run_sql "INSERT INTO public.benchmark_table (user_id, email, data, balance)
139+
VALUES ($new_id, 'user$i@test.com', '{\"test\": $i}', $i * 1.5);"
140+
if [ $((i % 5)) -eq 0 ]; then
141+
run_sql "UPDATE public.benchmark_table SET balance = balance + 10 WHERE id = $i;"
142+
fi
143+
done
144+
145+
end_timer
146+
local ops_duration=$(calculate_duration)
147+
local ops_throughput=$(calculate_throughput "$num_operations" "$ops_duration")
148+
log_performance "Operations" "$num_operations" "$ops_duration" "$ops_throughput"
149+
success "Operations completed"
150+
}
151+
152+
measure_replication_performance() {
153+
local test_mode=$1
154+
log "📊 Starting throughput benchmark in $test_mode mode..."
155+
log "Configuration: ROWS=$BENCHMARK_ROWS, BATCH_SIZE=$BATCH_SIZE, WORKERS=$MAX_COPY_WORKERS"
156+
157+
setup_postgres
158+
create_benchmark_table
159+
160+
if [ "$test_mode" != "stream" ]; then
161+
generate_test_data "$BENCHMARK_ROWS"
162+
fi
163+
164+
start_replicator "$test_mode"
165+
start_worker
166+
167+
log "Waiting for replication to initialize..."
168+
sleep 5
169+
170+
start_timer
171+
172+
if [ "$test_mode" = "stream" ]; then
173+
generate_test_data "$BENCHMARK_ROWS"
174+
else
175+
simulate_concurrent_operations 1000
176+
fi
177+
178+
log "Waiting for replication to complete..."
179+
local max_wait=120
180+
local wait_count=0
181+
182+
while [ $wait_count -lt $max_wait ]; do
183+
local source_count=$(run_sql "SELECT COUNT(*) FROM public.benchmark_table")
184+
local target_count=$(run_sql_target "SELECT COUNT(*) FROM public.benchmark_table" 2>/dev/null || echo "0")
185+
186+
if [ "$source_count" = "$target_count" ] && [ "$source_count" -gt 0 ]; then
187+
log "Replication completed: $source_count rows replicated"
188+
break
189+
fi
190+
191+
if [ $((wait_count % 10)) -eq 0 ]; then
192+
log "Waiting... Source: $source_count, Target: $target_count (${wait_count}s/${max_wait}s)"
193+
fi
194+
195+
sleep 1
196+
wait_count=$((wait_count + 1))
197+
done
198+
199+
end_timer
200+
201+
local total_duration=$(calculate_duration)
202+
local source_count=$(run_sql "SELECT COUNT(*) FROM public.benchmark_table")
203+
local target_count=$(run_sql_target "SELECT COUNT(*) FROM public.benchmark_table")
204+
local replication_throughput=$(calculate_throughput "$target_count" "$total_duration")
205+
206+
stop_pg_flo_gracefully
207+
208+
cat <<EOF > "$RESULTS_FILE"
209+
{
210+
"test_mode": "$test_mode",
211+
"configuration": {
212+
"benchmark_rows": $BENCHMARK_ROWS,
213+
"batch_size": $BATCH_SIZE,
214+
"max_copy_workers": $MAX_COPY_WORKERS
215+
},
216+
"results": {
217+
"total_duration_seconds": $total_duration,
218+
"source_row_count": $source_count,
219+
"target_row_count": $target_count,
220+
"replication_throughput_rows_per_second": $replication_throughput,
221+
"data_integrity_check": $([ "$source_count" = "$target_count" ] && echo "\"PASS\"" || echo "\"FAIL\"")
222+
},
223+
"timestamp": "$(date -Iseconds)"
224+
}
225+
EOF
226+
227+
log_performance "Total Replication ($test_mode mode)" "$target_count" "$total_duration" "$replication_throughput"
228+
229+
if [ "$source_count" = "$target_count" ]; then
230+
success "✅ Benchmark completed successfully! Results saved to $RESULTS_FILE"
231+
return 0
232+
else
233+
error "❌ Data integrity check failed: Source($source_count) != Target($target_count)"
234+
return 1
235+
fi
236+
}
237+
238+
show_results() {
239+
if [ -f "$RESULTS_FILE" ]; then
240+
log "📊 Benchmark Results:"
241+
echo "----------------------------------------"
242+
cat "$RESULTS_FILE" | jq -r '
243+
"Mode: " + .test_mode,
244+
"Duration: " + (.results.total_duration_seconds | tonumber | tostring) + "s",
245+
"Rows: " + (.results.target_row_count | tostring),
246+
"Throughput: " + (.results.replication_throughput_rows_per_second | tonumber | tostring) + " rows/sec",
247+
"Integrity: " + .results.data_integrity_check
248+
'
249+
echo "----------------------------------------"
250+
fi
251+
}
252+
253+
log "🚀 Starting pg_flo Throughput Benchmark"
254+
log "Mode: $TEST_MODE | Rows: $BENCHMARK_ROWS | Batch: $BATCH_SIZE"
255+
256+
if measure_replication_performance "$TEST_MODE"; then
257+
show_results
258+
success "🎉 Benchmark completed successfully!"
259+
exit 0
260+
else
261+
error "❌ Benchmark failed"
262+
show_pg_flo_logs
263+
show_results
264+
exit 1
265+
fi

pkg/worker/worker.go

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ type Worker struct {
3131
logger utils.Logger
3232
batchSize int
3333
buffer []*utils.CDCMessage
34+
ackBuffer []*nats.Msg
3435
lastSavedState uint64
3536
flushInterval time.Duration
3637
shutdownCh chan struct{}
@@ -65,6 +66,7 @@ func NewWorker(natsClient *pgflonats.NATSClient, ruleEngine *rules.RuleEngine, r
6566
logger: logger,
6667
batchSize: 1000,
6768
buffer: make([]*utils.CDCMessage, 0, 1000),
69+
ackBuffer: make([]*nats.Msg, 0, 1000),
6870
lastSavedState: 0,
6971
flushInterval: 500 * time.Millisecond,
7072
shutdownCh: make(chan struct{}),
@@ -74,6 +76,7 @@ func NewWorker(natsClient *pgflonats.NATSClient, ruleEngine *rules.RuleEngine, r
7476
opt(w)
7577
}
7678
w.buffer = make([]*utils.CDCMessage, 0, w.batchSize)
79+
w.ackBuffer = make([]*nats.Msg, 0, w.batchSize)
7780

7881
return w
7982
}
@@ -97,7 +100,7 @@ func (w *Worker) Start(ctx context.Context) error {
97100
Durable: consumerName,
98101
FilterSubject: subject,
99102
AckPolicy: nats.AckExplicitPolicy,
100-
MaxDeliver: 1,
103+
MaxDeliver: 20,
101104
AckWait: 25 * time.Minute,
102105
DeliverPolicy: nats.DeliverAllPolicy,
103106
}
@@ -146,7 +149,7 @@ func (w *Worker) processMessages(ctx context.Context, sub *nats.Subscription) er
146149
w.logger.Error().Err(err).Msg("Failed to flush buffer on interval")
147150
}
148151
default:
149-
msgs, err := sub.Fetch(10, nats.MaxWait(500*time.Millisecond))
152+
msgs, err := sub.Fetch(w.batchSize, nats.MaxWait(1*time.Second))
150153
if err != nil && !errors.Is(err, nats.ErrTimeout) {
151154
w.logger.Error().Err(err).Msg("Error fetching messages")
152155
continue
@@ -155,9 +158,9 @@ func (w *Worker) processMessages(ctx context.Context, sub *nats.Subscription) er
155158
for _, msg := range msgs {
156159
if err := w.processMessage(msg); err != nil {
157160
w.logger.Error().Err(err).Msg("Failed to process message")
158-
}
159-
if err := msg.Ack(); err != nil {
160-
w.logger.Error().Err(err).Msg("Failed to acknowledge message")
161+
if ackErr := msg.Ack(); ackErr != nil {
162+
w.logger.Error().Err(ackErr).Msg("Failed to acknowledge failed message")
163+
}
161164
}
162165
}
163166
if len(w.buffer) >= w.batchSize {
@@ -194,6 +197,9 @@ func (w *Worker) processMessage(msg *nats.Msg) error {
194197
}
195198
if processedMessage == nil {
196199
w.logger.Debug().Msg("Message filtered out by rules")
200+
if ackErr := msg.Ack(); ackErr != nil {
201+
w.logger.Error().Err(ackErr).Msg("Failed to acknowledge filtered message")
202+
}
197203
return nil
198204
}
199205
cdcMessage = *processedMessage
@@ -207,11 +213,15 @@ func (w *Worker) processMessage(msg *nats.Msg) error {
207213
}
208214
if routedMessage == nil {
209215
w.logger.Debug().Msg("Message filtered out by routing")
216+
if ackErr := msg.Ack(); ackErr != nil {
217+
w.logger.Error().Err(ackErr).Msg("Failed to acknowledge filtered message")
218+
}
210219
return nil
211220
}
212221
cdcMessage = *routedMessage
213222
}
214223

224+
w.ackBuffer = append(w.ackBuffer, msg)
215225
w.buffer = append(w.buffer, &cdcMessage)
216226
w.lastSavedState = metadata.Sequence.Stream
217227

@@ -229,12 +239,20 @@ func (w *Worker) flushBuffer() error {
229239
Int("batch_size", w.batchSize).
230240
Msg("Flushing buffer")
231241

242+
// Write to sink first - if this fails, messages remain unacked for redelivery
232243
err := w.sink.WriteBatch(w.buffer)
233244
if err != nil {
234245
w.logger.Error().Err(err).Msg("Failed to write batch to sink")
235246
return err
236247
}
237248

249+
for _, msg := range w.ackBuffer {
250+
if ackErr := msg.Ack(); ackErr != nil {
251+
w.logger.Error().Err(ackErr).Msg("Failed to acknowledge message after successful write")
252+
// Continue acking other messages even if one fails
253+
}
254+
}
255+
238256
state, err := w.natsClient.GetState()
239257
if err != nil {
240258
w.logger.Error().Err(err).Msg("Failed to get current state")
@@ -248,5 +266,6 @@ func (w *Worker) flushBuffer() error {
248266
}
249267

250268
w.buffer = w.buffer[:0]
269+
w.ackBuffer = w.ackBuffer[:0]
251270
return nil
252271
}

0 commit comments

Comments
 (0)