Skip to content

Commit 368c389

Browse files
committed
MLE-26966 Added mock of NuaClient for testing
Not testing any connection to Nuclia, but able to test that the model-name and chunk-metadata are handled correctly. NuaClient is now an interface, DefaultNuaClient is the impl, and MockNuaClient is for testing.
1 parent 9234a41 commit 368c389

File tree

9 files changed

+254
-91
lines changed

9 files changed

+254
-91
lines changed

marklogic-spark-connector/src/main/java/com/marklogic/spark/core/DocumentPipeline.java

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,10 @@
1111
import com.marklogic.spark.core.embedding.EmbeddingProducer;
1212
import com.marklogic.spark.core.extraction.ExtractionResult;
1313
import com.marklogic.spark.core.extraction.TextExtractor;
14-
import com.marklogic.spark.core.nuclia.NucliaClient;
14+
import com.marklogic.spark.core.nuclia.NuaClient;
1515
import com.marklogic.spark.core.nuclia.NucliaDocumentProcessor;
1616
import com.marklogic.spark.core.splitter.TextSplitter;
17+
import org.apache.commons.io.IOUtils;
1718

1819
import java.io.Closeable;
1920
import java.io.IOException;
@@ -32,7 +33,7 @@ public class DocumentPipeline implements Closeable {
3233
private final TextClassifier textClassifier;
3334
private final EmbeddingProducer embeddingProducer;
3435
private final ChunkSelector chunkSelector;
35-
private final NucliaClient nucliaClient;
36+
private final NuaClient nuaClient;
3637
private final NucliaDocumentProcessor nucliaProcessor;
3738

3839
public DocumentPipeline(TextExtractor textExtractor, TextSplitter textSplitter, TextClassifier textClassifier, EmbeddingProducer embeddingProducer, ChunkSelector chunkSelector) {
@@ -41,20 +42,20 @@ public DocumentPipeline(TextExtractor textExtractor, TextSplitter textSplitter,
4142
this.textClassifier = textClassifier;
4243
this.embeddingProducer = embeddingProducer;
4344
this.chunkSelector = chunkSelector;
44-
this.nucliaClient = null;
45+
this.nuaClient = null;
4546
this.nucliaProcessor = null;
4647
}
4748

4849
/**
4950
* Constructor for Nuclia-based pipeline. Nuclia handles extraction, splitting, and embedding generation.
5051
*
51-
* @param nucliaClient the Nuclia client for processing
52+
* @param nuaClient the Nuclia Understanding API client for processing
5253
* @param textClassifier optional text classifier (can be null)
5354
* @since 3.1.0
5455
*/
55-
public DocumentPipeline(NucliaClient nucliaClient, TextClassifier textClassifier) {
56-
this.nucliaClient = nucliaClient;
57-
this.nucliaProcessor = new NucliaDocumentProcessor(nucliaClient);
56+
public DocumentPipeline(NuaClient nuaClient, TextClassifier textClassifier) {
57+
this.nuaClient = nuaClient;
58+
this.nucliaProcessor = new NucliaDocumentProcessor(nuaClient);
5859
this.textClassifier = textClassifier;
5960
this.textExtractor = null;
6061
this.textSplitter = null;
@@ -64,17 +65,13 @@ public DocumentPipeline(NucliaClient nucliaClient, TextClassifier textClassifier
6465

6566
@Override
6667
public void close() throws IOException {
67-
if (textClassifier != null) {
68-
textClassifier.close();
69-
}
70-
if (nucliaClient != null) {
71-
nucliaClient.close();
72-
}
68+
IOUtils.closeQuietly(textClassifier);
69+
IOUtils.closeQuietly(nuaClient);
7370
}
7471

7572
// Package-private getters for testing
76-
NucliaClient getNucliaClient() {
77-
return nucliaClient;
73+
NuaClient getNuaClient() {
74+
return nuaClient;
7875
}
7976

8077
TextClassifier getTextClassifier() {

marklogic-spark-connector/src/main/java/com/marklogic/spark/core/DocumentPipelineFactory.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,25 @@
1515
import com.marklogic.spark.core.embedding.EmbeddingProducerFactory;
1616
import com.marklogic.spark.core.extraction.TextExtractor;
1717
import com.marklogic.spark.core.extraction.TikaTextExtractor;
18-
import com.marklogic.spark.core.nuclia.NucliaClient;
18+
import com.marklogic.spark.core.nuclia.DefaultNuaClient;
19+
import com.marklogic.spark.core.nuclia.MockNuaClient;
20+
import com.marklogic.spark.core.nuclia.NuaClient;
1921
import com.marklogic.spark.core.splitter.TextSplitter;
2022
import com.marklogic.spark.core.splitter.TextSplitterFactory;
2123

2224
public abstract class DocumentPipelineFactory {
2325

26+
private static final String MOCK_NUA_CLIENT_OPTION = "spark.marklogic.testing.mockNuaClientResponse";
27+
2428
// For some reason, Sonar thinks the check for four nulls always resolves to false, even though it's definitely
2529
// possible. So ignoring that warning.
2630
@SuppressWarnings("java:S2589")
2731
public static DocumentPipeline newDocumentPipeline(Context context) {
2832
// Check for Nuclia configuration first
29-
NucliaClient nucliaClient = newNucliaClient(context);
30-
if (nucliaClient != null) {
33+
NuaClient nuaClient = newNuaClient(context);
34+
if (nuaClient != null) {
3135
TextClassifier textClassifier = TextClassifierFactory.newTextClassifier(context);
32-
return new DocumentPipeline(nucliaClient, textClassifier);
36+
return new DocumentPipeline(nuaClient, textClassifier);
3337
}
3438

3539
// Standard pipeline with separate components
@@ -57,13 +61,19 @@ public static DocumentPipeline newDocumentPipeline(Context context) {
5761
new DocumentPipeline(textExtractor, textSplitter, textClassifier, embeddingProducer, chunkSelector);
5862
}
5963

60-
private static NucliaClient newNucliaClient(Context context) {
64+
private static NuaClient newNuaClient(Context context) {
65+
// Check for mock option first (for testing)
66+
if (context.hasOption(MOCK_NUA_CLIENT_OPTION)) {
67+
String mockResponseJson = context.getStringOption(MOCK_NUA_CLIENT_OPTION);
68+
return new MockNuaClient(mockResponseJson);
69+
}
70+
6171
String nuaKey = context.getProperties().get(Options.WRITE_NUCLIA_NUA_KEY);
6272
if (nuaKey == null || nuaKey.trim().isEmpty()) {
6373
return null;
6474
}
6575

66-
NucliaClient.Builder builder = NucliaClient.builder(nuaKey);
76+
DefaultNuaClient.Builder builder = DefaultNuaClient.builder(nuaKey);
6777

6878
int timeout = context.getIntOption(Options.WRITE_NUCLIA_TIMEOUT, 120, 1);
6979
builder.withTimeout(timeout);

marklogic-spark-connector/src/main/java/com/marklogic/spark/core/nuclia/NucliaClient.java renamed to marklogic-spark-connector/src/main/java/com/marklogic/spark/core/nuclia/DefaultNuaClient.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,21 @@
1717
import java.util.stream.Stream;
1818

1919
/**
20-
* Client for interacting with the Nuclia RAG API.
20+
* Default implementation of {@link NuaClient} for interacting with the Nuclia Understanding API (NUA).
2121
* Handles text ingestion, processing, and retrieval of embeddings and chunks.
2222
* Implements AutoCloseable to properly shut down HTTP client resources.
23+
*
24+
* @since 3.1.0
2325
*/
24-
public class NucliaClient implements AutoCloseable {
26+
public class DefaultNuaClient implements NuaClient {
2527

2628
private final String nuaKey;
2729
private final String baseUrl;
2830
private final OkHttpClient httpClient;
2931
private final ObjectMapper objectMapper;
3032
private final int timeoutSeconds;
3133

32-
private NucliaClient(Builder builder) {
34+
private DefaultNuaClient(Builder builder) {
3335
this.nuaKey = builder.nuaKey;
3436
this.baseUrl = builder.apiUrl;
3537
this.timeoutSeconds = builder.timeoutSeconds;
@@ -38,12 +40,12 @@ private NucliaClient(Builder builder) {
3840
this.objectMapper = new ObjectMapper();
3941

4042
if (Util.MAIN_LOGGER.isDebugEnabled()) {
41-
Util.MAIN_LOGGER.debug("Initialized NucliaClient: baseUrl={}, timeoutSeconds={}", baseUrl, timeoutSeconds);
43+
Util.MAIN_LOGGER.debug("Initialized DefaultNuaClient: baseUrl={}, timeoutSeconds={}", baseUrl, timeoutSeconds);
4244
}
4345
}
4446

4547
/**
46-
* Creates a new Builder for constructing a NucliaClient.
48+
* Creates a new Builder for constructing a DefaultNuaClient.
4749
*
4850
* @param nuaKey the Nuclia NUA key for authentication (required)
4951
* @return a new Builder instance
@@ -53,7 +55,7 @@ public static Builder builder(String nuaKey) {
5355
}
5456

5557
/**
56-
* Builder for creating NucliaClient instances.
58+
* Builder for creating DefaultNuaClient instances.
5759
*/
5860
public static class Builder {
5961
private static final String DEFAULT_API_URL = "https://aws-us-east-2-1.rag.progress.cloud/api/v1";
@@ -96,12 +98,12 @@ public Builder withTimeout(int timeoutSeconds) {
9698
}
9799

98100
/**
99-
* Builds the NucliaClient instance.
101+
* Builds the DefaultNuaClient instance.
100102
*
101-
* @return a new NucliaClient
103+
* @return a new DefaultNuaClient
102104
*/
103-
public NucliaClient build() {
104-
return new NucliaClient(this);
105+
public DefaultNuaClient build() {
106+
return new DefaultNuaClient(this);
105107
}
106108
}
107109

@@ -115,6 +117,7 @@ public NucliaClient build() {
115117
* @throws IOException if any request fails
116118
* @throws InterruptedException if the thread is interrupted while waiting
117119
*/
120+
@Override
118121
public Stream<ObjectNode> processData(String filename, byte[] content) throws IOException, InterruptedException {
119122
if (Util.MAIN_LOGGER.isDebugEnabled()) {
120123
Util.MAIN_LOGGER.debug("Starting processData; file: {}, size: {} bytes", filename, content.length);
@@ -342,7 +345,7 @@ private String escapeJson(String input) {
342345
@Override
343346
public void close() {
344347
if (Util.MAIN_LOGGER.isDebugEnabled()) {
345-
Util.MAIN_LOGGER.debug("Closing NucliaClient and releasing HTTP client resources");
348+
Util.MAIN_LOGGER.debug("Closing DefaultNuaClient and releasing HTTP client resources");
346349
}
347350
try {
348351
httpClient.dispatcher().executorService().shutdownNow();
@@ -357,14 +360,16 @@ public void close() {
357360
}
358361
}
359362
if (Util.MAIN_LOGGER.isDebugEnabled()) {
360-
Util.MAIN_LOGGER.debug("NucliaClient closed successfully");
363+
Util.MAIN_LOGGER.debug("DefaultNuaClient closed successfully");
361364
}
362365
}
363366

367+
@Override
364368
public String getBaseUrl() {
365369
return baseUrl;
366370
}
367371

372+
@Override
368373
public int getTimeoutSeconds() {
369374
return timeoutSeconds;
370375
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright (c) 2023-2026 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
3+
*/
4+
package com.marklogic.spark.core.nuclia;
5+
6+
import com.fasterxml.jackson.databind.ObjectMapper;
7+
import com.fasterxml.jackson.databind.node.ObjectNode;
8+
9+
import java.io.IOException;
10+
import java.util.stream.Stream;
11+
12+
/**
13+
* Mock implementation of {@link NuaClient} for testing purposes.
14+
* Returns canned JSON responses without making actual API calls to Nuclia.
15+
*
16+
* @since 3.1.0
17+
*/
18+
// Sonar doesn't like static assignments in this class, but this class is only used as a mock for testing.
19+
@SuppressWarnings("java:S2696")
20+
public record MockNuaClient(String mockResponseJson) implements NuaClient {
21+
22+
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
23+
24+
@Override
25+
public Stream<ObjectNode> processData(String filename, byte[] content) throws IOException {
26+
ObjectNode node = (ObjectNode) OBJECT_MAPPER.readTree(mockResponseJson);
27+
// Simulate the SSE stream by returning the same node twice.
28+
return Stream.of(node, node);
29+
}
30+
31+
@Override
32+
public String getBaseUrl() {
33+
return "https://example.org/doesnt-matter";
34+
}
35+
36+
@Override
37+
public int getTimeoutSeconds() {
38+
return 0;
39+
}
40+
41+
@Override
42+
public void close() {
43+
}
44+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright (c) 2023-2026 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
3+
*/
4+
package com.marklogic.spark.core.nuclia;
5+
6+
import com.fasterxml.jackson.databind.node.ObjectNode;
7+
8+
import java.io.Closeable;
9+
import java.io.IOException;
10+
import java.util.stream.Stream;
11+
12+
/**
13+
* Client interface for interacting with the Nuclia Understanding API (NUA).
14+
* Handles text ingestion, processing, and retrieval of embeddings and chunks.
15+
*
16+
* @since 3.1.0
17+
*/
18+
public interface NuaClient extends Closeable {
19+
20+
/**
21+
* Processes a file through Nuclia's pipeline synchronously.
22+
* Uploads the file, submits for processing, waits for completion, and retrieves the results.
23+
*
24+
* @param filename the name of the file
25+
* @param content the binary content of the file
26+
* @return a Stream of ObjectNode events containing chunks and embeddings
27+
* @throws IOException if any request fails
28+
* @throws InterruptedException if the thread is interrupted while waiting
29+
*/
30+
Stream<ObjectNode> processData(String filename, byte[] content) throws IOException, InterruptedException;
31+
32+
/**
33+
* Returns the base URL of the Nuclia API endpoint.
34+
*
35+
* @return the base URL
36+
*/
37+
String getBaseUrl();
38+
39+
/**
40+
* Returns the configured timeout in seconds.
41+
*
42+
* @return the timeout in seconds
43+
*/
44+
int getTimeoutSeconds();
45+
}

marklogic-spark-connector/src/main/java/com/marklogic/spark/core/nuclia/NucliaDocumentProcessor.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@
1919
*/
2020
public class NucliaDocumentProcessor {
2121

22-
private final NucliaClient nucliaClient;
22+
private final NuaClient nuaClient;
2323

24-
public NucliaDocumentProcessor(NucliaClient nucliaClient) {
25-
this.nucliaClient = nucliaClient;
24+
public NucliaDocumentProcessor(NuaClient nuaClient) {
25+
this.nuaClient = nuaClient;
2626
}
2727

2828
/**
@@ -45,7 +45,7 @@ public void processDocuments(List<DocumentInputs> inputs) {
4545

4646
// Process through Nuclia using binary file upload workflow
4747
// Collect to list since we need all the data anyway
48-
List<ObjectNode> results = nucliaClient.processData(filename, content)
48+
List<ObjectNode> results = nuaClient.processData(filename, content)
4949
.collect(Collectors.toList());
5050

5151
if (results.isEmpty()) {

marklogic-spark-connector/src/test/java/com/marklogic/spark/UpdateCopyrightsTest.java

Lines changed: 0 additions & 46 deletions
This file was deleted.

0 commit comments

Comments
 (0)