Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .copyrightconfig
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ startyear: 2023
# - Dotfiles already skipped automatically
# Enable by removing the leading '# ' from the next line and editing values.
# filesexcluded: third_party/*, docs/generated/*.md, assets/*.png, scripts/temp_*.py, vendor/lib.js
filesexcluded: .github/*, README.md, CONTRIBUTING.md, Jenkinsfile, gradle/*, docker-compose.yml, *.gradle, gradle.properties, gradlew, gradlew.bat, **/test/resources/**, docs/**, *.json, *.txt, CODEOWNERS
filesexcluded: .github/*, README.md, CONTRIBUTING.md, Jenkinsfile, gradle/*, docker-compose.yml, *.gradle, gradle.properties, gradlew, gradlew.bat, **/test/resources/**, docs/**, *.json, *.txt, CODEOWNERS, *.properties
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=3.0-SNAPSHOT
version=3.1-SNAPSHOT
sparkVersion=4.1.1
tikaVersion=3.2.3
semaphoreVersion=5.10.0
Expand Down
3 changes: 3 additions & 0 deletions marklogic-spark-connector/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ dependencies {
// Only needs compileOnly, as the Java Client brings this as an implementation dependency.
compileOnly 'com.squareup.okhttp3:okhttp:5.2.0'
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The okhttp-sse dependency version (5.2.1) differs from the okhttp version (5.2.0) specified earlier in the file. Consider aligning these versions to avoid potential compatibility issues.

Suggested change
compileOnly 'com.squareup.okhttp3:okhttp:5.2.0'
compileOnly 'com.squareup.okhttp3:okhttp:5.2.1'

Copilot uses AI. Check for mistakes.

// For Nuclia support
implementation 'com.squareup.okhttp3:okhttp-sse:5.2.1'

// Automatic loading of test framework implementation dependencies is deprecated.
// https://docs.gradle.org/current/userguide/upgrading_version_8.html#test_framework_implementation_dependencies
// Without this, once using JUnit 5.12 or higher, Gradle will not find any tests and report an error of:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
* Copyright (c) 2023-2026 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
*/
package com.marklogic.spark;

Expand Down Expand Up @@ -496,6 +496,29 @@ public abstract class Options {
*/
public static final String WRITE_EMBEDDER_BASE64_ENCODE = WRITE_EMBEDDER_PREFIX + "base64Encode";

private static final String WRITE_NUCLIA_PREFIX = "spark.marklogic.write.nuclia.";

/**
* Nuclia API key for authentication. Required if any Nuclia options are used.
*
* @since 3.1.0
*/
public static final String WRITE_NUCLIA_API_KEY = WRITE_NUCLIA_PREFIX + "apikey";

/**
* Nuclia region (e.g., "aws-us-east-2-1"). Required if any Nuclia options are used.
*
* @since 3.1.0
*/
public static final String WRITE_NUCLIA_REGION = WRITE_NUCLIA_PREFIX + "region";

/**
* Maximum number of seconds to wait for Nuclia processing to complete. Defaults to 120 seconds.
*
* @since 3.1.0
*/
public static final String WRITE_NUCLIA_TIMEOUT = WRITE_NUCLIA_PREFIX + "timeout";

/**
* Defines the host for classification requests
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,26 @@ public void setChunks(List<String> chunks) {
}
}

/**
* Adds a chunk with its embedding and model name. This is useful for workflows like Nuclia where
* chunks and embeddings are received together.
*
* @param text the chunk text
* @param embedding the embedding vector (can be null)
* @param modelName the model name (can be null)
*/
public void addChunk(String text, float[] embedding, String modelName) {
if (chunkInputsList == null) {
chunkInputsList = new ArrayList<>();
}
ChunkInputs chunkInputs = new ChunkInputs(text);
if (embedding != null) {
chunkInputs.setEmbedding(embedding);
chunkInputs.setModelName(modelName);
}
chunkInputsList.add(chunkInputs);
}

public byte[] getDocumentClassification() {
return documentClassification;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import com.marklogic.spark.core.embedding.EmbeddingProducer;
import com.marklogic.spark.core.extraction.ExtractionResult;
import com.marklogic.spark.core.extraction.TextExtractor;
import com.marklogic.spark.core.nuclia.NucliaClient;
import com.marklogic.spark.core.nuclia.NucliaDocumentProcessor;
import com.marklogic.spark.core.splitter.TextSplitter;

import java.io.Closeable;
Expand All @@ -30,27 +32,81 @@ public class DocumentPipeline implements Closeable {
private final TextClassifier textClassifier;
private final EmbeddingProducer embeddingProducer;
private final ChunkSelector chunkSelector;
private final NucliaClient nucliaClient;
private final NucliaDocumentProcessor nucliaProcessor;

public DocumentPipeline(TextExtractor textExtractor, TextSplitter textSplitter, TextClassifier textClassifier, EmbeddingProducer embeddingProducer, ChunkSelector chunkSelector) {
this.textExtractor = textExtractor;
this.textSplitter = textSplitter;
this.textClassifier = textClassifier;
this.embeddingProducer = embeddingProducer;
this.chunkSelector = chunkSelector;
this.nucliaClient = null;
this.nucliaProcessor = null;
}

/**
* Constructor for Nuclia-based pipeline. Nuclia handles extraction, splitting, and embedding generation.
*
* @param nucliaClient the Nuclia client for processing
* @param textClassifier optional text classifier (can be null)
* @since 3.1.0
*/
public DocumentPipeline(NucliaClient nucliaClient, TextClassifier textClassifier) {
this.nucliaClient = nucliaClient;
this.nucliaProcessor = new NucliaDocumentProcessor(nucliaClient);
this.textClassifier = textClassifier;
this.textExtractor = null;
this.textSplitter = null;
this.embeddingProducer = null;
this.chunkSelector = null;
}

@Override
public void close() throws IOException {
if (textClassifier != null) {
textClassifier.close();
}
if (nucliaClient != null) {
nucliaClient.close();
}
}

// Package-private getters for testing
NucliaClient getNucliaClient() {
return nucliaClient;
}

TextClassifier getTextClassifier() {
return textClassifier;
}

TextExtractor getTextExtractor() {
return textExtractor;
}

TextSplitter getTextSplitter() {
return textSplitter;
}

EmbeddingProducer getEmbeddingProducer() {
return embeddingProducer;
}

ChunkSelector getChunkSelector() {
return chunkSelector;
}

/**
* Implements the pipeline for processing documents via text extraction, text splitting, text classification, and
* embedding generation.
*/
public void processDocuments(List<DocumentInputs> inputs) {
if (nucliaProcessor != null) {
processWithNuclia(inputs);
return;
}

if (textExtractor != null) {
inputs.stream().forEach(this::extractText);
}
Expand All @@ -68,6 +124,15 @@ public void processDocuments(List<DocumentInputs> inputs) {
}
}

private void processWithNuclia(List<DocumentInputs> inputs) {
nucliaProcessor.processDocuments(inputs);

// Optionally classify after Nuclia processing
if (textClassifier != null) {
classifyText(inputs);
}
}

private void classifyText(List<DocumentInputs> inputs) {
List<TextClassifier.ClassifiableContent> contents = new ArrayList<>();
for (DocumentInputs input : inputs) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/*
* Copyright (c) 2023-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
* Copyright (c) 2023-2026 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
*/
package com.marklogic.spark.core;

import com.marklogic.langchain4j.Langchain4jFactory;
import com.marklogic.spark.ConnectorException;
import com.marklogic.spark.Context;
import com.marklogic.spark.Options;
import com.marklogic.spark.Util;
import com.marklogic.spark.core.classifier.TextClassifier;
import com.marklogic.spark.core.classifier.TextClassifierFactory;
import com.marklogic.spark.core.embedding.ChunkSelector;
Expand All @@ -15,19 +15,24 @@
import com.marklogic.spark.core.embedding.EmbeddingProducerFactory;
import com.marklogic.spark.core.extraction.TextExtractor;
import com.marklogic.spark.core.extraction.TikaTextExtractor;
import com.marklogic.spark.core.nuclia.NucliaClient;
import com.marklogic.spark.core.splitter.TextSplitter;
import com.marklogic.spark.core.splitter.TextSplitterFactory;

import java.lang.reflect.InvocationTargetException;

public abstract class DocumentPipelineFactory {

private static final String FACTORY_CLASS_NAME = "com.marklogic.langchain4j.Langchain4jFactory";

// For some reason, Sonar thinks the check for four nulls always resolves to false, even though it's definitely
// possible. So ignoring that warning.
@SuppressWarnings("java:S2589")
public static DocumentPipeline newDocumentPipeline(Context context) {
// Check for Nuclia configuration first
NucliaClient nucliaClient = newNucliaClient(context);
if (nucliaClient != null) {
TextClassifier textClassifier = TextClassifierFactory.newTextClassifier(context);
return new DocumentPipeline(nucliaClient, textClassifier);
}

// Standard pipeline with separate components
final TextExtractor textExtractor = context.getBooleanOption(Options.WRITE_EXTRACTED_TEXT, false) ?
new TikaTextExtractor() : null;
final TextSplitter textSplitter = newTextSplitter(context);
Expand All @@ -52,6 +57,25 @@ public static DocumentPipeline newDocumentPipeline(Context context) {
new DocumentPipeline(textExtractor, textSplitter, textClassifier, embeddingProducer, chunkSelector);
}

private static NucliaClient newNucliaClient(Context context) {
String apiKey = context.getProperties().get(Options.WRITE_NUCLIA_API_KEY);
if (apiKey == null || apiKey.trim().isEmpty()) {
return null;
}

final String region = context.getProperties().get(Options.WRITE_NUCLIA_REGION);

if (region == null || region.trim().isEmpty()) {
throw new ConnectorException(String.format("When %s is specified, %s must also be specified.",
context.getOptionNameForMessage(Options.WRITE_NUCLIA_API_KEY),
context.getOptionNameForMessage(Options.WRITE_NUCLIA_REGION)));
}

int timeout = context.getIntOption(Options.WRITE_NUCLIA_TIMEOUT, 120, 1);

return new NucliaClient(apiKey, region, timeout);
}

private static TextSplitter newTextSplitter(Context context) {
boolean shouldSplit = context.getProperties().keySet().stream().anyMatch(key -> key.startsWith(Options.WRITE_SPLITTER_PREFIX));
if (!shouldSplit) {
Expand All @@ -73,24 +97,7 @@ private static EmbeddingProducer newEmbeddingProducer(Context context) {
}

Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The removal of comprehensive error handling and logging for Langchain4j instantiation may impact debuggability. Consider documenting why direct instantiation is now safe and what changed to make the previous exception handling unnecessary.

Suggested change
/**
* Creates a new Langchain4jFactory instance.
* <p>
* Earlier versions of this method wrapped factory construction in additional
* error handling and logging. That logic was removed once Langchain4jFactory
* construction became side-effect free and stopped performing configuration
* work that could fail at instantiation time. Any errors related to
* misconfiguration or downstream processing are now expected to surface when
* the factory is actually used (for example, when creating a TextSplitter or
* EmbeddingProducer), where more contextual information is available.
* <p>
* As a result, direct instantiation here is considered safe and avoids
* redundant, low-value exception handling that would only rethrow generic
* runtime failures.
*/

Copilot uses AI. Check for mistakes.
private static Object newLangchain4jProcessorFactory() {
try {
return Class.forName(FACTORY_CLASS_NAME).getDeclaredConstructor().newInstance();
} catch (UnsupportedClassVersionError e) {
throw new ConnectorException("Unable to configure support for splitting documents and/or generating embeddings. " +
"Please ensure you are using Java 17 or higher for these operations.", e);
}
// Catch every checked exception from trying to instantiate the class. Any exception from the factory class
// itself is expected to be a RuntimeException that should bubble up.
catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException |
InvocationTargetException ex) {
if (Util.MAIN_LOGGER.isDebugEnabled()) {
Util.MAIN_LOGGER.debug(
"Unable to instantiate factory class {}; this is expected when the marklogic-langchain4j module is not on the classpath. Cause: {}",
FACTORY_CLASS_NAME, ex.getMessage()
);
}
return null;
}
return new Langchain4jFactory();
}

private DocumentPipelineFactory() {
Expand Down
Loading