Skip to content

Commit 093f7ea

Browse files
committed
MLE-26420 Can now perform incremental writes
IncrementalWriteFilter is the entry point, with a Builder for customizing its behavior.
1 parent 1c8dd85 commit 093f7ea

File tree

7 files changed

+585
-0
lines changed

7 files changed

+585
-0
lines changed

marklogic-client-api/build.gradle

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ dependencies {
3737
implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}"
3838
implementation "com.fasterxml.jackson.dataformat:jackson-dataformat-csv:${jacksonVersion}"
3939

40+
// Dependencies for hash generation. Can be safely omitted if not using the incremental write feature. But neither
41+
// has any transitive dependencies, and thus their impact on the dependency tree is minimal.
42+
implementation "io.github.erdtman:java-json-canonicalization:1.1"
43+
implementation "net.openhft:zero-allocation-hashing:0.27ea1"
44+
4045
// Only used by extras (which some examples then depend on)
4146
compileOnly 'org.jdom:jdom2:2.0.6.1'
4247
compileOnly 'org.dom4j:dom4j:2.2.0'
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
3+
*/
4+
package com.marklogic.client.datamovement.filter;
5+
6+
import com.fasterxml.jackson.databind.JsonNode;
7+
import com.fasterxml.jackson.databind.ObjectMapper;
8+
import com.fasterxml.jackson.databind.node.ArrayNode;
9+
import com.marklogic.client.datamovement.DocumentWriteSetFilter;
10+
import com.marklogic.client.document.DocumentWriteOperation;
11+
import com.marklogic.client.document.DocumentWriteSet;
12+
import com.marklogic.client.io.JacksonHandle;
13+
14+
import java.util.function.Consumer;
15+
16+
/**
17+
* Uses server-side JavaScript code to get the existing hash values for a set of URIs.
18+
*
19+
* @since 8.1.0
20+
*/
21+
class IncrementalWriteEvalFilter extends IncrementalWriteFilter {
22+
23+
private static final String EVAL_SCRIPT = """
24+
const tuples = cts.valueTuples([cts.uriReference(), cts.fieldReference(fieldName)], null, cts.documentQuery(uris));
25+
const response = {};
26+
for (var tuple of tuples) {
27+
response[tuple[0]] = tuple[1];
28+
}
29+
response
30+
""";
31+
32+
IncrementalWriteEvalFilter(String fieldName, boolean canonicalizeJson, Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer) {
33+
super(fieldName, canonicalizeJson, skippedDocumentsConsumer);
34+
}
35+
36+
@Override
37+
public DocumentWriteSet apply(DocumentWriteSetFilter.Context context) {
38+
ArrayNode uris = new ObjectMapper().createArrayNode();
39+
context.getDocumentWriteSet().stream().forEach(op -> {
40+
if (DocumentWriteOperation.OperationType.DOCUMENT_WRITE.equals(op.getOperationType())) {
41+
uris.add(op.getUri());
42+
}
43+
});
44+
45+
JsonNode response = context.getDatabaseClient().newServerEval().javascript(EVAL_SCRIPT)
46+
.addVariable("fieldName", fieldName)
47+
.addVariable("uris", new JacksonHandle(uris))
48+
.evalAs(JsonNode.class);
49+
50+
return filterDocuments(context, uri -> {
51+
if (response.has(uri)) {
52+
return response.get(uri).asText();
53+
}
54+
return null;
55+
});
56+
}
57+
}
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
/*
2+
* Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
3+
*/
4+
package com.marklogic.client.datamovement.filter;
5+
6+
import com.marklogic.client.datamovement.DocumentWriteSetFilter;
7+
import com.marklogic.client.document.DocumentWriteOperation;
8+
import com.marklogic.client.document.DocumentWriteSet;
9+
import com.marklogic.client.impl.DocumentWriteOperationImpl;
10+
import com.marklogic.client.impl.HandleAccessor;
11+
import com.marklogic.client.io.BaseHandle;
12+
import com.marklogic.client.io.DocumentMetadataHandle;
13+
import com.marklogic.client.io.Format;
14+
import com.marklogic.client.io.marker.AbstractWriteHandle;
15+
import net.openhft.hashing.LongHashFunction;
16+
import org.erdtman.jcs.JsonCanonicalizer;
17+
import org.slf4j.Logger;
18+
import org.slf4j.LoggerFactory;
19+
20+
import java.io.IOException;
21+
import java.nio.charset.StandardCharsets;
22+
import java.util.ArrayList;
23+
import java.util.List;
24+
import java.util.function.Consumer;
25+
import java.util.function.Function;
26+
27+
/**
28+
* A DocumentWriteSetFilter that skips writing documents whose content has not changed since the last write
29+
* based on a hash value stored in a MarkLogic field.
30+
*
31+
* @since 8.1.0
32+
*/
33+
public abstract class IncrementalWriteFilter implements DocumentWriteSetFilter {
34+
35+
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
36+
37+
public static Builder newBuilder() {
38+
return new Builder();
39+
}
40+
41+
public static class Builder {
42+
43+
private String fieldName = "incrementalWriteHash";
44+
private boolean canonicalizeJson = true;
45+
private boolean useEvalQuery = false;
46+
private Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer;
47+
48+
/**
49+
* @param fieldName the name of the MarkLogic field that will hold the hash value; defaults to "incrementalWriteHash".
50+
*/
51+
public Builder fieldName(String fieldName) {
52+
this.fieldName = fieldName;
53+
return this;
54+
}
55+
56+
/**
57+
* @param canonicalizeJson whether to canonicalize JSON content before hashing; defaults to true.
58+
* Delegates to https://github.com/erdtman/java-json-canonicalization for canonicalization.
59+
*/
60+
public Builder canonicalizeJson(boolean canonicalizeJson) {
61+
this.canonicalizeJson = canonicalizeJson;
62+
return this;
63+
}
64+
65+
/**
66+
* @param useEvalQuery if true, evaluate server-side JavaScript instead of an Optic query for retrieving hash values; defaults to false.
67+
*/
68+
public Builder useEvalQuery(boolean useEvalQuery) {
69+
this.useEvalQuery = useEvalQuery;
70+
return this;
71+
}
72+
73+
/**
74+
* @param skippedDocumentsConsumer a consumer that will be called with any documents in a batch that were skipped because their content had not changed.
75+
*/
76+
public Builder onDocumentsSkipped(Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer) {
77+
this.skippedDocumentsConsumer = skippedDocumentsConsumer;
78+
return this;
79+
}
80+
81+
public IncrementalWriteFilter build() {
82+
if (useEvalQuery) {
83+
return new IncrementalWriteEvalFilter(fieldName, canonicalizeJson, skippedDocumentsConsumer);
84+
}
85+
return new IncrementalWriteOpticFilter(fieldName, canonicalizeJson, skippedDocumentsConsumer);
86+
}
87+
}
88+
89+
protected final String fieldName;
90+
private final boolean canonicalizeJson;
91+
private final Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer;
92+
93+
// Hardcoding this for now, with a good general purpose hashing function.
94+
// See https://xxhash.com for benchmarks.
95+
private final LongHashFunction hashFunction = LongHashFunction.xx3();
96+
97+
public IncrementalWriteFilter(String fieldName, boolean canonicalizeJson, Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer) {
98+
this.fieldName = fieldName;
99+
this.canonicalizeJson = canonicalizeJson;
100+
this.skippedDocumentsConsumer = skippedDocumentsConsumer;
101+
}
102+
103+
protected final DocumentWriteSet filterDocuments(Context context, Function<String, String> hashRetriever) {
104+
final DocumentWriteSet newWriteSet = context.getDatabaseClient().newDocumentManager().newWriteSet();
105+
final List<DocumentWriteOperation> skippedDocuments = new ArrayList<>();
106+
107+
for (DocumentWriteOperation doc : context.getDocumentWriteSet()) {
108+
if (!DocumentWriteOperation.OperationType.DOCUMENT_WRITE.equals(doc.getOperationType())) {
109+
newWriteSet.add(doc);
110+
continue;
111+
}
112+
113+
final String content = serializeContent(doc.getContent());
114+
final String contentHash = computeHash(content);
115+
final String existingHash = hashRetriever.apply(doc.getUri());
116+
if (logger.isTraceEnabled()) {
117+
logger.trace("URI: {}, existing Hash: {}, new Hash: {}", doc.getUri(), existingHash, contentHash);
118+
}
119+
120+
if (existingHash != null) {
121+
if (!existingHash.equals(contentHash)) {
122+
newWriteSet.add(addHashToMetadata(doc, fieldName, contentHash));
123+
} else if (skippedDocumentsConsumer != null) {
124+
skippedDocuments.add(doc);
125+
}
126+
} else {
127+
newWriteSet.add(addHashToMetadata(doc, fieldName, contentHash));
128+
}
129+
}
130+
131+
if (!skippedDocuments.isEmpty()) {
132+
skippedDocumentsConsumer.accept(skippedDocuments.toArray(new DocumentWriteOperation[0]));
133+
}
134+
135+
return newWriteSet;
136+
}
137+
138+
private String serializeContent(AbstractWriteHandle contentHandle) {
139+
String content = HandleAccessor.contentAsString(contentHandle);
140+
141+
Format format = null;
142+
if (contentHandle instanceof BaseHandle<?, ?> baseHandle) {
143+
format = baseHandle.getFormat();
144+
}
145+
146+
if (canonicalizeJson && (Format.JSON.equals(format) || content.startsWith("{"))) {
147+
JsonCanonicalizer jc;
148+
try {
149+
jc = new JsonCanonicalizer(content);
150+
} catch (IOException e) {
151+
throw new RuntimeException("Unable to parse JSON content, cause: " + e.getMessage(), e);
152+
}
153+
return jc.getEncodedString();
154+
}
155+
156+
return content;
157+
}
158+
159+
private String computeHash(String content) {
160+
byte[] bytes = content.getBytes(StandardCharsets.UTF_8);
161+
long hash = hashFunction.hashBytes(bytes);
162+
return Long.toHexString(hash);
163+
}
164+
165+
protected static DocumentWriteOperation addHashToMetadata(DocumentWriteOperation op, String fieldName, String hash) {
166+
DocumentMetadataHandle newMetadata = new DocumentMetadataHandle();
167+
if (op.getMetadata() != null) {
168+
DocumentMetadataHandle originalMetadata = (DocumentMetadataHandle) op.getMetadata();
169+
newMetadata.setPermissions(originalMetadata.getPermissions());
170+
newMetadata.setCollections(originalMetadata.getCollections());
171+
newMetadata.setQuality(originalMetadata.getQuality());
172+
newMetadata.setProperties(originalMetadata.getProperties());
173+
newMetadata.getMetadataValues().putAll(originalMetadata.getMetadataValues());
174+
}
175+
newMetadata.getMetadataValues().put(fieldName, hash);
176+
return new DocumentWriteOperationImpl(op.getUri(), newMetadata, op.getContent(), op.getTemporalDocumentURI());
177+
}
178+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
3+
*/
4+
package com.marklogic.client.datamovement.filter;
5+
6+
import com.marklogic.client.document.DocumentWriteOperation;
7+
import com.marklogic.client.document.DocumentWriteSet;
8+
import com.marklogic.client.row.RowTemplate;
9+
10+
import java.util.HashMap;
11+
import java.util.Map;
12+
import java.util.function.Consumer;
13+
14+
/**
15+
* Uses an Optic query to get the existing hash values for a set of URIs.
16+
*
17+
* @since 8.1.0
18+
*/
19+
class IncrementalWriteOpticFilter extends IncrementalWriteFilter {
20+
21+
IncrementalWriteOpticFilter(String fieldName, boolean canonicalizeJson, Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer) {
22+
super(fieldName, canonicalizeJson, skippedDocumentsConsumer);
23+
}
24+
25+
@Override
26+
public DocumentWriteSet apply(Context context) {
27+
final String[] uris = context.getDocumentWriteSet().stream()
28+
.filter(op -> DocumentWriteOperation.OperationType.DOCUMENT_WRITE.equals(op.getOperationType()))
29+
.map(DocumentWriteOperation::getUri)
30+
.toArray(String[]::new);
31+
32+
// It doesn't seem possible yet to use a DSL query and bind an array of strings to a "uris" param, so using
33+
// a serialized query instead. That doesn't allow a user to override the query though.
34+
Map<String, String> existingHashes = new RowTemplate(context.getDatabaseClient()).query(op ->
35+
op.fromLexicons(Map.of(
36+
"uri", op.cts.uriReference(),
37+
"hash", op.cts.fieldReference(super.fieldName)
38+
)).where(
39+
op.cts.documentQuery(op.xs.stringSeq(uris))
40+
),
41+
42+
rows -> {
43+
Map<String, String> map = new HashMap<>();
44+
rows.forEach(row -> {
45+
String uri = row.getString("uri");
46+
String existingHash = row.getString("hash");
47+
map.put(uri, existingHash);
48+
});
49+
return map;
50+
}
51+
);
52+
53+
return filterDocuments(context, uri -> existingHashes.get(uri));
54+
}
55+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
3+
*/
4+
package com.marklogic.client.datamovement.filter;
5+
6+
import com.marklogic.client.document.DocumentWriteOperation;
7+
import com.marklogic.client.impl.DocumentWriteOperationImpl;
8+
import com.marklogic.client.io.DocumentMetadataHandle;
9+
import com.marklogic.client.io.StringHandle;
10+
import org.junit.jupiter.api.Test;
11+
12+
import static org.junit.jupiter.api.Assertions.assertEquals;
13+
14+
/**
15+
* Unit tests that make no connection to MarkLogic.
16+
*/
17+
class IncrementalWriteFilterTest {
18+
19+
/**
20+
* Verifies that when a hash is added, a new metadata object is created so that a doc-specific hash field can be
21+
* added without affecting any other document that might be sharing the same metadata object.
22+
*/
23+
@Test
24+
void addHashToMetadata() {
25+
DocumentMetadataHandle metadata = new DocumentMetadataHandle()
26+
.withCollections("c1")
27+
.withPermission("rest-reader", DocumentMetadataHandle.Capability.READ)
28+
.withQuality(2)
29+
.withProperty("prop1", "value1")
30+
.withMetadataValue("meta1", "value1");
31+
32+
DocumentWriteOperation doc1 = new DocumentWriteOperationImpl("/1.xml", metadata, new StringHandle("<doc1/>"));
33+
DocumentWriteOperation doc2 = new DocumentWriteOperationImpl("/2.xml", metadata, new StringHandle("<doc2/>"));
34+
35+
doc2 = IncrementalWriteFilter.addHashToMetadata(doc2, "theField", "abc123");
36+
37+
assertEquals(metadata, doc1.getMetadata(), "doc1 should still have the original metadata object");
38+
39+
DocumentMetadataHandle metadata2 = (DocumentMetadataHandle) doc2.getMetadata();
40+
assertEquals("c1", metadata2.getCollections().iterator().next(), "collection should be preserved");
41+
assertEquals(DocumentMetadataHandle.Capability.READ, metadata2.getPermissions().get("rest-reader").iterator().next(), "permission should be preserved");
42+
assertEquals(2, metadata2.getQuality(), "quality should be preserved");
43+
assertEquals("value1", metadata2.getProperties().get("prop1"), "property should be preserved");
44+
45+
assertEquals("value1", metadata2.getMetadataValues().get("meta1"), "metadata value should be preserved");
46+
assertEquals("abc123", metadata2.getMetadataValues().get("theField"), "hash field should be added");
47+
}
48+
}

0 commit comments

Comments
 (0)