Skip to content
This repository was archived by the owner on Sep 16, 2024. It is now read-only.

Commit aaac9eb

Browse files
author
Rob Rudin
committed
#44 New BatchWriter interface for batched writes to ML9
1 parent 77e9175 commit aaac9eb

File tree

7 files changed

+417
-0
lines changed

7 files changed

+417
-0
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.marklogic.client.batch;
2+
3+
import com.marklogic.client.document.DocumentWriteOperation;
4+
5+
import java.util.List;
6+
7+
/**
8+
* Interface for writing batches of documents to pre-ML9 clusters (DMSDK can be used with ML9+).
9+
*/
10+
public interface BatchWriter {
11+
12+
/**
13+
* Give the writer a chance to perform any initialization it requires before it starts writing documents.
14+
*/
15+
void initialize();
16+
17+
/**
18+
* Write the given list of documents, as defined by the Java Client DocumentWriteOperation interface.
19+
*
20+
* @param items
21+
*/
22+
void write(List<? extends DocumentWriteOperation> items);
23+
24+
/**
25+
* Assuming that the writer is using a multi-threaded approach, call this to wait for the writer to finish
26+
* performing all of its writes.
27+
*/
28+
void waitForCompletion();
29+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package com.marklogic.client.batch;
2+
3+
import com.marklogic.client.helper.LoggingObject;
4+
import org.springframework.core.task.AsyncTaskExecutor;
5+
import org.springframework.core.task.SyncTaskExecutor;
6+
import org.springframework.core.task.TaskExecutor;
7+
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
8+
9+
import java.util.ArrayList;
10+
import java.util.List;
11+
import java.util.concurrent.Future;
12+
import java.util.concurrent.TimeUnit;
13+
14+
/**
15+
* Support class for BatchWriter implementations that uses Spring's TaskExecutor interface for parallelizing writes to
16+
* MarkLogic.
17+
*/
18+
public abstract class BatchWriterSupport extends LoggingObject implements BatchWriter {
19+
20+
private TaskExecutor taskExecutor;
21+
private int threadCount = 16;
22+
23+
/**
24+
* Seems necessary to keep track of each Future instance so that we can properly wait for each one to finish.
25+
* Spring's TaskExecutor library doesn't seem to provide a better way of doing this.
26+
*/
27+
private List<Future<?>> futures = new ArrayList<>();
28+
29+
@Override
30+
public void initialize() {
31+
if (threadCount > 1) {
32+
if (logger.isInfoEnabled()) {
33+
logger.info("Initializing thread pool with a count of " + threadCount);
34+
}
35+
ThreadPoolTaskExecutor tpte = new ThreadPoolTaskExecutor();
36+
tpte.setCorePoolSize(threadCount);
37+
tpte.afterPropertiesSet();
38+
this.taskExecutor = tpte;
39+
} else {
40+
if (logger.isInfoEnabled()) {
41+
logger.info("Thread count is 1, so using a synchronous TaskExecutor");
42+
}
43+
this.taskExecutor = new SyncTaskExecutor();
44+
}
45+
}
46+
47+
@Override
48+
public void waitForCompletion() {
49+
int size = futures.size();
50+
if (logger.isDebugEnabled()) {
51+
logger.debug("Waiting for threads to finish document processing; futures count: " + size);
52+
}
53+
54+
for (int i = 0; i < size; i++) {
55+
Future<?> f = futures.get(i);
56+
if (f.isDone()) {
57+
continue;
58+
}
59+
try {
60+
// Wait up to 1 hour for a write to ML to finish (should never happen)
61+
f.get(1, TimeUnit.HOURS);
62+
} catch (Exception ex) {
63+
logger.warn("Unable to wait for last set of documents to be processed: " + ex.getMessage(), ex);
64+
}
65+
}
66+
}
67+
68+
protected void execute(Runnable runnable) {
69+
if (taskExecutor instanceof AsyncTaskExecutor) {
70+
futures.add(((AsyncTaskExecutor) taskExecutor).submit(runnable));
71+
} else {
72+
taskExecutor.execute(runnable);
73+
}
74+
}
75+
76+
public void setTaskExecutor(TaskExecutor taskExecutor) {
77+
this.taskExecutor = taskExecutor;
78+
}
79+
80+
public void setThreadCount(int threadCount) {
81+
this.threadCount = threadCount;
82+
}
83+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package com.marklogic.client.batch;
2+
3+
import com.marklogic.client.DatabaseClient;
4+
import com.marklogic.client.document.DocumentWriteOperation;
5+
import com.marklogic.client.document.DocumentWriteSet;
6+
import com.marklogic.client.document.GenericDocumentManager;
7+
8+
import java.util.List;
9+
10+
public class RestBatchWriter extends BatchWriterSupport {
11+
12+
private List<DatabaseClient> databaseClients;
13+
private int clientIndex = 0;
14+
15+
public RestBatchWriter(List<DatabaseClient> databaseClients) {
16+
this.databaseClients = databaseClients;
17+
}
18+
19+
@Override
20+
public void write(final List<? extends DocumentWriteOperation> items) {
21+
if (clientIndex >= databaseClients.size()) {
22+
clientIndex = 0;
23+
}
24+
final DatabaseClient client = databaseClients.get(clientIndex);
25+
clientIndex++;
26+
27+
execute(new Runnable() {
28+
@Override
29+
public void run() {
30+
GenericDocumentManager mgr = client.newDocumentManager();
31+
DocumentWriteSet set = mgr.newWriteSet();
32+
for (DocumentWriteOperation item : items) {
33+
set.add(item);
34+
}
35+
int count = set.size();
36+
if (logger.isDebugEnabled()) {
37+
logger.debug("Writing " + count + " documents to MarkLogic");
38+
}
39+
mgr.write(set);
40+
if (logger.isInfoEnabled()) {
41+
logger.info("Wrote " + count + " documents to MarkLogic");
42+
}
43+
}
44+
});
45+
}
46+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package com.marklogic.client.batch;
2+
3+
import com.marklogic.client.document.DocumentWriteOperation;
4+
import com.marklogic.client.xcc.DefaultDocumentWriteOperationAdapter;
5+
import com.marklogic.client.xcc.DocumentWriteOperationAdapter;
6+
import com.marklogic.xcc.Content;
7+
import com.marklogic.xcc.ContentSource;
8+
import com.marklogic.xcc.Session;
9+
import com.marklogic.xcc.exceptions.RequestException;
10+
11+
import java.util.ArrayList;
12+
import java.util.List;
13+
14+
/**
15+
* XCC implementation for batched writes. Most important thing here is we depend on an instance of
16+
* DocumentWriteOperationAdapter to adapt a DocumentWriteOperation instance into a Content instance.
17+
*/
18+
public class XccBatchWriter extends BatchWriterSupport {
19+
20+
private List<ContentSource> contentSources;
21+
private int contentSourceIndex = 0;
22+
private DocumentWriteOperationAdapter documentWriteOperationAdapter;
23+
24+
public XccBatchWriter(List<ContentSource> contentSources) {
25+
this.contentSources = contentSources;
26+
this.documentWriteOperationAdapter = new DefaultDocumentWriteOperationAdapter();
27+
}
28+
29+
@Override
30+
public void write(final List<? extends DocumentWriteOperation> items) {
31+
if (contentSourceIndex >= contentSources.size()) {
32+
contentSourceIndex = 0;
33+
}
34+
35+
final ContentSource contentSource = contentSources.get(contentSourceIndex);
36+
contentSourceIndex++;
37+
38+
execute(new Runnable() {
39+
@Override
40+
public void run() {
41+
Session session = contentSource.newSession();
42+
int count = items.size();
43+
Content[] array = new Content[count];
44+
for (int i = 0; i < count; i++) {
45+
array[i] = documentWriteOperationAdapter.adapt(items.get(i));
46+
}
47+
if (logger.isDebugEnabled()) {
48+
logger.debug("Writing " + count + " documents to MarkLogic");
49+
}
50+
try {
51+
session.insertContent(array);
52+
if (logger.isInfoEnabled()) {
53+
logger.info("Wrote " + count + " documents to MarkLogic");
54+
}
55+
} catch (RequestException e) {
56+
throw new RuntimeException("Unable to insert content: " + e.getMessage(), e);
57+
} finally {
58+
session.close();
59+
}
60+
}
61+
});
62+
}
63+
64+
public void setDocumentWriteOperationAdapter(DocumentWriteOperationAdapter documentWriteOperationAdapter) {
65+
this.documentWriteOperationAdapter = documentWriteOperationAdapter;
66+
}
67+
}
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package com.marklogic.client.xcc;
2+
3+
import com.marklogic.client.document.DocumentWriteOperation;
4+
import com.marklogic.client.io.*;
5+
import com.marklogic.client.io.marker.AbstractWriteHandle;
6+
import com.marklogic.client.io.marker.DocumentMetadataWriteHandle;
7+
import com.marklogic.xcc.*;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
11+
import java.io.IOException;
12+
import java.util.HashSet;
13+
import java.util.Set;
14+
15+
public class DefaultDocumentWriteOperationAdapter implements DocumentWriteOperationAdapter {
16+
17+
private final static Logger logger = LoggerFactory.getLogger(DefaultDocumentWriteOperationAdapter.class);
18+
19+
@Override
20+
public Content adapt(DocumentWriteOperation operation) {
21+
String uri = operation.getUri();
22+
ContentCreateOptions options = adaptMetadata(operation.getMetadata());
23+
AbstractWriteHandle handle = operation.getContent();
24+
if (handle instanceof StringHandle) {
25+
return ContentFactory.newContent(uri, ((StringHandle) handle).get(), options);
26+
} else if (handle instanceof FileHandle) {
27+
return ContentFactory.newContent(uri, ((FileHandle) handle).get(), options);
28+
} else if (handle instanceof BytesHandle) {
29+
return ContentFactory.newContent(uri, ((BytesHandle) handle).get(), options);
30+
} else if (handle instanceof InputStreamHandle) {
31+
try {
32+
return ContentFactory.newContent(uri, ((InputStreamHandle) handle).get(), options);
33+
} catch (IOException e) {
34+
throw new RuntimeException("Unable to read content input stream: " + e.getMessage(), e);
35+
}
36+
} else if (handle instanceof DOMHandle) {
37+
return ContentFactory.newContent(uri, ((DOMHandle) handle).get(), options);
38+
} else throw new IllegalArgumentException("No support yet for content class: " + handle.getClass().getName());
39+
}
40+
41+
/**
42+
* TODO Only adapts collections, quality, format, and permissions so far.
43+
*
44+
* @param handle
45+
* @return
46+
*/
47+
protected ContentCreateOptions adaptMetadata(DocumentMetadataWriteHandle handle) {
48+
ContentCreateOptions options = new ContentCreateOptions();
49+
if (handle instanceof DocumentMetadataHandle) {
50+
DocumentMetadataHandle metadata = (DocumentMetadataHandle) handle;
51+
options.setQuality(metadata.getQuality());
52+
options.setCollections(metadata.getCollections().toArray(new String[]{}));
53+
adaptPermissions(options, metadata);
54+
adaptFormat(options, metadata);
55+
} else {
56+
logger.warn("Only supports DocumentMetadataHandle; unsupported metadata class: " + handle.getClass().getName());
57+
}
58+
return options;
59+
}
60+
61+
/**
62+
* The REST API Format class has a "getDefaultMimetype" method on it, but there doesn't appear to be anything
63+
* useful to do with that for XCC. So we just do a simple translation from Format to DocumentFormat.
64+
*
65+
* @param options
66+
* @param metadata
67+
*/
68+
protected void adaptFormat(ContentCreateOptions options, DocumentMetadataHandle metadata) {
69+
Format format = metadata.getFormat();
70+
DocumentFormat xccFormat = null;
71+
if (format != null) {
72+
if (Format.BINARY.equals(format)) {
73+
xccFormat = DocumentFormat.BINARY;
74+
} else if (Format.JSON.equals(format)) {
75+
xccFormat = DocumentFormat.JSON;
76+
} else if (Format.TEXT.equals(format)) {
77+
xccFormat = DocumentFormat.TEXT;
78+
} else if (Format.XML.equals(format)) {
79+
xccFormat = DocumentFormat.XML;
80+
} else if (Format.UNKNOWN.equals(format)) {
81+
xccFormat = DocumentFormat.NONE;
82+
} else if (logger.isDebugEnabled()) {
83+
logger.debug("Unsupported format, can't adapt to an XCC DocumentFormat; " + format.toString());
84+
}
85+
}
86+
if (xccFormat != null) {
87+
if (logger.isDebugEnabled()) {
88+
logger.debug("Adapted REST format " + format + " to XCC format: " + xccFormat.toString());
89+
}
90+
options.setFormat(xccFormat);
91+
}
92+
}
93+
94+
protected void adaptPermissions(ContentCreateOptions options, DocumentMetadataHandle metadata) {
95+
Set<ContentPermission> contentPermissions = new HashSet<>();
96+
DocumentMetadataHandle.DocumentPermissions permissions = metadata.getPermissions();
97+
for (String role : permissions.keySet()) {
98+
for (DocumentMetadataHandle.Capability capability : permissions.get(role)) {
99+
ContentCapability contentCapability;
100+
if (DocumentMetadataHandle.Capability.EXECUTE.equals(capability)) {
101+
contentCapability = ContentCapability.EXECUTE;
102+
} else if (DocumentMetadataHandle.Capability.INSERT.equals(capability)) {
103+
contentCapability = ContentCapability.INSERT;
104+
} else if (DocumentMetadataHandle.Capability.READ.equals(capability)) {
105+
contentCapability = ContentCapability.READ;
106+
} else if (DocumentMetadataHandle.Capability.UPDATE.equals(capability)) {
107+
contentCapability = ContentCapability.UPDATE;
108+
} else throw new IllegalArgumentException("Unrecognized permission capability: " + capability);
109+
contentPermissions.add(new ContentPermission(contentCapability, role));
110+
}
111+
}
112+
options.setPermissions(contentPermissions.toArray(new ContentPermission[]{}));
113+
}
114+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package com.marklogic.client.xcc;
2+
3+
import com.marklogic.client.document.DocumentWriteOperation;
4+
import com.marklogic.xcc.Content;
5+
6+
/**
7+
* Interface for adapting a REST DocumentWriteOperation instance into an XCC Content instance. Intended to support
8+
* objects that prefer using the REST API to insert a document, where the document to insert is defined via a
9+
* DocumentWriteOperation instance, but can then shift to XCC if needed.
10+
*/
11+
public interface DocumentWriteOperationAdapter {
12+
13+
Content adapt(DocumentWriteOperation operation);
14+
15+
}

0 commit comments

Comments
 (0)