Skip to content

Commit 9234a41

Browse files
committed
MLE-26966 Added Nuclia metadata
In the next PR, going to turn NucliaClient into an interface so we can mock it in tests. That will allows us to verify how the metadata is handled without having to connect to Nuclia.
1 parent 64204ab commit 9234a41

File tree

6 files changed

+55
-25
lines changed

6 files changed

+55
-25
lines changed

marklogic-spark-connector/src/main/java/com/marklogic/spark/core/ChunkInputs.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
*/
44
package com.marklogic.spark.core;
55

6+
import com.fasterxml.jackson.databind.JsonNode;
7+
68
/**
79
* Encapsulates the data associated with a chunk of text, including its embedding and classification. Note there's
810
* some naming issues to work out with this class and the Chunk interface.
@@ -13,6 +15,7 @@ public class ChunkInputs {
1315
private float[] embedding;
1416
private byte[] classification;
1517
private String modelName;
18+
private JsonNode metadata;
1619

1720
public ChunkInputs(String text) {
1821
this.text = text;
@@ -45,4 +48,12 @@ public String getModelName() {
4548
public void setModelName(String modelName) {
4649
this.modelName = modelName;
4750
}
51+
52+
public JsonNode getMetadata() {
53+
return metadata;
54+
}
55+
56+
public void setMetadata(JsonNode metadata) {
57+
this.metadata = metadata;
58+
}
4859
}

marklogic-spark-connector/src/main/java/com/marklogic/spark/core/DocumentInputs.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -157,14 +157,15 @@ public void setChunks(List<String> chunks) {
157157
}
158158

159159
/**
160-
* Adds a chunk with its embedding and model name. This is useful for workflows like Nuclia where
160+
* Adds a chunk with its embedding, model name, and metadata. This is useful for workflows like Nuclia where
161161
* chunks and embeddings are received together.
162162
*
163-
* @param text the chunk text
163+
* @param text the chunk text
164164
* @param embedding the embedding vector (can be null)
165165
* @param modelName the model name (can be null)
166+
* @param metadata the metadata as a JsonNode (can be null)
166167
*/
167-
public void addChunk(String text, float[] embedding, String modelName) {
168+
public void addChunk(String text, float[] embedding, String modelName, JsonNode metadata) {
168169
if (chunkInputsList == null) {
169170
chunkInputsList = new ArrayList<>();
170171
}
@@ -173,6 +174,7 @@ public void addChunk(String text, float[] embedding, String modelName) {
173174
chunkInputs.setEmbedding(embedding);
174175
chunkInputs.setModelName(modelName);
175176
}
177+
chunkInputs.setMetadata(metadata);
176178
chunkInputsList.add(chunkInputs);
177179
}
178180

marklogic-spark-connector/src/main/java/com/marklogic/spark/core/nuclia/NucliaDocumentProcessor.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
*/
44
package com.marklogic.spark.core.nuclia;
55

6+
import com.fasterxml.jackson.databind.JsonNode;
67
import com.fasterxml.jackson.databind.node.ObjectNode;
78
import com.marklogic.spark.Util;
89
import com.marklogic.spark.core.DocumentInputs;
@@ -94,6 +95,7 @@ private String extractTextFromNucliaNode(ObjectNode node) {
9495
* {
9596
* "type": "Chunk",
9697
* "text": "chunk text content...",
98+
* "metadata": { ... },
9799
* "embeddings": [
98100
* {
99101
* "id": "multilingual-2024-05-06",
@@ -110,6 +112,9 @@ private void addChunkFromNucliaNode(ObjectNode node, DocumentInputs input) {
110112
return;
111113
}
112114

115+
// Extract metadata as JsonNode if present
116+
JsonNode metadata = node.has("metadata") ? node.get("metadata") : null;
117+
113118
// Process each embedding in the array
114119
if (node.has("embeddings") && node.get("embeddings").isArray()) {
115120
var embeddingsArray = node.get("embeddings");
@@ -132,11 +137,11 @@ private void addChunkFromNucliaNode(ObjectNode node, DocumentInputs input) {
132137
modelName = embeddingObj.get("id").asText();
133138
}
134139

135-
input.addChunk(text, embedding, modelName);
140+
input.addChunk(text, embedding, modelName, metadata);
136141
}
137142
} else {
138-
// No embeddings, still add the chunk with just text
139-
input.addChunk(text, null, null);
143+
// No embeddings, still add the chunk with just text and metadata
144+
input.addChunk(text, null, null, metadata);
140145
}
141146
}
142147

marklogic-spark-connector/src/main/java/com/marklogic/spark/core/splitter/JsonChunkDocumentProducer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ protected DocumentWriteOperation addChunksToSourceDocument() {
5454
throw new ConnectorException(String.format("Unable to classify data from document with URI: %s; cause: %s", sourceDocument.getUri(), e.getMessage()), e);
5555
}
5656
}
57+
if (chunkInputs.getMetadata() != null) {
58+
chunk.set("chunk-metadata", chunkInputs.getMetadata());
59+
}
5760
var jsonChunk = new JsonChunk(chunk, null, chunkConfig.getEmbeddingName(), chunkConfig.isBase64EncodeVectors());
5861
if (chunkInputs.getEmbedding() != null) {
5962
jsonChunk.addEmbedding(chunkInputs.getEmbedding(), chunkInputs.getModelName());
@@ -91,6 +94,9 @@ protected DocumentWriteOperation makeChunkDocument() {
9194
throw new ConnectorException(String.format("Unable to classify data from document with URI: %s; cause: %s", uri, e.getMessage()), e);
9295
}
9396
}
97+
if (chunkInputs.getMetadata() != null) {
98+
chunk.set("chunk-metadata", chunkInputs.getMetadata());
99+
}
94100
var jsonChunk = new JsonChunk(chunk, null, chunkConfig.getEmbeddingName(), chunkConfig.isBase64EncodeVectors());
95101
if (chunkInputs.getEmbedding() != null) {
96102
jsonChunk.addEmbedding(chunkInputs.getEmbedding(), chunkInputs.getModelName());

marklogic-spark-connector/src/main/java/com/marklogic/spark/core/splitter/XmlChunkDocumentProducer.java

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -65,19 +65,18 @@ protected DocumentWriteOperation makeChunkDocument() {
6565
Element chunksElement = doc.createElementNS(chunkConfig.getXmlNamespace(), DEFAULT_CHUNKS_ELEMENT_NAME);
6666
root.appendChild(chunksElement);
6767

68-
List<Chunk> chunks = new ArrayList<>();
68+
List<Chunk> addedChunks = new ArrayList<>();
6969
for (int i = 0; i < this.maxChunksPerDocument && hasNext(); i++) {
7070
ChunkInputs chunkInputs = chunkInputsList.get(listIndex);
71-
Element classificationResponseNode = chunkInputs.getClassification() != null ?
72-
getClassificationResponseElement(chunkInputs.getClassification()) : null;
73-
addChunk(doc, chunkInputs.getText(), chunksElement, chunks, classificationResponseNode, chunkInputs.getEmbedding(), chunkInputs.getModelName());
71+
DOMChunk chunk = addChunk(doc, chunkInputs, chunksElement);
72+
addedChunks.add(chunk);
7473
listIndex++;
7574
}
7675

7776
final String chunkDocumentUri = makeChunkDocumentUri(sourceDocument, "xml");
7877
return new DocumentAndChunks(
7978
new DocumentWriteOperationImpl(chunkDocumentUri, chunkConfig.getMetadata(), new DOMHandle(doc)),
80-
chunks
79+
addedChunks
8180
);
8281
}
8382

@@ -87,16 +86,15 @@ protected DocumentWriteOperation addChunksToSourceDocument() {
8786
Element chunksElement = doc.createElementNS(chunkConfig.getXmlNamespace(), determineChunksElementName(doc));
8887
doc.getDocumentElement().appendChild(chunksElement);
8988

90-
List<Chunk> chunks = new ArrayList<>();
89+
List<Chunk> addedChunks = new ArrayList<>();
9190
for (ChunkInputs chunkInputs : chunkInputsList) {
92-
Element classificationResponseNode = chunkInputs.getClassification() != null ?
93-
getClassificationResponseElement(chunkInputs.getClassification()) : null;
94-
addChunk(doc, chunkInputs.getText(), chunksElement, chunks, classificationResponseNode, chunkInputs.getEmbedding(), chunkInputs.getModelName());
91+
DOMChunk chunk = addChunk(doc, chunkInputs, chunksElement);
92+
addedChunks.add(chunk);
9593
}
9694

9795
return new DocumentAndChunks(
9896
new DocumentWriteOperationImpl(sourceDocument.getUri(), sourceDocument.getMetadata(), new DOMHandle(doc)),
99-
chunks
97+
addedChunks
10098
);
10199
}
102100

@@ -110,15 +108,16 @@ private Element getClassificationResponseElement(byte[] classificationBytes) {
110108
}
111109
}
112110

113-
private void addChunk(Document doc, String textSegment, Element chunksElement, List<Chunk> chunks, Element classificationResponse, float[] embedding, String modelName) {
111+
private DOMChunk addChunk(Document doc, ChunkInputs chunkInputs, Element chunksElement) {
114112
Element chunk = doc.createElementNS(chunkConfig.getXmlNamespace(), "chunk");
115113
chunksElement.appendChild(chunk);
116114

117115
Element text = doc.createElementNS(chunkConfig.getXmlNamespace(), "text");
118-
text.setTextContent(textSegment);
116+
text.setTextContent(chunkInputs.getText());
119117
chunk.appendChild(text);
120118

121-
if (classificationResponse != null) {
119+
if (chunkInputs.getClassification() != null) {
120+
Element classificationResponse = getClassificationResponseElement(chunkInputs.getClassification());
122121
Node classificationNode = doc.createElement("classification");
123122
chunk.appendChild(classificationNode);
124123
for (int i = 0; i < classificationResponse.getChildNodes().getLength(); i++) {
@@ -127,11 +126,21 @@ private void addChunk(Document doc, String textSegment, Element chunksElement, L
127126
}
128127
}
129128

129+
if (chunkInputs.getMetadata() != null) {
130+
Element metadataElement = doc.createElementNS(chunkConfig.getXmlNamespace(), "chunk-metadata");
131+
// Re: possibly converting JSON to XML - Copilot recommends using the serialized string, as there's no
132+
// "correct" way for converting JSON to XML, particularly in regard to arrays. If the user wants XML
133+
// documents, they can always e.g. use a REST transform to determine how they want to represent the JSON
134+
// as XML.
135+
metadataElement.setTextContent(chunkInputs.getMetadata().toString());
136+
chunk.appendChild(metadataElement);
137+
}
138+
130139
var domChunk = new DOMChunk(doc, chunk, this.xmlChunkConfig, this.xPathFactory);
131-
if (embedding != null) {
132-
domChunk.addEmbedding(embedding, modelName);
140+
if (chunkInputs.getEmbedding() != null) {
141+
domChunk.addEmbedding(chunkInputs.getEmbedding(), chunkInputs.getModelName());
133142
}
134-
chunks.add(domChunk);
143+
return domChunk;
135144
}
136145

137146
private String determineChunksElementName(Document doc) {

marklogic-spark-connector/src/main/resources/marklogic-spark-messages.properties

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,3 @@ spark.marklogic.write.splitter.sidecar.maxChunks=
2222
spark.marklogic.write.embedder.chunks.jsonPointer=
2323
spark.marklogic.write.embedder.chunks.xpath=
2424
spark.marklogic.write.embedder.batchSize=
25-
spark.marklogic.write.nuclia.apikey=
26-
spark.marklogic.write.nuclia.kbid=
27-
spark.marklogic.write.nuclia.region=

0 commit comments

Comments
 (0)