Skip to content

Commit 97a597b

Browse files
Add data record count metrics to reverse replication (#2499)
* Add data record count metrics to reverse replication * spotless apply * addressing comment
1 parent 1f4783a commit 97a597b

File tree

2 files changed

+31
-18
lines changed

2 files changed

+31
-18
lines changed

v2/spanner-to-sourcedb/README.md

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -203,21 +203,23 @@ The progress of the Dataflow jobs can be tracked via the Dataflow UI. Refer the
203203
In addition, there are following application metrics exposed by the job:
204204

205205

206-
| Metric Name | Description |
207-
|---------------------------------------|----------------------------------------------------------------------------------------------------------------------------------|
208-
| custom_shard_id_impl_latency_ms | Time taken for the execution of custom shard identifier logic. |
209-
| data_record_count | The number of change stream records read. |
210-
| element_requeued_for_retry_count | Relevant for retryDLQ run mode, when the record gets enqueded back to severe folder for retry. |
211-
| elementsReconsumedFromDeadLetterQueue | The number of records read from the retry folder of DLQ directory. |
212-
| records_written_to_source_\<logical shard name\> | Number of records successfully written for the shard. |
213-
| replication_lag_in_seconds_\<logical shard name\>| Replication lag min,max and count value for the shard.|
214-
| retryable_record_count | The number of records that are up for retry. |
215-
| severe_error_count | The number of permanent errors. |
216-
| skipped_record_count | The count of records that were skipped from reverse replication. |
217-
| success_record_count | The number of successfully processed records. This also accounts for the records that were not written to source if the source already had updated data. |
218-
| custom_transformation_exception | Number of exception encountered in the custom transformation jar |
219-
| filtered_events_\<logical shard name\> | Number of events filtered via custom transformation per shard |
220-
| apply_custom_transformation_impl_latency_ms | Time taken for the execution of custom transformation logic. |
206+
| Metric Name | Description |
207+
|---------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------|
208+
| custom_shard_id_impl_latency_ms | Time taken for the execution of custom shard identifier logic. |
209+
| data_record_count | The number of change stream transactions read. |
210+
| total_spanner_writes | The number of change stream records read. |
211+
| element_requeued_for_retry_count | Relevant for retryDLQ run mode, when the record gets enqueded back to severe folder for retry. |
212+
| elementsReconsumedFromDeadLetterQueue | The number of records read from the retry folder of DLQ directory. |
213+
| records_written_to_source_\<logical shard name\> | Number of records successfully written for the shard. |
214+
| replication_lag_in_seconds_\<logical shard name\> | Replication lag min,max and count value for the shard. |
215+
| fwd_migration_filtered_record_count | The number of records filtered during reverse replication due to forward transaction tag. |
216+
| retryable_record_count | The number of records that are up for retry. |
217+
| severe_error_count | The number of permanent errors. |
218+
| skipped_record_count | The count of records that were skipped from reverse replication. |
219+
| success_record_count | The number of successfully processed records. This also accounts for the records that were not written to source if the source already had updated data. |
220+
| custom_transformation_exception | Number of exception encountered in the custom transformation jar |
221+
| filtered_events_\<logical shard name\> | Number of events filtered via custom transformation per shard |
222+
| apply_custom_transformation_impl_latency_ms | Time taken for the execution of custom transformation logic. |
221223

222224

223225
These can be used to track the pipeline progress.
@@ -361,9 +363,15 @@ In the event that cut-back is needed to start serving from the original database
361363
1. Ensure that there is a validation solution to place to validate the Spanner and source database records.
362364
2. There should bo no severe errors.
363365
3. There should be no retryable errors.
364-
4. The success_record_count which reflects the total successful records should match the data_record_count metric which reflects the count of data records read by SpannerIO. If these match - it is an indication that all records have been successfully reverse replicated.
365-
Note that for these metrics to be reliable - there should be no Dataflow worker restarts. If there are worker restarts, there is a possibility that the same record was re-processed by a certain stage.
366-
To check if there are worker restarts - in the Dataflow UI, navigate to the Job metrics -> CPU utilization.
366+
4. The success_record_count should ideally match the value of total_spanner_writes minus fwd_migration_filtered_record_count. This indicates that the total number of successful records aligns with the data records read by SpannerIO, less any records filtered due to a forward migration transaction tag.
367+
368+
It's important to note that these metrics are approximate and should be treated as indicators, not absolute guarantees of 100% accuracy.
369+
370+
**Important Considerations for Metric Reliability**
371+
372+
For these metrics to be reliable, ensure there are no Dataflow worker restarts. If workers restart, there's a chance the same record might be re-processed by a specific stage, skewing your counts. You can check for worker restarts in the Dataflow UI by navigating to Job metrics -> CPU utilization.
373+
374+
Additionally, to determine an approximate downtime during cutover, rely on the replication_lag_in_seconds metric.
367375

368376
### What to do when there are worker restarts or the metrics do not match
369377

v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/transforms/FilterRecordsFn.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import com.google.cloud.teleport.v2.templates.constants.Constants;
1919
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
20+
import org.apache.beam.sdk.metrics.Metrics;
2021
import org.apache.beam.sdk.transforms.DoFn;
2122
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
2223
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
@@ -33,6 +34,7 @@ public FilterRecordsFn(String filtrationMode) {
3334
@ProcessElement
3435
public void processElement(ProcessContext c) {
3536
DataChangeRecord record = c.element();
37+
Metrics.counter(FilterRecordsFn.class, "total_spanner_writes").inc(record.getMods().size());
3638

3739
// In this mode, filter no records.
3840
if (filtrationMode.equals(Constants.FILTRATION_MODE_NONE)) {
@@ -43,6 +45,9 @@ public void processElement(ProcessContext c) {
4345
// TODO: Fetch forward migration Dataflow job id and do full string match for the tag.
4446
if (!record.getTransactionTag().startsWith(Constants.FWD_MIGRATION_TRANSACTION_TAG_PREFIX)) {
4547
c.output(record);
48+
return;
4649
}
50+
Metrics.counter(FilterRecordsFn.class, "fwd_migration_filtered_record_count")
51+
.inc(record.getMods().size());
4752
}
4853
}

0 commit comments

Comments
 (0)