Skip to content

Commit d7f4b7a

Browse files
committed
MLE-26428 Added timestamp to metadata for incremental write
1 parent 57a023c commit d7f4b7a

File tree

5 files changed

+49
-25
lines changed

5 files changed

+49
-25
lines changed

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteEvalFilter.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,17 @@
2222
class IncrementalWriteEvalFilter extends IncrementalWriteFilter {
2323

2424
private static final String EVAL_SCRIPT = """
25-
const tuples = cts.valueTuples([cts.uriReference(), cts.fieldReference(fieldName)], null, cts.documentQuery(uris));
25+
const tuples = cts.valueTuples([cts.uriReference(), cts.fieldReference(hashKeyName)], null, cts.documentQuery(uris));
2626
const response = {};
2727
for (var tuple of tuples) {
2828
response[tuple[0]] = tuple[1];
2929
}
3030
response
3131
""";
3232

33-
IncrementalWriteEvalFilter(String fieldName, boolean canonicalizeJson, Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer) {
34-
super(fieldName, canonicalizeJson, skippedDocumentsConsumer);
33+
IncrementalWriteEvalFilter(String hashKeyName, String timestampKeyName, boolean canonicalizeJson,
34+
Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer) {
35+
super(hashKeyName, timestampKeyName, canonicalizeJson, skippedDocumentsConsumer);
3536
}
3637

3738
@Override
@@ -45,7 +46,7 @@ public DocumentWriteSet apply(DocumentWriteSetFilter.Context context) {
4546

4647
try {
4748
JsonNode response = context.getDatabaseClient().newServerEval().javascript(EVAL_SCRIPT)
48-
.addVariable("fieldName", fieldName)
49+
.addVariable("hashKeyName", hashKeyName)
4950
.addVariable("uris", new JacksonHandle(uris))
5051
.evalAs(JsonNode.class);
5152

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilter.java

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.io.IOException;
2020
import java.nio.charset.StandardCharsets;
21+
import java.time.Instant;
2122
import java.util.ArrayList;
2223
import java.util.List;
2324
import java.util.function.Consumer;
@@ -39,16 +40,25 @@ public static Builder newBuilder() {
3940

4041
public static class Builder {
4142

42-
private String fieldName = "incrementalWriteHash";
43+
private String hashKeyName = "incrementalWriteHash";
44+
private String timestampKeyName = "incrementalWriteTimestamp";
4345
private boolean canonicalizeJson = true;
4446
private boolean useEvalQuery = false;
4547
private Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer;
4648

4749
/**
48-
* @param fieldName the name of the MarkLogic field that will hold the hash value; defaults to "incrementalWriteHash".
50+
* @param keyName the name of the MarkLogic metadata key that will hold the hash value; defaults to "incrementalWriteHash".
4951
*/
50-
public Builder fieldName(String fieldName) {
51-
this.fieldName = fieldName;
52+
public Builder hashKeyName(String keyName) {
53+
this.hashKeyName = keyName;
54+
return this;
55+
}
56+
57+
/**
58+
* @param keyName the name of the MarkLogic metadata key that will hold the timestamp value; defaults to "incrementalWriteTimestamp".
59+
*/
60+
public Builder timestampKeyName(String keyName) {
61+
this.timestampKeyName = keyName;
5262
return this;
5363
}
5464

@@ -79,29 +89,32 @@ public Builder onDocumentsSkipped(Consumer<DocumentWriteOperation[]> skippedDocu
7989

8090
public IncrementalWriteFilter build() {
8191
if (useEvalQuery) {
82-
return new IncrementalWriteEvalFilter(fieldName, canonicalizeJson, skippedDocumentsConsumer);
92+
return new IncrementalWriteEvalFilter(hashKeyName, timestampKeyName, canonicalizeJson, skippedDocumentsConsumer);
8393
}
84-
return new IncrementalWriteOpticFilter(fieldName, canonicalizeJson, skippedDocumentsConsumer);
94+
return new IncrementalWriteOpticFilter(hashKeyName, timestampKeyName, canonicalizeJson, skippedDocumentsConsumer);
8595
}
8696
}
8797

88-
protected final String fieldName;
98+
protected final String hashKeyName;
99+
private final String timestampKeyName;
89100
private final boolean canonicalizeJson;
90101
private final Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer;
91102

92103
// Hardcoding this for now, with a good general purpose hashing function.
93104
// See https://xxhash.com for benchmarks.
94105
private final LongHashFunction hashFunction = LongHashFunction.xx3();
95106

96-
public IncrementalWriteFilter(String fieldName, boolean canonicalizeJson, Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer) {
97-
this.fieldName = fieldName;
107+
public IncrementalWriteFilter(String hashKeyName, String timestampKeyName, boolean canonicalizeJson, Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer) {
108+
this.hashKeyName = hashKeyName;
109+
this.timestampKeyName = timestampKeyName;
98110
this.canonicalizeJson = canonicalizeJson;
99111
this.skippedDocumentsConsumer = skippedDocumentsConsumer;
100112
}
101113

102114
protected final DocumentWriteSet filterDocuments(Context context, Function<String, String> hashRetriever) {
103115
final DocumentWriteSet newWriteSet = context.getDatabaseClient().newDocumentManager().newWriteSet();
104116
final List<DocumentWriteOperation> skippedDocuments = new ArrayList<>();
117+
final String timestamp = Instant.now().toString();
105118

106119
for (DocumentWriteOperation doc : context.getDocumentWriteSet()) {
107120
if (!DocumentWriteOperation.OperationType.DOCUMENT_WRITE.equals(doc.getOperationType())) {
@@ -117,14 +130,14 @@ protected final DocumentWriteSet filterDocuments(Context context, Function<Strin
117130

118131
if (existingHash != null) {
119132
if (!existingHash.equals(contentHash)) {
120-
newWriteSet.add(addHashToMetadata(doc, fieldName, contentHash));
133+
newWriteSet.add(addHashToMetadata(doc, hashKeyName, contentHash, timestampKeyName, timestamp));
121134
} else if (skippedDocumentsConsumer != null) {
122135
skippedDocuments.add(doc);
123136
} else {
124137
// No consumer, so skip the document silently.
125138
}
126139
} else {
127-
newWriteSet.add(addHashToMetadata(doc, fieldName, contentHash));
140+
newWriteSet.add(addHashToMetadata(doc, hashKeyName, contentHash, timestampKeyName, timestamp));
128141
}
129142
}
130143

@@ -173,7 +186,8 @@ private String computeHash(String content) {
173186
return Long.toHexString(hash);
174187
}
175188

176-
protected static DocumentWriteOperation addHashToMetadata(DocumentWriteOperation op, String fieldName, String hash) {
189+
protected static DocumentWriteOperation addHashToMetadata(DocumentWriteOperation op, String hashKeyName, String hash,
190+
String timestampKeyName, String timestamp) {
177191
DocumentMetadataHandle newMetadata = new DocumentMetadataHandle();
178192
if (op.getMetadata() != null) {
179193
DocumentMetadataHandle originalMetadata = (DocumentMetadataHandle) op.getMetadata();
@@ -183,7 +197,10 @@ protected static DocumentWriteOperation addHashToMetadata(DocumentWriteOperation
183197
newMetadata.setProperties(originalMetadata.getProperties());
184198
newMetadata.getMetadataValues().putAll(originalMetadata.getMetadataValues());
185199
}
186-
newMetadata.getMetadataValues().put(fieldName, hash);
200+
201+
newMetadata.getMetadataValues().put(hashKeyName, hash);
202+
newMetadata.getMetadataValues().put(timestampKeyName, timestamp);
203+
187204
return new DocumentWriteOperationImpl(op.getUri(), newMetadata, op.getContent(), op.getTemporalDocumentURI());
188205
}
189206
}

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteOpticFilter.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@
1919
*/
2020
class IncrementalWriteOpticFilter extends IncrementalWriteFilter {
2121

22-
IncrementalWriteOpticFilter(String fieldName, boolean canonicalizeJson, Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer) {
23-
super(fieldName, canonicalizeJson, skippedDocumentsConsumer);
22+
IncrementalWriteOpticFilter(String hashKeyName, String timestampKeyName, boolean canonicalizeJson,
23+
Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer) {
24+
super(hashKeyName, timestampKeyName, canonicalizeJson, skippedDocumentsConsumer);
2425
}
2526

2627
@Override
@@ -38,7 +39,7 @@ public DocumentWriteSet apply(Context context) {
3839
Map<String, String> existingHashes = rowTemplate.query(op ->
3940
op.fromLexicons(Map.of(
4041
"uri", op.cts.uriReference(),
41-
"hash", op.cts.fieldReference(super.fieldName)
42+
"hash", op.cts.fieldReference(super.hashKeyName)
4243
)).where(
4344
op.cts.documentQuery(op.xs.stringSeq(uris))
4445
),

marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilterTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
import com.marklogic.client.io.StringHandle;
1010
import org.junit.jupiter.api.Test;
1111

12+
import java.time.Instant;
13+
1214
import static org.junit.jupiter.api.Assertions.assertEquals;
1315

1416
/**
@@ -32,7 +34,8 @@ void addHashToMetadata() {
3234
DocumentWriteOperation doc1 = new DocumentWriteOperationImpl("/1.xml", metadata, new StringHandle("<doc1/>"));
3335
DocumentWriteOperation doc2 = new DocumentWriteOperationImpl("/2.xml", metadata, new StringHandle("<doc2/>"));
3436

35-
doc2 = IncrementalWriteFilter.addHashToMetadata(doc2, "theField", "abc123");
37+
final String timestamp = Instant.now().toString();
38+
doc2 = IncrementalWriteFilter.addHashToMetadata(doc2, "theField", "abc123", "theTimestamp", timestamp);
3639

3740
assertEquals(metadata, doc1.getMetadata(), "doc1 should still have the original metadata object");
3841

@@ -44,5 +47,6 @@ void addHashToMetadata() {
4447

4548
assertEquals("value1", metadata2.getMetadataValues().get("meta1"), "metadata value should be preserved");
4649
assertEquals("abc123", metadata2.getMetadataValues().get("theField"), "hash field should be added");
50+
assertEquals(timestamp, metadata2.getMetadataValues().get("theTimestamp"), "timestamp should be added");
4751
}
4852
}

marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteTest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ void invalidJsonWithFormat() {
153153
@Test
154154
void noRangeIndexForField() {
155155
filter = IncrementalWriteFilter.newBuilder()
156-
.fieldName("non-existent-field")
156+
.hashKeyName("non-existent-field")
157157
.build();
158158

159159
writeTenDocuments();
@@ -168,7 +168,7 @@ void noRangeIndexForField() {
168168
@Test
169169
void noRangeIndexForFieldWithEval() {
170170
filter = IncrementalWriteFilter.newBuilder()
171-
.fieldName("non-existent-field")
171+
.hashKeyName("non-existent-field")
172172
.useEvalQuery(true)
173173
.build();
174174

@@ -218,8 +218,6 @@ private void verifyDocumentsHasHashInMetadataKey() {
218218
while (page.hasNext()) {
219219
DocumentRecord doc = page.next();
220220
DocumentMetadataHandle metadata = doc.getMetadata(new DocumentMetadataHandle());
221-
assertTrue(metadata.getMetadataValues().containsKey("incrementalWriteHash"),
222-
"Document " + doc.getUri() + " should have an incrementalWriteHash in its metadata values.");
223221

224222
String hash = metadata.getMetadataValues().get("incrementalWriteHash");
225223
try {
@@ -228,6 +226,9 @@ private void verifyDocumentsHasHashInMetadataKey() {
228226
} catch (NumberFormatException e) {
229227
fail("Document " + doc.getUri() + " has an invalid incrementalWriteHash value: " + hash);
230228
}
229+
230+
String timestamp = metadata.getMetadataValues().get("incrementalWriteTimestamp");
231+
assertNotNull(timestamp, "Document " + doc.getUri() + " should have an incrementalWriteTimestamp value.");
231232
}
232233
}
233234

0 commit comments

Comments
 (0)