Skip to content

Commit f6cfa96

Browse files
committed
MLE-26420 Can now perform incremental writes
IncrementalWriteFilter is the entry point, with a Builder for customizing its behavior.
1 parent 1c8dd85 commit f6cfa96

File tree

7 files changed

+594
-0
lines changed

7 files changed

+594
-0
lines changed

marklogic-client-api/build.gradle

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ dependencies {
3737
implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}"
3838
implementation "com.fasterxml.jackson.dataformat:jackson-dataformat-csv:${jacksonVersion}"
3939

40+
// Dependencies for hash generation. Can be safely omitted if not using the incremental write feature. But neither
41+
// has any transitive dependencies, and thus their impact on the dependency tree is minimal.
42+
implementation "io.github.erdtman:java-json-canonicalization:1.1"
43+
implementation "net.openhft:zero-allocation-hashing:0.27ea1"
44+
4045
// Only used by extras (which some examples then depend on)
4146
compileOnly 'org.jdom:jdom2:2.0.6.1'
4247
compileOnly 'org.dom4j:dom4j:2.2.0'
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
3+
*/
4+
package com.marklogic.client.datamovement.filter;
5+
6+
import com.fasterxml.jackson.databind.JsonNode;
7+
import com.fasterxml.jackson.databind.ObjectMapper;
8+
import com.fasterxml.jackson.databind.node.ArrayNode;
9+
import com.marklogic.client.datamovement.DocumentWriteSetFilter;
10+
import com.marklogic.client.document.DocumentWriteOperation;
11+
import com.marklogic.client.document.DocumentWriteSet;
12+
import com.marklogic.client.io.JacksonHandle;
13+
14+
import java.util.function.Consumer;
15+
16+
/**
17+
* Uses server-side JavaScript code to get the existing hash values for a set of URIs.
18+
*
19+
* @since 8.1.0
20+
*/
21+
class IncrementalWriteEvalFilter extends IncrementalWriteFilter {
22+
23+
private static final String EVAL_SCRIPT = """
24+
const tuples = cts.valueTuples([cts.uriReference(), cts.fieldReference(fieldName)], null, cts.documentQuery(uris));
25+
const response = {};
26+
for (var tuple of tuples) {
27+
response[tuple[0]] = tuple[1];
28+
}
29+
response
30+
""";
31+
32+
IncrementalWriteEvalFilter(String fieldName, boolean canonicalizeJson, Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer) {
33+
super(fieldName, canonicalizeJson, skippedDocumentsConsumer);
34+
}
35+
36+
@Override
37+
public DocumentWriteSet apply(DocumentWriteSetFilter.Context context) {
38+
ArrayNode uris = new ObjectMapper().createArrayNode();
39+
for (DocumentWriteOperation doc : context.getDocumentWriteSet()) {
40+
if (DocumentWriteOperation.OperationType.DOCUMENT_WRITE.equals(doc.getOperationType())) {
41+
uris.add(doc.getUri());
42+
}
43+
}
44+
45+
JsonNode response = context.getDatabaseClient().newServerEval().javascript(EVAL_SCRIPT)
46+
.addVariable("fieldName", fieldName)
47+
.addVariable("uris", new JacksonHandle(uris))
48+
.evalAs(JsonNode.class);
49+
50+
return filterDocuments(context, uri -> {
51+
if (response.has(uri)) {
52+
return response.get(uri).asText();
53+
}
54+
return null;
55+
});
56+
}
57+
}
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
/*
2+
* Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
3+
*/
4+
package com.marklogic.client.datamovement.filter;
5+
6+
import com.marklogic.client.datamovement.DocumentWriteSetFilter;
7+
import com.marklogic.client.document.DocumentWriteOperation;
8+
import com.marklogic.client.document.DocumentWriteSet;
9+
import com.marklogic.client.impl.DocumentWriteOperationImpl;
10+
import com.marklogic.client.impl.HandleAccessor;
11+
import com.marklogic.client.io.BaseHandle;
12+
import com.marklogic.client.io.DocumentMetadataHandle;
13+
import com.marklogic.client.io.Format;
14+
import net.openhft.hashing.LongHashFunction;
15+
import org.erdtman.jcs.JsonCanonicalizer;
16+
import org.slf4j.Logger;
17+
import org.slf4j.LoggerFactory;
18+
19+
import java.io.IOException;
20+
import java.nio.charset.StandardCharsets;
21+
import java.util.ArrayList;
22+
import java.util.List;
23+
import java.util.function.Consumer;
24+
import java.util.function.Function;
25+
26+
/**
27+
* A DocumentWriteSetFilter that skips writing documents whose content has not changed since the last write
28+
* based on a hash value stored in a MarkLogic field.
29+
*
30+
* @since 8.1.0
31+
*/
32+
public abstract class IncrementalWriteFilter implements DocumentWriteSetFilter {
33+
34+
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
35+
36+
public static Builder newBuilder() {
37+
return new Builder();
38+
}
39+
40+
public static class Builder {
41+
42+
private String fieldName = "incrementalWriteHash";
43+
private boolean canonicalizeJson = true;
44+
private boolean useEvalQuery = false;
45+
private Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer;
46+
47+
/**
48+
* @param fieldName the name of the MarkLogic field that will hold the hash value; defaults to "incrementalWriteHash".
49+
*/
50+
public Builder fieldName(String fieldName) {
51+
this.fieldName = fieldName;
52+
return this;
53+
}
54+
55+
/**
56+
* @param canonicalizeJson whether to canonicalize JSON content before hashing; defaults to true.
57+
* Delegates to https://github.com/erdtman/java-json-canonicalization for canonicalization.
58+
*/
59+
public Builder canonicalizeJson(boolean canonicalizeJson) {
60+
this.canonicalizeJson = canonicalizeJson;
61+
return this;
62+
}
63+
64+
/**
65+
* @param useEvalQuery if true, evaluate server-side JavaScript instead of an Optic query for retrieving hash values; defaults to false.
66+
*/
67+
public Builder useEvalQuery(boolean useEvalQuery) {
68+
this.useEvalQuery = useEvalQuery;
69+
return this;
70+
}
71+
72+
/**
73+
* @param skippedDocumentsConsumer a consumer that will be called with any documents in a batch that were skipped because their content had not changed.
74+
*/
75+
public Builder onDocumentsSkipped(Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer) {
76+
this.skippedDocumentsConsumer = skippedDocumentsConsumer;
77+
return this;
78+
}
79+
80+
public IncrementalWriteFilter build() {
81+
if (useEvalQuery) {
82+
return new IncrementalWriteEvalFilter(fieldName, canonicalizeJson, skippedDocumentsConsumer);
83+
}
84+
return new IncrementalWriteOpticFilter(fieldName, canonicalizeJson, skippedDocumentsConsumer);
85+
}
86+
}
87+
88+
protected final String fieldName;
89+
private final boolean canonicalizeJson;
90+
private final Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer;
91+
92+
// Hardcoding this for now, with a good general purpose hashing function.
93+
// See https://xxhash.com for benchmarks.
94+
private final LongHashFunction hashFunction = LongHashFunction.xx3();
95+
96+
public IncrementalWriteFilter(String fieldName, boolean canonicalizeJson, Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer) {
97+
this.fieldName = fieldName;
98+
this.canonicalizeJson = canonicalizeJson;
99+
this.skippedDocumentsConsumer = skippedDocumentsConsumer;
100+
}
101+
102+
protected final DocumentWriteSet filterDocuments(Context context, Function<String, String> hashRetriever) {
103+
final DocumentWriteSet newWriteSet = context.getDatabaseClient().newDocumentManager().newWriteSet();
104+
final List<DocumentWriteOperation> skippedDocuments = new ArrayList<>();
105+
106+
for (DocumentWriteOperation doc : context.getDocumentWriteSet()) {
107+
if (!DocumentWriteOperation.OperationType.DOCUMENT_WRITE.equals(doc.getOperationType())) {
108+
newWriteSet.add(doc);
109+
continue;
110+
}
111+
112+
final String contentHash = serializeContent(doc);
113+
final String existingHash = hashRetriever.apply(doc.getUri());
114+
if (logger.isTraceEnabled()) {
115+
logger.trace("URI: {}, existing Hash: {}, new Hash: {}", doc.getUri(), existingHash, contentHash);
116+
}
117+
118+
if (existingHash != null) {
119+
if (!existingHash.equals(contentHash)) {
120+
newWriteSet.add(addHashToMetadata(doc, fieldName, contentHash));
121+
} else if (skippedDocumentsConsumer != null) {
122+
skippedDocuments.add(doc);
123+
} else {
124+
// No consumer, so skip the document silently.
125+
}
126+
} else {
127+
newWriteSet.add(addHashToMetadata(doc, fieldName, contentHash));
128+
}
129+
}
130+
131+
if (!skippedDocuments.isEmpty()) {
132+
skippedDocumentsConsumer.accept(skippedDocuments.toArray(new DocumentWriteOperation[0]));
133+
}
134+
135+
return newWriteSet;
136+
}
137+
138+
private String serializeContent(DocumentWriteOperation doc) {
139+
String content = HandleAccessor.contentAsString(doc.getContent());
140+
141+
Format format = null;
142+
if (doc.getContent() instanceof BaseHandle<?, ?> baseHandle) {
143+
format = baseHandle.getFormat();
144+
}
145+
146+
if (canonicalizeJson && (Format.JSON.equals(format) || isPossiblyJsonContent(content))) {
147+
JsonCanonicalizer jc;
148+
try {
149+
jc = new JsonCanonicalizer(content);
150+
return jc.getEncodedString();
151+
} catch (IOException e) {
152+
// Going to improve this in the next PR, as I think we can throw an exception if Format = JSON.
153+
logger.warn("Unable to canonicalize JSON content for URI {}, using original content for hashing; cause: {}",
154+
doc.getUri(), e.getMessage());
155+
}
156+
}
157+
158+
return content;
159+
}
160+
161+
private boolean isPossiblyJsonContent(String content) {
162+
// This isn't 100% reliable, as the content could be text that just happens to start with { or [, and so
163+
// we'll still need to catch an exception if we try to canonicalize non-JSON content.
164+
String trimmed = content.trim();
165+
return trimmed.startsWith("{") || trimmed.startsWith("[");
166+
}
167+
168+
private String computeHash(String content) {
169+
byte[] bytes = content.getBytes(StandardCharsets.UTF_8);
170+
long hash = hashFunction.hashBytes(bytes);
171+
return Long.toHexString(hash);
172+
}
173+
174+
protected static DocumentWriteOperation addHashToMetadata(DocumentWriteOperation op, String fieldName, String hash) {
175+
DocumentMetadataHandle newMetadata = new DocumentMetadataHandle();
176+
if (op.getMetadata() != null) {
177+
DocumentMetadataHandle originalMetadata = (DocumentMetadataHandle) op.getMetadata();
178+
newMetadata.setPermissions(originalMetadata.getPermissions());
179+
newMetadata.setCollections(originalMetadata.getCollections());
180+
newMetadata.setQuality(originalMetadata.getQuality());
181+
newMetadata.setProperties(originalMetadata.getProperties());
182+
newMetadata.getMetadataValues().putAll(originalMetadata.getMetadataValues());
183+
}
184+
newMetadata.getMetadataValues().put(fieldName, hash);
185+
return new DocumentWriteOperationImpl(op.getUri(), newMetadata, op.getContent(), op.getTemporalDocumentURI());
186+
}
187+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
3+
*/
4+
package com.marklogic.client.datamovement.filter;
5+
6+
import com.marklogic.client.document.DocumentWriteOperation;
7+
import com.marklogic.client.document.DocumentWriteSet;
8+
import com.marklogic.client.row.RowTemplate;
9+
10+
import java.util.HashMap;
11+
import java.util.Map;
12+
import java.util.function.Consumer;
13+
14+
/**
15+
* Uses an Optic query to get the existing hash values for a set of URIs.
16+
*
17+
* @since 8.1.0
18+
*/
19+
class IncrementalWriteOpticFilter extends IncrementalWriteFilter {
20+
21+
IncrementalWriteOpticFilter(String fieldName, boolean canonicalizeJson, Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer) {
22+
super(fieldName, canonicalizeJson, skippedDocumentsConsumer);
23+
}
24+
25+
@Override
26+
public DocumentWriteSet apply(Context context) {
27+
final String[] uris = context.getDocumentWriteSet().stream()
28+
.filter(op -> DocumentWriteOperation.OperationType.DOCUMENT_WRITE.equals(op.getOperationType()))
29+
.map(DocumentWriteOperation::getUri)
30+
.toArray(String[]::new);
31+
32+
// It doesn't seem possible yet to use a DSL query and bind an array of strings to a "uris" param, so using
33+
// a serialized query instead. That doesn't allow a user to override the query though.
34+
Map<String, String> existingHashes = new RowTemplate(context.getDatabaseClient()).query(op ->
35+
op.fromLexicons(Map.of(
36+
"uri", op.cts.uriReference(),
37+
"hash", op.cts.fieldReference(super.fieldName)
38+
)).where(
39+
op.cts.documentQuery(op.xs.stringSeq(uris))
40+
),
41+
42+
rows -> {
43+
Map<String, String> map = new HashMap<>();
44+
rows.forEach(row -> {
45+
String uri = row.getString("uri");
46+
String existingHash = row.getString("hash");
47+
map.put(uri, existingHash);
48+
});
49+
return map;
50+
}
51+
);
52+
53+
return filterDocuments(context, uri -> existingHashes.get(uri));
54+
}
55+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
3+
*/
4+
package com.marklogic.client.datamovement.filter;
5+
6+
import com.marklogic.client.document.DocumentWriteOperation;
7+
import com.marklogic.client.impl.DocumentWriteOperationImpl;
8+
import com.marklogic.client.io.DocumentMetadataHandle;
9+
import com.marklogic.client.io.StringHandle;
10+
import org.junit.jupiter.api.Test;
11+
12+
import static org.junit.jupiter.api.Assertions.assertEquals;
13+
14+
/**
15+
* Unit tests that make no connection to MarkLogic.
16+
*/
17+
class IncrementalWriteFilterTest {
18+
19+
/**
20+
* Verifies that when a hash is added, a new metadata object is created so that a doc-specific hash field can be
21+
* added without affecting any other document that might be sharing the same metadata object.
22+
*/
23+
@Test
24+
void addHashToMetadata() {
25+
DocumentMetadataHandle metadata = new DocumentMetadataHandle()
26+
.withCollections("c1")
27+
.withPermission("rest-reader", DocumentMetadataHandle.Capability.READ)
28+
.withQuality(2)
29+
.withProperty("prop1", "value1")
30+
.withMetadataValue("meta1", "value1");
31+
32+
DocumentWriteOperation doc1 = new DocumentWriteOperationImpl("/1.xml", metadata, new StringHandle("<doc1/>"));
33+
DocumentWriteOperation doc2 = new DocumentWriteOperationImpl("/2.xml", metadata, new StringHandle("<doc2/>"));
34+
35+
doc2 = IncrementalWriteFilter.addHashToMetadata(doc2, "theField", "abc123");
36+
37+
assertEquals(metadata, doc1.getMetadata(), "doc1 should still have the original metadata object");
38+
39+
DocumentMetadataHandle metadata2 = (DocumentMetadataHandle) doc2.getMetadata();
40+
assertEquals("c1", metadata2.getCollections().iterator().next(), "collection should be preserved");
41+
assertEquals(DocumentMetadataHandle.Capability.READ, metadata2.getPermissions().get("rest-reader").iterator().next(), "permission should be preserved");
42+
assertEquals(2, metadata2.getQuality(), "quality should be preserved");
43+
assertEquals("value1", metadata2.getProperties().get("prop1"), "property should be preserved");
44+
45+
assertEquals("value1", metadata2.getMetadataValues().get("meta1"), "metadata value should be preserved");
46+
assertEquals("abc123", metadata2.getMetadataValues().get("theField"), "hash field should be added");
47+
}
48+
}

0 commit comments

Comments
 (0)