MLE-26966 Now supports NUA for processing#607
Conversation
|
Copyright Validation Results ⏭️ Skipped (Excluded) Files
✅ Valid Files
✅ All files have valid copyright headers! |
a4741bf to
c1ee3d5
Compare
There was a problem hiding this comment.
Pull request overview
This PR adds Nuclia Understanding API (NUA) support for document processing in the MarkLogic Spark connector. Nuclia provides an external service for text extraction, chunking, and embedding generation, offering an alternative to the existing local processing pipeline.
Changes:
- Added Nuclia client implementation with SSE event streaming for processing results
- Integrated Nuclia as a processing option in DocumentPipeline with priority over standard pipeline
- Added configuration options for Nuclia API key, KB ID, region, and timeout
Reviewed changes
Copilot reviewed 13 out of 13 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| NucliaAdHocTest.java | Manual test for Nuclia integration requiring environment variables |
| AddEmbeddingsToJsonTest.java | Removed unused import |
| DocumentPipelineFactoryTest.java | Comprehensive tests for Nuclia client creation and pipeline configuration |
| marklogic-spark-messages.properties | Added property keys for Nuclia configuration options |
| NucliaEventCollector.java | SSE event listener for collecting Nuclia processing results |
| NucliaDocumentProcessor.java | Processes documents through Nuclia API and extracts chunks/embeddings |
| NucliaClient.java | HTTP client for Nuclia API with file upload and processing workflow |
| DocumentPipelineFactory.java | Factory method for creating Nuclia client and prioritizing Nuclia pipeline |
| DocumentPipeline.java | Added Nuclia processing path with client lifecycle management |
| DocumentInputs.java | New method for adding chunks with embeddings and model names |
| Options.java | Defined configuration constants for Nuclia options |
| build.gradle | Added okhttp-sse dependency for SSE streaming |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| String resourceId = response.body().string(); | ||
| return resourceId; |
There was a problem hiding this comment.
The variable assignment and return can be simplified by directly returning the value without the intermediate variable.
| String resourceId = response.body().string(); | |
| return resourceId; | |
| return response.body().string(); |
|
|
||
| // Process each embedding in the array | ||
| if (node.has("embeddings") && node.get("embeddings").isArray()) { | ||
| var embeddingsArray = node.get("embeddings"); |
There was a problem hiding this comment.
Using 'var' reduces code clarity when the type isn't immediately obvious from the right side. Consider using explicit type 'JsonNode' for better readability and maintainability.
| var embeddingsArray = node.get("embeddings"); | ||
|
|
||
| for (int i = 0; i < embeddingsArray.size(); i++) { | ||
| var embeddingObj = embeddingsArray.get(i); |
There was a problem hiding this comment.
Using 'var' reduces code clarity when the type isn't immediately obvious from the right side. Consider using explicit type 'JsonNode' for better readability and maintainability.
| String modelName = null; | ||
|
|
||
| if (embeddingObj.has("embedding") && embeddingObj.get("embedding").isArray()) { | ||
| var embeddingArray = embeddingObj.get("embedding"); |
There was a problem hiding this comment.
Using 'var' reduces code clarity when the type isn't immediately obvious from the right side. Consider using explicit type 'JsonNode' for better readability and maintainability.
| @@ -73,24 +103,7 @@ private static EmbeddingProducer newEmbeddingProducer(Context context) { | |||
| } | |||
|
|
|||
There was a problem hiding this comment.
The removal of comprehensive error handling and logging for Langchain4j instantiation may impact debuggability. Consider documenting why direct instantiation is now safe and what changed to make the previous exception handling unnecessary.
| /** | |
| * Creates a new Langchain4jFactory instance. | |
| * <p> | |
| * Earlier versions of this method wrapped factory construction in additional | |
| * error handling and logging. That logic was removed once Langchain4jFactory | |
| * construction became side-effect free and stopped performing configuration | |
| * work that could fail at instantiation time. Any errors related to | |
| * misconfiguration or downstream processing are now expected to surface when | |
| * the factory is actually used (for example, when creating a TextSplitter or | |
| * EmbeddingProducer), where more contextual information is available. | |
| * <p> | |
| * As a result, direct instantiation here is considered safe and avoids | |
| * redundant, low-value exception handling that would only rethrow generic | |
| * runtime failures. | |
| */ |
| spark.marklogic.write.nuclia.apikey= | ||
| spark.marklogic.write.nuclia.kbid= | ||
| spark.marklogic.write.nuclia.region= |
There was a problem hiding this comment.
Properties file entries are empty without any default values or documentation. Consider adding comments explaining the purpose and expected values for each Nuclia property, or default placeholder values to guide users.
| @@ -60,6 +60,9 @@ dependencies { | |||
| // Only needs compileOnly, as the Java Client brings this as an implementation dependency. | |||
| compileOnly 'com.squareup.okhttp3:okhttp:5.2.0' | |||
There was a problem hiding this comment.
The okhttp-sse dependency version (5.2.1) differs from the okhttp version (5.2.0) specified earlier in the file. Consider aligning these versions to avoid potential compatibility issues.
| compileOnly 'com.squareup.okhttp3:okhttp:5.2.0' | |
| compileOnly 'com.squareup.okhttp3:okhttp:5.2.1' |
2cae573 to
ba77977
Compare
vangheem
left a comment
There was a problem hiding this comment.
I wonder about all the manual serialization/deserialization but otherwise looks fine
| */ | ||
| private Stream<ObjectNode> getProcessingResults(String processingId) throws IOException { | ||
| final String endpoint = baseUrl + "/processing/requests/" + processingId + "/results"; | ||
| if (Util.MAIN_LOGGER.isDebugEnabled()) { |
There was a problem hiding this comment.
Are these types of checks necessary? Isn't it going to only output the log if the DEBUG log level is configured anyways?
| .newEventSource(request, collector); | ||
|
|
||
| try { | ||
| Stream<ObjectNode> results = collector.awaitCompletion(120).stream(); |
There was a problem hiding this comment.
120 should maybe to configurable? In reality, it should be fast but I guess maybe for insanely complex and big docs with files it could be huge
| Util.MAIN_LOGGER.debug("Submitting file for processing: filename={}, endpoint={}", filename, endpoint); | ||
| } | ||
|
|
||
| final String requestBody = String.format(""" |
There was a problem hiding this comment.
Is it normal to do string interpolation like this for json structures? Maybe use a map or record? (I'm not a java person)
Not a lot can be tested here without a valid Nuclia connection, which we may end up doing in Jenkins. Will largely depend on manual testing for now.
ba77977 to
ece681b
Compare
Not a lot can be tested here without a valid Nuclia connection, which we may end up doing in Jenkins. Will largely depend on manual testing for now.