3838import com .fasterxml .jackson .databind .ObjectMapper ;
3939import com .fasterxml .jackson .databind .SerializationFeature ;
4040
41- import org .slf4j .Logger ;
42- import org .slf4j .LoggerFactory ;
43-
4441import java .io .IOException ;
4542import java .sql .ResultSet ;
4643import java .sql .SQLException ;
4946import java .util .UUID ;
5047import javax .sql .DataSource ;
5148
49+ /** BulkLoadFromJdbcRaw shows one way to load rows as-is from JDBC (the source
50+ * system) into MarkLogic (the target system), then transform (or harmonize)
51+ * after everything has been ingested. We pull each row from three tables:
52+ * employees, salaries, and titles. We convert employee rows to the Employee
53+ * POJO and the salaries and titles rows to Jackson ObjectNode (JSON). Then we
54+ * write each row to MarkLogic Server (the target) as flat JSON. Then we run a
55+ * transform on each employee record to pull in the salaries and titles for that
56+ * employee. We call this "denormalization" and is a common way to translate
57+ * relational data into documents.
58+ *
59+ * This example assumes there are no employees, salaries, or titles records in
60+ * the target. If this is run multiple times to capture changes, a step should
61+ * be added to first delete all employees, salaries, and titles in the target
62+ * system. Otherwise old data (data updated or deleted from the source system)
63+ * might get mixed with new data.
64+ */
5265public class BulkLoadFromJdbcRaw {
53- private static Logger logger = LoggerFactory . getLogger ( BulkLoadFromJdbcRaw . class );
54-
66+ // during testing we run with a small data set and few threads but production
67+ // systems would use many more threads and much larger batch sizes
5568 private static int threadCount = 3 ;
5669 private static int batchSize = 3 ;
5770
71+ // DataMovementManager helps orchestrate optimized writes across the
72+ // MarkLogic cluster
5873 public static final DataMovementManager moveMgr =
5974 DatabaseClientSingleton .get ().newDataMovementManager ();
75+
76+ // ObjectMapper serializes objects to JSON for writing
6077 public static final ObjectMapper mapper = new ObjectMapper ();
78+
79+ // the ISO 8601 format is expected for dates in MarkLogic Server
6180 public static final String ISO_8601_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX" ;
6281 public static final SimpleDateFormat dateFormat = new SimpleDateFormat (ISO_8601_FORMAT );
6382
@@ -66,11 +85,15 @@ public static void main(String[] args) throws IOException, SQLException {
6685 }
6786
6887 public void run () throws IOException , SQLException {
88+ // first use a REST Admin user to install our transform
6989 installTransform (DatabaseClientSingleton .getRestAdmin ());
90+ // then load all the data from the source system to the target system
7091 load ();
92+ // then pull the salaries and titles into the employees
7193 transform ();
7294 }
7395
96+ // take data from a JDBC ResultSet (row) and populate an Employee POJO object
7497 public void populateEmployee (ResultSet row , Employee e ) throws SQLException {
7598 e .setEmployeeId (row .getInt ("emp_no" ));
7699 e .setBirthDate (Calendar .getInstance ());
@@ -86,6 +109,7 @@ public void populateEmployee(ResultSet row, Employee e) throws SQLException {
86109 e .getHireDate ().setTime (row .getDate ("hire_date" ));
87110 }
88111
112+ // take data from a JDBC ResultSet (row) and populate an ObjectNode (JSON) object
89113 public void populateSalary (ResultSet row , ObjectNode s ) throws SQLException {
90114 s .put ("employeeId" , row .getInt ("emp_no" ));
91115 s .put ("salary" , row .getInt ("salary" ));
@@ -97,6 +121,7 @@ public void populateSalary(ResultSet row, ObjectNode s) throws SQLException {
97121 s .put ("toDate" , dateFormat .format (toDate .getTime ()));
98122 }
99123
124+ // take data from a JDBC ResultSet (row) and populate an ObjectNode (JSON) object
100125 public void populateTitle (ResultSet row , ObjectNode t ) throws SQLException {
101126 t .put ("employeeId" , row .getInt ("emp_no" ));
102127 t .put ("title" , row .getString ("title" ));
@@ -109,36 +134,57 @@ public void populateTitle(ResultSet row, ObjectNode t) throws SQLException {
109134 }
110135
111136 public void load () throws IOException , SQLException {
137+ // the JdbcTemplate is an easy way to run JDBC queries
112138 JdbcTemplate jdbcTemplate = new JdbcTemplate (getDataSource ());
139+
140+ // the WriteBatcher is used to queue writes, batch them, and distribute
141+ // them to all appropriate nodes in the MarkLogic cluster
113142 WriteBatcher wb = moveMgr .newWriteBatcher ()
114143 .withBatchSize (batchSize )
115144 .withThreadCount (threadCount )
116145 .onBatchSuccess (batch -> System .out .println ("Written: " + batch .getJobWritesSoFar ()))
117146 .onBatchFailure ((batch ,exception ) -> exception .printStackTrace ());
118147 JobTicket ticket = moveMgr .startJob (wb );
148+
149+ // run a JDBC query and for each row returned, populate an Employee object
150+ // then add it the the WriteBatcher with a uri in the /employees/ directory
119151 jdbcTemplate .query ("SELECT * FROM employees" ,
120152 (RowCallbackHandler ) row -> {
121153 Employee employee = new Employee ();
122154 populateEmployee (row , employee );
123155 wb .addAs ("/employees/" + employee .getEmployeeId () + ".json" , employee );
124156 }
125157 );
158+
159+ // run a JDBC query and for each salary returned, populate an ObjectNode
160+ // (JSON) object then add it the the WriteBatcher with a uri in the
161+ // /salaries/ directory
126162 jdbcTemplate .query ("SELECT * FROM salaries" ,
127163 (RowCallbackHandler ) row -> {
128164 ObjectNode salary = mapper .createObjectNode ();
129165 populateSalary (row , salary );
130166 wb .addAs ("/salaries/" + UUID .randomUUID ().toString () + ".json" , salary );
131167 }
132168 );
169+
170+ // run a JDBC query and for each title returned, populate an ObjectNode
171+ // (JSON) object then add it the the WriteBatcher with a uri in the
172+ // /titles/ directory
133173 jdbcTemplate .query ("SELECT * FROM titles" ,
134174 (RowCallbackHandler ) row -> {
135175 ObjectNode title = mapper .createObjectNode ();
136176 populateTitle (row , title );
137177 wb .addAs ("/titles/" + UUID .randomUUID ().toString () + ".json" , title );
138178 }
139179 );
180+
181+ // finish all writes before proceeding
140182 wb .flushAndWait ();
183+
184+ // free any resources used by the WriteBatcher
141185 moveMgr .stopJob (wb );
186+
187+ // double-check that the WriteBatcher job had no failures
142188 JobReport report = moveMgr .getJobReport (ticket );
143189 if ( report .getFailureBatchesCount () > 0 ) {
144190 throw new IllegalStateException ("Encountered " +
@@ -147,21 +193,32 @@ public void load() throws IOException, SQLException {
147193 }
148194
149195 public void transform () throws IOException , SQLException {
196+ // search for all records in the /employees/ directory
150197 StructuredQueryDefinition query = new StructuredQueryBuilder ().directory (1 , "/employees/" );
198+
199+ // the QueryBatcher efficiently paginates through matching batches from all
200+ // appropriate nodes in the cluster then applies the transform on each batch
151201 QueryBatcher qb = moveMgr .newQueryBatcher (query )
152202 .withThreadCount (threadCount )
153203 .withBatchSize (batchSize )
154204 .onQueryFailure (throwable -> throwable .printStackTrace ());
155205
206+ // the ApplyTransformListener performs the transform on each batch and
207+ // overwrites the employee document with the results of the transform
156208 ApplyTransformListener transformListener = new ApplyTransformListener ()
157209 .withTransform (new ServerTransform ("BulkLoadFromJdbcRaw" ))
158210 .withApplyResult (ApplyTransformListener .ApplyResult .REPLACE )
159211 .onBatchFailure ((batch , throwable ) -> throwable .printStackTrace ());
212+
213+ // add the ApplyTransformListener to the QueryBatcher
160214 qb .onUrisReady (transformListener );
161215
216+ // start the job (across threadCount threads) and wait for it to finish
162217 JobTicket ticket = moveMgr .startJob (qb );
163218 qb .awaitCompletion ();
164219 moveMgr .stopJob (ticket );
220+
221+ // double-check that the QueryBatcher job had no failures
165222 JobReport report = moveMgr .getJobReport (ticket );
166223 if ( report .getFailureBatchesCount () > 0 ) {
167224 throw new IllegalStateException ("Encountered " +
@@ -175,6 +232,8 @@ private DataSource getDataSource() throws IOException {
175232 }
176233
177234 public void installTransform (DatabaseClient client ) {
235+ // this transform seeks salaries and titles associated with a single
236+ // employee record and injects them into the employee record
178237 String script =
179238 "function transform_function(context, params, content) { " +
180239 " var uri = context.uri; " +
@@ -188,10 +247,19 @@ public void installTransform(DatabaseClient client) {
188247 " for (let salary of salaries) { " +
189248 " var employeeSalary = salary.toObject(); " +
190249 " delete employeeSalary.employeeId; " +
191- " employee.salaries = employeeSalary; " +
250+ " employee.salaries.push( employeeSalary) ; " +
192251 " } " +
193- " for ( var i=1; i <= fn.count(salaries); i++ ) { " +
194- " xdmp.documentDelete(fn.baseUri(fn.subsequence(salaries, i, 1))) " +
252+ " } " +
253+ " var titles = cts.search(cts.andQuery([" +
254+ " cts.directoryQuery('/titles/'), " +
255+ " cts.jsonPropertyValueQuery('employeeId', employee.employeeId)" +
256+ " ])); " +
257+ " if ( fn.count(titles) > 0 ) { " +
258+ " employee.titles = new Array(); " +
259+ " for (let title of titles) { " +
260+ " var employeeTitle = title.toObject(); " +
261+ " delete employeeTitle.employeeId; " +
262+ " employee.titles.push(employeeTitle); " +
195263 " } " +
196264 " } " +
197265 " return employee; " +
0 commit comments