Skip to content

Commit 72fd8ed

Browse files
georgeajitgeorgeajit
authored andcommitted
Merge branch 'develop' of https://github.com/marklogic/java-client-api.git into develop
2 parents 57790fc + 0989af3 commit 72fd8ed

File tree

12 files changed

+4779
-4873
lines changed

12 files changed

+4779
-4873
lines changed

src/main/java/com/marklogic/client/datamovement/HostAvailabilityListener.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ public void processFailure(WriteBatch batch, Throwable throwable) {
182182
logger.warn("Retrying failed batch: {}, results so far: {}, uris: {}",
183183
batch.getJobBatchNumber(), batch.getJobWritesSoFar(),
184184
Stream.of(batch.getItems()).map(event->event.getTargetUri()).collect(Collectors.toList()));
185-
batch.getBatcher().retry(batch);
185+
batch.getBatcher().retryWithFailureListeners(batch);
186186
} catch (RuntimeException e) {
187187
logger.error("Exception during retry", e);
188188
processFailure(batch, e);
@@ -202,7 +202,7 @@ public void processFailure(QueryBatchException queryBatch) {
202202
logger.warn("Retrying failed batch: {}, results so far: {}, forest: {}, forestBatch: {}, forest results so far: {}",
203203
queryBatch.getJobBatchNumber(), queryBatch.getJobResultsSoFar(), queryBatch.getForest().getForestName(),
204204
queryBatch.getForestBatchNumber(), queryBatch.getForestResultsSoFar());
205-
queryBatch.getBatcher().retry(queryBatch);
205+
queryBatch.getBatcher().retryWithFailureListeners(queryBatch);
206206
} catch (RuntimeException e) {
207207
logger.error("Exception during retry", e);
208208
processFailure(new QueryBatchException(queryBatch, e));

src/main/java/com/marklogic/client/datamovement/QueryBatcher.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,4 +310,18 @@ public interface QueryBatcher extends Batcher {
310310
* @param queryBatchListener the QueryBatchListener which needs to be applied
311311
*/
312312
void retryListener(QueryBatch batch, QueryBatchListener queryBatchListener);
313+
314+
/**
315+
* Retry in the same thread to query a batch that failed. If it fails again,
316+
* all the failure listeners associated with the batcher using onQueryFailure
317+
* method would be processed.
318+
*
319+
* Note : Use this method with caution as there is a possibility of infinite
320+
* loops. If a batch fails and one of the failure listeners calls this method
321+
* to retry with failure listeners and if the batch again fails, this would go
322+
* on as an infinite loop until the batch succeeds.
323+
*
324+
* @param queryEvent the information about the batch that failed
325+
*/
326+
void retryWithFailureListeners(QueryEvent queryEvent);
313327
}

src/main/java/com/marklogic/client/datamovement/WriteBatcher.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,4 +321,18 @@ WriteBatcher addAs(String uri, DocumentMetadataWriteHandle metadataHandle,
321321
* @throws IllegalStateException if this job has not yet been started
322322
*/
323323
JobTicket getJobTicket();
324+
325+
/**
326+
* Retry in the same thread to send a batch that failed. If it fails again,
327+
* all the failure listeners associated with the batcher using onBatchFailure
328+
* method would be processed.
329+
*
330+
* Note : Use this method with caution as there is a possibility of infinite
331+
* loops. If a batch fails and one of the failure listeners calls this method
332+
* to retry with failure listeners and if the batch again fails, this would go
333+
* on as an infinite loop until the batch succeeds.
334+
*
335+
* @param queryEvent the information about the batch that failed
336+
*/
337+
public void retryWithFailureListeners(WriteBatch writeBatch);
324338
}

src/main/java/com/marklogic/client/datamovement/impl/DataMovementManagerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
public class DataMovementManagerImpl implements DataMovementManager {
5252
private static Logger logger = LoggerFactory.getLogger(DataMovementManager.class);
5353
private DataMovementServices service = new DataMovementServices();
54-
private ConcurrentHashMap<String, JobTicket> activeJobs = new ConcurrentHashMap<>();
54+
private static ConcurrentHashMap<String, JobTicket> activeJobs = new ConcurrentHashMap<>();
5555
private ForestConfiguration forestConfig;
5656
private DatabaseClient primaryClient;
5757
// clientMap key is the hostname_database

src/main/java/com/marklogic/client/datamovement/impl/QueryBatcherImpl.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,11 +124,19 @@ public QueryBatcherImpl onQueryFailure(QueryFailureListener listener) {
124124
*/
125125
@Override
126126
public void retry(QueryEvent queryEvent) {
127+
retry(queryEvent, false);
128+
}
129+
130+
@Override
131+
public void retryWithFailureListeners(QueryEvent queryEvent) {
132+
retry(queryEvent, true);
133+
}
134+
135+
private void retry(QueryEvent queryEvent, boolean callFailListeners) {
127136
if ( isStopped() == true ) {
128137
logger.warn("Job is now stopped, aborting the retry");
129138
return;
130139
}
131-
boolean callFailListeners = false;
132140
Forest retryForest = null;
133141
for ( Forest forest : getForestConfig().listForests() ) {
134142
if ( forest.equals(queryEvent.getForest()) ) {
@@ -151,7 +159,6 @@ public void retry(QueryEvent queryEvent) {
151159
queryEvent.getForestBatchNumber(), start, queryEvent.getJobBatchNumber(), callFailListeners);
152160
runnable.run();
153161
}
154-
155162
/*
156163
* Accepts a QueryBatch which was successfully retrieved from the server and a
157164
* QueryBatchListener which was failed to apply and retry that listener on the batch.

src/main/java/com/marklogic/client/datamovement/impl/WriteBatcherImpl.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -461,26 +461,38 @@ public WriteBatcher onBatchFailure(WriteFailureListener listener) {
461461
return this;
462462
}
463463

464+
@Override
465+
public void retryWithFailureListeners(WriteBatch batch) {
466+
retry(batch, true);
467+
}
468+
464469
@Override
465470
public void retry(WriteBatch batch) {
471+
retry(batch, false);
472+
}
473+
474+
private void retry(WriteBatch batch, boolean callFailListeners) {
466475
if ( isStopped() == true ) {
467476
logger.warn("Job is now stopped, aborting the retry");
468477
return;
469478
}
470479
if ( batch == null ) throw new IllegalArgumentException("batch must not be null");
471480
boolean forceNewTransaction = true;
472481
BatchWriteSet writeSet = newBatchWriteSet(forceNewTransaction, batch.getJobBatchNumber());
473-
writeSet.onFailure(throwable -> {
474-
if ( throwable instanceof RuntimeException ) throw (RuntimeException) throwable;
475-
else throw new DataMovementException("Failed to retry batch", throwable);
476-
});
477-
for ( WriteEvent doc : batch.getItems() ) {
482+
if ( !callFailListeners ) {
483+
writeSet.onFailure(throwable -> {
484+
if ( throwable instanceof RuntimeException )
485+
throw (RuntimeException) throwable;
486+
else
487+
throw new DataMovementException("Failed to retry batch", throwable);
488+
});
489+
}
490+
for (WriteEvent doc : batch.getItems()) {
478491
writeSet.getWriteSet().add(doc.getTargetUri(), doc.getMetadata(), doc.getContent());
479492
}
480493
BatchWriter runnable = new BatchWriter(writeSet);
481494
runnable.run();
482495
}
483-
484496
@Override
485497
public WriteBatchListener[] getBatchSuccessListeners() {
486498
return successListeners.toArray(new WriteBatchListener[successListeners.size()]);
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
# Using Cookbook Examples
2+
3+
The most important use of cookbook examples is reading the source code. You
4+
can do this on [github](https://github.com/marklogic/java-client-api) or on
5+
your machine once you've cloned the code from github.
6+
7+
To run the examples, first edit the
8+
[Example.properties](../../../../../../resources/Example.properties) file in the
9+
distribution to specify the connection parameters for your server. Most
10+
Cookbook examples have a main method, so they can be run from the command-line
11+
like so:
12+
13+
java -cp $CLASSPATH com.marklogic.client.example.cookbook.DocumentWrite
14+
15+
This, of course, requires that you have all necessary dependencies in the env
16+
variable $CLASSPATH. You can get the classpath for your machine using the
17+
[maven dependency plugin](http://maven.apache.org/plugins/maven-dependency-plugin/usage.html)
18+
command:
19+
20+
mvn dependency:build-classpath
21+
22+
# Testing Cookbook Examples
23+
24+
Most cookbook examples pass their unit test if they run without error. First
25+
edit the [Example.properties](../../../../../../resources/Example.properties) file
26+
in the distribution to specify the connection parameters for your server. Then
27+
run `mvn test` while specifying the unit test you want to run, for example:
28+
29+
mvn test -Dtest=DocumentWriteTest
30+
31+
# Creating a Cookbook Example
32+
33+
We encourage community-contributed cookbook examples! Make sure you follow
34+
the guidelines in [CONTRIBUTING.md](../../../../../../../../CONTRIBUTING.md)
35+
when you submit a pull request. Each cookbook example should be runnable from
36+
the command-line, so it should have a static `main` method. The approach in
37+
the code should come as close as possible to production code (code one would
38+
reasonably expect to use in a production application), while remaining as
39+
simple as possible to facilitate grokking for newbies to the Java Client API.
40+
It should have helpful comments throughout, including javadocs since it will
41+
show up in the published javadocs. It should be added to
42+
[AllCookbookExamples.java](https://github.com/marklogic/java-client-api/blob/develop/src/main/java/com/marklogic/client/example/cookbook/AllCookbookExamples.java)
43+
in order of recommended examples for developers to review.
44+
45+
It should have a unit test added to
46+
[this package](https://github.com/marklogic/java-client-api/tree/develop/src/test/java/com/marklogic/client/test/example/cookbook).
47+
The unit test can test whatever is needed, however most cookbook unit tests
48+
just run the class and consider it success if no errors are thrown. Some
49+
cookbook examples, such as SSLClientCreator and KerberosClientCreator cannot be
50+
included in unit tests because the unit tests require a server configured with
51+
digest authentication and those tests require a different authentication
52+
scheme. Any cookbook examples not included in unit tests run the risk of
53+
breaking without anyone noticing--hence we have unit tests whenever possible.

src/main/java/com/marklogic/client/example/cookbook/datamovement/BulkExportToJdbc.java

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,28 @@
5353

5454
import java.text.SimpleDateFormat;
5555

56+
/** BulkExportToJdbc shows how simple it is to use Data Movement SDK to move
57+
* massive data sets from a source MarkLogic Server to a JDBC target server.
58+
* In this example all employees are exported using a query matching all docs
59+
* in directory /employees/. Using the
60+
* [Shortcut Method](https://docs.marklogic.com/javadoc/client/overview-summary.html#ShortcutMethods)
61+
* `getContentAs` and the Employee POJO class (pre-registered with the handle
62+
* registry by DatabaseClientSingleton), we can easily serialize each document
63+
* to an Employee object. From there it's straightforward to use Spring's
64+
* JdbcTemplate to write the employees, their salaries, and their titles via
65+
* JDBC. Of course, Spring's JdbcTemplate is not required--you could choose
66+
* your favorite JDBC libraries to use with Data Movement SDK. And of course
67+
* you don't need to deserialize to pojos--you could use any of the Java Client
68+
* API handles to deserialize the matching documents.
69+
*/
5670
public class BulkExportToJdbc {
5771
private static Logger logger = LoggerFactory.getLogger(BulkExportToJdbc.class);
72+
// this is the date format required by our relational database tables
5873
public static final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
5974

75+
// we're using a small thread count and batch size because the example
76+
// dataset is small, but with a larger dataset you'd use more threads and
77+
// larger batches
6078
private static int threadCount = 3;
6179
private static int batchSize = 3;
6280

@@ -68,25 +86,49 @@ public static void main(String[] args) throws IOException, SQLException {
6886
}
6987

7088
public void run() throws IOException, SQLException {
89+
// connect to JDBC and initialize JdbcTemplate
7190
JdbcTemplate jdbcTemplate = new JdbcTemplate(getDataSource());
72-
final boolean isMySQLDB;
73-
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
91+
// query for all employees in directory /employees/
7492
StructuredQueryDefinition query = new StructuredQueryBuilder().directory(true, "/employees/");
93+
// run the query on each forest in the cluster and asynchronously paginate
94+
// through matches, sending them to the onUrisReady listener ExportListener
7595
QueryBatcher qb = moveMgr.newQueryBatcher(query)
7696
.withBatchSize(batchSize)
7797
.withThreadCount(threadCount)
98+
99+
// use withConsistentSnapshot so the set of matches doesn't change while this job
100+
// runs even though updates are still occurring concurrently in MarkLogic Server.
101+
// Requires a [merge timestamp](https://docs.marklogic.com/guide/app-dev/point_in_time#id_32468)
102+
// to be set on MarkLogic Server.
103+
.withConsistentSnapshot()
104+
78105
.onUrisReady(
106+
// Since ExportListener meets our needs we'll use it instead of a
107+
// custom listener
79108
new ExportListener()
109+
110+
// since the ExportListener uses a separate request from the QueryBatcher
111+
// we must also use withConsistentSnapshot on the ExportListener
80112
.withConsistentSnapshot()
113+
114+
// this is our custom onDocumentReady listener
81115
.onDocumentReady(record -> {
116+
117+
// Employee class is registered by DatabaseClientSingleton with the
118+
// handle registry so we can use the getContentAs shortcut method
82119
Employee employee = record.getContentAs(Employee.class);
120+
121+
// using jdbcTemplate (which simplifies using jdbc) we can easily
122+
// write the employee to the target relational database server
83123
jdbcTemplate.update(
84124
"INSERT INTO employees_export (emp_no, hire_date, first_name, last_name, gender, birth_date) " +
85125
"VALUES (?, ?, ?, ?, ?, ?) ",
86126
employee.getEmployeeId(), dateFormat.format(employee.getHireDate().getTime()), employee.getFirstName(),
87127
employee.getLastName(), employee.getGender() == Gender.MALE ? "M" : "F",
88128
dateFormat.format(employee.getBirthDate().getTime()));
89129
if ( employee.getSalaries() != null ) {
130+
// each employee could have many salaries, and we need to write
131+
// each of those to its own row
90132
for ( Salary salary : employee.getSalaries() ) {
91133
jdbcTemplate.update(
92134
"INSERT INTO salaries_export (emp_no, salary, from_date, to_date) " +
@@ -96,6 +138,8 @@ public void run() throws IOException, SQLException {
96138
}
97139
}
98140
if ( employee.getTitles() != null ) {
141+
// each employee could have many titles, and we need to write
142+
// each of those to its own row
99143
for ( Title title : employee.getTitles() ) {
100144
jdbcTemplate.update(
101145
"INSERT INTO titles_export (emp_no, title, from_date, to_date) " +
@@ -105,23 +149,40 @@ public void run() throws IOException, SQLException {
105149
}
106150
}
107151
})
152+
153+
// in a production application we could have more elaborate error
154+
// handling here
108155
.onBatchFailure((failedBatch,exception) -> exception.printStackTrace())
109156
)
157+
158+
// another onUrisReady listener, this one custom, and just for logging
110159
.onUrisReady(batch ->
111160
logger.debug("Batch exported {}, so far {}",
112161
batch.getJobBatchNumber(), batch.getJobResultsSoFar())
113162
)
163+
164+
// in a production application we could have more elaborate error
165+
// handling here
114166
.onQueryFailure(exception -> exception.printStackTrace());
167+
168+
// now that the job is configured, kick it off
115169
JobTicket ticket = moveMgr.startJob(qb);
170+
171+
// wait for the job to fully complete all pagination and all listeners
116172
qb.awaitCompletion();
173+
174+
// free up resources by stopping the job
117175
moveMgr.stopJob(qb);
176+
177+
// double check that we didn't have any failed batches
118178
JobReport report = moveMgr.getJobReport(ticket);
119179
if ( report.getFailureBatchesCount() > 0 ) {
120180
throw new IllegalStateException("Encountered " +
121181
report.getFailureBatchesCount() + " failed batches");
122182
}
123183
}
124184

185+
// get the jdbcUrl property from Example.properties with our jdbc connection info
125186
private DataSource getDataSource() throws IOException {
126187
ExampleProperties properties = Util.loadProperties();
127188
return new DriverManagerDataSource(properties.jdbcUrl, properties.jdbcUser, properties.jdbcPassword);

0 commit comments

Comments
 (0)