Skip to content

Commit 132de6e

Browse files
feat(metrics): tables synced per batch (#3524)
1 parent a7a266d commit 132de6e

File tree

2 files changed

+15
-0
lines changed

2 files changed

+15
-0
lines changed

flow/connectors/utils/monitoring/monitoring.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,11 +136,15 @@ func AddCDCBatchTablesForFlow(
136136
if enabled, err := internal.PeerDBMetricsRecordAggregatesEnabled(ctx, nil); err == nil && enabled {
137137
recordAggregateMetrics = true
138138
}
139+
var syncedTablesCount int64
139140
tableNameOperations := make(map[string][3]opWithValue, len(tableNameRowsMapping))
140141
for destinationTableName, rowCounts := range tableNameRowsMapping {
141142
inserts := rowCounts.InsertCount.Load()
142143
updates := rowCounts.UpdateCount.Load()
143144
deletes := rowCounts.DeleteCount.Load()
145+
if inserts+updates+deletes > 0 {
146+
syncedTablesCount++
147+
}
144148
if recordAggregateMetrics {
145149
tableNameOperations[destinationTableName] = [3]opWithValue{
146150
{op: otel_metrics.RecordOperationTypeInsert, count: inserts},
@@ -199,6 +203,9 @@ func AddCDCBatchTablesForFlow(
199203
return fmt.Errorf("error while committing transaction for inserting and updating statistics: %w", err)
200204
}
201205

206+
otelManager.Metrics.SyncedTablesPerBatchGauge.Record(ctx, syncedTablesCount, metric.WithAttributeSet(attribute.NewSet(
207+
attribute.Int64(otel_metrics.BatchIdKey, batchID))))
208+
202209
for destinationTableName, operations := range tableNameOperations {
203210
for _, opAndCount := range operations {
204211
otelManager.Metrics.RecordsSyncedPerTableCounter.Add(ctx, int64(opAndCount.count), metric.WithAttributeSet(attribute.NewSet(

flow/otel_metrics/otel_manager.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ const (
4848
RecordsSyncedPerTableGaugeName = "records_synced_per_table"
4949
RecordsSyncedPerTableCounterName = "records_synced_per_table_counter"
5050
SyncedTablesGaugeName = "synced_tables"
51+
SyncedTablesPerBatchGaugeName = "synced_tables_per_batch"
5152
InstanceStatusGaugeName = "instance_status"
5253
MaintenanceStatusGaugeName = "maintenance_status"
5354
FlowStatusGaugeName = "flow_status"
@@ -85,6 +86,7 @@ type Metrics struct {
8586
RecordsSyncedPerTableGauge metric.Int64Gauge
8687
RecordsSyncedPerTableCounter metric.Int64Counter
8788
SyncedTablesGauge metric.Int64Gauge
89+
SyncedTablesPerBatchGauge metric.Int64Gauge
8890
InstanceStatusGauge metric.Int64Gauge
8991
MaintenanceStatusGauge metric.Int64Gauge
9092
FlowStatusGauge metric.Int64Gauge
@@ -335,6 +337,12 @@ func (om *OtelManager) setupMetrics(ctx context.Context) error {
335337
return err
336338
}
337339

340+
if om.Metrics.SyncedTablesPerBatchGauge, err = om.GetOrInitInt64Gauge(BuildMetricName(SyncedTablesPerBatchGaugeName),
341+
metric.WithDescription("Number of tables synced for every Sync batch"),
342+
); err != nil {
343+
return err
344+
}
345+
338346
if om.Metrics.InstanceStatusGauge, err = om.GetOrInitInt64Gauge(BuildMetricName(InstanceStatusGaugeName),
339347
metric.WithDescription("Status of the instance, always emits a 1 metric with different attributes for different statuses"),
340348
); err != nil {

0 commit comments

Comments
 (0)