@@ -111,37 +111,38 @@ private RecordWithMeta buildRecordResponse(
111111 lastMeta .put (SPLIT_ID , BinlogSplit .BINLOG_SPLIT_ID );
112112 recordResponse .setMeta (lastMeta );
113113 }
114- if (count >= fetchRecord .getFetchSize ()) {
115- return recordResponse ;
116- }
117114 }
118115 }
119- } finally {
120- sourceReader .finishSplitRecords ();
121- }
122116
123- if (readResult .getSplitState () != null ) {
124- // Set meta information for hw
125- if (sourceReader .isSnapshotSplit (split )) {
126- Map <String , String > offsetRes =
127- sourceReader .extractSnapshotStateOffset (readResult .getSplitState ());
128- offsetRes .put (SPLIT_ID , split .splitId ());
129- recordResponse .setMeta (offsetRes );
117+ if (readResult .getSplitState () != null ) {
118+ // Set meta information for hw
119+ if (sourceReader .isSnapshotSplit (split )) {
120+ Map <String , String > offsetRes =
121+ sourceReader .extractSnapshotStateOffset (readResult .getSplitState ());
122+ offsetRes .put (SPLIT_ID , split .splitId ());
123+ recordResponse .setMeta (offsetRes );
124+ }
125+
126+ // set meta for binlog event
127+ if (sourceReader .isBinlogSplit (split )) {
128+ Map <String , String > offsetRes =
129+ sourceReader .extractBinlogStateOffset (readResult .getSplitState ());
130+ offsetRes .put (SPLIT_ID , BinlogSplit .BINLOG_SPLIT_ID );
131+ recordResponse .setMeta (offsetRes );
132+ }
133+ } else {
134+ throw new RuntimeException ("split state is null" );
130135 }
131136
132- // set meta for binlog event
133- if (sourceReader .isBinlogSplit (split )) {
134- Map <String , String > offsetRes =
135- sourceReader .extractBinlogStateOffset (readResult .getSplitState ());
136- offsetRes .put (SPLIT_ID , BinlogSplit .BINLOG_SPLIT_ID );
137- recordResponse .setMeta (offsetRes );
137+ sourceReader .commitSourceOffset (fetchRecord .getJobId (), readResult .getSplit ());
138+ return recordResponse ;
139+ } finally {
140+ // This must be called after commitSourceOffset; otherwise, PG's confirmed lsn will not
141+ // proceed.
142+ if (sourceReader != null ) {
143+ sourceReader .finishSplitRecords ();
138144 }
139- } else {
140- throw new RuntimeException ("split state is null" );
141145 }
142-
143- sourceReader .commitSourceOffset (fetchRecord .getJobId (), readResult .getSplit ());
144- return recordResponse ;
145146 }
146147
147148 public CompletableFuture <Void > writeRecordsAsync (WriteRecordRequest writeRecordRequest ) {
@@ -221,41 +222,47 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception
221222 break ;
222223 }
223224 }
224- } finally {
225- sourceReader .finishSplitRecords ();
226- }
227225
228- try {
229- if (readResult .getSplitState () != null ) {
230- // Set meta information for hw
231- if (sourceReader .isSnapshotSplit (readResult .getSplit ())) {
232- Map <String , String > offsetRes =
233- sourceReader .extractSnapshotStateOffset (readResult .getSplitState ());
234- offsetRes .put (SPLIT_ID , readResult .getSplit ().splitId ());
235- metaResponse = offsetRes ;
236- }
226+ // get offset from split state
227+ try {
228+ if (readResult .getSplitState () != null ) {
229+ // Set meta information for hw
230+ if (sourceReader .isSnapshotSplit (readResult .getSplit ())) {
231+ Map <String , String > offsetRes =
232+ sourceReader .extractSnapshotStateOffset (readResult .getSplitState ());
233+ offsetRes .put (SPLIT_ID , readResult .getSplit ().splitId ());
234+ metaResponse = offsetRes ;
235+ }
237236
238- // set meta for binlog event
239- if (sourceReader .isBinlogSplit (readResult .getSplit ())) {
240- Map <String , String > offsetRes =
241- sourceReader .extractBinlogStateOffset (readResult .getSplitState ());
242- offsetRes .put (SPLIT_ID , BinlogSplit .BINLOG_SPLIT_ID );
243- metaResponse = offsetRes ;
237+ // set meta for binlog event
238+ if (sourceReader .isBinlogSplit (readResult .getSplit ())) {
239+ Map <String , String > offsetRes =
240+ sourceReader .extractBinlogStateOffset (readResult .getSplitState ());
241+ offsetRes .put (SPLIT_ID , BinlogSplit .BINLOG_SPLIT_ID );
242+ metaResponse = offsetRes ;
243+ }
244+ } else {
245+ throw new RuntimeException ("split state is null" );
244246 }
245- } else {
246- throw new RuntimeException ("split state is null" );
247- }
248247
249- // wait all stream load finish
250- batchStreamLoad .forceFlush ();
248+ // wait all stream load finish
249+ batchStreamLoad .forceFlush ();
251250
252- // request fe api
253- batchStreamLoad .commitOffset (metaResponse , scannedRows , scannedBytes );
251+ // request fe api
252+ batchStreamLoad .commitOffset (metaResponse , scannedRows , scannedBytes );
254253
255- // commit source offset if need
256- sourceReader .commitSourceOffset (writeRecordRequest .getJobId (), readResult .getSplit ());
254+ // commit source offset if need
255+ sourceReader .commitSourceOffset (
256+ writeRecordRequest .getJobId (), readResult .getSplit ());
257+ } finally {
258+ batchStreamLoad .resetTaskId ();
259+ }
257260 } finally {
258- batchStreamLoad .resetTaskId ();
261+ // This must be called after commitSourceOffset; otherwise, PG's confirmed lsn will not
262+ // proceed.
263+ if (sourceReader != null ) {
264+ sourceReader .finishSplitRecords ();
265+ }
259266 }
260267 }
261268
0 commit comments