Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
import com.marklogic.spark.core.embedding.EmbeddingProducer;
import com.marklogic.spark.core.extraction.ExtractionResult;
import com.marklogic.spark.core.extraction.TextExtractor;
import com.marklogic.spark.core.nuclia.NucliaClient;
import com.marklogic.spark.core.nuclia.NuaClient;
import com.marklogic.spark.core.nuclia.NucliaDocumentProcessor;
import com.marklogic.spark.core.splitter.TextSplitter;
import org.apache.commons.io.IOUtils;

import java.io.Closeable;
import java.io.IOException;
Expand All @@ -32,7 +33,7 @@ public class DocumentPipeline implements Closeable {
private final TextClassifier textClassifier;
private final EmbeddingProducer embeddingProducer;
private final ChunkSelector chunkSelector;
private final NucliaClient nucliaClient;
private final NuaClient nuaClient;
private final NucliaDocumentProcessor nucliaProcessor;

public DocumentPipeline(TextExtractor textExtractor, TextSplitter textSplitter, TextClassifier textClassifier, EmbeddingProducer embeddingProducer, ChunkSelector chunkSelector) {
Expand All @@ -41,20 +42,20 @@ public DocumentPipeline(TextExtractor textExtractor, TextSplitter textSplitter,
this.textClassifier = textClassifier;
this.embeddingProducer = embeddingProducer;
this.chunkSelector = chunkSelector;
this.nucliaClient = null;
this.nuaClient = null;
this.nucliaProcessor = null;
}

/**
* Constructor for Nuclia-based pipeline. Nuclia handles extraction, splitting, and embedding generation.
*
* @param nucliaClient the Nuclia client for processing
* @param nuaClient the Nuclia Understanding API client for processing
* @param textClassifier optional text classifier (can be null)
* @since 3.1.0
*/
public DocumentPipeline(NucliaClient nucliaClient, TextClassifier textClassifier) {
this.nucliaClient = nucliaClient;
this.nucliaProcessor = new NucliaDocumentProcessor(nucliaClient);
public DocumentPipeline(NuaClient nuaClient, TextClassifier textClassifier) {
this.nuaClient = nuaClient;
this.nucliaProcessor = new NucliaDocumentProcessor(nuaClient);
this.textClassifier = textClassifier;
this.textExtractor = null;
this.textSplitter = null;
Expand All @@ -64,17 +65,13 @@ public DocumentPipeline(NucliaClient nucliaClient, TextClassifier textClassifier

@Override
public void close() throws IOException {
if (textClassifier != null) {
textClassifier.close();
}
if (nucliaClient != null) {
nucliaClient.close();
}
IOUtils.closeQuietly(textClassifier);
IOUtils.closeQuietly(nuaClient);
}

// Package-private getters for testing
NucliaClient getNucliaClient() {
return nucliaClient;
NuaClient getNuaClient() {
return nuaClient;
}

TextClassifier getTextClassifier() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,25 @@
import com.marklogic.spark.core.embedding.EmbeddingProducerFactory;
import com.marklogic.spark.core.extraction.TextExtractor;
import com.marklogic.spark.core.extraction.TikaTextExtractor;
import com.marklogic.spark.core.nuclia.NucliaClient;
import com.marklogic.spark.core.nuclia.DefaultNuaClient;
import com.marklogic.spark.core.nuclia.MockNuaClient;
import com.marklogic.spark.core.nuclia.NuaClient;
import com.marklogic.spark.core.splitter.TextSplitter;
import com.marklogic.spark.core.splitter.TextSplitterFactory;

public abstract class DocumentPipelineFactory {

private static final String MOCK_NUA_CLIENT_OPTION = "spark.marklogic.testing.mockNuaClientResponse";

// For some reason, Sonar thinks the check for four nulls always resolves to false, even though it's definitely
// possible. So ignoring that warning.
@SuppressWarnings("java:S2589")
public static DocumentPipeline newDocumentPipeline(Context context) {
// Check for Nuclia configuration first
NucliaClient nucliaClient = newNucliaClient(context);
if (nucliaClient != null) {
NuaClient nuaClient = newNuaClient(context);
if (nuaClient != null) {
TextClassifier textClassifier = TextClassifierFactory.newTextClassifier(context);
return new DocumentPipeline(nucliaClient, textClassifier);
return new DocumentPipeline(nuaClient, textClassifier);
}

// Standard pipeline with separate components
Expand Down Expand Up @@ -57,13 +61,19 @@ public static DocumentPipeline newDocumentPipeline(Context context) {
new DocumentPipeline(textExtractor, textSplitter, textClassifier, embeddingProducer, chunkSelector);
}

private static NucliaClient newNucliaClient(Context context) {
private static NuaClient newNuaClient(Context context) {
// Check for mock option first (for testing)
if (context.hasOption(MOCK_NUA_CLIENT_OPTION)) {
String mockResponseJson = context.getStringOption(MOCK_NUA_CLIENT_OPTION);
return new MockNuaClient(mockResponseJson);
}

String nuaKey = context.getProperties().get(Options.WRITE_NUCLIA_NUA_KEY);
if (nuaKey == null || nuaKey.trim().isEmpty()) {
return null;
}

NucliaClient.Builder builder = NucliaClient.builder(nuaKey);
DefaultNuaClient.Builder builder = DefaultNuaClient.builder(nuaKey);

int timeout = context.getIntOption(Options.WRITE_NUCLIA_TIMEOUT, 120, 1);
builder.withTimeout(timeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,21 @@
import java.util.stream.Stream;

/**
* Client for interacting with the Nuclia RAG API.
* Default implementation of {@link NuaClient} for interacting with the Nuclia Understanding API (NUA).
* Handles text ingestion, processing, and retrieval of embeddings and chunks.
* Implements AutoCloseable to properly shut down HTTP client resources.
*
* @since 3.1.0
*/
public class NucliaClient implements AutoCloseable {
public class DefaultNuaClient implements NuaClient {

private final String nuaKey;
private final String baseUrl;
private final OkHttpClient httpClient;
private final ObjectMapper objectMapper;
private final int timeoutSeconds;

private NucliaClient(Builder builder) {
private DefaultNuaClient(Builder builder) {
this.nuaKey = builder.nuaKey;
this.baseUrl = builder.apiUrl;
this.timeoutSeconds = builder.timeoutSeconds;
Expand All @@ -38,12 +40,12 @@ private NucliaClient(Builder builder) {
this.objectMapper = new ObjectMapper();

if (Util.MAIN_LOGGER.isDebugEnabled()) {
Util.MAIN_LOGGER.debug("Initialized NucliaClient: baseUrl={}, timeoutSeconds={}", baseUrl, timeoutSeconds);
Util.MAIN_LOGGER.debug("Initialized DefaultNuaClient: baseUrl={}, timeoutSeconds={}", baseUrl, timeoutSeconds);
}
}

/**
* Creates a new Builder for constructing a NucliaClient.
* Creates a new Builder for constructing a DefaultNuaClient.
*
* @param nuaKey the Nuclia NUA key for authentication (required)
* @return a new Builder instance
Expand All @@ -53,7 +55,7 @@ public static Builder builder(String nuaKey) {
}

/**
* Builder for creating NucliaClient instances.
* Builder for creating DefaultNuaClient instances.
*/
public static class Builder {
private static final String DEFAULT_API_URL = "https://aws-us-east-2-1.rag.progress.cloud/api/v1";
Expand Down Expand Up @@ -96,12 +98,12 @@ public Builder withTimeout(int timeoutSeconds) {
}

/**
* Builds the NucliaClient instance.
* Builds the DefaultNuaClient instance.
*
* @return a new NucliaClient
* @return a new DefaultNuaClient
*/
public NucliaClient build() {
return new NucliaClient(this);
public DefaultNuaClient build() {
return new DefaultNuaClient(this);
}
}

Expand All @@ -115,6 +117,7 @@ public NucliaClient build() {
* @throws IOException if any request fails
* @throws InterruptedException if the thread is interrupted while waiting
*/
@Override
public Stream<ObjectNode> processData(String filename, byte[] content) throws IOException, InterruptedException {
if (Util.MAIN_LOGGER.isDebugEnabled()) {
Util.MAIN_LOGGER.debug("Starting processData; file: {}, size: {} bytes", filename, content.length);
Expand Down Expand Up @@ -342,7 +345,7 @@ private String escapeJson(String input) {
@Override
public void close() {
if (Util.MAIN_LOGGER.isDebugEnabled()) {
Util.MAIN_LOGGER.debug("Closing NucliaClient and releasing HTTP client resources");
Util.MAIN_LOGGER.debug("Closing DefaultNuaClient and releasing HTTP client resources");
}
try {
httpClient.dispatcher().executorService().shutdownNow();
Expand All @@ -357,14 +360,16 @@ public void close() {
}
}
if (Util.MAIN_LOGGER.isDebugEnabled()) {
Util.MAIN_LOGGER.debug("NucliaClient closed successfully");
Util.MAIN_LOGGER.debug("DefaultNuaClient closed successfully");
}
}

@Override
public String getBaseUrl() {
return baseUrl;
}

@Override
public int getTimeoutSeconds() {
return timeoutSeconds;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2023-2026 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
*/
package com.marklogic.spark.core.nuclia;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

import java.io.IOException;
import java.util.stream.Stream;

/**
* Mock implementation of {@link NuaClient} for testing purposes.
* Returns canned JSON responses without making actual API calls to Nuclia.
*
* @since 3.1.0
*/
// Sonar doesn't like static assignments in this class, but this class is only used as a mock for testing.
@SuppressWarnings("java:S2696")
public record MockNuaClient(String mockResponseJson) implements NuaClient {

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

@Override
public Stream<ObjectNode> processData(String filename, byte[] content) throws IOException {
ObjectNode node = (ObjectNode) OBJECT_MAPPER.readTree(mockResponseJson);
// Simulate the SSE stream by returning the same node twice.
return Stream.of(node, node);
}

@Override
public String getBaseUrl() {
return "https://example.org/doesnt-matter";
}

@Override
public int getTimeoutSeconds() {
return 0;
}

@Override
public void close() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright (c) 2023-2026 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
*/
package com.marklogic.spark.core.nuclia;

import com.fasterxml.jackson.databind.node.ObjectNode;

import java.io.Closeable;
import java.io.IOException;
import java.util.stream.Stream;

/**
* Client interface for interacting with the Nuclia Understanding API (NUA).
* Handles text ingestion, processing, and retrieval of embeddings and chunks.
*
* @since 3.1.0
*/
public interface NuaClient extends Closeable {

/**
* Processes a file through Nuclia's pipeline synchronously.
* Uploads the file, submits for processing, waits for completion, and retrieves the results.
*
* @param filename the name of the file
* @param content the binary content of the file
* @return a Stream of ObjectNode events containing chunks and embeddings
* @throws IOException if any request fails
* @throws InterruptedException if the thread is interrupted while waiting
*/
Stream<ObjectNode> processData(String filename, byte[] content) throws IOException, InterruptedException;

/**
* Returns the base URL of the Nuclia API endpoint.
*
* @return the base URL
*/
String getBaseUrl();

/**
* Returns the configured timeout in seconds.
*
* @return the timeout in seconds
*/
int getTimeoutSeconds();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
*/
public class NucliaDocumentProcessor {

private final NucliaClient nucliaClient;
private final NuaClient nuaClient;

public NucliaDocumentProcessor(NucliaClient nucliaClient) {
this.nucliaClient = nucliaClient;
public NucliaDocumentProcessor(NuaClient nuaClient) {
this.nuaClient = nuaClient;
}

/**
Expand All @@ -45,7 +45,7 @@ public void processDocuments(List<DocumentInputs> inputs) {

// Process through Nuclia using binary file upload workflow
// Collect to list since we need all the data anyway
List<ObjectNode> results = nucliaClient.processData(filename, content)
List<ObjectNode> results = nuaClient.processData(filename, content)
.collect(Collectors.toList());

if (results.isEmpty()) {
Expand Down

This file was deleted.

Loading