Skip to content

Commit 975a342

Browse files
committed
MLE-26420 Can now perform incremental writes
Added DocumentWriteSetFilter as a generic interface for modifying a DocumentWriteSet before it's written. IncrementalWriteFilter is then the entry point, with a Builder for customizing its behavior. Also started moving some tests into "com.marklogic.client.datamovement" so we can have unit tests that verify protected methods.
1 parent b452860 commit 975a342

File tree

15 files changed

+690
-90
lines changed

15 files changed

+690
-90
lines changed

marklogic-client-api/build.gradle

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ dependencies {
3737
implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}"
3838
implementation "com.fasterxml.jackson.dataformat:jackson-dataformat-csv:${jacksonVersion}"
3939

40+
// Dependencies for hash generation. Can be safely omitted if not using the incremental write feature. But neither
41+
// has any transitive dependencies, and thus their impact on the dependency tree is minimal.
42+
implementation "io.github.erdtman:java-json-canonicalization:1.1"
43+
implementation "net.openhft:zero-allocation-hashing:0.27ea1"
44+
4045
// Only used by extras (which some examples then depend on)
4146
compileOnly 'org.jdom:jdom2:2.0.6.1'
4247
compileOnly 'org.dom4j:dom4j:2.2.0'
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
3+
*/
4+
package com.marklogic.client.datamovement;
5+
6+
import com.marklogic.client.DatabaseClient;
7+
import com.marklogic.client.document.DocumentWriteSet;
8+
9+
import java.util.function.Function;
10+
11+
/**
12+
* A filter that can modify a DocumentWriteSet before it is written to the database.
13+
*
14+
* @since 8.1.0
15+
*/
16+
public interface DocumentWriteSetFilter extends Function<DocumentWriteSetFilter.Context, DocumentWriteSet> {
17+
18+
interface Context {
19+
/**
20+
* @return the DocumentWriteSet to be written
21+
*/
22+
DocumentWriteSet getDocumentWriteSet();
23+
24+
/**
25+
* @return the batch number
26+
*/
27+
long getBatchNumber();
28+
29+
/**
30+
* @return the DatabaseClient being used for this batch
31+
*/
32+
DatabaseClient getDatabaseClient();
33+
34+
/**
35+
* @return the temporal collection name, or null if not writing to a temporal collection
36+
*/
37+
String getTemporalCollection();
38+
}
39+
}

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/WriteBatcher.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,4 +357,17 @@ WriteBatcher addAs(String uri, DocumentMetadataWriteHandle metadataHandle,
357357
* @param writeBatch the information about the batch that failed
358358
*/
359359
void retryWithFailureListeners(WriteBatch writeBatch);
360+
361+
/**
362+
* Sets a filter to modify or replace the DocumentWriteSet before it is written.
363+
* The filter can return either the modified DocumentWriteSet or a new one.
364+
* If the filter returns null or an empty DocumentWriteSet, no write will occur.
365+
*
366+
* @param filter the function to apply before writing
367+
* @return this instance for method chaining
368+
* @since 8.1.0
369+
*/
370+
default WriteBatcher withDocumentWriteSetFilter(DocumentWriteSetFilter filter) {
371+
return this;
372+
}
360373
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
3+
*/
4+
package com.marklogic.client.datamovement.filter;
5+
6+
import com.fasterxml.jackson.databind.JsonNode;
7+
import com.fasterxml.jackson.databind.ObjectMapper;
8+
import com.fasterxml.jackson.databind.node.ArrayNode;
9+
import com.marklogic.client.datamovement.DocumentWriteSetFilter;
10+
import com.marklogic.client.document.DocumentWriteOperation;
11+
import com.marklogic.client.document.DocumentWriteSet;
12+
import com.marklogic.client.io.JacksonHandle;
13+
14+
import java.util.function.Consumer;
15+
16+
/**
17+
* Uses server-side JavaScript code to get the existing hash values for a set of URIs.
18+
*
19+
* @since 8.1.0
20+
*/
21+
class IncrementalWriteEvalFilter extends IncrementalWriteFilter {
22+
23+
private static final String EVAL_SCRIPT = """
24+
const tuples = cts.valueTuples([cts.uriReference(), cts.fieldReference(fieldName)], null, cts.documentQuery(uris));
25+
const response = {};
26+
for (var tuple of tuples) {
27+
response[tuple[0]] = tuple[1];
28+
}
29+
response
30+
""";
31+
32+
IncrementalWriteEvalFilter(String fieldName, boolean canonicalizeJson, Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer) {
33+
super(fieldName, canonicalizeJson, skippedDocumentsConsumer);
34+
}
35+
36+
@Override
37+
public DocumentWriteSet apply(DocumentWriteSetFilter.Context context) {
38+
ArrayNode uris = new ObjectMapper().createArrayNode();
39+
context.getDocumentWriteSet().stream().forEach(op -> {
40+
if (DocumentWriteOperation.OperationType.DOCUMENT_WRITE.equals(op.getOperationType())) {
41+
uris.add(op.getUri());
42+
}
43+
});
44+
45+
JsonNode response = context.getDatabaseClient().newServerEval().javascript(EVAL_SCRIPT)
46+
.addVariable("fieldName", fieldName)
47+
.addVariable("uris", new JacksonHandle(uris))
48+
.evalAs(JsonNode.class);
49+
50+
return filterDocuments(context, uri -> {
51+
if (response.has(uri)) {
52+
return response.get(uri).asText();
53+
}
54+
return null;
55+
});
56+
}
57+
}
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
/*
2+
* Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
3+
*/
4+
package com.marklogic.client.datamovement.filter;
5+
6+
import com.marklogic.client.datamovement.DocumentWriteSetFilter;
7+
import com.marklogic.client.document.DocumentWriteOperation;
8+
import com.marklogic.client.document.DocumentWriteSet;
9+
import com.marklogic.client.impl.DocumentWriteOperationImpl;
10+
import com.marklogic.client.impl.HandleAccessor;
11+
import com.marklogic.client.io.BaseHandle;
12+
import com.marklogic.client.io.DocumentMetadataHandle;
13+
import com.marklogic.client.io.Format;
14+
import com.marklogic.client.io.marker.AbstractWriteHandle;
15+
import net.openhft.hashing.LongHashFunction;
16+
import org.erdtman.jcs.JsonCanonicalizer;
17+
import org.slf4j.Logger;
18+
import org.slf4j.LoggerFactory;
19+
20+
import java.io.IOException;
21+
import java.nio.charset.StandardCharsets;
22+
import java.util.ArrayList;
23+
import java.util.List;
24+
import java.util.function.Consumer;
25+
import java.util.function.Function;
26+
27+
/**
28+
* A DocumentWriteSetFilter that skips writing documents whose content has not changed since the last write
29+
* based on a hash value stored in a MarkLogic field.
30+
*
31+
* @since 8.1.0
32+
*/
33+
public abstract class IncrementalWriteFilter implements DocumentWriteSetFilter {
34+
35+
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
36+
37+
public static Builder newBuilder() {
38+
return new Builder();
39+
}
40+
41+
public static class Builder {
42+
43+
private String fieldName = "incrementalWriteHash";
44+
private boolean canonicalizeJson = true;
45+
private boolean useEvalQuery = false;
46+
private Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer;
47+
48+
/**
49+
* @param fieldName the name of the MarkLogic field that will hold the hash value; defaults to "incrementalWriteHash".
50+
*/
51+
public Builder fieldName(String fieldName) {
52+
this.fieldName = fieldName;
53+
return this;
54+
}
55+
56+
/**
57+
* @param canonicalizeJson whether to canonicalize JSON content before hashing; defaults to true.
58+
* Delegates to https://github.com/erdtman/java-json-canonicalization for canonicalization.
59+
*/
60+
public Builder canonicalizeJson(boolean canonicalizeJson) {
61+
this.canonicalizeJson = canonicalizeJson;
62+
return this;
63+
}
64+
65+
/**
66+
* @param useEvalQuery if true, evaluate server-side JavaScript instead of an Optic query for retrieving hash values; defaults to false.
67+
*/
68+
public Builder useEvalQuery(boolean useEvalQuery) {
69+
this.useEvalQuery = useEvalQuery;
70+
return this;
71+
}
72+
73+
/**
74+
* @param skippedDocumentsConsumer a consumer that will be called with any documents in a batch that were skipped because their content had not changed.
75+
*/
76+
public Builder onDocumentSkipped(Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer) {
77+
this.skippedDocumentsConsumer = skippedDocumentsConsumer;
78+
return this;
79+
}
80+
81+
public IncrementalWriteFilter build() {
82+
if (useEvalQuery) {
83+
return new IncrementalWriteEvalFilter(fieldName, canonicalizeJson, skippedDocumentsConsumer);
84+
}
85+
return new IncrementalWriteOpticFilter(fieldName, canonicalizeJson, skippedDocumentsConsumer);
86+
}
87+
}
88+
89+
protected final String fieldName;
90+
private final boolean canonicalizeJson;
91+
private final Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer;
92+
93+
// Hardcoding this for now, with a good general purpose hashing function.
94+
// See https://xxhash.com for benchmarks.
95+
private final LongHashFunction hashFunction = LongHashFunction.xx3();
96+
97+
public IncrementalWriteFilter(String fieldName, boolean canonicalizeJson, Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer) {
98+
this.fieldName = fieldName;
99+
this.canonicalizeJson = canonicalizeJson;
100+
this.skippedDocumentsConsumer = skippedDocumentsConsumer;
101+
}
102+
103+
protected final DocumentWriteSet filterDocuments(Context context, Function<String, String> hashRetriever) {
104+
final DocumentWriteSet newWriteSet = context.getDatabaseClient().newDocumentManager().newWriteSet();
105+
final List<DocumentWriteOperation> skippedDocuments = new ArrayList<>();
106+
107+
for (DocumentWriteOperation doc : context.getDocumentWriteSet()) {
108+
if (!DocumentWriteOperation.OperationType.DOCUMENT_WRITE.equals(doc.getOperationType())) {
109+
newWriteSet.add(doc);
110+
}
111+
112+
final String content = serializeContent(doc.getContent());
113+
final String contentHash = computeHash(content);
114+
final String existingHash = hashRetriever.apply(doc.getUri());
115+
if (logger.isTraceEnabled()) {
116+
logger.trace("URI: {}, existing Hash: {}, new Hash: {}", doc.getUri(), existingHash, contentHash);
117+
}
118+
119+
if (existingHash != null) {
120+
if (!existingHash.equals(contentHash)) {
121+
newWriteSet.add(addHashToMetadata(doc, fieldName, contentHash));
122+
} else if (skippedDocumentsConsumer != null) {
123+
skippedDocuments.add(doc);
124+
}
125+
} else {
126+
newWriteSet.add(addHashToMetadata(doc, fieldName, contentHash));
127+
}
128+
}
129+
130+
if (!skippedDocuments.isEmpty()) {
131+
skippedDocumentsConsumer.accept(skippedDocuments.toArray(new DocumentWriteOperation[0]));
132+
}
133+
134+
return newWriteSet;
135+
}
136+
137+
private String serializeContent(AbstractWriteHandle contentHandle) {
138+
String content = HandleAccessor.contentAsString(contentHandle);
139+
140+
Format format = null;
141+
if (contentHandle instanceof BaseHandle<?, ?> baseHandle) {
142+
format = baseHandle.getFormat();
143+
}
144+
145+
if (canonicalizeJson && (Format.JSON.equals(format) || content.startsWith("{"))) {
146+
JsonCanonicalizer jc;
147+
try {
148+
jc = new JsonCanonicalizer(content);
149+
} catch (IOException e) {
150+
throw new RuntimeException("Unable to parse JSON content, cause: " + e.getMessage(), e);
151+
}
152+
return jc.getEncodedString();
153+
}
154+
155+
return content;
156+
}
157+
158+
private String computeHash(String content) {
159+
byte[] bytes = content.getBytes(StandardCharsets.UTF_8);
160+
long hash = hashFunction.hashBytes(bytes);
161+
return Long.toHexString(hash);
162+
}
163+
164+
protected static DocumentWriteOperation addHashToMetadata(DocumentWriteOperation op, String fieldName, String hash) {
165+
DocumentMetadataHandle newMetadata = new DocumentMetadataHandle();
166+
if (op.getMetadata() != null) {
167+
DocumentMetadataHandle originalMetadata = (DocumentMetadataHandle) op.getMetadata();
168+
newMetadata.setPermissions(originalMetadata.getPermissions());
169+
newMetadata.setCollections(originalMetadata.getCollections());
170+
newMetadata.setQuality(originalMetadata.getQuality());
171+
newMetadata.setProperties(originalMetadata.getProperties());
172+
newMetadata.getMetadataValues().putAll(originalMetadata.getMetadataValues());
173+
}
174+
newMetadata.getMetadataValues().put(fieldName, hash);
175+
return new DocumentWriteOperationImpl(op.getUri(), newMetadata, op.getContent(), op.getTemporalDocumentURI());
176+
}
177+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
3+
*/
4+
package com.marklogic.client.datamovement.filter;
5+
6+
import com.marklogic.client.document.DocumentWriteOperation;
7+
import com.marklogic.client.document.DocumentWriteSet;
8+
import com.marklogic.client.row.RowTemplate;
9+
10+
import java.util.HashMap;
11+
import java.util.Map;
12+
import java.util.function.Consumer;
13+
14+
/**
15+
* Uses an Optic query to get the existing hash values for a set of URIs.
16+
*
17+
* @since 8.1.0
18+
*/
19+
class IncrementalWriteOpticFilter extends IncrementalWriteFilter {
20+
21+
IncrementalWriteOpticFilter(String fieldName, boolean canonicalizeJson, Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer) {
22+
super(fieldName, canonicalizeJson, skippedDocumentsConsumer);
23+
}
24+
25+
@Override
26+
public DocumentWriteSet apply(Context context) {
27+
final String[] uris = context.getDocumentWriteSet().stream()
28+
.filter(op -> DocumentWriteOperation.OperationType.DOCUMENT_WRITE.equals(op.getOperationType()))
29+
.map(DocumentWriteOperation::getUri)
30+
.toArray(String[]::new);
31+
32+
// It doesn't seem possible yet to use a DSL query and bind an array of strings to a "uris" param, so using
33+
// a serialized query instead. That doesn't allow a user to override the query though.
34+
Map<String, String> existingHashes = new RowTemplate(context.getDatabaseClient()).query(op ->
35+
op.fromLexicons(Map.of(
36+
"uri", op.cts.uriReference(),
37+
"hash", op.cts.fieldReference(super.fieldName)
38+
)).where(
39+
op.cts.documentQuery(op.xs.stringSeq(uris))
40+
),
41+
42+
rows -> {
43+
Map<String, String> map = new HashMap<>();
44+
rows.forEach(row -> {
45+
String uri = row.getString("uri");
46+
String existingHash = row.getString("hash");
47+
map.put(uri, existingHash);
48+
});
49+
return map;
50+
}
51+
);
52+
53+
return filterDocuments(context, uri -> existingHashes.get(uri));
54+
}
55+
}

0 commit comments

Comments
 (0)