Skip to content

Commit 1149c4e

Browse files
committed
[SPARK-25005][SS] Support non-consecutive offsets for Kafka
## What changes were proposed in this pull request? As the user uses Kafka transactions to write data, the offsets in Kafka will be non-consecutive. It will contains some transaction (commit or abort) markers. In addition, if the consumer's `isolation.level` is `read_committed`, `poll` will not return aborted messages either. Hence, we will see non-consecutive offsets in the date returned by `poll`. However, as `seekToEnd` may move the offset point to these missing offsets, there are 4 possible corner cases we need to support: - The whole batch contains no data messages - The first offset in a batch is not a committed data message - The last offset in a batch is not a committed data message - There is a gap in the middle of a batch They are all covered by the new unit tests. ## How was this patch tested? The new unit tests. Closes apache#22042 from zsxwing/kafka-transaction-read. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Shixiong Zhu <[email protected]>
1 parent 592e3a4 commit 1149c4e

File tree

6 files changed

+720
-74
lines changed

6 files changed

+720
-74
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ class KafkaContinuousPartitionReader(
227227

228228
// This is a failOnDataLoss exception. Retry if nextKafkaOffset is within the data range,
229229
// or if it's the endpoint of the data range (i.e. the "true" next offset).
230-
case e: IllegalStateException if e.getCause.isInstanceOf[OffsetOutOfRangeException] =>
230+
case e: IllegalStateException if e.getCause.isInstanceOf[OffsetOutOfRangeException] =>
231231
val range = consumer.getAvailableOffsetRange()
232232
if (range.latest >= nextKafkaOffset && range.earliest <= nextKafkaOffset) {
233233
// retry

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala

Lines changed: 204 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,19 @@ import org.apache.spark.util.UninterruptibleThread
3333

3434
private[kafka010] sealed trait KafkaDataConsumer {
3535
/**
36-
* Get the record for the given offset if available. Otherwise it will either throw error
37-
* (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset),
38-
* or null.
36+
* Get the record for the given offset if available.
37+
*
38+
* If the record is invisible (either a
39+
* transaction message, or an aborted message when the consumer's `isolation.level` is
40+
* `read_committed`), it will be skipped and this method will try to fetch next available record
41+
* within [offset, untilOffset).
42+
*
43+
* This method also will try its best to detect data loss. If `failOnDataLoss` is `true`, it will
44+
* throw an exception when we detect an unavailable offset. If `failOnDataLoss` is `false`, this
45+
* method will try to fetch next available record within [offset, untilOffset).
46+
*
47+
* When this method tries to skip offsets due to either invisible messages or data loss and
48+
* reaches `untilOffset`, it will return `null`.
3949
*
4050
* @param offset the offset to fetch.
4151
* @param untilOffset the max offset to fetch. Exclusive.
@@ -80,6 +90,83 @@ private[kafka010] case class InternalKafkaConsumer(
8090
kafkaParams: ju.Map[String, Object]) extends Logging {
8191
import InternalKafkaConsumer._
8292

93+
/**
94+
* The internal object to store the fetched data from Kafka consumer and the next offset to poll.
95+
*
96+
* @param _records the pre-fetched Kafka records.
97+
* @param _nextOffsetInFetchedData the next offset in `records`. We use this to verify if we
98+
* should check if the pre-fetched data is still valid.
99+
* @param _offsetAfterPoll the Kafka offset after calling `poll`. We will use this offset to
100+
* poll when `records` is drained.
101+
*/
102+
private case class FetchedData(
103+
private var _records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
104+
private var _nextOffsetInFetchedData: Long,
105+
private var _offsetAfterPoll: Long) {
106+
107+
def withNewPoll(
108+
records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
109+
offsetAfterPoll: Long): FetchedData = {
110+
this._records = records
111+
this._nextOffsetInFetchedData = UNKNOWN_OFFSET
112+
this._offsetAfterPoll = offsetAfterPoll
113+
this
114+
}
115+
116+
/** Whether there are more elements */
117+
def hasNext: Boolean = _records.hasNext
118+
119+
/** Move `records` forward and return the next record. */
120+
def next(): ConsumerRecord[Array[Byte], Array[Byte]] = {
121+
val record = _records.next()
122+
_nextOffsetInFetchedData = record.offset + 1
123+
record
124+
}
125+
126+
/** Move `records` backward and return the previous record. */
127+
def previous(): ConsumerRecord[Array[Byte], Array[Byte]] = {
128+
assert(_records.hasPrevious, "fetchedData cannot move back")
129+
val record = _records.previous()
130+
_nextOffsetInFetchedData = record.offset
131+
record
132+
}
133+
134+
/** Reset the internal pre-fetched data. */
135+
def reset(): Unit = {
136+
_records = ju.Collections.emptyListIterator()
137+
}
138+
139+
/**
140+
* Returns the next offset in `records`. We use this to verify if we should check if the
141+
* pre-fetched data is still valid.
142+
*/
143+
def nextOffsetInFetchedData: Long = _nextOffsetInFetchedData
144+
145+
/**
146+
* Returns the next offset to poll after draining the pre-fetched records.
147+
*/
148+
def offsetAfterPoll: Long = _offsetAfterPoll
149+
}
150+
151+
/**
152+
* The internal object returned by the `fetchRecord` method. If `record` is empty, it means it is
153+
* invisible (either a transaction message, or an aborted message when the consumer's
154+
* `isolation.level` is `read_committed`), and the caller should use `nextOffsetToFetch` to fetch
155+
* instead.
156+
*/
157+
private case class FetchedRecord(
158+
var record: ConsumerRecord[Array[Byte], Array[Byte]],
159+
var nextOffsetToFetch: Long) {
160+
161+
def withRecord(
162+
record: ConsumerRecord[Array[Byte], Array[Byte]],
163+
nextOffsetToFetch: Long): FetchedRecord = {
164+
this.record = record
165+
this.nextOffsetToFetch = nextOffsetToFetch
166+
this
167+
}
168+
}
169+
83170
private val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
84171

85172
@volatile private var consumer = createConsumer
@@ -90,10 +177,21 @@ private[kafka010] case class InternalKafkaConsumer(
90177
/** indicate whether this consumer is going to be stopped in the next release */
91178
@volatile var markedForClose = false
92179

93-
/** Iterator to the already fetch data */
94-
@volatile private var fetchedData =
95-
ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
96-
@volatile private var nextOffsetInFetchedData = UNKNOWN_OFFSET
180+
/**
181+
* The fetched data returned from Kafka consumer. This is a reusable private object to avoid
182+
* memory allocation.
183+
*/
184+
private val fetchedData = FetchedData(
185+
ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
186+
UNKNOWN_OFFSET,
187+
UNKNOWN_OFFSET)
188+
189+
/**
190+
* The fetched record returned from the `fetchRecord` method. This is a reusable private object to
191+
* avoid memory allocation.
192+
*/
193+
private val fetchedRecord: FetchedRecord = FetchedRecord(null, UNKNOWN_OFFSET)
194+
97195

98196
/** Create a KafkaConsumer to fetch records for `topicPartition` */
99197
private def createConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = {
@@ -125,20 +223,7 @@ private[kafka010] case class InternalKafkaConsumer(
125223
AvailableOffsetRange(earliestOffset, latestOffset)
126224
}
127225

128-
/**
129-
* Get the record for the given offset if available. Otherwise it will either throw error
130-
* (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset),
131-
* or null.
132-
*
133-
* @param offset the offset to fetch.
134-
* @param untilOffset the max offset to fetch. Exclusive.
135-
* @param pollTimeoutMs timeout in milliseconds to poll data from Kafka.
136-
* @param failOnDataLoss When `failOnDataLoss` is `true`, this method will either return record at
137-
* offset if available, or throw exception.when `failOnDataLoss` is `false`,
138-
* this method will either return record at offset if available, or return
139-
* the next earliest available record less than untilOffset, or null. It
140-
* will not throw any exception.
141-
*/
226+
/** @see [[KafkaDataConsumer.get]] */
142227
def get(
143228
offset: Long,
144229
untilOffset: Long,
@@ -147,21 +232,32 @@ private[kafka010] case class InternalKafkaConsumer(
147232
ConsumerRecord[Array[Byte], Array[Byte]] = runUninterruptiblyIfPossible {
148233
require(offset < untilOffset,
149234
s"offset must always be less than untilOffset [offset: $offset, untilOffset: $untilOffset]")
150-
logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset")
235+
logDebug(s"Get $groupId $topicPartition nextOffset ${fetchedData.nextOffsetInFetchedData} " +
236+
s"requested $offset")
151237
// The following loop is basically for `failOnDataLoss = false`. When `failOnDataLoss` is
152238
// `false`, first, we will try to fetch the record at `offset`. If no such record exists, then
153239
// we will move to the next available offset within `[offset, untilOffset)` and retry.
154240
// If `failOnDataLoss` is `true`, the loop body will be executed only once.
155241
var toFetchOffset = offset
156-
var consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]] = null
242+
var fetchedRecord: FetchedRecord = null
157243
// We want to break out of the while loop on a successful fetch to avoid using "return"
158244
// which may cause a NonLocalReturnControl exception when this method is used as a function.
159245
var isFetchComplete = false
160246

161247
while (toFetchOffset != UNKNOWN_OFFSET && !isFetchComplete) {
162248
try {
163-
consumerRecord = fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss)
164-
isFetchComplete = true
249+
fetchedRecord = fetchRecord(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss)
250+
if (fetchedRecord.record != null) {
251+
isFetchComplete = true
252+
} else {
253+
toFetchOffset = fetchedRecord.nextOffsetToFetch
254+
if (toFetchOffset >= untilOffset) {
255+
fetchedData.reset()
256+
toFetchOffset = UNKNOWN_OFFSET
257+
} else {
258+
logDebug(s"Skipped offsets [$offset, $toFetchOffset]")
259+
}
260+
}
165261
} catch {
166262
case e: OffsetOutOfRangeException =>
167263
// When there is some error thrown, it's better to use a new consumer to drop all cached
@@ -174,9 +270,9 @@ private[kafka010] case class InternalKafkaConsumer(
174270
}
175271

176272
if (isFetchComplete) {
177-
consumerRecord
273+
fetchedRecord.record
178274
} else {
179-
resetFetchedData()
275+
fetchedData.reset()
180276
null
181277
}
182278
}
@@ -239,65 +335,81 @@ private[kafka010] case class InternalKafkaConsumer(
239335
}
240336

241337
/**
242-
* Get the record for the given offset if available. Otherwise it will either throw error
243-
* (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset),
244-
* or null.
338+
* Get the fetched record for the given offset if available.
339+
*
340+
* If the record is invisible (either a transaction message, or an aborted message when the
341+
* consumer's `isolation.level` is `read_committed`), it will return a `FetchedRecord` with the
342+
* next offset to fetch.
343+
*
344+
* This method also will try the best to detect data loss. If `failOnDataLoss` is true`, it will
345+
* throw an exception when we detect an unavailable offset. If `failOnDataLoss` is `false`, this
346+
* method will return `null` if the next available record is within [offset, untilOffset).
245347
*
246348
* @throws OffsetOutOfRangeException if `offset` is out of range
247349
* @throws TimeoutException if cannot fetch the record in `pollTimeoutMs` milliseconds.
248350
*/
249-
private def fetchData(
351+
private def fetchRecord(
250352
offset: Long,
251353
untilOffset: Long,
252354
pollTimeoutMs: Long,
253-
failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
254-
if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
255-
// This is the first fetch, or the last pre-fetched data has been drained.
256-
// Seek to the offset because we may call seekToBeginning or seekToEnd before this.
257-
seek(offset)
258-
poll(pollTimeoutMs)
259-
}
260-
261-
if (!fetchedData.hasNext()) {
262-
// We cannot fetch anything after `poll`. Two possible cases:
263-
// - `offset` is out of range so that Kafka returns nothing. Just throw
264-
// `OffsetOutOfRangeException` to let the caller handle it.
265-
// - Cannot fetch any data before timeout. TimeoutException will be thrown.
266-
val range = getAvailableOffsetRange()
267-
if (offset < range.earliest || offset >= range.latest) {
268-
throw new OffsetOutOfRangeException(
269-
Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
355+
failOnDataLoss: Boolean): FetchedRecord = {
356+
if (offset != fetchedData.nextOffsetInFetchedData) {
357+
// This is the first fetch, or the fetched data has been reset.
358+
// Fetch records from Kafka and update `fetchedData`.
359+
fetchData(offset, pollTimeoutMs)
360+
} else if (!fetchedData.hasNext) { // The last pre-fetched data has been drained.
361+
if (offset < fetchedData.offsetAfterPoll) {
362+
// Offsets in [offset, fetchedData.offsetAfterPoll) are invisible. Return a record to ask
363+
// the next call to start from `fetchedData.offsetAfterPoll`.
364+
fetchedData.reset()
365+
return fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll)
270366
} else {
271-
throw new TimeoutException(
272-
s"Cannot fetch record for offset $offset in $pollTimeoutMs milliseconds")
367+
// Fetch records from Kafka and update `fetchedData`.
368+
fetchData(offset, pollTimeoutMs)
273369
}
370+
}
371+
372+
if (!fetchedData.hasNext) {
373+
// When we reach here, we have already tried to poll from Kafka. As `fetchedData` is still
374+
// empty, all messages in [offset, fetchedData.offsetAfterPoll) are invisible. Return a
375+
// record to ask the next call to start from `fetchedData.offsetAfterPoll`.
376+
assert(offset <= fetchedData.offsetAfterPoll,
377+
s"seek to $offset and poll but the offset was reset to ${fetchedData.offsetAfterPoll}")
378+
fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll)
274379
} else {
275380
val record = fetchedData.next()
276-
nextOffsetInFetchedData = record.offset + 1
277381
// In general, Kafka uses the specified offset as the start point, and tries to fetch the next
278382
// available offset. Hence we need to handle offset mismatch.
279383
if (record.offset > offset) {
384+
val range = getAvailableOffsetRange()
385+
if (range.earliest <= offset) {
386+
// `offset` is still valid but the corresponding message is invisible. We should skip it
387+
// and jump to `record.offset`. Here we move `fetchedData` back so that the next call of
388+
// `fetchRecord` can just return `record` directly.
389+
fetchedData.previous()
390+
return fetchedRecord.withRecord(null, record.offset)
391+
}
280392
// This may happen when some records aged out but their offsets already got verified
281393
if (failOnDataLoss) {
282394
reportDataLoss(true, s"Cannot fetch records in [$offset, ${record.offset})")
283395
// Never happen as "reportDataLoss" will throw an exception
284-
null
396+
throw new IllegalStateException(
397+
"reportDataLoss didn't throw an exception when 'failOnDataLoss' is true")
398+
} else if (record.offset >= untilOffset) {
399+
reportDataLoss(false, s"Skip missing records in [$offset, $untilOffset)")
400+
// Set `nextOffsetToFetch` to `untilOffset` to finish the current batch.
401+
fetchedRecord.withRecord(null, untilOffset)
285402
} else {
286-
if (record.offset >= untilOffset) {
287-
reportDataLoss(false, s"Skip missing records in [$offset, $untilOffset)")
288-
null
289-
} else {
290-
reportDataLoss(false, s"Skip missing records in [$offset, ${record.offset})")
291-
record
292-
}
403+
reportDataLoss(false, s"Skip missing records in [$offset, ${record.offset})")
404+
fetchedRecord.withRecord(record, fetchedData.nextOffsetInFetchedData)
293405
}
294406
} else if (record.offset < offset) {
295407
// This should not happen. If it does happen, then we probably misunderstand Kafka internal
296408
// mechanism.
297409
throw new IllegalStateException(
298410
s"Tried to fetch $offset but the returned record offset was ${record.offset}")
299411
} else {
300-
record
412+
fetchedRecord.withRecord(record, fetchedData.nextOffsetInFetchedData)
301413
}
302414
}
303415
}
@@ -306,13 +418,7 @@ private[kafka010] case class InternalKafkaConsumer(
306418
private def resetConsumer(): Unit = {
307419
consumer.close()
308420
consumer = createConsumer
309-
resetFetchedData()
310-
}
311-
312-
/** Reset the internal pre-fetched data. */
313-
private def resetFetchedData(): Unit = {
314-
nextOffsetInFetchedData = UNKNOWN_OFFSET
315-
fetchedData = ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
421+
fetchedData.reset()
316422
}
317423

318424
/**
@@ -346,11 +452,40 @@ private[kafka010] case class InternalKafkaConsumer(
346452
consumer.seek(topicPartition, offset)
347453
}
348454

349-
private def poll(pollTimeoutMs: Long): Unit = {
455+
/**
456+
* Poll messages from Kafka starting from `offset` and update `fetchedData`. `fetchedData` may be
457+
* empty if the Kafka consumer fetches some messages but all of them are not visible messages
458+
* (either transaction messages, or aborted messages when `isolation.level` is `read_committed`).
459+
*
460+
* @throws OffsetOutOfRangeException if `offset` is out of range.
461+
* @throws TimeoutException if the consumer position is not changed after polling. It means the
462+
* consumer polls nothing before timeout.
463+
*/
464+
private def fetchData(offset: Long, pollTimeoutMs: Long): Unit = {
465+
// Seek to the offset because we may call seekToBeginning or seekToEnd before this.
466+
seek(offset)
350467
val p = consumer.poll(pollTimeoutMs)
351468
val r = p.records(topicPartition)
352469
logDebug(s"Polled $groupId ${p.partitions()} ${r.size}")
353-
fetchedData = r.iterator
470+
val offsetAfterPoll = consumer.position(topicPartition)
471+
logDebug(s"Offset changed from $offset to $offsetAfterPoll after polling")
472+
fetchedData.withNewPoll(r.listIterator, offsetAfterPoll)
473+
if (!fetchedData.hasNext) {
474+
// We cannot fetch anything after `poll`. Two possible cases:
475+
// - `offset` is out of range so that Kafka returns nothing. `OffsetOutOfRangeException` will
476+
// be thrown.
477+
// - Cannot fetch any data before timeout. `TimeoutException` will be thrown.
478+
// - Fetched something but all of them are not invisible. This is a valid case and let the
479+
// caller handles this.
480+
val range = getAvailableOffsetRange()
481+
if (offset < range.earliest || offset >= range.latest) {
482+
throw new OffsetOutOfRangeException(
483+
Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
484+
} else if (offset == offsetAfterPoll) {
485+
throw new TimeoutException(
486+
s"Cannot fetch record for offset $offset in $pollTimeoutMs milliseconds")
487+
}
488+
}
354489
}
355490
}
356491

0 commit comments

Comments
 (0)