diff --git a/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/DocumentPipeline.java b/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/DocumentPipeline.java index 89f32ad7..05528602 100644 --- a/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/DocumentPipeline.java +++ b/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/DocumentPipeline.java @@ -11,9 +11,10 @@ import com.marklogic.spark.core.embedding.EmbeddingProducer; import com.marklogic.spark.core.extraction.ExtractionResult; import com.marklogic.spark.core.extraction.TextExtractor; -import com.marklogic.spark.core.nuclia.NucliaClient; +import com.marklogic.spark.core.nuclia.NuaClient; import com.marklogic.spark.core.nuclia.NucliaDocumentProcessor; import com.marklogic.spark.core.splitter.TextSplitter; +import org.apache.commons.io.IOUtils; import java.io.Closeable; import java.io.IOException; @@ -32,7 +33,7 @@ public class DocumentPipeline implements Closeable { private final TextClassifier textClassifier; private final EmbeddingProducer embeddingProducer; private final ChunkSelector chunkSelector; - private final NucliaClient nucliaClient; + private final NuaClient nuaClient; private final NucliaDocumentProcessor nucliaProcessor; public DocumentPipeline(TextExtractor textExtractor, TextSplitter textSplitter, TextClassifier textClassifier, EmbeddingProducer embeddingProducer, ChunkSelector chunkSelector) { @@ -41,20 +42,20 @@ public DocumentPipeline(TextExtractor textExtractor, TextSplitter textSplitter, this.textClassifier = textClassifier; this.embeddingProducer = embeddingProducer; this.chunkSelector = chunkSelector; - this.nucliaClient = null; + this.nuaClient = null; this.nucliaProcessor = null; } /** * Constructor for Nuclia-based pipeline. Nuclia handles extraction, splitting, and embedding generation. * - * @param nucliaClient the Nuclia client for processing + * @param nuaClient the Nuclia Understanding API client for processing * @param textClassifier optional text classifier (can be null) * @since 3.1.0 */ - public DocumentPipeline(NucliaClient nucliaClient, TextClassifier textClassifier) { - this.nucliaClient = nucliaClient; - this.nucliaProcessor = new NucliaDocumentProcessor(nucliaClient); + public DocumentPipeline(NuaClient nuaClient, TextClassifier textClassifier) { + this.nuaClient = nuaClient; + this.nucliaProcessor = new NucliaDocumentProcessor(nuaClient); this.textClassifier = textClassifier; this.textExtractor = null; this.textSplitter = null; @@ -64,17 +65,13 @@ public DocumentPipeline(NucliaClient nucliaClient, TextClassifier textClassifier @Override public void close() throws IOException { - if (textClassifier != null) { - textClassifier.close(); - } - if (nucliaClient != null) { - nucliaClient.close(); - } + IOUtils.closeQuietly(textClassifier); + IOUtils.closeQuietly(nuaClient); } // Package-private getters for testing - NucliaClient getNucliaClient() { - return nucliaClient; + NuaClient getNuaClient() { + return nuaClient; } TextClassifier getTextClassifier() { diff --git a/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/DocumentPipelineFactory.java b/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/DocumentPipelineFactory.java index 2487bc49..c8c00b7b 100644 --- a/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/DocumentPipelineFactory.java +++ b/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/DocumentPipelineFactory.java @@ -15,21 +15,25 @@ import com.marklogic.spark.core.embedding.EmbeddingProducerFactory; import com.marklogic.spark.core.extraction.TextExtractor; import com.marklogic.spark.core.extraction.TikaTextExtractor; -import com.marklogic.spark.core.nuclia.NucliaClient; +import com.marklogic.spark.core.nuclia.DefaultNuaClient; +import com.marklogic.spark.core.nuclia.MockNuaClient; +import com.marklogic.spark.core.nuclia.NuaClient; import com.marklogic.spark.core.splitter.TextSplitter; import com.marklogic.spark.core.splitter.TextSplitterFactory; public abstract class DocumentPipelineFactory { + private static final String MOCK_NUA_CLIENT_OPTION = "spark.marklogic.testing.mockNuaClientResponse"; + // For some reason, Sonar thinks the check for four nulls always resolves to false, even though it's definitely // possible. So ignoring that warning. @SuppressWarnings("java:S2589") public static DocumentPipeline newDocumentPipeline(Context context) { // Check for Nuclia configuration first - NucliaClient nucliaClient = newNucliaClient(context); - if (nucliaClient != null) { + NuaClient nuaClient = newNuaClient(context); + if (nuaClient != null) { TextClassifier textClassifier = TextClassifierFactory.newTextClassifier(context); - return new DocumentPipeline(nucliaClient, textClassifier); + return new DocumentPipeline(nuaClient, textClassifier); } // Standard pipeline with separate components @@ -57,13 +61,19 @@ public static DocumentPipeline newDocumentPipeline(Context context) { new DocumentPipeline(textExtractor, textSplitter, textClassifier, embeddingProducer, chunkSelector); } - private static NucliaClient newNucliaClient(Context context) { + private static NuaClient newNuaClient(Context context) { + // Check for mock option first (for testing) + if (context.hasOption(MOCK_NUA_CLIENT_OPTION)) { + String mockResponseJson = context.getStringOption(MOCK_NUA_CLIENT_OPTION); + return new MockNuaClient(mockResponseJson); + } + String nuaKey = context.getProperties().get(Options.WRITE_NUCLIA_NUA_KEY); if (nuaKey == null || nuaKey.trim().isEmpty()) { return null; } - NucliaClient.Builder builder = NucliaClient.builder(nuaKey); + DefaultNuaClient.Builder builder = DefaultNuaClient.builder(nuaKey); int timeout = context.getIntOption(Options.WRITE_NUCLIA_TIMEOUT, 120, 1); builder.withTimeout(timeout); diff --git a/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/nuclia/NucliaClient.java b/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/nuclia/DefaultNuaClient.java similarity index 94% rename from marklogic-spark-connector/src/main/java/com/marklogic/spark/core/nuclia/NucliaClient.java rename to marklogic-spark-connector/src/main/java/com/marklogic/spark/core/nuclia/DefaultNuaClient.java index b3d92f3d..06dbe998 100644 --- a/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/nuclia/NucliaClient.java +++ b/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/nuclia/DefaultNuaClient.java @@ -17,11 +17,13 @@ import java.util.stream.Stream; /** - * Client for interacting with the Nuclia RAG API. + * Default implementation of {@link NuaClient} for interacting with the Nuclia Understanding API (NUA). * Handles text ingestion, processing, and retrieval of embeddings and chunks. * Implements AutoCloseable to properly shut down HTTP client resources. + * + * @since 3.1.0 */ -public class NucliaClient implements AutoCloseable { +public class DefaultNuaClient implements NuaClient { private final String nuaKey; private final String baseUrl; @@ -29,7 +31,7 @@ public class NucliaClient implements AutoCloseable { private final ObjectMapper objectMapper; private final int timeoutSeconds; - private NucliaClient(Builder builder) { + private DefaultNuaClient(Builder builder) { this.nuaKey = builder.nuaKey; this.baseUrl = builder.apiUrl; this.timeoutSeconds = builder.timeoutSeconds; @@ -38,12 +40,12 @@ private NucliaClient(Builder builder) { this.objectMapper = new ObjectMapper(); if (Util.MAIN_LOGGER.isDebugEnabled()) { - Util.MAIN_LOGGER.debug("Initialized NucliaClient: baseUrl={}, timeoutSeconds={}", baseUrl, timeoutSeconds); + Util.MAIN_LOGGER.debug("Initialized DefaultNuaClient: baseUrl={}, timeoutSeconds={}", baseUrl, timeoutSeconds); } } /** - * Creates a new Builder for constructing a NucliaClient. + * Creates a new Builder for constructing a DefaultNuaClient. * * @param nuaKey the Nuclia NUA key for authentication (required) * @return a new Builder instance @@ -53,7 +55,7 @@ public static Builder builder(String nuaKey) { } /** - * Builder for creating NucliaClient instances. + * Builder for creating DefaultNuaClient instances. */ public static class Builder { private static final String DEFAULT_API_URL = "https://aws-us-east-2-1.rag.progress.cloud/api/v1"; @@ -96,12 +98,12 @@ public Builder withTimeout(int timeoutSeconds) { } /** - * Builds the NucliaClient instance. + * Builds the DefaultNuaClient instance. * - * @return a new NucliaClient + * @return a new DefaultNuaClient */ - public NucliaClient build() { - return new NucliaClient(this); + public DefaultNuaClient build() { + return new DefaultNuaClient(this); } } @@ -115,6 +117,7 @@ public NucliaClient build() { * @throws IOException if any request fails * @throws InterruptedException if the thread is interrupted while waiting */ + @Override public Stream processData(String filename, byte[] content) throws IOException, InterruptedException { if (Util.MAIN_LOGGER.isDebugEnabled()) { Util.MAIN_LOGGER.debug("Starting processData; file: {}, size: {} bytes", filename, content.length); @@ -342,7 +345,7 @@ private String escapeJson(String input) { @Override public void close() { if (Util.MAIN_LOGGER.isDebugEnabled()) { - Util.MAIN_LOGGER.debug("Closing NucliaClient and releasing HTTP client resources"); + Util.MAIN_LOGGER.debug("Closing DefaultNuaClient and releasing HTTP client resources"); } try { httpClient.dispatcher().executorService().shutdownNow(); @@ -357,14 +360,16 @@ public void close() { } } if (Util.MAIN_LOGGER.isDebugEnabled()) { - Util.MAIN_LOGGER.debug("NucliaClient closed successfully"); + Util.MAIN_LOGGER.debug("DefaultNuaClient closed successfully"); } } + @Override public String getBaseUrl() { return baseUrl; } + @Override public int getTimeoutSeconds() { return timeoutSeconds; } diff --git a/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/nuclia/MockNuaClient.java b/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/nuclia/MockNuaClient.java new file mode 100644 index 00000000..52f45c30 --- /dev/null +++ b/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/nuclia/MockNuaClient.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2023-2026 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved. + */ +package com.marklogic.spark.core.nuclia; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import java.io.IOException; +import java.util.stream.Stream; + +/** + * Mock implementation of {@link NuaClient} for testing purposes. + * Returns canned JSON responses without making actual API calls to Nuclia. + * + * @since 3.1.0 + */ +// Sonar doesn't like static assignments in this class, but this class is only used as a mock for testing. +@SuppressWarnings("java:S2696") +public record MockNuaClient(String mockResponseJson) implements NuaClient { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @Override + public Stream processData(String filename, byte[] content) throws IOException { + ObjectNode node = (ObjectNode) OBJECT_MAPPER.readTree(mockResponseJson); + // Simulate the SSE stream by returning the same node twice. + return Stream.of(node, node); + } + + @Override + public String getBaseUrl() { + return "https://example.org/doesnt-matter"; + } + + @Override + public int getTimeoutSeconds() { + return 0; + } + + @Override + public void close() { + } +} diff --git a/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/nuclia/NuaClient.java b/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/nuclia/NuaClient.java new file mode 100644 index 00000000..71b36cbe --- /dev/null +++ b/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/nuclia/NuaClient.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2023-2026 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved. + */ +package com.marklogic.spark.core.nuclia; + +import com.fasterxml.jackson.databind.node.ObjectNode; + +import java.io.Closeable; +import java.io.IOException; +import java.util.stream.Stream; + +/** + * Client interface for interacting with the Nuclia Understanding API (NUA). + * Handles text ingestion, processing, and retrieval of embeddings and chunks. + * + * @since 3.1.0 + */ +public interface NuaClient extends Closeable { + + /** + * Processes a file through Nuclia's pipeline synchronously. + * Uploads the file, submits for processing, waits for completion, and retrieves the results. + * + * @param filename the name of the file + * @param content the binary content of the file + * @return a Stream of ObjectNode events containing chunks and embeddings + * @throws IOException if any request fails + * @throws InterruptedException if the thread is interrupted while waiting + */ + Stream processData(String filename, byte[] content) throws IOException, InterruptedException; + + /** + * Returns the base URL of the Nuclia API endpoint. + * + * @return the base URL + */ + String getBaseUrl(); + + /** + * Returns the configured timeout in seconds. + * + * @return the timeout in seconds + */ + int getTimeoutSeconds(); +} diff --git a/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/nuclia/NucliaDocumentProcessor.java b/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/nuclia/NucliaDocumentProcessor.java index cc1d53cc..c1c5d329 100644 --- a/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/nuclia/NucliaDocumentProcessor.java +++ b/marklogic-spark-connector/src/main/java/com/marklogic/spark/core/nuclia/NucliaDocumentProcessor.java @@ -19,10 +19,10 @@ */ public class NucliaDocumentProcessor { - private final NucliaClient nucliaClient; + private final NuaClient nuaClient; - public NucliaDocumentProcessor(NucliaClient nucliaClient) { - this.nucliaClient = nucliaClient; + public NucliaDocumentProcessor(NuaClient nuaClient) { + this.nuaClient = nuaClient; } /** @@ -45,7 +45,7 @@ public void processDocuments(List inputs) { // Process through Nuclia using binary file upload workflow // Collect to list since we need all the data anyway - List results = nucliaClient.processData(filename, content) + List results = nuaClient.processData(filename, content) .collect(Collectors.toList()); if (results.isEmpty()) { diff --git a/marklogic-spark-connector/src/test/java/com/marklogic/spark/UpdateCopyrightsTest.java b/marklogic-spark-connector/src/test/java/com/marklogic/spark/UpdateCopyrightsTest.java deleted file mode 100644 index eb31e38f..00000000 --- a/marklogic-spark-connector/src/test/java/com/marklogic/spark/UpdateCopyrightsTest.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2023-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved. - */ -package com.marklogic.spark; - -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; -import org.springframework.util.FileCopyUtils; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileReader; -import java.io.FileWriter; - -import static org.junit.jupiter.api.Assertions.assertTrue; - -class UpdateCopyrightsTest { - - @Disabled("Intended to be run manually when needed.") - @Test - void test() throws Exception { - final String oldCopyright = "2024 MarkLogic Corporation. All Rights Reserved."; - final String newCopyright = "2025 MarkLogic Corporation. All Rights Reserved."; - - // To produce output.txt, run the following in the root of the repository. - // git log --name-only --after="2024-12-31T23:59:59-00:00" --oneline > output.txt - assertTrue(new File("../output.txt").exists()); - try (BufferedReader reader = new BufferedReader(new FileReader("../output.txt"))) { - String line = reader.readLine(); - while (line != null) { - if (line.endsWith(".java")) { - final String filePath = "../" + line; - if (new File(filePath).exists()) { - String text = FileCopyUtils.copyToString(new FileReader(filePath)); - if (text.contains(oldCopyright)) { - text = text.replaceAll(oldCopyright, newCopyright); - System.out.println("Updating: " + filePath); - FileCopyUtils.copy(text, new FileWriter(filePath)); - } - } - } - line = reader.readLine(); - } - } - } -} diff --git a/marklogic-spark-connector/src/test/java/com/marklogic/spark/core/DocumentPipelineFactoryTest.java b/marklogic-spark-connector/src/test/java/com/marklogic/spark/core/DocumentPipelineFactoryTest.java index 1a8befee..adbf3d82 100644 --- a/marklogic-spark-connector/src/test/java/com/marklogic/spark/core/DocumentPipelineFactoryTest.java +++ b/marklogic-spark-connector/src/test/java/com/marklogic/spark/core/DocumentPipelineFactoryTest.java @@ -25,9 +25,9 @@ void nucliaWithAllRequiredOptions() { DocumentPipeline pipeline = DocumentPipelineFactory.newDocumentPipeline(context); - assertNotNull(pipeline.getNucliaClient(), "NucliaClient should be present"); - assertEquals("https://aws-us-east-2-1.rag.progress.cloud/api/v1", pipeline.getNucliaClient().getBaseUrl()); - assertEquals(120, pipeline.getNucliaClient().getTimeoutSeconds(), "Default timeout should be 120 seconds"); + assertNotNull(pipeline.getNuaClient(), "NuaClient should be present"); + assertEquals("https://aws-us-east-2-1.rag.progress.cloud/api/v1", pipeline.getNuaClient().getBaseUrl()); + assertEquals(120, pipeline.getNuaClient().getTimeoutSeconds(), "Default timeout should be 120 seconds"); assertNull(pipeline.getTextExtractor(), "TextExtractor should not be present in Nuclia pipeline"); assertNull(pipeline.getTextSplitter(), "TextSplitter should not be present in Nuclia pipeline"); @@ -44,8 +44,8 @@ void nucliaWithCustomTimeout() { DocumentPipeline pipeline = DocumentPipelineFactory.newDocumentPipeline(context); - assertNotNull(pipeline.getNucliaClient()); - assertEquals(300, pipeline.getNucliaClient().getTimeoutSeconds()); + assertNotNull(pipeline.getNuaClient()); + assertEquals(300, pipeline.getNuaClient().getTimeoutSeconds()); } @Test @@ -79,7 +79,7 @@ void textExtractorOnly() { assertNotNull(pipeline, "Pipeline should be created with text extractor"); assertNotNull(pipeline.getTextExtractor(), "TextExtractor should be present"); - assertNull(pipeline.getNucliaClient(), "NucliaClient should not be present"); + assertNull(pipeline.getNuaClient(), "NuaClient should not be present"); assertNull(pipeline.getTextSplitter(), "TextSplitter should not be present"); assertNull(pipeline.getEmbeddingProducer(), "EmbeddingProducer should not be present"); } @@ -96,7 +96,7 @@ void nucliaWithClassifier() { DocumentPipeline pipeline = DocumentPipelineFactory.newDocumentPipeline(context); assertNotNull(pipeline, "Pipeline should be created with Nuclia and classifier"); - assertNotNull(pipeline.getNucliaClient(), "NucliaClient should be present"); + assertNotNull(pipeline.getNuaClient(), "NuaClient should be present"); assertNotNull(pipeline.getTextClassifier(), "TextClassifier should be present even with Nuclia"); } @@ -113,7 +113,7 @@ void nucliaHasPriorityOverStandardPipeline() { DocumentPipeline pipeline = DocumentPipelineFactory.newDocumentPipeline(context); assertNotNull(pipeline, "Pipeline should be created"); - assertNotNull(pipeline.getNucliaClient(), "NucliaClient should be present"); + assertNotNull(pipeline.getNuaClient(), "NuaClient should be present"); assertNull(pipeline.getTextExtractor(), "TextExtractor should be ignored when Nuclia is configured"); assertNull(pipeline.getTextSplitter(), "TextSplitter should be ignored when Nuclia is configured"); } diff --git a/marklogic-spark-connector/src/test/java/com/marklogic/spark/writer/nuclia/ProcessWithNucliaTest.java b/marklogic-spark-connector/src/test/java/com/marklogic/spark/writer/nuclia/ProcessWithNucliaTest.java new file mode 100644 index 00000000..e8907297 --- /dev/null +++ b/marklogic-spark-connector/src/test/java/com/marklogic/spark/writer/nuclia/ProcessWithNucliaTest.java @@ -0,0 +1,108 @@ +/* + * Copyright (c) 2023-2026 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved. + */ +package com.marklogic.spark.writer.nuclia; + +import com.fasterxml.jackson.databind.JsonNode; +import com.marklogic.junit5.PermissionsTester; +import com.marklogic.junit5.XmlNode; +import com.marklogic.spark.AbstractIntegrationTest; +import com.marklogic.spark.Options; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class ProcessWithNucliaTest extends AbstractIntegrationTest { + + private static final String MOCK_RESPONSE = """ + { + "type": "Chunk", + "text": "The chunk text", + "metadata": { + "meta1": "value1", + "meta2": "value2" + }, + "embeddings": [{ + "id": "multilingual-2024-05-06", + "embedding": [0.123, -0.456] + }] + } + """; + + @Test + void jsonChunk() { + readDocument("/marklogic-docs/java-client-intro.json") + .write().format(CONNECTOR_IDENTIFIER) + .option(Options.CLIENT_URI, makeClientUri()) + .option(Options.WRITE_PERMISSIONS, DEFAULT_PERMISSIONS) + .option(Options.WRITE_URI_TEMPLATE, "/split-test.json") + .option("spark.marklogic.testing.mockNuaClientResponse", MOCK_RESPONSE) + .option(Options.WRITE_SPLITTER_SIDECAR_COLLECTIONS, "nuclia-chunks") + .option(Options.WRITE_SPLITTER_SIDECAR_PERMISSIONS, DEFAULT_PERMISSIONS) + .mode(SaveMode.Append) + .save(); + + assertCollectionSize("nuclia-chunks", 1); + + final String expectedChunkUri = "/split-test.json-extracted-text.json-chunks-1.json"; + JsonNode doc = readJsonDocument(expectedChunkUri); + assertEquals("/split-test.json-extracted-text.json", doc.get("source-uri").asText()); + assertEquals(1, doc.get("chunks").size()); + + JsonNode chunk = doc.get("chunks").get(0); + assertEquals("The chunk text", chunk.get("text").asText()); + assertEquals("value1", chunk.get("chunk-metadata").get("meta1").asText()); + assertEquals("value2", chunk.get("chunk-metadata").get("meta2").asText()); + assertEquals("multilingual-2024-05-06", chunk.get("model-name").asText()); + assertEquals(2, chunk.get("_vector").size()); + assertEquals(0.123, chunk.get("_vector").get(0).asDouble()); + assertEquals(-0.456, chunk.get("_vector").get(1).asDouble()); + + PermissionsTester perms = readDocumentPermissions(expectedChunkUri); + perms.assertReadPermissionExists("spark-user-role"); + perms.assertUpdatePermissionExists("spark-user-role"); + } + + @Test + void xmlChunk() { + readDocument("/marklogic-docs/java-client-intro.json") + .write().format(CONNECTOR_IDENTIFIER) + .option(Options.CLIENT_URI, makeClientUri()) + .option(Options.WRITE_PERMISSIONS, DEFAULT_PERMISSIONS) + .option(Options.WRITE_URI_TEMPLATE, "/split-test.json") + .option("spark.marklogic.testing.mockNuaClientResponse", MOCK_RESPONSE) + .option(Options.WRITE_SPLITTER_SIDECAR_COLLECTIONS, "nuclia-chunks") + .option(Options.WRITE_SPLITTER_SIDECAR_DOCUMENT_TYPE, "xml") + .mode(SaveMode.Append) + .save(); + + assertCollectionSize("nuclia-chunks", 1); + + final String expectedChunkUri = "/split-test.json-extracted-text.json-chunks-1.xml"; + XmlNode doc = readXmlDocument(expectedChunkUri); + doc.assertElementValue("/model:root/model:source-uri", "/split-test.json-extracted-text.json"); + doc.assertElementCount("/model:root/model:chunks/model:chunk", 1); + doc.assertElementValue("/model:root/model:chunks/model:chunk[1]/model:text", "The chunk text"); + doc.assertElementValue("/model:root/model:chunks/model:chunk[1]/vec:model-name", "multilingual-2024-05-06"); + doc.assertElementValue("/model:root/model:chunks/model:chunk[1]/vec:vector[@xml:lang='zxx']", "[0.123, -0.456]"); + + doc.assertElementValue( + "The Nuclia metadata JSON is stored as a serialized string as there's not a standard approach for " + + "serializing JSON into XML. If the user wants it to be XML, they can use a REST transform to get the " + + "desired structure for the XML.", + "/model:root/model:chunks/model:chunk[1]/model:chunk-metadata", + "{\"meta1\":\"value1\",\"meta2\":\"value2\"}" + ); + } + + private Dataset readDocument(String uri) { + return newSparkSession().read().format(CONNECTOR_IDENTIFIER) + .option(Options.CLIENT_URI, makeClientUri()) + .option(Options.READ_DOCUMENTS_CATEGORIES, "content,metadata") + .option(Options.READ_DOCUMENTS_URIS, uri) + .load(); + } +}