Skip to content

Commit ea34459

Browse files
authored
Disable the replication_latency metric for backfilled items (#146)
1 parent 56f5745 commit ea34459

File tree

3 files changed

+13
-9
lines changed

3 files changed

+13
-9
lines changed

agent/src/main/java/com/datastax/oss/cdc/agent/AbstractPulsarMutationSender.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -251,14 +251,17 @@ public CompletableFuture<MessageId> sendMutationAsync(final AbstractMutation<T>
251251
Producer<KeyValue<byte[], MutationValue>> producer = getProducer(mutation);
252252
SchemaAndWriter schemaAndWriter = getAvroKeySchema(mutation);
253253
TypedMessageBuilder<KeyValue<byte[], MutationValue>> messageBuilder = producer.newMessage();
254-
return messageBuilder
254+
messageBuilder = messageBuilder
255255
.value(new KeyValue(
256256
serializeAvroGenericRecord(buildAvroKey(schemaAndWriter.schema, mutation), schemaAndWriter.writer),
257257
mutation.mutationValue()))
258-
.property(Constants.WRITETIME, mutation.getTs() + "")
259258
.property(Constants.SEGMENT_AND_POSITION, mutation.getSegment() + ":" + mutation.getPosition())
260-
.property(Constants.TOKEN, mutation.getToken().toString())
261-
.sendAsync();
259+
.property(Constants.TOKEN, mutation.getToken().toString());
260+
// a WRITETIME property is only used by the connector to emit e2e latency metric, skip if the mutation is not timestamped
261+
if (mutation.getTs() != -1) {
262+
messageBuilder = messageBuilder.property(Constants.WRITETIME, mutation.getTs() + "");
263+
}
264+
return messageBuilder.sendAsync();
262265
} catch(Exception e) {
263266
CompletableFuture future = new CompletableFuture<>();
264267
future.completeExceptionally(e);

backfill-cli/src/main/java/com/datastax/oss/cdc/backfill/importer/PulsarImporter.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -172,11 +172,9 @@ public ExitStatus importTable() {
172172
}
173173
return newVal;
174174
}).collect(Collectors.toList());
175-
// tsMicro is used to emit e2e metrics by the connectors, if you carry over the C* WRITETIME
176-
// of the source records, the metric will be greatly skewed because those records are historical.
177-
// For now, will mimic the metric by using now()
178-
// TODO: Disable the e2e latency metric if the records are emitted from cdc back-filling CLI
179-
final long tsMicro = Instant.now().toEpochMilli() * 1000;
175+
// Disables the e2e latency metric because the {@link com.datastax.oss.cdc.Constants.WRITETIME}
176+
// property won't be set
177+
final long tsMicro = -1;
180178
final AbstractMutation<TableMetadata> mutation =
181179
createMutation(pkValues.toArray(), this.exportedTable.getCassandraTable(), tsMicro);
182180
sendMutationAsync(mutation);

backfill-cli/src/test/java/com/datastax/oss/cdc/backfill/PulsarImporterTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import java.util.ArrayList;
6666
import java.util.Arrays;
6767
import java.util.List;
68+
import java.util.Optional;
6869
import java.util.UUID;
6970
import java.util.concurrent.CompletableFuture;
7071
import java.util.concurrent.ExecutionException;
@@ -131,6 +132,8 @@ public void testImportPartitionKeyOnly() {
131132
Mockito.verify(sender, Mockito.times(1)).close();
132133
List<AbstractMutation<TableMetadata>> pkValues = abstractMutationCaptor.getAllValues();
133134
assertEquals(2, pkValues.size());
135+
assertEquals(-1L, pkValues.get(0).getTs());
136+
assertEquals(-1L, pkValues.get(1).getTs());
134137
List<Object> allPkValues = pkValues.stream().flatMap(v-> Arrays.stream(v.getPkValues())).collect(Collectors.toList());
135138
assertThat(allPkValues, containsInAnyOrder("id3", "id8"));
136139
}

0 commit comments

Comments
 (0)