Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,35 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.marklogic.client.impl.XmlFactories;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;

import javax.xml.namespace.QName;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.transform.OutputKeys;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerException;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import javax.xml.xpath.XPath;
import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPathExpression;
import javax.xml.xpath.XPathExpressionException;
import java.io.ByteArrayInputStream;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;

/**
* Utility class for applying content exclusions to documents before hash calculation.
* Supports removing specific paths from JSON and XML documents using JSON Pointer and XPath expressions.
*
* @since 8.1.0
*/
public class ContentExclusionUtil {
class ContentExclusionUtil {

private static final Logger logger = LoggerFactory.getLogger(ContentExclusionUtil.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
Expand All @@ -31,7 +50,7 @@ public class ContentExclusionUtil {
* @return the modified JSON content with specified paths removed
* @throws JsonProcessingException if the JSON content cannot be parsed or serialized
*/
public static String applyJsonExclusions(String uri, String jsonContent, String[] jsonPointers) throws JsonProcessingException {
static String applyJsonExclusions(String uri, String jsonContent, String[] jsonPointers) throws JsonProcessingException {
if (jsonPointers == null || jsonPointers.length == 0) {
return jsonContent;
}
Expand Down Expand Up @@ -72,6 +91,59 @@ private static void removeNodeAtPointer(String uri, JsonNode rootNode, String js
}
}

// Future method for XML exclusions
// public static String applyXmlExclusions(String xmlContent, String[] xpaths) { ... }
/**
* Applies XPath exclusions to XML content by removing the specified elements.
*
* @param uri the document URI (used for logging purposes)
* @param xmlContent the XML content as a string
* @param xpathExpressions array of XPath expressions identifying elements to exclude
* @return the modified XML content with specified elements removed
* @throws Exception if the XML content cannot be parsed or serialized
*/
static String applyXmlExclusions(String uri, String xmlContent, String... xpathExpressions) throws Exception {
Copy link

Copilot AI Jan 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throwing generic Exception reduces clarity for API consumers. Consider creating a specific exception type or throwing more specific exceptions like XPathExpressionException or TransformerException to help callers handle errors appropriately.

Copilot uses AI. Check for mistakes.
if (xpathExpressions == null || xpathExpressions.length == 0) {
return xmlContent;
}

DocumentBuilder builder = XmlFactories.getDocumentBuilderFactory().newDocumentBuilder();
Document document = builder.parse(new ByteArrayInputStream(xmlContent.getBytes(StandardCharsets.UTF_8)));
applyXmlExclusions(uri, document, xpathExpressions);
return serializeDocument(document);
}

private static void applyXmlExclusions(String uri, Document document, String[] xpathExpressions) {
final XPath xpath = XmlFactories.getXPathFactory().newXPath();
for (String xpathExpression : xpathExpressions) {
try {
XPathExpression expr = xpath.compile(xpathExpression);
QName returnType = XPathConstants.NODESET;
NodeList nodes = (NodeList) expr.evaluate(document, returnType);

if (nodes.getLength() == 0) {
logger.debug("XPath '{}' does not match any nodes in document {}, skipping", xpathExpression, uri);
continue;
}

// Remove nodes in reverse order to avoid index issues
for (int i = nodes.getLength() - 1; i >= 0; i--) {
Node node = nodes.item(i);
Node parent = node.getParentNode();
if (parent != null) {
parent.removeChild(node);
}
}
} catch (XPathExpressionException e) {
logger.warn("Invalid XPath expression '{}' for document {}: {}", xpathExpression, uri, e.getMessage());
}
}
}

private static String serializeDocument(Document document) throws TransformerException {
Transformer transformer = XmlFactories.newTransformer();
transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
transformer.setOutputProperty(OutputKeys.INDENT, "no");
StringWriter writer = new StringWriter();
transformer.transform(new DOMSource(document), new StreamResult(writer));
return writer.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ class IncrementalWriteEvalFilter extends IncrementalWriteFilter {
""";

IncrementalWriteEvalFilter(String hashKeyName, String timestampKeyName, boolean canonicalizeJson,
Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer, String[] jsonExclusions) {
super(hashKeyName, timestampKeyName, canonicalizeJson, skippedDocumentsConsumer, jsonExclusions);
Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer, String[] jsonExclusions, String[] xmlExclusions) {
super(hashKeyName, timestampKeyName, canonicalizeJson, skippedDocumentsConsumer, jsonExclusions, xmlExclusions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public static class Builder {
private boolean useEvalQuery = false;
private Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer;
private String[] jsonExclusions;
private String[] xmlExclusions;

/**
* @param keyName the name of the MarkLogic metadata key that will hold the hash value; defaults to "incrementalWriteHash".
Expand Down Expand Up @@ -103,11 +104,20 @@ public Builder jsonExclusions(String... jsonPointers) {
return this;
}

/**
* @param xpathExpressions XPath expressions identifying XML elements to exclude from hash calculation.
* For example, "//timestamp" or "//metadata/lastModified".
*/
public Builder xmlExclusions(String... xpathExpressions) {
this.xmlExclusions = xpathExpressions;
return this;
}

public IncrementalWriteFilter build() {
if (useEvalQuery) {
return new IncrementalWriteEvalFilter(hashKeyName, timestampKeyName, canonicalizeJson, skippedDocumentsConsumer, jsonExclusions);
return new IncrementalWriteEvalFilter(hashKeyName, timestampKeyName, canonicalizeJson, skippedDocumentsConsumer, jsonExclusions, xmlExclusions);
}
return new IncrementalWriteOpticFilter(hashKeyName, timestampKeyName, canonicalizeJson, skippedDocumentsConsumer, jsonExclusions);
return new IncrementalWriteOpticFilter(hashKeyName, timestampKeyName, canonicalizeJson, skippedDocumentsConsumer, jsonExclusions, xmlExclusions);
}
}

Expand All @@ -116,17 +126,19 @@ public IncrementalWriteFilter build() {
private final boolean canonicalizeJson;
private final Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer;
private final String[] jsonExclusions;
private final String[] xmlExclusions;

// Hardcoding this for now, with a good general purpose hashing function.
// See https://xxhash.com for benchmarks.
private final LongHashFunction hashFunction = LongHashFunction.xx3();

public IncrementalWriteFilter(String hashKeyName, String timestampKeyName, boolean canonicalizeJson, Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer, String[] jsonExclusions) {
public IncrementalWriteFilter(String hashKeyName, String timestampKeyName, boolean canonicalizeJson, Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer, String[] jsonExclusions, String[] xmlExclusions) {
this.hashKeyName = hashKeyName;
this.timestampKeyName = timestampKeyName;
this.canonicalizeJson = canonicalizeJson;
this.skippedDocumentsConsumer = skippedDocumentsConsumer;
this.jsonExclusions = jsonExclusions;
this.xmlExclusions = xmlExclusions;
}

protected final DocumentWriteSet filterDocuments(Context context, Function<String, String> hashRetriever) {
Expand Down Expand Up @@ -178,7 +190,6 @@ private String serializeContent(DocumentWriteOperation doc) {
JsonCanonicalizer jc;
try {
if (jsonExclusions != null && jsonExclusions.length > 0) {
// TBD on error handling here, want to get XML supported first.
content = ContentExclusionUtil.applyJsonExclusions(doc.getUri(), content, jsonExclusions);
}
jc = new JsonCanonicalizer(content);
Expand All @@ -190,6 +201,13 @@ private String serializeContent(DocumentWriteOperation doc) {
logger.warn("Unable to canonicalize JSON content for URI {}, using original content for hashing; cause: {}",
doc.getUri(), e.getMessage());
}
} else if (xmlExclusions != null && xmlExclusions.length > 0) {
try {
content = ContentExclusionUtil.applyXmlExclusions(doc.getUri(), content, xmlExclusions);
} catch (Exception e) {
logger.warn("Unable to apply XML exclusions for URI {}, using original content for hashing; cause: {}",
doc.getUri(), e.getMessage());
}
}

return content;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
class IncrementalWriteOpticFilter extends IncrementalWriteFilter {

IncrementalWriteOpticFilter(String hashKeyName, String timestampKeyName, boolean canonicalizeJson,
Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer, String[] jsonExclusions) {
super(hashKeyName, timestampKeyName, canonicalizeJson, skippedDocumentsConsumer, jsonExclusions);
Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer, String[] jsonExclusions, String[] xmlExclusions) {
super(hashKeyName, timestampKeyName, canonicalizeJson, skippedDocumentsConsumer, jsonExclusions, xmlExclusions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
import javax.xml.stream.FactoryConfigurationError;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLOutputFactory;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerConfigurationException;
import javax.xml.transform.TransformerFactory;
import javax.xml.xpath.XPathFactory;
import java.lang.ref.SoftReference;
import java.util.function.Supplier;

Expand All @@ -27,6 +29,12 @@ public final class XmlFactories {
private static final CachedInstancePerThreadSupplier<DocumentBuilderFactory> cachedDocumentBuilderFactory =
new CachedInstancePerThreadSupplier<>(XmlFactories::makeNewDocumentBuilderFactory);

private static final CachedInstancePerThreadSupplier<XPathFactory> cachedXPathFactory =
new CachedInstancePerThreadSupplier<>(XPathFactory::newInstance);

private static final CachedInstancePerThreadSupplier<TransformerFactory> cachedTransformerFactory =
new CachedInstancePerThreadSupplier<>(XmlFactories::makeNewTransformerFactory);

private XmlFactories() {} // preventing instances of utility class

/**
Expand Down Expand Up @@ -152,6 +160,47 @@ public static XMLOutputFactory getOutputFactory() {
return cachedOutputFactory.get();
}

/**
* Returns a shared {@link XPathFactory}.
* <p>
* Creating XML factories is potentially a pretty expensive operation. Using a shared instance helps to amortize
* this initialization cost via reuse.
*
* @return a {@link XPathFactory}
*
* @since 8.1.0
*/
public static XPathFactory getXPathFactory() {
return cachedXPathFactory.get();
}

/**
* Returns a shared {@link TransformerFactory} configured with secure defaults.
* <p>
* Creating XML factories is potentially a pretty expensive operation. Using a shared instance helps to amortize
* this initialization cost via reuse.
*
* @return a securely configured {@link TransformerFactory}
*
* @since 8.1.0
*/
public static TransformerFactory getTransformerFactory() {
return cachedTransformerFactory.get();
}

/**
* Creates a new {@link Transformer} from the shared {@link TransformerFactory}.
*
* @since 8.1.0
*/
public static Transformer newTransformer() {
try {
return getTransformerFactory().newTransformer();
} catch (TransformerConfigurationException e) {
throw new RuntimeException("Unable to create new Transformer from TransformerFactory", e);
}
}

/**
* A supplier that caches results per thread.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
import javax.xml.namespace.QName;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.xpath.*;
import javax.xml.xpath.XPath;
import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPathExpression;
import javax.xml.xpath.XPathExpressionException;
import java.io.*;
import java.nio.charset.StandardCharsets;

Expand Down Expand Up @@ -205,7 +208,7 @@ public void setFactory(DocumentBuilderFactory factory) {
*/
public XPath getXPathProcessor() {
if (xpathProcessor == null)
xpathProcessor = makeXPathProcessorFactory().newXPath();
xpathProcessor = XmlFactories.getXPathFactory().newXPath();
return xpathProcessor;
}
/**
Expand All @@ -216,9 +219,6 @@ public XPath getXPathProcessor() {
public void setXPathProcessor(XPath xpathProcessor) {
this.xpathProcessor = xpathProcessor;
}
protected XPathFactory makeXPathProcessorFactory() {
return XPathFactory.newInstance();
}

/**
* Evaluate a string XPath expression against the retrieved document.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2010-2026 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
*/
package com.marklogic.client.datamovement.filter;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.marklogic.client.document.DocumentWriteOperation;
import com.marklogic.client.io.DocumentMetadataHandle;
import com.marklogic.client.test.AbstractClientTest;
import com.marklogic.client.test.Common;
import org.junit.jupiter.api.BeforeEach;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

abstract class AbstractIncrementalWriteTest extends AbstractClientTest {

static final DocumentMetadataHandle METADATA = new DocumentMetadataHandle()
.withCollections("incremental-test")
.withPermission("rest-reader", DocumentMetadataHandle.Capability.READ, DocumentMetadataHandle.Capability.UPDATE);

AtomicInteger writtenCount = new AtomicInteger();
AtomicInteger skippedCount = new AtomicInteger();
AtomicReference<Throwable> batchFailure = new AtomicReference<>();
ObjectMapper objectMapper = new ObjectMapper();

List<DocumentWriteOperation> docs = new ArrayList<>();
IncrementalWriteFilter filter;

@BeforeEach
void setup() {
// Need a user with eval privileges so that the eval filter can be tested.
Common.client = Common.newEvalClient();

// Default filter implementation, should be suitable for most tests.
filter = IncrementalWriteFilter.newBuilder()
.onDocumentsSkipped(docs -> skippedCount.addAndGet(docs.length))
.build();
}

final void writeDocs(List<DocumentWriteOperation> docs) {
new WriteBatcherTemplate(Common.client).runWriteJob(
writeBatcher -> writeBatcher
.withDocumentWriteSetFilter(filter)
.onBatchSuccess(batch -> writtenCount.addAndGet(batch.getItems().length))
.onBatchFailure((batch, failure) -> batchFailure.set(failure)),

writeBatcher -> docs.forEach(writeBatcher::add)
);
}
}
Loading