diff --git a/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/ChunkInputs.java b/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/ChunkInputs.java index 665b0d6d..caf94b2a 100644 --- a/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/ChunkInputs.java +++ b/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/ChunkInputs.java @@ -3,6 +3,8 @@ */ package com.marklogic.spark.core; +import com.fasterxml.jackson.databind.JsonNode; + /** * Encapsulates the data associated with a chunk of text, including its embedding and classification. Note there's * some naming issues to work out with this class and the Chunk interface. @@ -13,6 +15,7 @@ public class ChunkInputs { private float[] embedding; private byte[] classification; private String modelName; + private JsonNode metadata; public ChunkInputs(String text) { this.text = text; @@ -45,4 +48,12 @@ public String getModelName() { public void setModelName(String modelName) { this.modelName = modelName; } + + public JsonNode getMetadata() { + return metadata; + } + + public void setMetadata(JsonNode metadata) { + this.metadata = metadata; + } } diff --git a/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/DocumentInputs.java b/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/DocumentInputs.java index eda9f0a6..ac06cecc 100644 --- a/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/DocumentInputs.java +++ b/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/DocumentInputs.java @@ -157,14 +157,15 @@ public void setChunks(List chunks) { } /** - * Adds a chunk with its embedding and model name. This is useful for workflows like Nuclia where + * Adds a chunk with its embedding, model name, and metadata. This is useful for workflows like Nuclia where * chunks and embeddings are received together. * - * @param text the chunk text + * @param text the chunk text * @param embedding the embedding vector (can be null) * @param modelName the model name (can be null) + * @param metadata the metadata as a JsonNode (can be null) */ - public void addChunk(String text, float[] embedding, String modelName) { + public void addChunk(String text, float[] embedding, String modelName, JsonNode metadata) { if (chunkInputsList == null) { chunkInputsList = new ArrayList<>(); } @@ -173,6 +174,7 @@ public void addChunk(String text, float[] embedding, String modelName) { chunkInputs.setEmbedding(embedding); chunkInputs.setModelName(modelName); } + chunkInputs.setMetadata(metadata); chunkInputsList.add(chunkInputs); } diff --git a/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/nuclia/NucliaDocumentProcessor.java b/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/nuclia/NucliaDocumentProcessor.java index c3255c84..cc1d53cc 100644 --- a/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/nuclia/NucliaDocumentProcessor.java +++ b/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/nuclia/NucliaDocumentProcessor.java @@ -3,6 +3,7 @@ */ package com.marklogic.spark.core.nuclia; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.marklogic.spark.Util; import com.marklogic.spark.core.DocumentInputs; @@ -94,6 +95,7 @@ private String extractTextFromNucliaNode(ObjectNode node) { * { * "type": "Chunk", * "text": "chunk text content...", + * "metadata": { ... }, * "embeddings": [ * { * "id": "multilingual-2024-05-06", @@ -110,6 +112,9 @@ private void addChunkFromNucliaNode(ObjectNode node, DocumentInputs input) { return; } + // Extract metadata as JsonNode if present + JsonNode metadata = node.has("metadata") ? node.get("metadata") : null; + // Process each embedding in the array if (node.has("embeddings") && node.get("embeddings").isArray()) { var embeddingsArray = node.get("embeddings"); @@ -132,11 +137,11 @@ private void addChunkFromNucliaNode(ObjectNode node, DocumentInputs input) { modelName = embeddingObj.get("id").asText(); } - input.addChunk(text, embedding, modelName); + input.addChunk(text, embedding, modelName, metadata); } } else { - // No embeddings, still add the chunk with just text - input.addChunk(text, null, null); + // No embeddings, still add the chunk with just text and metadata + input.addChunk(text, null, null, metadata); } } diff --git a/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/splitter/JsonChunkDocumentProducer.java b/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/splitter/JsonChunkDocumentProducer.java index 83cfc833..4d99e85d 100644 --- a/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/splitter/JsonChunkDocumentProducer.java +++ b/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/splitter/JsonChunkDocumentProducer.java @@ -54,6 +54,9 @@ protected DocumentWriteOperation addChunksToSourceDocument() { throw new ConnectorException(String.format("Unable to classify data from document with URI: %s; cause: %s", sourceDocument.getUri(), e.getMessage()), e); } } + if (chunkInputs.getMetadata() != null) { + chunk.set("chunk-metadata", chunkInputs.getMetadata()); + } var jsonChunk = new JsonChunk(chunk, null, chunkConfig.getEmbeddingName(), chunkConfig.isBase64EncodeVectors()); if (chunkInputs.getEmbedding() != null) { jsonChunk.addEmbedding(chunkInputs.getEmbedding(), chunkInputs.getModelName()); @@ -91,6 +94,9 @@ protected DocumentWriteOperation makeChunkDocument() { throw new ConnectorException(String.format("Unable to classify data from document with URI: %s; cause: %s", uri, e.getMessage()), e); } } + if (chunkInputs.getMetadata() != null) { + chunk.set("chunk-metadata", chunkInputs.getMetadata()); + } var jsonChunk = new JsonChunk(chunk, null, chunkConfig.getEmbeddingName(), chunkConfig.isBase64EncodeVectors()); if (chunkInputs.getEmbedding() != null) { jsonChunk.addEmbedding(chunkInputs.getEmbedding(), chunkInputs.getModelName()); diff --git a/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/splitter/XmlChunkDocumentProducer.java b/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/splitter/XmlChunkDocumentProducer.java index 1246f96e..6e21b5ca 100644 --- a/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/splitter/XmlChunkDocumentProducer.java +++ b/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/splitter/XmlChunkDocumentProducer.java @@ -65,19 +65,18 @@ protected DocumentWriteOperation makeChunkDocument() { Element chunksElement = doc.createElementNS(chunkConfig.getXmlNamespace(), DEFAULT_CHUNKS_ELEMENT_NAME); root.appendChild(chunksElement); - List chunks = new ArrayList<>(); + List addedChunks = new ArrayList<>(); for (int i = 0; i < this.maxChunksPerDocument && hasNext(); i++) { ChunkInputs chunkInputs = chunkInputsList.get(listIndex); - Element classificationResponseNode = chunkInputs.getClassification() != null ? - getClassificationResponseElement(chunkInputs.getClassification()) : null; - addChunk(doc, chunkInputs.getText(), chunksElement, chunks, classificationResponseNode, chunkInputs.getEmbedding(), chunkInputs.getModelName()); + DOMChunk chunk = addChunk(doc, chunkInputs, chunksElement); + addedChunks.add(chunk); listIndex++; } final String chunkDocumentUri = makeChunkDocumentUri(sourceDocument, "xml"); return new DocumentAndChunks( new DocumentWriteOperationImpl(chunkDocumentUri, chunkConfig.getMetadata(), new DOMHandle(doc)), - chunks + addedChunks ); } @@ -87,16 +86,15 @@ protected DocumentWriteOperation addChunksToSourceDocument() { Element chunksElement = doc.createElementNS(chunkConfig.getXmlNamespace(), determineChunksElementName(doc)); doc.getDocumentElement().appendChild(chunksElement); - List chunks = new ArrayList<>(); + List addedChunks = new ArrayList<>(); for (ChunkInputs chunkInputs : chunkInputsList) { - Element classificationResponseNode = chunkInputs.getClassification() != null ? - getClassificationResponseElement(chunkInputs.getClassification()) : null; - addChunk(doc, chunkInputs.getText(), chunksElement, chunks, classificationResponseNode, chunkInputs.getEmbedding(), chunkInputs.getModelName()); + DOMChunk chunk = addChunk(doc, chunkInputs, chunksElement); + addedChunks.add(chunk); } return new DocumentAndChunks( new DocumentWriteOperationImpl(sourceDocument.getUri(), sourceDocument.getMetadata(), new DOMHandle(doc)), - chunks + addedChunks ); } @@ -110,15 +108,16 @@ private Element getClassificationResponseElement(byte[] classificationBytes) { } } - private void addChunk(Document doc, String textSegment, Element chunksElement, List chunks, Element classificationResponse, float[] embedding, String modelName) { + private DOMChunk addChunk(Document doc, ChunkInputs chunkInputs, Element chunksElement) { Element chunk = doc.createElementNS(chunkConfig.getXmlNamespace(), "chunk"); chunksElement.appendChild(chunk); Element text = doc.createElementNS(chunkConfig.getXmlNamespace(), "text"); - text.setTextContent(textSegment); + text.setTextContent(chunkInputs.getText()); chunk.appendChild(text); - if (classificationResponse != null) { + if (chunkInputs.getClassification() != null) { + Element classificationResponse = getClassificationResponseElement(chunkInputs.getClassification()); Node classificationNode = doc.createElement("classification"); chunk.appendChild(classificationNode); for (int i = 0; i < classificationResponse.getChildNodes().getLength(); i++) { @@ -127,11 +126,21 @@ private void addChunk(Document doc, String textSegment, Element chunksElement, L } } + if (chunkInputs.getMetadata() != null) { + Element metadataElement = doc.createElementNS(chunkConfig.getXmlNamespace(), "chunk-metadata"); + // Re: possibly converting JSON to XML - Copilot recommends using the serialized string, as there's no + // "correct" way for converting JSON to XML, particularly in regard to arrays. If the user wants XML + // documents, they can always e.g. use a REST transform to determine how they want to represent the JSON + // as XML. + metadataElement.setTextContent(chunkInputs.getMetadata().toString()); + chunk.appendChild(metadataElement); + } + var domChunk = new DOMChunk(doc, chunk, this.xmlChunkConfig, this.xPathFactory); - if (embedding != null) { - domChunk.addEmbedding(embedding, modelName); + if (chunkInputs.getEmbedding() != null) { + domChunk.addEmbedding(chunkInputs.getEmbedding(), chunkInputs.getModelName()); } - chunks.add(domChunk); + return domChunk; } private String determineChunksElementName(Document doc) { diff --git a/marklogic-spark-connector/src/main/resources/marklogic-spark-messages.properties b/marklogic-spark-connector/src/main/resources/marklogic-spark-messages.properties index 2731c9c5..39928646 100644 --- a/marklogic-spark-connector/src/main/resources/marklogic-spark-messages.properties +++ b/marklogic-spark-connector/src/main/resources/marklogic-spark-messages.properties @@ -22,6 +22,3 @@ spark.marklogic.write.splitter.sidecar.maxChunks= spark.marklogic.write.embedder.chunks.jsonPointer= spark.marklogic.write.embedder.chunks.xpath= spark.marklogic.write.embedder.batchSize= -spark.marklogic.write.nuclia.apikey= -spark.marklogic.write.nuclia.kbid= -spark.marklogic.write.nuclia.region=