Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
*/
package com.marklogic.spark.core;

import com.fasterxml.jackson.databind.JsonNode;

/**
* Encapsulates the data associated with a chunk of text, including its embedding and classification. Note there's
* some naming issues to work out with this class and the Chunk interface.
Expand All @@ -13,6 +15,7 @@ public class ChunkInputs {
private float[] embedding;
private byte[] classification;
private String modelName;
private JsonNode metadata;

public ChunkInputs(String text) {
this.text = text;
Expand Down Expand Up @@ -45,4 +48,12 @@ public String getModelName() {
public void setModelName(String modelName) {
this.modelName = modelName;
}

public JsonNode getMetadata() {
return metadata;
}

public void setMetadata(JsonNode metadata) {
this.metadata = metadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,15 @@ public void setChunks(List<String> chunks) {
}

/**
* Adds a chunk with its embedding and model name. This is useful for workflows like Nuclia where
* Adds a chunk with its embedding, model name, and metadata. This is useful for workflows like Nuclia where
* chunks and embeddings are received together.
*
* @param text the chunk text
* @param text the chunk text
* @param embedding the embedding vector (can be null)
* @param modelName the model name (can be null)
* @param metadata the metadata as a JsonNode (can be null)
*/
public void addChunk(String text, float[] embedding, String modelName) {
public void addChunk(String text, float[] embedding, String modelName, JsonNode metadata) {
if (chunkInputsList == null) {
chunkInputsList = new ArrayList<>();
}
Expand All @@ -173,6 +174,7 @@ public void addChunk(String text, float[] embedding, String modelName) {
chunkInputs.setEmbedding(embedding);
chunkInputs.setModelName(modelName);
}
Copy link

Copilot AI Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The metadata is set unconditionally outside the null check that applies to embedding and modelName. This inconsistency could be confusing. Consider moving this line inside the if block or adding a comment explaining why metadata is handled differently.

Suggested change
}
}
// Metadata is associated with the chunk regardless of whether an embedding/model name is provided.

Copilot uses AI. Check for mistakes.
chunkInputs.setMetadata(metadata);
chunkInputsList.add(chunkInputs);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*/
package com.marklogic.spark.core.nuclia;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.marklogic.spark.Util;
import com.marklogic.spark.core.DocumentInputs;
Expand Down Expand Up @@ -94,6 +95,7 @@ private String extractTextFromNucliaNode(ObjectNode node) {
* {
* "type": "Chunk",
* "text": "chunk text content...",
* "metadata": { ... },
* "embeddings": [
* {
* "id": "multilingual-2024-05-06",
Expand All @@ -110,6 +112,9 @@ private void addChunkFromNucliaNode(ObjectNode node, DocumentInputs input) {
return;
}

// Extract metadata as JsonNode if present
JsonNode metadata = node.has("metadata") ? node.get("metadata") : null;

// Process each embedding in the array
if (node.has("embeddings") && node.get("embeddings").isArray()) {
var embeddingsArray = node.get("embeddings");
Expand All @@ -132,11 +137,11 @@ private void addChunkFromNucliaNode(ObjectNode node, DocumentInputs input) {
modelName = embeddingObj.get("id").asText();
}

input.addChunk(text, embedding, modelName);
input.addChunk(text, embedding, modelName, metadata);
}
} else {
// No embeddings, still add the chunk with just text
input.addChunk(text, null, null);
// No embeddings, still add the chunk with just text and metadata
input.addChunk(text, null, null, metadata);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ protected DocumentWriteOperation addChunksToSourceDocument() {
throw new ConnectorException(String.format("Unable to classify data from document with URI: %s; cause: %s", sourceDocument.getUri(), e.getMessage()), e);
}
}
if (chunkInputs.getMetadata() != null) {
chunk.set("chunk-metadata", chunkInputs.getMetadata());
}
var jsonChunk = new JsonChunk(chunk, null, chunkConfig.getEmbeddingName(), chunkConfig.isBase64EncodeVectors());
if (chunkInputs.getEmbedding() != null) {
jsonChunk.addEmbedding(chunkInputs.getEmbedding(), chunkInputs.getModelName());
Expand Down Expand Up @@ -91,6 +94,9 @@ protected DocumentWriteOperation makeChunkDocument() {
throw new ConnectorException(String.format("Unable to classify data from document with URI: %s; cause: %s", uri, e.getMessage()), e);
}
}
if (chunkInputs.getMetadata() != null) {
chunk.set("chunk-metadata", chunkInputs.getMetadata());
}
var jsonChunk = new JsonChunk(chunk, null, chunkConfig.getEmbeddingName(), chunkConfig.isBase64EncodeVectors());
if (chunkInputs.getEmbedding() != null) {
jsonChunk.addEmbedding(chunkInputs.getEmbedding(), chunkInputs.getModelName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,19 +65,18 @@ protected DocumentWriteOperation makeChunkDocument() {
Element chunksElement = doc.createElementNS(chunkConfig.getXmlNamespace(), DEFAULT_CHUNKS_ELEMENT_NAME);
root.appendChild(chunksElement);

List<Chunk> chunks = new ArrayList<>();
List<Chunk> addedChunks = new ArrayList<>();
for (int i = 0; i < this.maxChunksPerDocument && hasNext(); i++) {
ChunkInputs chunkInputs = chunkInputsList.get(listIndex);
Element classificationResponseNode = chunkInputs.getClassification() != null ?
getClassificationResponseElement(chunkInputs.getClassification()) : null;
addChunk(doc, chunkInputs.getText(), chunksElement, chunks, classificationResponseNode, chunkInputs.getEmbedding(), chunkInputs.getModelName());
DOMChunk chunk = addChunk(doc, chunkInputs, chunksElement);
addedChunks.add(chunk);
listIndex++;
}

final String chunkDocumentUri = makeChunkDocumentUri(sourceDocument, "xml");
return new DocumentAndChunks(
new DocumentWriteOperationImpl(chunkDocumentUri, chunkConfig.getMetadata(), new DOMHandle(doc)),
chunks
addedChunks
);
}

Expand All @@ -87,16 +86,15 @@ protected DocumentWriteOperation addChunksToSourceDocument() {
Element chunksElement = doc.createElementNS(chunkConfig.getXmlNamespace(), determineChunksElementName(doc));
doc.getDocumentElement().appendChild(chunksElement);

List<Chunk> chunks = new ArrayList<>();
List<Chunk> addedChunks = new ArrayList<>();
for (ChunkInputs chunkInputs : chunkInputsList) {
Element classificationResponseNode = chunkInputs.getClassification() != null ?
getClassificationResponseElement(chunkInputs.getClassification()) : null;
addChunk(doc, chunkInputs.getText(), chunksElement, chunks, classificationResponseNode, chunkInputs.getEmbedding(), chunkInputs.getModelName());
DOMChunk chunk = addChunk(doc, chunkInputs, chunksElement);
addedChunks.add(chunk);
}

return new DocumentAndChunks(
new DocumentWriteOperationImpl(sourceDocument.getUri(), sourceDocument.getMetadata(), new DOMHandle(doc)),
chunks
addedChunks
);
}

Expand All @@ -110,15 +108,16 @@ private Element getClassificationResponseElement(byte[] classificationBytes) {
}
}

private void addChunk(Document doc, String textSegment, Element chunksElement, List<Chunk> chunks, Element classificationResponse, float[] embedding, String modelName) {
private DOMChunk addChunk(Document doc, ChunkInputs chunkInputs, Element chunksElement) {
Element chunk = doc.createElementNS(chunkConfig.getXmlNamespace(), "chunk");
chunksElement.appendChild(chunk);

Element text = doc.createElementNS(chunkConfig.getXmlNamespace(), "text");
text.setTextContent(textSegment);
text.setTextContent(chunkInputs.getText());
chunk.appendChild(text);

if (classificationResponse != null) {
if (chunkInputs.getClassification() != null) {
Element classificationResponse = getClassificationResponseElement(chunkInputs.getClassification());
Node classificationNode = doc.createElement("classification");
chunk.appendChild(classificationNode);
for (int i = 0; i < classificationResponse.getChildNodes().getLength(); i++) {
Expand All @@ -127,11 +126,21 @@ private void addChunk(Document doc, String textSegment, Element chunksElement, L
}
}

if (chunkInputs.getMetadata() != null) {
Element metadataElement = doc.createElementNS(chunkConfig.getXmlNamespace(), "chunk-metadata");
// Re: possibly converting JSON to XML - Copilot recommends using the serialized string, as there's no
// "correct" way for converting JSON to XML, particularly in regard to arrays. If the user wants XML
// documents, they can always e.g. use a REST transform to determine how they want to represent the JSON
// as XML.
metadataElement.setTextContent(chunkInputs.getMetadata().toString());
Copy link

Copilot AI Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling toString() on a JsonNode may not produce valid JSON in all cases. Consider using ObjectMapper.writeValueAsString() to ensure proper JSON serialization, especially for complex nested structures.

Copilot uses AI. Check for mistakes.
chunk.appendChild(metadataElement);
}

var domChunk = new DOMChunk(doc, chunk, this.xmlChunkConfig, this.xPathFactory);
if (embedding != null) {
domChunk.addEmbedding(embedding, modelName);
if (chunkInputs.getEmbedding() != null) {
domChunk.addEmbedding(chunkInputs.getEmbedding(), chunkInputs.getModelName());
}
chunks.add(domChunk);
return domChunk;
}

private String determineChunksElementName(Document doc) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,3 @@ spark.marklogic.write.splitter.sidecar.maxChunks=
spark.marklogic.write.embedder.chunks.jsonPointer=
spark.marklogic.write.embedder.chunks.xpath=
spark.marklogic.write.embedder.batchSize=
spark.marklogic.write.nuclia.apikey=
spark.marklogic.write.nuclia.kbid=
spark.marklogic.write.nuclia.region=