File tree Expand file tree Collapse file tree 1 file changed +4
-4
lines changed
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka Expand file tree Collapse file tree 1 file changed +4
-4
lines changed Original file line number Diff line number Diff line change @@ -151,8 +151,8 @@ public boolean start() throws IOException {
151151 @ Override
152152 public boolean advance () throws IOException {
153153 /* Read first record (if any). we need to loop here because :
154- * - (b ) if curBatch is empty, we want to fetch next batch and then advance.
155- * - (c ) curBatch is an iterator of iterators. we interleave the records from each.
154+ * - (a ) if curBatch is empty, we want to fetch next batch and then advance.
155+ * - (b ) curBatch is an iterator of iterators. we interleave the records from each.
156156 * curBatch.next() might return an empty iterator.
157157 */
158158 while (true ) {
@@ -162,7 +162,7 @@ public boolean advance() throws IOException {
162162
163163 PartitionState <K , V > pState = curBatch .next ();
164164
165- if (!pState .recordIter .hasNext ()) { // -- (c )
165+ if (!pState .recordIter .hasNext ()) { // -- (b )
166166 pState .recordIter = Collections .emptyIterator (); // drop ref
167167 curBatch .remove ();
168168 continue ;
@@ -217,7 +217,7 @@ public boolean advance() throws IOException {
217217
218218 kafkaResults .flushBufferedMetrics ();
219219 return true ;
220- } else { // -- (b )
220+ } else { // -- (a )
221221 kafkaResults = KafkaSinkMetrics .kafkaMetrics ();
222222 nextBatch ();
223223 kafkaResults .flushBufferedMetrics ();
You can’t perform that action at this time.
0 commit comments