Skip to content
This repository was archived by the owner on Sep 16, 2024. It is now read-only.

Commit e5aa843

Browse files
committed
#85 New job for exporting batches to files in a directory
Also moved LoadModulesFailureListener, which was in the wrong package
1 parent 983bb3a commit e5aa843

File tree

11 files changed

+327
-29
lines changed

11 files changed

+327
-29
lines changed

build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ repositories {
1717
}
1818

1919
dependencies {
20-
compile 'com.marklogic:marklogic-client-api:4.0.2'
21-
compile 'com.marklogic:marklogic-xcc:9.0.2'
20+
compile 'com.marklogic:marklogic-client-api:4.0.3'
21+
compile 'com.marklogic:marklogic-xcc:9.0.3'
2222
compile 'org.jdom:jdom2:2.0.6'
2323
compile 'org.springframework:spring-context:4.3.7.RELEASE'
2424

src/main/java/com/marklogic/client/ext/datamovement/job/AbstractQueryBatcherJob.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public abstract class AbstractQueryBatcherJob extends BatcherConfig implements Q
1818

1919
private boolean applyConsistentSnapshot = false;
2020
private boolean awaitCompletion = true;
21-
private boolean stopAfterCompletion = true;
21+
private boolean stopJobAfterCompletion = true;
2222

2323
// A client can provide its own DataMovementManager to be reused
2424
private DataMovementManager dataMovementManager;
@@ -54,7 +54,7 @@ public QueryBatcherJobTicket run(DatabaseClient databaseClient) {
5454

5555
if (awaitCompletion) {
5656
queryBatcher.awaitCompletion();
57-
if (stopAfterCompletion) {
57+
if (stopJobAfterCompletion) {
5858
dmm.stopJob(queryBatcher);
5959
}
6060
if (jobDescription != null && logger.isInfoEnabled()) {
@@ -235,12 +235,12 @@ public AbstractQueryBatcherJob setAwaitCompletion(boolean awaitCompletion) {
235235
return this;
236236
}
237237

238-
public boolean isStopAfterCompletion() {
239-
return stopAfterCompletion;
238+
public boolean isStopJobAfterCompletion() {
239+
return stopJobAfterCompletion;
240240
}
241241

242-
public AbstractQueryBatcherJob setStopAfterCompletion(boolean stopAfterCompletion) {
243-
this.stopAfterCompletion = stopAfterCompletion;
242+
public AbstractQueryBatcherJob setStopJobAfterCompletion(boolean stopJobAfterCompletion) {
243+
this.stopJobAfterCompletion = stopJobAfterCompletion;
244244
return this;
245245
}
246246

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package com.marklogic.client.ext.datamovement.job;
2+
3+
import com.marklogic.client.ext.datamovement.listener.ExportBatchesToDirectoryListener;
4+
5+
import java.io.File;
6+
7+
public class ExportBatchesToDirectoryJob extends AbstractQueryBatcherJob {
8+
9+
private ExportBatchesToDirectoryListener exportBatchesToDirectoryListener;
10+
private File exportDir;
11+
12+
public ExportBatchesToDirectoryJob(File exportDir) {
13+
exportBatchesToDirectoryListener = new ExportBatchesToDirectoryListener(exportDir);
14+
addUrisReadyListener(exportBatchesToDirectoryListener);
15+
this.exportDir = exportDir;
16+
}
17+
18+
@Override
19+
protected String getJobDescription() {
20+
return "Exporting batches of documents " + getQueryDescription() + " to files at: " + exportDir.getAbsolutePath();
21+
}
22+
23+
public ExportBatchesToDirectoryListener getExportBatchesToDirectoryListener() {
24+
return exportBatchesToDirectoryListener;
25+
}
26+
}

src/main/java/com/marklogic/client/ext/datamovement/job/ExportToFileJob.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
public class ExportToFileJob extends AbstractQueryBatcherJob {
1414

1515
private File exportFile;
16-
private String filePrefix;
17-
private String fileSuffix;
16+
private String fileHeader;
17+
private String fileFooter;
1818
private FileWriter fileWriter;
1919
private ExportToWriterListener exportToWriterListener;
2020
private boolean includeXmlOutputListener = true;
@@ -41,8 +41,8 @@ public QueryBatcherJobTicket run(DatabaseClient databaseClient) {
4141

4242
if (ticket.getQueryBatcher().isStopped()) {
4343
try {
44-
if (fileSuffix != null) {
45-
fileWriter.write(fileSuffix);
44+
if (fileFooter != null) {
45+
fileWriter.write(fileFooter);
4646
}
4747
} catch (IOException ie) {
4848
throw new RuntimeException(ie);
@@ -67,8 +67,8 @@ protected void prepareQueryBatcher(QueryBatcher queryBatcher) {
6767
}
6868

6969
try {
70-
if (filePrefix != null) {
71-
fileWriter.write(filePrefix);
70+
if (fileHeader != null) {
71+
fileWriter.write(fileHeader);
7272
fileWriter.write("\n");
7373
}
7474
} catch (IOException e) {
@@ -99,12 +99,12 @@ public ExportToWriterListener getExportToWriterListener() {
9999
return exportToWriterListener;
100100
}
101101

102-
public void setFilePrefix(String filePrefix) {
103-
this.filePrefix = filePrefix;
102+
public void setFileHeader(String fileHeader) {
103+
this.fileHeader = fileHeader;
104104
}
105105

106-
public void setFileSuffix(String fileSuffix) {
107-
this.fileSuffix = fileSuffix;
106+
public void setFileFooter(String fileFooter) {
107+
this.fileFooter = fileFooter;
108108
}
109109

110110
public void setIncludeXmlOutputListener(boolean includeXmlOutputListener) {
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
package com.marklogic.client.ext.datamovement.listener;
2+
3+
import com.marklogic.client.datamovement.BatchFailureListener;
4+
import com.marklogic.client.datamovement.ExportListener;
5+
import com.marklogic.client.datamovement.ExportToWriterListener;
6+
import com.marklogic.client.datamovement.QueryBatch;
7+
import com.marklogic.client.document.DocumentManager;
8+
import com.marklogic.client.document.ServerTransform;
9+
import com.marklogic.client.io.Format;
10+
import org.slf4j.Logger;
11+
import org.slf4j.LoggerFactory;
12+
13+
import java.io.File;
14+
import java.io.FileWriter;
15+
import java.io.IOException;
16+
import java.util.HashSet;
17+
import java.util.Set;
18+
19+
/**
20+
* Supports exporting each QueryBatch to a separate File in a given directory.
21+
* <p>
22+
* Extends ExportListener so that it can reuse the error handling in that class.
23+
* <p>
24+
* Reuses ExportToWriterListener. Many of the properties that can be set on this class are copied over to each
25+
* ExportToWriterListener, which handles writing each QueryBatch to a File via a FileWriter.
26+
*/
27+
public class ExportBatchesToDirectoryListener extends ExportListener {
28+
29+
private static Logger logger = LoggerFactory.getLogger(ExportBatchesToDirectoryListener.class);
30+
31+
private File exportDir;
32+
33+
// Specific to this class - controls the name of each file that is written to
34+
private String fileExtension = "xml";
35+
private String filenamePrefix = "batch-";
36+
37+
// Copied over to the ExportToWriterListener instances created by this class
38+
private boolean includeXmlOutputListener = true;
39+
private ServerTransform transform;
40+
private Format nonDocumentFormat;
41+
private boolean consistentSnapshot;
42+
private Set<DocumentManager.Metadata> categories = new HashSet();
43+
private String recordPrefix;
44+
private String recordSuffix;
45+
private String fileHeader;
46+
private String fileFooter;
47+
48+
public ExportBatchesToDirectoryListener(File exportDir) {
49+
this.exportDir = exportDir;
50+
this.exportDir.mkdirs();
51+
}
52+
53+
@Override
54+
public void processEvent(QueryBatch queryBatch) {
55+
try {
56+
writeDocuments(queryBatch);
57+
} catch (Throwable t) {
58+
for (BatchFailureListener<QueryBatch> queryBatchFailureListener : getBatchFailureListeners()) {
59+
try {
60+
queryBatchFailureListener.processFailure(queryBatch, t);
61+
} catch (Throwable t2) {
62+
logger.error("Exception thrown by an onFailure listener", t2);
63+
}
64+
}
65+
}
66+
}
67+
68+
/**
69+
* @param queryBatch
70+
* @throws IOException
71+
*/
72+
protected void writeDocuments(QueryBatch queryBatch) throws IOException {
73+
File file = getFileForBatch(queryBatch, exportDir);
74+
75+
FileWriter fileWriter = new FileWriter(file);
76+
try {
77+
if (fileHeader != null) {
78+
fileWriter.write(fileHeader);
79+
}
80+
81+
ExportToWriterListener listener = new ExportToWriterListener(fileWriter);
82+
prepareExportToWriterListener(listener);
83+
listener.processEvent(queryBatch);
84+
85+
if (fileFooter != null) {
86+
fileWriter.write(fileFooter);
87+
}
88+
} finally {
89+
fileWriter.close();
90+
}
91+
}
92+
93+
/**
94+
* Determine the File to write to for the given query batch.
95+
*
96+
* @param queryBatch
97+
* @param exportDir
98+
* @return
99+
*/
100+
protected File getFileForBatch(QueryBatch queryBatch, File exportDir) {
101+
String filename = queryBatch.getJobBatchNumber() + "." + fileExtension;
102+
if (filenamePrefix != null) {
103+
filename = filenamePrefix + filename;
104+
}
105+
return new File(exportDir, filename);
106+
}
107+
108+
/**
109+
* Copies all of the applicable properties to the listener that have been set on this class.
110+
*
111+
* @param listener
112+
*/
113+
protected void prepareExportToWriterListener(ExportToWriterListener listener) {
114+
if (includeXmlOutputListener) {
115+
listener.onGenerateOutput(new XmlOutputListener());
116+
}
117+
if (consistentSnapshot) {
118+
listener.withConsistentSnapshot();
119+
}
120+
if (categories != null) {
121+
for (DocumentManager.Metadata category : categories) {
122+
listener.withMetadataCategory(category);
123+
}
124+
}
125+
if (nonDocumentFormat != null) {
126+
listener.withNonDocumentFormat(nonDocumentFormat);
127+
}
128+
if (transform != null) {
129+
listener.withTransform(transform);
130+
}
131+
if (recordPrefix != null) {
132+
listener.withRecordPrefix(recordPrefix);
133+
}
134+
if (recordSuffix != null) {
135+
listener.withRecordSuffix(recordSuffix);
136+
}
137+
}
138+
139+
public ExportBatchesToDirectoryListener withFileExtension(String fileExtension) {
140+
this.fileExtension = fileExtension;
141+
return this;
142+
}
143+
144+
public ExportBatchesToDirectoryListener withConsistentSnapshot() {
145+
this.consistentSnapshot = true;
146+
return this;
147+
}
148+
149+
public ExportBatchesToDirectoryListener withMetadataCategory(DocumentManager.Metadata category) {
150+
this.categories.add(category);
151+
return this;
152+
}
153+
154+
public ExportBatchesToDirectoryListener withNonDocumentFormat(Format nonDocumentFormat) {
155+
this.nonDocumentFormat = nonDocumentFormat;
156+
return this;
157+
}
158+
159+
public ExportBatchesToDirectoryListener withTransform(ServerTransform transform) {
160+
this.transform = transform;
161+
return this;
162+
}
163+
164+
public ExportBatchesToDirectoryListener withRecordPrefix(String recordPrefix) {
165+
this.recordPrefix = recordPrefix;
166+
return this;
167+
}
168+
169+
public ExportBatchesToDirectoryListener withRecordSuffix(String recordSuffix) {
170+
this.recordSuffix = recordSuffix;
171+
return this;
172+
}
173+
174+
public ExportBatchesToDirectoryListener withFileHeader(String fileHeader) {
175+
this.fileHeader = fileHeader;
176+
return this;
177+
}
178+
179+
public ExportBatchesToDirectoryListener withFileFooter(String fileFooter) {
180+
this.fileFooter = fileFooter;
181+
return this;
182+
}
183+
184+
public void setIncludeXmlOutputListener(boolean includeXmlOutputListener) {
185+
this.includeXmlOutputListener = includeXmlOutputListener;
186+
}
187+
188+
public ExportBatchesToDirectoryListener withFilenamePrefix(String filenamePrefix) {
189+
this.filenamePrefix = filenamePrefix;
190+
return this;
191+
}
192+
}

src/main/java/com/marklogic/client/ext/modulesloader/impl/DefaultModulesLoader.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import com.marklogic.client.admin.*;
77
import com.marklogic.client.admin.ResourceExtensionsManager.MethodParameters;
88
import com.marklogic.client.admin.ServerConfigurationManager.UpdatePolicy;
9-
import com.marklogic.client.ext.datamovement.listener.LoadModulesFailureListener;
109
import com.marklogic.client.ext.file.DocumentFile;
1110
import com.marklogic.client.ext.helper.FilenameUtil;
1211
import com.marklogic.client.ext.helper.LoggingObject;
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.marklogic.client.ext.datamovement.listener;
1+
package com.marklogic.client.ext.modulesloader.impl;
22

33
public interface LoadModulesFailureListener {
44
void processFailure(Throwable throwable);

src/main/java/com/marklogic/client/ext/modulesloader/impl/SimpleLoadModulesFailureListener.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.marklogic.client.ext.modulesloader.impl;
22

3-
import com.marklogic.client.ext.datamovement.listener.LoadModulesFailureListener;
43
import com.marklogic.client.ext.helper.LoggingObject;
54

65
public class SimpleLoadModulesFailureListener extends LoggingObject implements LoadModulesFailureListener {

src/test/java/com/marklogic/client/ext/datamovement/AbstractDataMovementTest.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
package com.marklogic.client.ext.datamovement;
22

33
import com.marklogic.client.datamovement.DeleteListener;
4+
import com.marklogic.client.document.DocumentWriteOperation;
45
import com.marklogic.client.ext.AbstractIntegrationTest;
56
import com.marklogic.client.ext.batch.RestBatchWriter;
67
import com.marklogic.client.ext.batch.SimpleDocumentWriteOperation;
78
import com.marklogic.client.ext.datamovement.job.DeleteCollectionsJob;
89
import org.junit.Before;
910

10-
import java.util.Arrays;
11+
import java.util.ArrayList;
12+
import java.util.List;
1113

1214
/**
1315
* Abstract class for making it easier to test DMSDK listeners and consumers.
@@ -31,12 +33,17 @@ public void setup() {
3133
queryBatcherTemplate.applyOnDocumentUris(new DeleteListener(), FIRST_URI, SECOND_URI);
3234
new DeleteCollectionsJob(COLLECTION).run(client);
3335

34-
// Insert documents
36+
writeDocuments(FIRST_URI, SECOND_URI);
37+
}
38+
39+
protected void writeDocuments(String... uris) {
3540
RestBatchWriter writer = new RestBatchWriter(client, false);
36-
writer.write(Arrays.asList(
37-
new SimpleDocumentWriteOperation(FIRST_URI, "<one/>", COLLECTION),
38-
new SimpleDocumentWriteOperation(SECOND_URI, "<two/>", COLLECTION)
39-
));
41+
List<DocumentWriteOperation> list = new ArrayList<>();
42+
for (String uri : uris) {
43+
list.add(new SimpleDocumentWriteOperation(uri, "<test>" + uri + "</test>", COLLECTION));
44+
}
45+
writer.write(list);
4046
writer.waitForCompletion();
4147
}
48+
4249
}

0 commit comments

Comments
 (0)