Skip to content

Commit 61dc176

Browse files
authored
Merge pull request #1548 from marklogic/feature/363-export
DEVEXP-363 ExportListener now provides access to DocumentPage
2 parents 2ae43f8 + d965b13 commit 61dc176

File tree

3 files changed

+157
-15
lines changed

3 files changed

+157
-15
lines changed

marklogic-client-api-functionaltests/src/test/java/com/marklogic/client/fastfunctest/AbstractFunctionalTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@
88
import com.marklogic.client.MarkLogicVersion;
99
import com.marklogic.client.document.DocumentManager;
1010
import com.marklogic.client.document.DocumentWriteSet;
11+
import com.marklogic.client.document.JSONDocumentManager;
1112
import com.marklogic.client.functionaltest.BasicJavaClientREST;
1213
import com.marklogic.client.io.DocumentMetadataHandle;
1314
import com.marklogic.client.io.FileHandle;
15+
import com.marklogic.client.io.JacksonHandle;
1416
import com.marklogic.mgmt.resource.databases.DatabaseManager;
1517
import org.junit.jupiter.api.AfterAll;
1618
import org.junit.jupiter.api.Assertions;
@@ -20,6 +22,8 @@
2022
import javax.xml.parsers.ParserConfigurationException;
2123
import java.io.File;
2224
import java.io.IOException;
25+
import java.util.ArrayList;
26+
import java.util.List;
2327
import java.util.stream.Stream;
2428

2529
/**
@@ -31,6 +35,8 @@ public abstract class AbstractFunctionalTest extends BasicJavaClientREST {
3135

3236
protected final static String DB_NAME = "java-functest";
3337

38+
protected final static ObjectMapper objectMapper = new ObjectMapper();
39+
3440
protected static DatabaseClient client;
3541
protected static DatabaseClient schemasClient;
3642
protected static DatabaseClient adminModulesClient;
@@ -79,6 +85,31 @@ public static void classTearDown() {
7985
schemasClient.release();
8086
}
8187

88+
/**
89+
* Convenience method for easily writing some JSON docs where the content of the docs doesn't matter.
90+
*
91+
* @param count
92+
* @param collections
93+
* @return
94+
*/
95+
protected List<String> writeJsonDocs(int count, String... collections) {
96+
DocumentMetadataHandle metadata = new DocumentMetadataHandle()
97+
.withCollections(collections)
98+
.withPermission("rest-reader", DocumentMetadataHandle.Capability.READ, DocumentMetadataHandle.Capability.UPDATE);
99+
JSONDocumentManager mgr = client.newJSONDocumentManager();
100+
DocumentWriteSet set = mgr.newWriteSet();
101+
List<String> uris = new ArrayList<>();
102+
for (int i = 1; i <= count; i++) {
103+
String uri = "/test/" + i + ".json";
104+
uris.add(uri);
105+
set.add(uri, metadata, new JacksonHandle(
106+
objectMapper.createObjectNode().put("test", i)
107+
));
108+
}
109+
mgr.write(set);
110+
return uris;
111+
}
112+
82113
protected static void loadFileToDB(DatabaseClient client, String filename, String uri, String type, String[] collections) throws IOException, ParserConfigurationException,
83114
SAXException {
84115
// create doc manager
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package com.marklogic.client.fastfunctest.datamovement;
2+
3+
import com.marklogic.client.datamovement.DataMovementManager;
4+
import com.marklogic.client.datamovement.ExportListener;
5+
import com.marklogic.client.datamovement.QueryBatcher;
6+
import com.marklogic.client.fastfunctest.AbstractFunctionalTest;
7+
import org.junit.jupiter.api.BeforeEach;
8+
import org.junit.jupiter.api.Test;
9+
10+
import java.util.List;
11+
import java.util.concurrent.atomic.AtomicInteger;
12+
13+
import static org.junit.jupiter.api.Assertions.assertEquals;
14+
import static org.junit.jupiter.api.Assertions.assertThrows;
15+
16+
public class ExportBatchesTest extends AbstractFunctionalTest {
17+
18+
List<String> testDocUris;
19+
20+
@BeforeEach
21+
void beforeEach() {
22+
deleteDocuments(client);
23+
testDocUris = writeJsonDocs(20, "ExportBatchesTest");
24+
}
25+
26+
@Test
27+
void simplePageConsumer() {
28+
AtomicInteger pageCount = new AtomicInteger();
29+
AtomicInteger docCount = new AtomicInteger();
30+
31+
ExportListener listener = new ExportListener().onDocumentPageReady(documentPage -> {
32+
pageCount.incrementAndGet();
33+
while (documentPage.hasContent()) {
34+
documentPage.next();
35+
docCount.incrementAndGet();
36+
}
37+
});
38+
39+
runJob(listener, 5);
40+
assertEquals(4, pageCount.get(), "Should get 4 pages of 5 docs each");
41+
assertEquals(20, docCount.get());
42+
}
43+
44+
@Test
45+
void consumerThrowsException() {
46+
AtomicInteger failureCount = new AtomicInteger();
47+
48+
ExportListener listener = new ExportListener()
49+
.onDocumentPageReady(documentPage -> {
50+
throw new RuntimeException("Intentional error");
51+
})
52+
.onFailure(((batch, throwable) -> failureCount.incrementAndGet()));
53+
54+
runJob(listener, 5);
55+
assertEquals(4, failureCount.get(), "The failure listener should have been invoked once for each batch, " +
56+
"and batch should have failed.");
57+
}
58+
59+
@Test
60+
void documentListenerAlreadySet() {
61+
ExportListener listener = new ExportListener().onDocumentReady(doc -> doc.getUri());
62+
IllegalStateException ex = assertThrows(IllegalStateException.class,
63+
() -> listener.onDocumentPageReady(page -> page.next()));
64+
assertEquals("Cannot call onDocumentPageReady if a listener has already been added via onDocumentReady",
65+
ex.getMessage(), "Both listeners cannot be set because the DocumentPage can only be iterated through once.");
66+
}
67+
68+
@Test
69+
void documentPageListenerAlreadySet() {
70+
ExportListener listener = new ExportListener().onDocumentPageReady(page -> page.next());
71+
IllegalStateException ex = assertThrows(IllegalStateException.class,
72+
() -> listener.onDocumentReady(doc -> doc.getUri()));
73+
assertEquals("Cannot call onDocumentReady if a listener has already been set via onDocumentPageReady",
74+
ex.getMessage(), "Both listeners cannot be set because the DocumentPage can only be iterated through once.");
75+
}
76+
77+
private void runJob(ExportListener listener, int batchSize) {
78+
DataMovementManager dmm = client.newDataMovementManager();
79+
QueryBatcher qb = dmm.newQueryBatcher(testDocUris.iterator())
80+
.withBatchSize(batchSize)
81+
.onUrisReady(listener);
82+
dmm.startJob(qb);
83+
qb.awaitCompletion();
84+
dmm.stopJob(qb);
85+
}
86+
}

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/ExportListener.java

Lines changed: 40 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,8 @@ public class ExportListener implements QueryBatchListener {
7474
private QueryManager.QueryView view;
7575
private Set<DocumentManager.Metadata> categories = new HashSet<>();
7676
private Format nonDocumentFormat;
77-
private List<Consumer<DocumentRecord>> exportListeners = new ArrayList<>();
77+
private List<Consumer<DocumentRecord>> documentListeners = new ArrayList<>();
78+
private Consumer<DocumentPage> documentPageListener;
7879
private boolean consistentSnapshot = false;
7980
private List<BatchFailureListener<Batch<String>>> failureListeners = new ArrayList<>();
8081
private List<BatchFailureListener<QueryBatch>> queryBatchFailureListeners = new ArrayList<>();
@@ -127,15 +128,19 @@ public void initializeListener(QueryBatcher queryBatcher) {
127128
@Override
128129
public void processEvent(QueryBatch batch) {
129130
try ( DocumentPage docs = getDocs(batch) ) {
130-
while ( docs.hasNext() ) {
131-
for ( Consumer<DocumentRecord> listener : exportListeners ) {
132-
try {
133-
listener.accept(docs.next());
134-
} catch (Throwable t) {
135-
logger.error("Exception thrown by an onDocumentReady listener", t);
136-
}
137-
}
138-
}
131+
if (documentPageListener != null) {
132+
documentPageListener.accept(docs);
133+
} else {
134+
while ( docs.hasNext() ) {
135+
for ( Consumer<DocumentRecord> listener : documentListeners) {
136+
try {
137+
listener.accept(docs.next());
138+
} catch (Throwable t) {
139+
logger.error("Exception thrown by an onDocumentReady listener", t);
140+
}
141+
}
142+
}
143+
}
139144
} catch (Throwable t) {
140145
for ( BatchFailureListener<Batch<String>> listener : failureListeners ) {
141146
try {
@@ -232,9 +237,7 @@ public ExportListener withTransform(ServerTransform transform) {
232237
* file system, a REST service, or any target supported by Java. If further
233238
* information is required about the document beyond what DocumentRecord can
234239
* provide, register a listener with {@link QueryBatcher#onUrisReady
235-
* QueryBatcher.onUrisReady} instead. You do not need to call close() on
236-
* each DocumentRecord because the ExportListener will call close for you on
237-
* the entire DocumentPage.
240+
* QueryBatcher.onUrisReady} instead.
238241
*
239242
* @param listener the code which will process each document
240243
* @return this instance for method chaining
@@ -243,10 +246,32 @@ public ExportListener withTransform(ServerTransform transform) {
243246
* @see DocumentRecord
244247
*/
245248
public ExportListener onDocumentReady(Consumer<DocumentRecord> listener) {
246-
exportListeners.add(listener);
247-
return this;
249+
if (this.documentPageListener != null) {
250+
throw new IllegalStateException("Cannot call onDocumentReady if a listener has already been set via onDocumentPageReady");
251+
}
252+
documentListeners.add(listener);
253+
return this;
248254
}
249255

256+
/**
257+
* Sets a listener to process a page of retrieved documents. Useful for when documents should be written to an
258+
* external system where it's more efficient to make batched writes to that system. Note that {@code close()} does
259+
* need to be invoked on the {@code DocumentPage}; this class will handle that.
260+
*
261+
* @param listener the code which will process each page of documents
262+
* @return this instance for method chaining
263+
* @see Consumer
264+
* @see DocumentPage
265+
* @since 6.2.0
266+
*/
267+
public ExportListener onDocumentPageReady(Consumer<DocumentPage> listener) {
268+
if (this.documentListeners != null && !this.documentListeners.isEmpty()) {
269+
throw new IllegalStateException("Cannot call onDocumentPageReady if a listener has already been added via onDocumentReady");
270+
}
271+
this.documentPageListener = listener;
272+
return this;
273+
}
274+
250275
/**
251276
* When a batch fails or a callback throws an Exception, run this listener
252277
* code. Multiple listeners can be registered with this method.

0 commit comments

Comments
 (0)