Skip to content

Commit 1ded356

Browse files
committed
Merge pull request #65 from paxtonhare/62_spring_batch
Fixed #62 - added spring batch to run jobs
2 parents 5e11447 + 1506f3f commit 1ded356

File tree

7 files changed

+227
-63
lines changed

7 files changed

+227
-63
lines changed

data-hub/build.gradle

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ plugins {
99

1010
repositories {
1111
jcenter()
12+
maven { url 'http://repo.spring.io/milestone' }
1213
maven { url 'http://developer.marklogic.com/maven2/' }
1314
}
1415

@@ -20,9 +21,10 @@ targetCompatibility = 1.8
2021

2122

2223
dependencies {
24+
compile 'org.springframework.batch:spring-batch-core:3.0.6.RELEASE'
2325
compile 'com.marklogic:marklogic-xcc:8.0.4.2'
2426
compile 'com.marklogic:java-client-api:3.0.4'
25-
compile 'com.marklogic:ml-javaclient-util:2.6'
27+
compile 'com.marklogic:ml-javaclient-util:2.6.1'
2628
compile 'com.marklogic:ml-app-deployer:2.0'
2729
compile 'commons-io:commons-io:2.4'
2830
compile 'com.google.code.gson:gson:2.6.1'
@@ -31,6 +33,7 @@ dependencies {
3133
exclude group: 'com.google.guava', module: 'guava'
3234
}
3335
compile 'com.google.guava:guava:11.0.2'
36+
testCompile 'org.springframework.batch:spring-batch-test:3.0.6.RELEASE'
3437
testCompile 'junit:junit:4.12'
3538
testCompile 'xmlunit:xmlunit:1.3'
3639
testCompile 'org.hamcrest:hamcrest-junit:2.0.0.0'

data-hub/src/main/java/com/marklogic/hub/FlowManager.java

Lines changed: 79 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,21 @@
2020

2121
import javax.xml.stream.XMLStreamException;
2222

23+
import org.springframework.batch.core.Job;
24+
import org.springframework.batch.core.JobExecutionListener;
25+
import org.springframework.batch.core.JobParameters;
26+
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
27+
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
28+
import org.springframework.batch.core.job.builder.SimpleJobBuilder;
29+
import org.springframework.batch.core.launch.JobLauncher;
30+
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
31+
import org.springframework.batch.core.repository.JobRepository;
32+
import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean;
33+
import org.springframework.batch.core.step.tasklet.TaskletStep;
34+
import org.springframework.batch.item.ItemReader;
35+
import org.springframework.batch.item.ItemWriter;
36+
import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
37+
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
2338
import org.w3c.dom.Document;
2439
import org.w3c.dom.Element;
2540
import org.w3c.dom.Node;
@@ -38,20 +53,58 @@
3853
import com.marklogic.hub.collector.ServerCollector;
3954
import com.marklogic.hub.flow.Flow;
4055
import com.marklogic.hub.flow.SimpleFlow;
41-
import com.marklogic.hub.plugin.Plugin;
42-
import com.marklogic.hub.plugin.ServerPlugin;
43-
import com.marklogic.hub.writer.ServerWriter;
44-
import com.marklogic.hub.writer.Writer;
56+
import com.marklogic.spring.batch.hub.CollectorReader;
57+
import com.marklogic.spring.batch.hub.FlowWriter;
4558

4659
public class FlowManager extends ResourceManager {
4760
private static final String HUB_NS = "http://marklogic.com/hub-in-a-box";
48-
static final public String NAME = "flow";
61+
private static final String NAME = "flow";
62+
private static final int DEFAULT_THREAD_COUNT = 8;
63+
4964
private DatabaseClient client;
5065

66+
private JobBuilderFactory jobBuilderFactory;
67+
private StepBuilderFactory stepBuilderFactory;
68+
private JobLauncher jobLauncher;
69+
private ThreadPoolTaskExecutor executor;
70+
private int threadCount = DEFAULT_THREAD_COUNT;
71+
5172
public FlowManager(DatabaseClient client) {
5273
super();
5374
this.client = client;
5475
this.client.init(NAME, this);
76+
initializeDefaultSpringBatchComponents();
77+
}
78+
79+
public void setThreadCount(int count) {
80+
threadCount = count;
81+
executor.setCorePoolSize(threadCount);
82+
executor.setMaxPoolSize(threadCount);
83+
}
84+
85+
protected void initializeDefaultSpringBatchComponents() {
86+
ResourcelessTransactionManager transactionManager = new ResourcelessTransactionManager();
87+
MapJobRepositoryFactoryBean f = new MapJobRepositoryFactoryBean(transactionManager);
88+
try {
89+
f.afterPropertiesSet();
90+
JobRepository jobRepository = f.getObject();
91+
this.jobBuilderFactory = new JobBuilderFactory(jobRepository);
92+
this.stepBuilderFactory = new StepBuilderFactory(jobRepository, transactionManager);
93+
SimpleJobLauncher jbl = new SimpleJobLauncher();
94+
jbl.setJobRepository(jobRepository);
95+
96+
this.executor = new ThreadPoolTaskExecutor();
97+
executor.setCorePoolSize(threadCount);
98+
executor.setMaxPoolSize(threadCount);
99+
executor.initialize();
100+
executor.setWaitForTasksToCompleteOnShutdown(true);
101+
jbl.setTaskExecutor(executor);
102+
103+
jbl.afterPropertiesSet();
104+
this.jobLauncher = jbl;
105+
} catch (Exception ex) {
106+
throw new RuntimeException("Unable to initialize SqlMigrator, cause: " + ex.getMessage(), ex);
107+
}
55108
}
56109

57110
/**
@@ -121,74 +174,43 @@ public void uninstallFlow(String flowName) {
121174

122175
}
123176

177+
public void runFlow(Flow flow, int batchSize) {
178+
runFlow(flow, batchSize, null);
179+
}
180+
124181
// might want to add Job tracking support
125182
// by returning a Job or some such.
126183
// Depends a lot on if we go full in with spring batch or not
127184
/**
128185
* Runs a given flow
129186
* @param flow - the flow to run
130187
* @param batchSize - the size to use for batching transactions
188+
* @param listener - the JobExecutionListener to receive status updates about the job
131189
*/
132-
public void runFlow(Flow flow, int batchSize) {
190+
public void runFlow(Flow flow, int batchSize, JobExecutionListener listener) {
133191
Collector c = flow.getCollector();
134192
if (c instanceof ServerCollector) {
135193
((ServerCollector)c).setClient(client);
136194
}
195+
ItemReader<String> reader = new CollectorReader(c);
196+
ItemWriter<String> writer = new FlowWriter(client, flow);
137197

138-
boolean allServer = true;
139-
for (Plugin t : flow.getPlugins()) {
140-
if (t instanceof ServerPlugin) {
141-
((ServerPlugin)t).setClient(client);
142-
}
143-
else if (t != null) {
144-
allServer = false;
145-
}
146-
}
198+
TaskletStep step = stepBuilderFactory.get("testStep")
199+
.<String, String> chunk(batchSize)
200+
.reader(reader).writer(writer).build();
147201

148-
Writer w = flow.getWriter();
149-
if (w instanceof ServerWriter) {
150-
((ServerWriter)w).setClient(client);
202+
String jobName = flow.getDomainName() + ":" + flow.getType() + ":" + flow.getName() + "-" + System.currentTimeMillis();
203+
SimpleJobBuilder builder = jobBuilderFactory.get(jobName).start(step);
204+
if (listener != null) {
205+
builder = builder.listener(listener);
151206
}
207+
Job job = builder.build();
152208

153-
FlowRunner runner = new FlowRunner(client);
154-
List<String> ids = c.run();
155-
156-
Transaction transaction = null;
157209
try {
158-
for (int i = 0; i < ids.size(); i++) {
159-
if (i % batchSize == 0) {
160-
if (transaction != null) {
161-
transaction.commit();
162-
}
163-
transaction = client.openTransaction();
164-
}
165-
String id = ids.get(i);
166-
167-
// all of the Plugins need to be run in MarkLogic
168-
if (allServer) {
169-
runner.run(flow, id, transaction);
170-
}
171-
172-
// this path is for when java is in the mix
173-
// TODO: make this work
174-
else {
175-
// for (Plugin t : flow.getPlugins()) {
176-
// if (t != null) {
177-
// t.run(id);
178-
// }
179-
// }
180-
w.write(id);
181-
}
182-
183-
}
184-
if (transaction != null) {
185-
transaction.commit();
186-
}
210+
jobLauncher.run(job, new JobParameters());
187211
}
188-
catch(Exception e) {
189-
if (transaction != null) {
190-
transaction.rollback();
191-
}
212+
catch (Exception e) {
213+
throw new RuntimeException(e);
192214
}
193215
}
194216

@@ -222,9 +244,9 @@ public static Flow flowFromXml(Element doc) {
222244
* A class to run a flow
223245
*/
224246
class FlowRunner extends ResourceManager {
225-
static final public String NAME = "flow";
247+
private static final String NAME = "flow";
226248

227-
public FlowRunner(DatabaseClient client) {
249+
public FlowRunner(DatabaseClient client, Flow flow, String identifier, Transaction transaction) {
228250
super();
229251
client.init(NAME, this);
230252
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package com.marklogic.spring.batch.hub;
2+
3+
import java.util.List;
4+
5+
import org.springframework.batch.item.ExecutionContext;
6+
import org.springframework.batch.item.ItemStreamException;
7+
import org.springframework.batch.item.ItemStreamReader;
8+
import org.springframework.batch.item.NonTransientResourceException;
9+
import org.springframework.batch.item.ParseException;
10+
import org.springframework.batch.item.UnexpectedInputException;
11+
12+
import com.marklogic.client.helper.LoggingObject;
13+
import com.marklogic.hub.collector.Collector;
14+
15+
public class CollectorReader extends LoggingObject implements ItemStreamReader<String> {
16+
17+
private Collector collector;
18+
19+
private List<String> results;
20+
21+
private int index = 0;
22+
23+
public CollectorReader(Collector collector) {
24+
this.collector = collector;
25+
}
26+
27+
@Override
28+
public void open(ExecutionContext executionContext) throws ItemStreamException {
29+
this.results = collector.run();
30+
}
31+
32+
@Override
33+
public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
34+
String result = null;
35+
36+
if (results.size() > this.index) {
37+
result = this.results.get(this.index);
38+
index++;
39+
}
40+
41+
return result;
42+
}
43+
44+
@Override
45+
public void update(ExecutionContext executionContext) throws ItemStreamException {
46+
}
47+
48+
@Override
49+
public void close() throws ItemStreamException {
50+
}
51+
52+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package com.marklogic.spring.batch.hub;
2+
3+
import java.util.List;
4+
5+
import org.springframework.batch.item.ItemWriter;
6+
7+
import com.marklogic.client.DatabaseClient;
8+
import com.marklogic.client.Transaction;
9+
import com.marklogic.client.extensions.ResourceManager;
10+
import com.marklogic.client.io.Format;
11+
import com.marklogic.client.io.StringHandle;
12+
import com.marklogic.client.util.RequestParameters;
13+
import com.marklogic.hub.flow.Flow;
14+
15+
public class FlowWriter extends ResourceManager implements ItemWriter<String> {
16+
17+
static final public String NAME = "flow";
18+
19+
private DatabaseClient client;
20+
private Flow flow;
21+
22+
public FlowWriter(DatabaseClient client, Flow flow) {
23+
super();
24+
this.flow = flow;
25+
this.client = client;
26+
this.client.init(NAME, this);
27+
}
28+
29+
@Override
30+
public void write(List<? extends String> items) throws Exception {
31+
32+
Transaction transaction = null;
33+
try {
34+
transaction = client.openTransaction();
35+
for (String id: items) {
36+
RequestParameters params = new RequestParameters();
37+
params.add("identifier", id);
38+
39+
StringHandle handle = new StringHandle(flow.serialize());
40+
handle.setFormat(Format.XML);
41+
this.getServices().post(params, handle, transaction);
42+
}
43+
transaction.commit();
44+
}
45+
catch(Exception e) {
46+
if (transaction != null) {
47+
transaction.rollback();
48+
}
49+
}
50+
}
51+
}

data-hub/src/test/java/com/marklogic/hub/FlowManagerTest.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@
4747
import com.marklogic.hub.plugin.ContentPlugin;
4848
import com.marklogic.hub.plugin.HeadersPlugin;
4949
import com.marklogic.hub.plugin.PluginType;
50-
import com.marklogic.hub.plugin.TriplesPlugin;
5150
import com.marklogic.hub.plugin.ServerPlugin;
51+
import com.marklogic.hub.plugin.TriplesPlugin;
5252
import com.marklogic.hub.writer.DefaultWriter;
5353

5454
public class FlowManagerTest extends HubTestBase {
@@ -214,9 +214,11 @@ public void getTestFlow() {
214214
public void testRunFlow() throws SAXException, IOException, ParserConfigurationException, XMLStreamException {
215215
installModule("/ext/domains/test/conformance/my-test-flow1/my-test-flow1.xml", "flow-manager-test/my-test-flow1/my-test-flow1.xml");
216216
assertEquals(2, getDocCount());
217+
JobFinishedListener listener = new JobFinishedListener();
217218
FlowManager fm = new FlowManager(client);
218219
SimpleFlow flow1 = (SimpleFlow)fm.getFlow("test", "my-test-flow1");
219-
fm.runFlow(flow1, 10);
220+
fm.runFlow(flow1, 10, listener);
221+
listener.waitForFinish();
220222
assertEquals(4, getDocCount());
221223
assertXMLEqual(getXmlFromResource("flow-manager-test/conformed/conformed1.xml"), docMgr.read("/conformed/incoming/employee1.xml").next().getContent(new DOMHandle()).get() );
222224
assertXMLEqual(getXmlFromResource("flow-manager-test/conformed/conformed2.xml"), docMgr.read("/conformed/incoming/employee2.xml").next().getContent(new DOMHandle()).get());
@@ -228,9 +230,11 @@ public void testRunFlowWithHeader() throws SAXException, IOException, ParserConf
228230
installModule("/ext/domains/test/conformance/my-test-flow-with-header/headers/headers.xqy", "flow-manager-test/my-test-flow-with-header/headers/headers.xqy");
229231

230232
assertEquals(2, getDocCount());
233+
JobFinishedListener listener = new JobFinishedListener();
231234
FlowManager fm = new FlowManager(client);
232235
SimpleFlow flow1 = (SimpleFlow)fm.getFlow("test", "my-test-flow-with-header");
233-
fm.runFlow(flow1, 10);
236+
fm.runFlow(flow1, 10, listener);
237+
listener.waitForFinish();
234238
assertEquals(4, getDocCount());
235239
assertXMLEqual(getXmlFromResource("flow-manager-test/conformed-with-header/conformed1.xml"), docMgr.read("/conformed/incoming/employee1.xml").next().getContent(new DOMHandle()).get() );
236240
assertXMLEqual(getXmlFromResource("flow-manager-test/conformed-with-header/conformed2.xml"), docMgr.read("/conformed/incoming/employee2.xml").next().getContent(new DOMHandle()).get());
@@ -246,10 +250,12 @@ public void testRunFlowWithAll() throws SAXException, IOException, ParserConfigu
246250
installModule("/ext/domains/test/conformance/my-test-flow-with-all/content/content.xqy", "flow-manager-test/my-test-flow-with-all/content/content.xqy");
247251
installModule("/ext/domains/test/conformance/my-test-flow-with-all/triples/triples.xqy", "flow-manager-test/my-test-flow-with-all/triples/triples.xqy");
248252

253+
JobFinishedListener listener = new JobFinishedListener();
249254
assertEquals(2, getDocCount());
250255
FlowManager fm = new FlowManager(client);
251256
SimpleFlow flow1 = (SimpleFlow)fm.getFlow("test", "my-test-flow-with-all");
252-
fm.runFlow(flow1, 10);
257+
fm.runFlow(flow1, 10, listener);
258+
listener.waitForFinish();
253259
assertEquals(4, getDocCount());
254260
assertXMLEqual(getXmlFromResource("flow-manager-test/conformed-with-all/conformed1.xml"), docMgr.read("/conformed/incoming/employee1.xml").next().getContent(new DOMHandle()).get() );
255261
assertXMLEqual(getXmlFromResource("flow-manager-test/conformed-with-all/conformed2.xml"), docMgr.read("/conformed/incoming/employee2.xml").next().getContent(new DOMHandle()).get());

0 commit comments

Comments
 (0)