File tree Expand file tree Collapse file tree 1 file changed +1
-3
lines changed
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka Expand file tree Collapse file tree 1 file changed +1
-3
lines changed Original file line number Diff line number Diff line change @@ -172,8 +172,6 @@ public boolean advance() throws IOException {
172172 elementsReadBySplit .inc ();
173173
174174 ConsumerRecord <byte [], byte []> rawRecord = pState .recordIter .next ();
175- long expected = pState .nextOffset ;
176- long offset = rawRecord .offset ();
177175
178176 // Apply user deserializers. User deserializers might throw, which will be propagated up
179177 // and 'curRecord' remains unchanged. The runner should close this reader.
@@ -200,7 +198,7 @@ public boolean advance() throws IOException {
200198 int recordSize =
201199 (rawRecord .key () == null ? 0 : rawRecord .key ().length )
202200 + (rawRecord .value () == null ? 0 : rawRecord .value ().length );
203- pState .recordConsumed (offset , recordSize );
201+ pState .recordConsumed (rawRecord . offset () , recordSize );
204202 bytesRead .inc (recordSize );
205203 bytesReadBySplit .inc (recordSize );
206204
You can’t perform that action at this time.
0 commit comments