Skip to content

Commit 2cae573

Browse files
committed
MLE-26966 Now supports NUA for processing
Not a lot can be tested here without a valid Nuclia connection, which we may end up doing in Jenkins. Will largely depend on manual testing for now.
1 parent 5463b41 commit 2cae573

File tree

14 files changed

+903
-29
lines changed

14 files changed

+903
-29
lines changed

.copyrightconfig

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,4 @@ startyear: 2023
1111
# - Dotfiles already skipped automatically
1212
# Enable by removing the leading '# ' from the next line and editing values.
1313
# filesexcluded: third_party/*, docs/generated/*.md, assets/*.png, scripts/temp_*.py, vendor/lib.js
14-
filesexcluded: .github/*, README.md, CONTRIBUTING.md, Jenkinsfile, gradle/*, docker-compose.yml, *.gradle, gradle.properties, gradlew, gradlew.bat, **/test/resources/**, docs/**, *.json, *.txt, CODEOWNERS
14+
filesexcluded: .github/*, README.md, CONTRIBUTING.md, Jenkinsfile, gradle/*, docker-compose.yml, *.gradle, gradle.properties, gradlew, gradlew.bat, **/test/resources/**, docs/**, *.json, *.txt, CODEOWNERS, *.properties

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
version=3.0-SNAPSHOT
1+
version=3.1-SNAPSHOT
22
sparkVersion=4.1.1
33
tikaVersion=3.2.3
44
semaphoreVersion=5.10.0

marklogic-spark-connector/build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ dependencies {
6060
// Only needs compileOnly, as the Java Client brings this as an implementation dependency.
6161
compileOnly 'com.squareup.okhttp3:okhttp:5.2.0'
6262

63+
// For Nuclia support
64+
implementation 'com.squareup.okhttp3:okhttp-sse:5.2.1'
65+
6366
// Automatic loading of test framework implementation dependencies is deprecated.
6467
// https://docs.gradle.org/current/userguide/upgrading_version_8.html#test_framework_implementation_dependencies
6568
// Without this, once using JUnit 5.12 or higher, Gradle will not find any tests and report an error of:

marklogic-spark-connector/src/main/java/com/marklogic/spark/Options.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2023-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
2+
* Copyright (c) 2023-2026 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
33
*/
44
package com.marklogic.spark;
55

@@ -496,6 +496,36 @@ public abstract class Options {
496496
*/
497497
public static final String WRITE_EMBEDDER_BASE64_ENCODE = WRITE_EMBEDDER_PREFIX + "base64Encode";
498498

499+
private static final String WRITE_NUCLIA_PREFIX = "spark.marklogic.write.nuclia.";
500+
501+
/**
502+
* Nuclia API key for authentication. Required if any Nuclia options are used.
503+
*
504+
* @since 3.1.0
505+
*/
506+
public static final String WRITE_NUCLIA_API_KEY = WRITE_NUCLIA_PREFIX + "apikey";
507+
508+
/**
509+
* Nuclia Knowledge Base ID. Required if any Nuclia options are used.
510+
*
511+
* @since 3.1.0
512+
*/
513+
public static final String WRITE_NUCLIA_KB_ID = WRITE_NUCLIA_PREFIX + "kbid";
514+
515+
/**
516+
* Nuclia region (e.g., "aws-us-east-2-1"). Required if any Nuclia options are used.
517+
*
518+
* @since 3.1.0
519+
*/
520+
public static final String WRITE_NUCLIA_REGION = WRITE_NUCLIA_PREFIX + "region";
521+
522+
/**
523+
* Maximum number of seconds to wait for Nuclia processing to complete. Defaults to 120 seconds.
524+
*
525+
* @since 3.1.0
526+
*/
527+
public static final String WRITE_NUCLIA_TIMEOUT = WRITE_NUCLIA_PREFIX + "timeout";
528+
499529
/**
500530
* Defines the host for classification requests
501531
*

marklogic-spark-connector/src/main/java/com/marklogic/spark/core/DocumentInputs.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,26 @@ public void setChunks(List<String> chunks) {
156156
}
157157
}
158158

159+
/**
160+
* Adds a chunk with its embedding and model name. This is useful for workflows like Nuclia where
161+
* chunks and embeddings are received together.
162+
*
163+
* @param text the chunk text
164+
* @param embedding the embedding vector (can be null)
165+
* @param modelName the model name (can be null)
166+
*/
167+
public void addChunk(String text, float[] embedding, String modelName) {
168+
if (chunkInputsList == null) {
169+
chunkInputsList = new ArrayList<>();
170+
}
171+
ChunkInputs chunkInputs = new ChunkInputs(text);
172+
if (embedding != null) {
173+
chunkInputs.setEmbedding(embedding);
174+
chunkInputs.setModelName(modelName);
175+
}
176+
chunkInputsList.add(chunkInputs);
177+
}
178+
159179
public byte[] getDocumentClassification() {
160180
return documentClassification;
161181
}

marklogic-spark-connector/src/main/java/com/marklogic/spark/core/DocumentPipeline.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import com.marklogic.spark.core.embedding.EmbeddingProducer;
1212
import com.marklogic.spark.core.extraction.ExtractionResult;
1313
import com.marklogic.spark.core.extraction.TextExtractor;
14+
import com.marklogic.spark.core.nuclia.NucliaClient;
15+
import com.marklogic.spark.core.nuclia.NucliaDocumentProcessor;
1416
import com.marklogic.spark.core.splitter.TextSplitter;
1517

1618
import java.io.Closeable;
@@ -30,27 +32,81 @@ public class DocumentPipeline implements Closeable {
3032
private final TextClassifier textClassifier;
3133
private final EmbeddingProducer embeddingProducer;
3234
private final ChunkSelector chunkSelector;
35+
private final NucliaClient nucliaClient;
36+
private final NucliaDocumentProcessor nucliaProcessor;
3337

3438
public DocumentPipeline(TextExtractor textExtractor, TextSplitter textSplitter, TextClassifier textClassifier, EmbeddingProducer embeddingProducer, ChunkSelector chunkSelector) {
3539
this.textExtractor = textExtractor;
3640
this.textSplitter = textSplitter;
3741
this.textClassifier = textClassifier;
3842
this.embeddingProducer = embeddingProducer;
3943
this.chunkSelector = chunkSelector;
44+
this.nucliaClient = null;
45+
this.nucliaProcessor = null;
46+
}
47+
48+
/**
49+
* Constructor for Nuclia-based pipeline. Nuclia handles extraction, splitting, and embedding generation.
50+
*
51+
* @param nucliaClient the Nuclia client for processing
52+
* @param textClassifier optional text classifier (can be null)
53+
* @since 3.1.0
54+
*/
55+
public DocumentPipeline(NucliaClient nucliaClient, TextClassifier textClassifier) {
56+
this.nucliaClient = nucliaClient;
57+
this.nucliaProcessor = new NucliaDocumentProcessor(nucliaClient);
58+
this.textClassifier = textClassifier;
59+
this.textExtractor = null;
60+
this.textSplitter = null;
61+
this.embeddingProducer = null;
62+
this.chunkSelector = null;
4063
}
4164

4265
@Override
4366
public void close() throws IOException {
4467
if (textClassifier != null) {
4568
textClassifier.close();
4669
}
70+
if (nucliaClient != null) {
71+
nucliaClient.close();
72+
}
73+
}
74+
75+
// Package-private getters for testing
76+
NucliaClient getNucliaClient() {
77+
return nucliaClient;
78+
}
79+
80+
TextClassifier getTextClassifier() {
81+
return textClassifier;
82+
}
83+
84+
TextExtractor getTextExtractor() {
85+
return textExtractor;
86+
}
87+
88+
TextSplitter getTextSplitter() {
89+
return textSplitter;
90+
}
91+
92+
EmbeddingProducer getEmbeddingProducer() {
93+
return embeddingProducer;
94+
}
95+
96+
ChunkSelector getChunkSelector() {
97+
return chunkSelector;
4798
}
4899

49100
/**
50101
* Implements the pipeline for processing documents via text extraction, text splitting, text classification, and
51102
* embedding generation.
52103
*/
53104
public void processDocuments(List<DocumentInputs> inputs) {
105+
if (nucliaProcessor != null) {
106+
processWithNuclia(inputs);
107+
return;
108+
}
109+
54110
if (textExtractor != null) {
55111
inputs.stream().forEach(this::extractText);
56112
}
@@ -68,6 +124,15 @@ public void processDocuments(List<DocumentInputs> inputs) {
68124
}
69125
}
70126

127+
private void processWithNuclia(List<DocumentInputs> inputs) {
128+
nucliaProcessor.processDocuments(inputs);
129+
130+
// Optionally classify after Nuclia processing
131+
if (textClassifier != null) {
132+
classifyText(inputs);
133+
}
134+
}
135+
71136
private void classifyText(List<DocumentInputs> inputs) {
72137
List<TextClassifier.ClassifiableContent> contents = new ArrayList<>();
73138
for (DocumentInputs input : inputs) {

marklogic-spark-connector/src/main/java/com/marklogic/spark/core/DocumentPipelineFactory.java

Lines changed: 37 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
/*
2-
* Copyright (c) 2023-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
2+
* Copyright (c) 2023-2026 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
33
*/
44
package com.marklogic.spark.core;
55

6+
import com.marklogic.langchain4j.Langchain4jFactory;
67
import com.marklogic.spark.ConnectorException;
78
import com.marklogic.spark.Context;
89
import com.marklogic.spark.Options;
9-
import com.marklogic.spark.Util;
1010
import com.marklogic.spark.core.classifier.TextClassifier;
1111
import com.marklogic.spark.core.classifier.TextClassifierFactory;
1212
import com.marklogic.spark.core.embedding.ChunkSelector;
@@ -15,19 +15,24 @@
1515
import com.marklogic.spark.core.embedding.EmbeddingProducerFactory;
1616
import com.marklogic.spark.core.extraction.TextExtractor;
1717
import com.marklogic.spark.core.extraction.TikaTextExtractor;
18+
import com.marklogic.spark.core.nuclia.NucliaClient;
1819
import com.marklogic.spark.core.splitter.TextSplitter;
1920
import com.marklogic.spark.core.splitter.TextSplitterFactory;
2021

21-
import java.lang.reflect.InvocationTargetException;
22-
2322
public abstract class DocumentPipelineFactory {
2423

25-
private static final String FACTORY_CLASS_NAME = "com.marklogic.langchain4j.Langchain4jFactory";
26-
2724
// For some reason, Sonar thinks the check for four nulls always resolves to false, even though it's definitely
2825
// possible. So ignoring that warning.
2926
@SuppressWarnings("java:S2589")
3027
public static DocumentPipeline newDocumentPipeline(Context context) {
28+
// Check for Nuclia configuration first
29+
NucliaClient nucliaClient = newNucliaClient(context);
30+
if (nucliaClient != null) {
31+
TextClassifier textClassifier = TextClassifierFactory.newTextClassifier(context);
32+
return new DocumentPipeline(nucliaClient, textClassifier);
33+
}
34+
35+
// Standard pipeline with separate components
3136
final TextExtractor textExtractor = context.getBooleanOption(Options.WRITE_EXTRACTED_TEXT, false) ?
3237
new TikaTextExtractor() : null;
3338
final TextSplitter textSplitter = newTextSplitter(context);
@@ -52,6 +57,31 @@ public static DocumentPipeline newDocumentPipeline(Context context) {
5257
new DocumentPipeline(textExtractor, textSplitter, textClassifier, embeddingProducer, chunkSelector);
5358
}
5459

60+
private static NucliaClient newNucliaClient(Context context) {
61+
String apiKey = context.getProperties().get(Options.WRITE_NUCLIA_API_KEY);
62+
if (apiKey == null || apiKey.trim().isEmpty()) {
63+
return null;
64+
}
65+
66+
final String kbId = context.getProperties().get(Options.WRITE_NUCLIA_KB_ID);
67+
final String region = context.getProperties().get(Options.WRITE_NUCLIA_REGION);
68+
69+
if (kbId == null || kbId.trim().isEmpty()) {
70+
throw new ConnectorException(String.format("When %s is specified, %s must also be specified.",
71+
context.getOptionNameForMessage(Options.WRITE_NUCLIA_API_KEY),
72+
context.getOptionNameForMessage(Options.WRITE_NUCLIA_KB_ID)));
73+
}
74+
if (region == null || region.trim().isEmpty()) {
75+
throw new ConnectorException(String.format("When %s is specified, %s must also be specified.",
76+
context.getOptionNameForMessage(Options.WRITE_NUCLIA_API_KEY),
77+
context.getOptionNameForMessage(Options.WRITE_NUCLIA_REGION)));
78+
}
79+
80+
int timeout = context.getIntOption(Options.WRITE_NUCLIA_TIMEOUT, 120, 1);
81+
82+
return new NucliaClient(apiKey, kbId, region, timeout);
83+
}
84+
5585
private static TextSplitter newTextSplitter(Context context) {
5686
boolean shouldSplit = context.getProperties().keySet().stream().anyMatch(key -> key.startsWith(Options.WRITE_SPLITTER_PREFIX));
5787
if (!shouldSplit) {
@@ -73,24 +103,7 @@ private static EmbeddingProducer newEmbeddingProducer(Context context) {
73103
}
74104

75105
private static Object newLangchain4jProcessorFactory() {
76-
try {
77-
return Class.forName(FACTORY_CLASS_NAME).getDeclaredConstructor().newInstance();
78-
} catch (UnsupportedClassVersionError e) {
79-
throw new ConnectorException("Unable to configure support for splitting documents and/or generating embeddings. " +
80-
"Please ensure you are using Java 17 or higher for these operations.", e);
81-
}
82-
// Catch every checked exception from trying to instantiate the class. Any exception from the factory class
83-
// itself is expected to be a RuntimeException that should bubble up.
84-
catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException |
85-
InvocationTargetException ex) {
86-
if (Util.MAIN_LOGGER.isDebugEnabled()) {
87-
Util.MAIN_LOGGER.debug(
88-
"Unable to instantiate factory class {}; this is expected when the marklogic-langchain4j module is not on the classpath. Cause: {}",
89-
FACTORY_CLASS_NAME, ex.getMessage()
90-
);
91-
}
92-
return null;
93-
}
106+
return new Langchain4jFactory();
94107
}
95108

96109
private DocumentPipelineFactory() {

0 commit comments

Comments
 (0)