|
66 | 66 | * .withBatchSize(1000) |
67 | 67 | * .withThreadCount(20) |
68 | 68 | .withConsistentSnapshot() |
69 | | - * .onUrisReady(batch -> { |
| 69 | + * .onUrisReady(batch -> { |
70 | 70 | * for ( String uri : batch.getItems() ) { |
71 | 71 | * if ( uri.endsWith(".txt") ) { |
72 | 72 | * client.newDocumentManager().delete(uri); |
73 | 73 | * } |
74 | 74 | * } |
75 | 75 | * }) |
76 | | - * .onQueryFailure(queryBatchException -> queryBatchException.printStackTrace()); |
| 76 | + * .onQueryFailure(queryBatchException -> queryBatchException.printStackTrace()); |
77 | 77 | * JobTicket ticket = dataMovementManager.startJob(qhb); |
78 | 78 | * qhb.awaitCompletion(); |
79 | 79 | * dataMovementManager.stopJob(ticket); |
80 | 80 | * ``` |
81 | 81 | * |
82 | 82 | * |
83 | | - * |
84 | | - * |
85 | | - * |
86 | 83 | * # Using WriteBatcher |
87 | 84 | * |
88 | 85 | * When you need to write a very large volume of documents and |
|
95 | 92 | * WriteBatcher whb = dataMovementManager.newWriteBatcher() |
96 | 93 | * .withBatchSize(100) |
97 | 94 | * .withThreadCount(20) |
98 | | - * .onBatchSuccess(batch -> { |
| 95 | + * .onBatchSuccess(batch -> { |
99 | 96 | * logger.debug("batch # {}, so far: {}", batch.getJobBatchNumber(), batch.getJobResultsSoFar()); |
100 | 97 | * }) |
101 | | - * .onBatchFailure((batch,throwable) -> throwable.printStackTrace() ); |
| 98 | + * .onBatchFailure((batch,throwable) -> throwable.printStackTrace() ); |
102 | 99 | * JobTicket ticket = dataMovementManager.startJob(whb); |
103 | 100 | * // the add or addAs methods could be called in separate threads on the |
104 | 101 | * // single whb instance |
|
109 | 106 | * dataMovementManager.stopJob(ticket); |
110 | 107 | * ``` |
111 | 108 | * [mlcp]: https://developer.marklogic.com/products/mlcp |
| 109 | + * |
| 110 | + * |
| 111 | + * <a name="lsnrs"></a> |
| 112 | + * # Writing Custom Listeners |
| 113 | + * |
| 114 | + * As demonstrated above, listeners should be added to each instance of |
| 115 | + * QueryBatcher or WriteBatcher. Ad-hoc listeners can be written as Java 8 |
| 116 | + * lambda expressions. More sophisticated custom listeners can implement the |
| 117 | + * appropriate listener interface or extend one of the [provided listeners |
| 118 | + * listed above](#provided). |
| 119 | + * |
| 120 | + * QueryBatchListener (onUrisReady) instances are necessary to do something |
| 121 | + * with the uris fetched by QueryBatcher. What a custom QueryBatchListener |
| 122 | + * does is completely up to it, but any operation which operates on uris |
| 123 | + * offered by any part of the Java Client API could be used, as could any read |
| 124 | + * or write to an external system. QueryFailureListener (onQueryFailure) |
| 125 | + * instances handle any exceptions encoutnered fetching the uris. |
| 126 | + * WriteBatchListener (onBatchSuccess) instances handle any custom tracking |
| 127 | + * requirements during a WriteBatcher job. WriteFailureListener |
| 128 | + * (onBatchFailure) instances handle any exceptions encountered writing the |
| 129 | + * batches formed from docs send to the WriteBatcher instance. See the |
| 130 | + * javadocs for each [provided listener](#provided) for an explantion of the |
| 131 | + * various listeners that can be registered for it to call. See javadocs, the |
| 132 | + * [Java Application Developer's Guide][], [source code for provided |
| 133 | + * listeners][], [cookbook examples][], and [unit tests][] for more examples of |
| 134 | + * listener implementation ideas. |
| 135 | + * |
| 136 | + * [Java Application Developer's Guide]: http://docs.marklogic.com/guide/java |
| 137 | + * [source code for provided listeners]: https://github.com/marklogic/java-client-api |
| 138 | + * [cookbook examples]: https://github.com/marklogic/java-client-api/tree/develop/src/main/java/com/marklogic/client/example/cookbook/datamovement |
| 139 | + * [unit tests]: https://github.com/marklogic/java-client-api/tree/develop/src/test/java/com/marklogic/client/test/datamovement |
| 140 | + * |
| 141 | + * # Listners Must Be Thread-Safe |
| 142 | + * |
| 143 | + * Since listeners are called asynchronously by all threads in the pool inside |
| 144 | + * the QueryBatcher or WriteBatcher instance, they must only perform |
| 145 | + * thread-safe operations. For example, accumulating to a collection should |
| 146 | + * only be done with collections wrapped as |
| 147 | + * {@link java.util.Collections#synchronizedCollection synchronized Collections} |
| 148 | + * rather than directly using un-synchronized collections such as HashMap or |
| 149 | + * ArrayList which are not thread-safe. Similarly, accumulating to a string |
| 150 | + * should use StringBuffer insted of StringBuilder since StringBuffer is |
| 151 | + * synchronized (and thus thread-safe). We also recommend {@link |
| 152 | + * java.util.concurrent.atomic java.util.concurrent.atomic classes}. |
| 153 | + * |
| 154 | + * Listeners should handle their own exceptions as described below in |
| 155 | + * [Handling Exceptions in Listeners](#errs). |
| 156 | + * |
| 157 | + * |
| 158 | + * <a name="errs"></a> |
| 159 | + * # Handling Exceptions in Listeners |
| 160 | + * |
| 161 | + * Since listeners are called asynchrounously, external exception handling |
| 162 | + * cannot wrap the call in a try-catch block. Instead, a listener can and |
| 163 | + * should handle its own exceptions by wrapping the calls in its body in a |
| 164 | + * try-catch block. When any listener does not handle its own exceptions and |
| 165 | + * throws any exception (Throwable), the exception is logged at error level |
| 166 | + * with a call like: |
| 167 | + * |
| 168 | + * logger.error("Exception thrown by an onBatchSuccess listener", throwable); |
| 169 | + * |
| 170 | + * This achieves logging of exceptions without allowing them to prevent the job |
| 171 | + * from continuing. |
| 172 | + * |
| 173 | + * A QueryFailureListener or WriteFailureListener will not be notified of |
| 174 | + * exceptions thrown by other listeners. Instead, these failure listeners are |
| 175 | + * notified exclusively of exceptions in the operation of QueryBatcher or |
| 176 | + * WriteBatcher. |
| 177 | + * |
| 178 | + * If you wish a custom QueryBatchListener or WriteBatchListener to trap its |
| 179 | + * own exceptions and pass them along to callbacks registered with it for |
| 180 | + * exception handling, it can of course do that in a custom way. Examples of |
| 181 | + * this pattern can be seen in the interface of |
| 182 | + * {@link com.marklogic.client.datamovement.ApplyTransformListener}. |
| 183 | + * |
| 184 | + * # Pre-installed Listeners |
| 185 | + * |
| 186 | + * Every time you create a new QueryBatcher or WriteBatcher it comes with some |
| 187 | + * pre-installed listeners such as |
| 188 | + * {@link com.marklogic.client.datamovement.HostAvailabilityListener} and a |
| 189 | + * listener to track counts for JobReport. If you wish to remove these |
| 190 | + * listeners and their associated functionality call one of the following: |
| 191 | + * {@link com.marklogic.client.datamovement.QueryBatcher#setUrisReadyListeners |
| 192 | + * setUrisReadyListeners}, {@link |
| 193 | + * com.marklogic.client.datamovement.QueryBatcher#setQueryFailureListeners |
| 194 | + * setQueryFailureListeners}, {@link |
| 195 | + * com.marklogic.client.datamovement.WriteBatcher#setBatchSuccessListeners |
| 196 | + * setBatchSuccessListeners}, or {@link |
| 197 | + * com.marklogic.client.datamovement.WriteBatcher#setBatchFailureListeners |
| 198 | + * setBatchFailureListeners}. Obviously, removing the functionality of |
| 199 | + * HostAvailabilityListener means it won't do its job of handling black-listing |
| 200 | + * hosts or retrying batches that occur when a host is unavailable. And |
| 201 | + * removing the functionality of the listeners that track counts for JobReport |
| 202 | + * means JobReport should no longer be used. If you would just like to change |
| 203 | + * the settings on HostAvailabilityListener, you can do something like the |
| 204 | + * following: |
| 205 | + * |
| 206 | + * for (WriteFailureListener listener : batcher.getBatchFailureListeners()) { |
| 207 | + * if ( listener instanceof HostAvailabilityListener ) { |
| 208 | + * ((HostAvailabilityListener) listener) |
| 209 | + * .withSuspendTimeForHostUnavailable(Duration.ofMinutes(60)) |
| 210 | + * .withMinHosts(2); |
| 211 | + * } |
| 212 | + * } |
| 213 | + * |
| 214 | + * |
| 215 | + * <h2>Enable Logging</h2> |
| 216 | + * |
| 217 | + * We have made efforts to provide helpful logging as you use QueryBatcher and |
| 218 | + * WriteBatcher. Please make sure to enable your slf4j-compliant [logging |
| 219 | + * framework](../../../../overview-summary.html#logging). |
| 220 | + * |
112 | 221 | */ |
113 | 222 | /* |
114 | 223 | * Copyright 2015-2016 MarkLogic Corporation |
|
0 commit comments