1616package com .marklogic .hub ;
1717
1818import com .marklogic .client .DatabaseClient ;
19- import com .marklogic .client .DatabaseClientFactory ;
19+ import com .marklogic .client .datamovement .DataMovementManager ;
20+ import com .marklogic .client .datamovement .JobTicket ;
21+ import com .marklogic .client .datamovement .QueryBatcher ;
2022import com .marklogic .client .extensions .ResourceManager ;
2123import com .marklogic .client .extensions .ResourceServices .ServiceResult ;
2224import com .marklogic .client .extensions .ResourceServices .ServiceResultIterator ;
2325import com .marklogic .client .io .DOMHandle ;
2426import com .marklogic .client .util .RequestParameters ;
25- import com .marklogic .hub .flow .Flow ;
26- import com .marklogic .hub .flow .FlowComplexity ;
27- import com .marklogic .hub .flow .FlowType ;
28- import com .marklogic .hub .flow .SimpleFlow ;
29- import com .marklogic .spring .batch .hub .FlowConfig ;
30- import com .marklogic .spring .batch .hub .RunHarmonizeFlowConfig ;
31- import com .marklogic .spring .batch .hub .StagingConfig ;
32- import org .springframework .batch .core .Job ;
33- import org .springframework .batch .core .JobExecution ;
34- import org .springframework .batch .core .JobParameters ;
35- import org .springframework .batch .core .JobParametersBuilder ;
36- import org .springframework .batch .core .launch .JobLauncher ;
37- import org .springframework .context .ConfigurableApplicationContext ;
38- import org .springframework .context .annotation .AnnotationConfigApplicationContext ;
27+ import com .marklogic .hub .collector .Collector ;
28+ import com .marklogic .hub .collector .ServerCollector ;
29+ import com .marklogic .hub .flow .*;
30+ import com .marklogic .hub .job .Job ;
31+ import com .marklogic .hub .job .JobManager ;
3932import org .w3c .dom .Document ;
4033import org .w3c .dom .Element ;
4134import org .w3c .dom .Node ;
4235import org .w3c .dom .NodeList ;
4336
44- import java .util .ArrayList ;
45- import java .util .List ;
46- import java .util .UUID ;
37+ import java .util .* ;
38+ import java .util .concurrent . atomic . AtomicInteger ;
39+ import java .util .concurrent . atomic . AtomicLong ;
4740
4841public class FlowManager extends ResourceManager {
4942 private static final String HUB_NS = "http://marklogic.com/data-hub" ;
5043 private static final String NAME = "flow" ;
5144
52- private DatabaseClient client ;
45+ private DatabaseClient stagingClient ;
46+ private DatabaseClient finalClient ;
47+ private DatabaseClient jobClient ;
5348 private HubConfig hubConfig ;
49+ private JobManager jobManager ;
50+
51+ private DataMovementManager dataMovementManager ;
5452
5553 public FlowManager (HubConfig hubConfig ) {
5654 super ();
5755 this .hubConfig = hubConfig ;
58- this .client = hubConfig .newStagingClient ();
59- this .client .init (NAME , this );
56+ this .stagingClient = hubConfig .newStagingClient ();
57+ this .finalClient = hubConfig .newFinalClient ();
58+ this .jobClient = hubConfig .newJobDbClient ();
59+ this .jobManager = new JobManager (this .jobClient );
60+ this .dataMovementManager = this .stagingClient .newDataMovementManager ();
61+ this .stagingClient .init (NAME , this );
6062 }
6163
6264 /**
@@ -103,8 +105,7 @@ public List<Flow> getFlows(String entityName) {
103105 * @return the flow
104106 */
105107 public Flow getFlow (String entityName , String flowName ) {
106- Flow flow = getFlow (entityName , flowName , null );
107- return flow ;
108+ return getFlow (entityName , flowName , null );
108109 }
109110
110111 public Flow getFlow (String entityName , String flowName , FlowType flowType ) {
@@ -124,27 +125,8 @@ public Flow getFlow(String entityName, String flowName, FlowType flowType) {
124125 return flowFromXml (parent .getDocumentElement ());
125126 }
126127
127- private ConfigurableApplicationContext buildApplicationContext (Flow flow , JobStatusListener statusListener ) {
128- AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext ();
129- ctx .register (StagingConfig .class );
130- ctx .register (FlowConfig .class );
131- ctx .register (RunHarmonizeFlowConfig .class );
132- ctx .getBeanFactory ().registerSingleton ("hubConfig" , hubConfig );
133- ctx .getBeanFactory ().registerSingleton ("flow" , flow );
134- ctx .getBeanFactory ().registerSingleton ("statusListener" , statusListener );
135- ctx .refresh ();
136- return ctx ;
137- }
138-
139- private JobParameters buildJobParameters (Flow flow , int batchSize , int threadCount ) {
140- JobParametersBuilder jpb = new JobParametersBuilder ();
141- jpb .addLong ("batchSize" , Integer .toUnsignedLong (batchSize ));
142- jpb .addLong ("threadCount" , Integer .toUnsignedLong (threadCount ));
143- jpb .addString ("uid" , UUID .randomUUID ().toString ());
144- jpb .addString ("flowType" , flow .getType ().toString ());
145- jpb .addString ("entity" , flow .getEntityName ());
146- jpb .addString ("flow" , flow .getName ());
147- return jpb .toJobParameters ();
128+ public JobTicket runFlow (Flow flow , int batchSize , int threadCount , JobStatusListener statusListener ) {
129+ return runFlow (flow , batchSize , threadCount , HubDatabase .STAGING , HubDatabase .FINAL , null , statusListener );
148130 }
149131
150132 /**
@@ -155,20 +137,83 @@ private JobParameters buildJobParameters(Flow flow, int batchSize, int threadCou
155137 * @param statusListener - the callback to receive job status updates
156138 * @return a JobExecution instance
157139 */
158- public JobExecution runFlow (Flow flow , int batchSize , int threadCount , JobStatusListener statusListener ) {
159- JobExecution result = null ;
160- try {
161- ConfigurableApplicationContext ctx = buildApplicationContext (flow , statusListener );
162-
163- JobParameters params = buildJobParameters (flow , batchSize , threadCount );
164- JobLauncher launcher = ctx .getBean (JobLauncher .class );
165- Job job = ctx .getBean (Job .class );
166- result = launcher .run (job , params );
167- } catch (Exception e ) {
168- e .printStackTrace ();
140+ public JobTicket runFlow (Flow flow , int batchSize , int threadCount , HubDatabase srcDb , HubDatabase destDb , Map <String , Object > options , JobStatusListener statusListener ) {
141+
142+ Collector c = flow .getCollector ();
143+ if (c instanceof ServerCollector ) {
144+ ((ServerCollector )c ).setClient (stagingClient );
145+ }
146+
147+ AtomicLong successfulEvents = new AtomicLong (0 );
148+ AtomicLong failedEvents = new AtomicLong (0 );
149+ AtomicLong successfulBatches = new AtomicLong (0 );
150+ AtomicLong failedBatches = new AtomicLong (0 );
151+
152+ Vector <String > uris = c .run (options );
153+
154+ DatabaseClient srcClient ;
155+ if (srcDb .equals (HubDatabase .STAGING )) {
156+ srcClient = this .stagingClient ;
169157 }
158+ else {
159+ srcClient = this .finalClient ;
160+ }
161+ String targetDatabase ;
162+ if (destDb .equals (HubDatabase .STAGING )) {
163+ targetDatabase = hubConfig .stagingDbName ;
164+ }
165+ else {
166+ targetDatabase = hubConfig .finalDbName ;
167+ }
168+
169+ FlowRunner flowRunner = new FlowRunner (srcClient , targetDatabase , flow );
170+ AtomicInteger count = new AtomicInteger (0 );
171+ ArrayList <String > errorMessages = new ArrayList <>();
172+ QueryBatcher queryBatcher = dataMovementManager .newQueryBatcher (uris .iterator ())
173+ .withBatchSize (batchSize )
174+ .withThreadCount (threadCount )
175+ .onUrisReady (batch -> {
176+ try {
177+ RunFlowResponse response = flowRunner .run (batch .getJobTicket ().getJobId (), batch .getItems (), options );
178+ failedEvents .addAndGet (response .errorCount );
179+ successfulEvents .addAndGet (response .totalCount - response .errorCount );
180+ successfulBatches .addAndGet (1 );
181+ count .addAndGet (1 );
182+ }
183+ catch (Exception e ) {
184+ errorMessages .add (e .toString ());
185+ }
186+ })
187+ .onQueryFailure (failure -> {
188+ failedBatches .addAndGet (1 );
189+ failedEvents .addAndGet (batchSize );
190+ });
191+ JobTicket jobTicket = dataMovementManager .startJob (queryBatcher );
192+ jobManager .saveJob (Job .withFlow (flow )
193+ .withJobId (jobTicket .getJobId ())
194+ );
195+
196+ new Thread (() -> {
197+ queryBatcher .awaitCompletion ();
198+
199+ if (statusListener != null ) {
200+ statusListener .onJobFinished ();
201+ }
202+ dataMovementManager .stopJob (queryBatcher );
203+
204+ // store the thing in MarkLogic
205+ Job job = Job .withFlow (flow )
206+ .withJobId (jobTicket .getJobId ())
207+ .setCounts (successfulEvents .get (), failedEvents .get (), successfulBatches .get (), failedBatches .get ())
208+ .withEndTime (new Date ());
209+
210+ if (errorMessages .size () > 0 ) {
211+ job .withJobOutput (String .join ("\n " , errorMessages ));
212+ }
213+ jobManager .saveJob (job );
214+ }).start ();
170215
171- return result ;
216+ return jobTicket ;
172217 }
173218
174219 /**
@@ -185,7 +230,7 @@ public static Flow flowFromXml(Element doc) {
185230 complexity = elements .item (0 ).getTextContent ();
186231 }
187232
188- if (complexity .equals (FlowComplexity .SIMPLE .toString ())) {
233+ if (complexity != null && complexity .equals (FlowComplexity .SIMPLE .toString ())) {
189234 f = new SimpleFlow (doc );
190235 }
191236
0 commit comments