diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteEvalFilter.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteEvalFilter.java index 46aba85c1..aa725eee8 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteEvalFilter.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteEvalFilter.java @@ -22,7 +22,7 @@ class IncrementalWriteEvalFilter extends IncrementalWriteFilter { private static final String EVAL_SCRIPT = """ - const tuples = cts.valueTuples([cts.uriReference(), cts.fieldReference(fieldName)], null, cts.documentQuery(uris)); + const tuples = cts.valueTuples([cts.uriReference(), cts.fieldReference(hashKeyName)], null, cts.documentQuery(uris)); const response = {}; for (var tuple of tuples) { response[tuple[0]] = tuple[1]; @@ -30,8 +30,9 @@ class IncrementalWriteEvalFilter extends IncrementalWriteFilter { response """; - IncrementalWriteEvalFilter(String fieldName, boolean canonicalizeJson, Consumer skippedDocumentsConsumer) { - super(fieldName, canonicalizeJson, skippedDocumentsConsumer); + IncrementalWriteEvalFilter(String hashKeyName, String timestampKeyName, boolean canonicalizeJson, + Consumer skippedDocumentsConsumer) { + super(hashKeyName, timestampKeyName, canonicalizeJson, skippedDocumentsConsumer); } @Override @@ -45,7 +46,7 @@ public DocumentWriteSet apply(DocumentWriteSetFilter.Context context) { try { JsonNode response = context.getDatabaseClient().newServerEval().javascript(EVAL_SCRIPT) - .addVariable("fieldName", fieldName) + .addVariable("hashKeyName", hashKeyName) .addVariable("uris", new JacksonHandle(uris)) .evalAs(JsonNode.class); diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilter.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilter.java index 8fe27d05d..cb14f469b 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilter.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilter.java @@ -18,6 +18,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.function.Consumer; @@ -39,16 +40,25 @@ public static Builder newBuilder() { public static class Builder { - private String fieldName = "incrementalWriteHash"; + private String hashKeyName = "incrementalWriteHash"; + private String timestampKeyName = "incrementalWriteTimestamp"; private boolean canonicalizeJson = true; private boolean useEvalQuery = false; private Consumer skippedDocumentsConsumer; /** - * @param fieldName the name of the MarkLogic field that will hold the hash value; defaults to "incrementalWriteHash". + * @param keyName the name of the MarkLogic metadata key that will hold the hash value; defaults to "incrementalWriteHash". */ - public Builder fieldName(String fieldName) { - this.fieldName = fieldName; + public Builder hashKeyName(String keyName) { + this.hashKeyName = keyName; + return this; + } + + /** + * @param keyName the name of the MarkLogic metadata key that will hold the timestamp value; defaults to "incrementalWriteTimestamp". + */ + public Builder timestampKeyName(String keyName) { + this.timestampKeyName = keyName; return this; } @@ -79,13 +89,14 @@ public Builder onDocumentsSkipped(Consumer skippedDocu public IncrementalWriteFilter build() { if (useEvalQuery) { - return new IncrementalWriteEvalFilter(fieldName, canonicalizeJson, skippedDocumentsConsumer); + return new IncrementalWriteEvalFilter(hashKeyName, timestampKeyName, canonicalizeJson, skippedDocumentsConsumer); } - return new IncrementalWriteOpticFilter(fieldName, canonicalizeJson, skippedDocumentsConsumer); + return new IncrementalWriteOpticFilter(hashKeyName, timestampKeyName, canonicalizeJson, skippedDocumentsConsumer); } } - protected final String fieldName; + protected final String hashKeyName; + private final String timestampKeyName; private final boolean canonicalizeJson; private final Consumer skippedDocumentsConsumer; @@ -93,8 +104,9 @@ public IncrementalWriteFilter build() { // See https://xxhash.com for benchmarks. private final LongHashFunction hashFunction = LongHashFunction.xx3(); - public IncrementalWriteFilter(String fieldName, boolean canonicalizeJson, Consumer skippedDocumentsConsumer) { - this.fieldName = fieldName; + public IncrementalWriteFilter(String hashKeyName, String timestampKeyName, boolean canonicalizeJson, Consumer skippedDocumentsConsumer) { + this.hashKeyName = hashKeyName; + this.timestampKeyName = timestampKeyName; this.canonicalizeJson = canonicalizeJson; this.skippedDocumentsConsumer = skippedDocumentsConsumer; } @@ -102,6 +114,7 @@ public IncrementalWriteFilter(String fieldName, boolean canonicalizeJson, Consum protected final DocumentWriteSet filterDocuments(Context context, Function hashRetriever) { final DocumentWriteSet newWriteSet = context.getDatabaseClient().newDocumentManager().newWriteSet(); final List skippedDocuments = new ArrayList<>(); + final String timestamp = Instant.now().toString(); for (DocumentWriteOperation doc : context.getDocumentWriteSet()) { if (!DocumentWriteOperation.OperationType.DOCUMENT_WRITE.equals(doc.getOperationType())) { @@ -117,14 +130,14 @@ protected final DocumentWriteSet filterDocuments(Context context, Function skippedDocumentsConsumer) { - super(fieldName, canonicalizeJson, skippedDocumentsConsumer); + IncrementalWriteOpticFilter(String hashKeyName, String timestampKeyName, boolean canonicalizeJson, + Consumer skippedDocumentsConsumer) { + super(hashKeyName, timestampKeyName, canonicalizeJson, skippedDocumentsConsumer); } @Override @@ -38,7 +39,7 @@ public DocumentWriteSet apply(Context context) { Map existingHashes = rowTemplate.query(op -> op.fromLexicons(Map.of( "uri", op.cts.uriReference(), - "hash", op.cts.fieldReference(super.fieldName) + "hash", op.cts.fieldReference(super.hashKeyName) )).where( op.cts.documentQuery(op.xs.stringSeq(uris)) ), diff --git a/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilterTest.java b/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilterTest.java index f1fe81518..8c0cded96 100644 --- a/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilterTest.java +++ b/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilterTest.java @@ -9,6 +9,8 @@ import com.marklogic.client.io.StringHandle; import org.junit.jupiter.api.Test; +import java.time.Instant; + import static org.junit.jupiter.api.Assertions.assertEquals; /** @@ -32,7 +34,8 @@ void addHashToMetadata() { DocumentWriteOperation doc1 = new DocumentWriteOperationImpl("/1.xml", metadata, new StringHandle("")); DocumentWriteOperation doc2 = new DocumentWriteOperationImpl("/2.xml", metadata, new StringHandle("")); - doc2 = IncrementalWriteFilter.addHashToMetadata(doc2, "theField", "abc123"); + final String timestamp = Instant.now().toString(); + doc2 = IncrementalWriteFilter.addHashToMetadata(doc2, "theField", "abc123", "theTimestamp", timestamp); assertEquals(metadata, doc1.getMetadata(), "doc1 should still have the original metadata object"); @@ -44,5 +47,6 @@ void addHashToMetadata() { assertEquals("value1", metadata2.getMetadataValues().get("meta1"), "metadata value should be preserved"); assertEquals("abc123", metadata2.getMetadataValues().get("theField"), "hash field should be added"); + assertEquals(timestamp, metadata2.getMetadataValues().get("theTimestamp"), "timestamp should be added"); } } diff --git a/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteTest.java b/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteTest.java index 388473553..47ea0d28d 100644 --- a/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteTest.java +++ b/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteTest.java @@ -153,7 +153,7 @@ void invalidJsonWithFormat() { @Test void noRangeIndexForField() { filter = IncrementalWriteFilter.newBuilder() - .fieldName("non-existent-field") + .hashKeyName("non-existent-field") .build(); writeTenDocuments(); @@ -168,7 +168,7 @@ void noRangeIndexForField() { @Test void noRangeIndexForFieldWithEval() { filter = IncrementalWriteFilter.newBuilder() - .fieldName("non-existent-field") + .hashKeyName("non-existent-field") .useEvalQuery(true) .build(); @@ -218,8 +218,6 @@ private void verifyDocumentsHasHashInMetadataKey() { while (page.hasNext()) { DocumentRecord doc = page.next(); DocumentMetadataHandle metadata = doc.getMetadata(new DocumentMetadataHandle()); - assertTrue(metadata.getMetadataValues().containsKey("incrementalWriteHash"), - "Document " + doc.getUri() + " should have an incrementalWriteHash in its metadata values."); String hash = metadata.getMetadataValues().get("incrementalWriteHash"); try { @@ -228,6 +226,9 @@ private void verifyDocumentsHasHashInMetadataKey() { } catch (NumberFormatException e) { fail("Document " + doc.getUri() + " has an invalid incrementalWriteHash value: " + hash); } + + String timestamp = metadata.getMetadataValues().get("incrementalWriteTimestamp"); + assertNotNull(timestamp, "Document " + doc.getUri() + " should have an incrementalWriteTimestamp value."); } }