Skip to content

Commit 293f99e

Browse files
author
Attila Tóth
committed
add: parameters to control number and the distribution of messages in a micro-batch
1 parent 820043e commit 293f99e

16 files changed

+1587
-10
lines changed

README.md

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,74 @@ You can use `org.apache.spark.sql.pulsar.JsonUtils.topicOffsets(Map[String, Mess
305305
This may cause a false alarm. You can set it to `false` when it doesn't work as you expected. <br>
306306

307307
A batch query always fails if it fails to read any data from the provided offsets due to data loss.</td>
308+
</tr>
309+
<tr>
310+
<td>
311+
`maxEntriesPerTrigger`
312+
</td>
313+
<td>
314+
Number of entries to include in a single micro-batch during
315+
streaming.
316+
</td>
317+
<td>-1</td>
318+
<td>Streaming query</td>
319+
<td>This parameter controls how many Pulsar entries are read by
320+
the connector from the topic backlog at once. If the topic
321+
backlog is considerably high, users can use this parameter
322+
to limit the size of the micro-batch. If multiple topics are read,
323+
this parameter controls the complete number of entries fetched from
324+
all of them.
325+
326+
*Note:* Entries might contain multiple messages. The default value of `-1` means that the
327+
complete backlog is read at once.</td>
328+
</tr>
329+
330+
<tr>
331+
<td>
332+
`forwardStrategy`
333+
</td>
334+
<td>
335+
`simple`, `large-first` or `proportional`
336+
</td>
337+
<td>`simple`</td>
338+
<td>Streaming query</td>
339+
<td>If `maxEntriesPerTrigger` is set, this parameter controls
340+
which forwarding strategy is in use during the read of multiple
341+
topics.
342+
<li>
343+
`simple` just divides the allowed number of entries equally
344+
between all topics, regardless of their backlog size
345+
</li>
346+
<li>
347+
`large-first` will load the largest topic backlogs first,
348+
as the maximum number of allowed entries allows
349+
</li>
350+
<li>
351+
`proportional` will forward all topics proportional to the
352+
topic backlog/overall backlog ratio
353+
</li>
354+
</td>
355+
</tr>
308356

357+
<tr>
358+
<td>
359+
`ensureEntriesPerTopic`
360+
</td>
361+
<td>Number to forward each topic with during a micro-batch.</td>
362+
<td>0</td>
363+
<td>Streaming query</td>
364+
<td>If multiple topics are read, and the maximum number of
365+
entries is also specified, always forward all topics with the
366+
amount of entries specified here. Using this, users can ensure that topics
367+
with considerably smaller backlogs than others are also forwarded
368+
and read. Note that:
369+
<li>If this number is higher than the maximum allowed entries divided
370+
by the number of topics, then this value is taken into account, overriding
371+
the maximum number of entries per micro-batch.
372+
</li>
373+
<li>This parameter has an effect only for forwarding strategies
374+
`large-first` and `proportional`.</li>
375+
</td>
309376
</tr>
310377
<tr>
311378
<td>
@@ -333,6 +400,7 @@ taken into account during operation.
333400
</td>
334401
</tr>
335402

403+
336404
</table>
337405

338406
#### Authentication

src/main/scala/org/apache/spark/sql/pulsar/PulsarMetadataReader.scala

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@ import java.util.regex.Pattern
2222
import org.apache.pulsar.client.admin.{PulsarAdmin, PulsarAdminException}
2323
import org.apache.pulsar.client.api.{Message, MessageId, PulsarClient}
2424
import org.apache.pulsar.client.impl.schema.BytesSchema
25+
import org.apache.pulsar.client.internal.DefaultImplementation
2526
import org.apache.pulsar.common.naming.TopicName
2627
import org.apache.pulsar.common.schema.SchemaInfo
2728

2829
import org.apache.spark.internal.Logging
2930
import org.apache.spark.sql.pulsar.PulsarOptions._
31+
import org.apache.spark.sql.pulsar.topicinternalstats.forward._
3032
import org.apache.spark.sql.types.StructType
3133

3234
/**
@@ -259,6 +261,82 @@ private[pulsar] case class PulsarMetadataReader(
259261
}.toMap)
260262
}
261263

264+
265+
def forwardOffset(actualOffset: Map[String, MessageId],
266+
strategy: String,
267+
numberOfEntriesToForward: Long,
268+
ensureEntriesPerTopic: Long): SpecificPulsarOffset = {
269+
getTopicPartitions()
270+
271+
// Collect internal stats for all topics
272+
val topicStats = topicPartitions.map( topic => {
273+
val internalStats = admin.topics().getInternalStats(topic)
274+
val topicActualMessageId = actualOffset.getOrElse(topic, MessageId.earliest)
275+
topic -> TopicState(internalStats,
276+
PulsarSourceUtils.getLedgerId(topicActualMessageId),
277+
PulsarSourceUtils.getEntryId(topicActualMessageId))
278+
} ).toMap
279+
280+
val forwarder = strategy match {
281+
case PulsarOptions.ProportionalForwardStrategy =>
282+
new ProportionalForwardStrategy(numberOfEntriesToForward, ensureEntriesPerTopic)
283+
case PulsarOptions.LargeFirstForwardStrategy =>
284+
new LargeFirstForwardStrategy(numberOfEntriesToForward, ensureEntriesPerTopic)
285+
case _ =>
286+
new LinearForwardStrategy(numberOfEntriesToForward)
287+
}
288+
289+
SpecificPulsarOffset(topicPartitions.map { topic =>
290+
topic -> PulsarSourceUtils.seekableLatestMid {
291+
// Fetch actual offset for topic
292+
val topicActualMessageId = actualOffset.getOrElse(topic, MessageId.earliest)
293+
try {
294+
// Get the actual ledger
295+
val actualLedgerId = PulsarSourceUtils.getLedgerId(topicActualMessageId)
296+
// Get the actual entry ID
297+
val actualEntryId = PulsarSourceUtils.getEntryId(topicActualMessageId)
298+
// Get the partition index
299+
val partitionIndex = PulsarSourceUtils.getPartitionIndex(topicActualMessageId)
300+
// Cache topic internal stats
301+
val internalStats = topicStats.get(topic).get.internalStat
302+
// Calculate the amount of messages we will pull in
303+
val numberOfEntriesPerTopic = forwarder.forward(topicStats)(topic)
304+
// Get a future message ID which corresponds
305+
// to the maximum number of messages
306+
val (nextLedgerId, nextEntryId) = TopicInternalStatsUtils.forwardMessageId(
307+
internalStats,
308+
actualLedgerId,
309+
actualEntryId,
310+
numberOfEntriesPerTopic)
311+
// Build a message id
312+
val forwardedMessageId =
313+
DefaultImplementation.newMessageId(nextLedgerId, nextEntryId, partitionIndex)
314+
// Log state
315+
val forwardedEntry = TopicInternalStatsUtils.numOfEntriesUntil(
316+
internalStats, nextLedgerId, nextEntryId)
317+
val entryCount = internalStats.numberOfEntries
318+
val progress = f"${forwardedEntry.toFloat / entryCount.toFloat}%1.3f"
319+
val logMessage = s"Pulsar Connector forward on topic. " +
320+
s"[$numberOfEntriesPerTopic/$numberOfEntriesToForward]" +
321+
s"${topic.reverse.take(30).reverse} $topicActualMessageId -> " +
322+
s"$forwardedMessageId ($forwardedEntry/$entryCount) [$progress]"
323+
log.debug(logMessage)
324+
// Return the message ID
325+
forwardedMessageId
326+
} catch {
327+
case e: PulsarAdminException if e.getStatusCode == 404 =>
328+
MessageId.earliest
329+
case e: Throwable =>
330+
throw new RuntimeException(
331+
s"Failed to get forwarded messageId for ${TopicName.get(topic).toString} " +
332+
s"(tried to forward ${forwarder.forward(topicStats)(topic)} messages " +
333+
s"starting from `$topicActualMessageId` using strategy $strategy)", e)
334+
}
335+
336+
}
337+
}.toMap)
338+
}
339+
262340
def fetchLatestOffsetForTopic(topic: String): MessageId = {
263341
val messageId =
264342
try {

src/main/scala/org/apache/spark/sql/pulsar/PulsarOptions.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,12 @@ private[pulsar] object PulsarOptions {
3737

3838
val TopicOptionKeys: Set[String] = Set(TopicSingle, TopicMulti, TopicPattern)
3939

40+
val MaxEntriesPerTrigger = "maxentriespertrigger"
41+
val EnsureEntriesPerTopic = "ensureentriespertopic"
42+
val ForwardStrategy = "forwardstrategy"
43+
val ProportionalForwardStrategy = "proportional"
44+
val LargeFirstForwardStrategy = "large-first"
45+
4046
val ServiceUrlOptionKey: String = "service.url"
4147
val AdminUrlOptionKey: String = "admin.url"
4248
val StartingOffsetsOptionKey: String = "startingOffsets".toLowerCase(Locale.ROOT)

src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,11 @@ private[pulsar] class PulsarProvider
113113
pollTimeoutMs(caseInsensitiveParams),
114114
failOnDataLoss(caseInsensitiveParams),
115115
subscriptionNamePrefix,
116-
jsonOptions)
116+
jsonOptions,
117+
maxEntriesPerTrigger(caseInsensitiveParams),
118+
minEntriesPerTopic(caseInsensitiveParams),
119+
forwardStrategy(caseInsensitiveParams)
120+
)
117121
}
118122

119123
override def createRelation(
@@ -395,6 +399,15 @@ private[pulsar] object PulsarProvider extends Logging {
395399
(SparkEnv.get.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000).toString)
396400
.toInt
397401

402+
private def maxEntriesPerTrigger(caseInsensitiveParams: Map[String, String]): Long =
403+
caseInsensitiveParams.getOrElse(MaxEntriesPerTrigger, "-1").toLong
404+
405+
private def minEntriesPerTopic(caseInsensitiveParams: Map[String, String]): Long =
406+
caseInsensitiveParams.getOrElse(EnsureEntriesPerTopic, "0").toLong
407+
408+
private def forwardStrategy(caseInsensitiveParams: Map[String, String]): String =
409+
caseInsensitiveParams.getOrElse(ForwardStrategy, "simple")
410+
398411
private def validateGeneralOptions(
399412
caseInsensitiveParams: Map[String, String]): Map[String, String] = {
400413
if (!caseInsensitiveParams.contains(ServiceUrlOptionKey)) {

src/main/scala/org/apache/spark/sql/pulsar/PulsarSource.scala

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,10 @@ private[pulsar] class PulsarSource(
3636
pollTimeoutMs: Int,
3737
failOnDataLoss: Boolean,
3838
subscriptionNamePrefix: String,
39-
jsonOptions: JSONOptionsInRead)
39+
jsonOptions: JSONOptionsInRead,
40+
maxEntriesPerTrigger: Long,
41+
ensureEntriesPerTopic: Long,
42+
forwardStrategy: String)
4043
extends Source
4144
with Logging {
4245

@@ -59,12 +62,21 @@ private[pulsar] class PulsarSource(
5962
override def schema(): StructType = SchemaUtils.pulsarSourceSchema(pulsarSchema)
6063

6164
override def getOffset: Option[Offset] = {
62-
// Make sure initialTopicOffsets is initialized
6365
initialTopicOffsets
64-
val latest = metadataReader.fetchLatestOffsets()
65-
currentTopicOffsets = Some(latest.topicOffsets)
66-
logDebug(s"GetOffset: ${latest.topicOffsets.toSeq.map(_.toString).sorted}")
67-
Some(latest.asInstanceOf[Offset])
66+
val nextOffsets = if (maxEntriesPerTrigger == -1) {
67+
metadataReader.fetchLatestOffsets()
68+
} else {
69+
currentTopicOffsets match {
70+
case Some(value) =>
71+
metadataReader.forwardOffset(value,
72+
forwardStrategy, maxEntriesPerTrigger, ensureEntriesPerTopic)
73+
case _ =>
74+
metadataReader.forwardOffset(initialTopicOffsets.topicOffsets,
75+
forwardStrategy, maxEntriesPerTrigger, ensureEntriesPerTopic)
76+
}
77+
}
78+
logDebug(s"GetOffset: ${nextOffsets.topicOffsets.toSeq.map(_.toString).sorted}")
79+
Some(nextOffsets.asInstanceOf[Offset])
6880
}
6981

7082
override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
@@ -74,9 +86,7 @@ private[pulsar] class PulsarSource(
7486
logInfo(s"getBatch called with start = $start, end = $end")
7587
val endTopicOffsets = SpecificPulsarOffset.getTopicOffsets(end)
7688

77-
if (currentTopicOffsets.isEmpty) {
78-
currentTopicOffsets = Some(endTopicOffsets)
79-
}
89+
currentTopicOffsets = Some(endTopicOffsets)
8090

8191
if (start.isDefined && start.get == end) {
8292
return sqlContext.internalCreateDataFrame(

src/main/scala/org/apache/spark/sql/pulsar/PulsarSources.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,36 @@ private[pulsar] object PulsarSourceUtils extends Logging {
120120
}
121121
}
122122

123+
def getLedgerId(mid: MessageId): Long = {
124+
mid match {
125+
case bmid: BatchMessageIdImpl =>
126+
bmid.getLedgerId
127+
case midi: MessageIdImpl => midi.getLedgerId
128+
case t: TopicMessageIdImpl => getLedgerId(t.getInnerMessageId)
129+
case up: UserProvidedMessageId => up.getLedgerId
130+
}
131+
}
132+
133+
def getEntryId(mid: MessageId): Long = {
134+
mid match {
135+
case bmid: BatchMessageIdImpl =>
136+
bmid.getEntryId
137+
case midi: MessageIdImpl => midi.getEntryId
138+
case t: TopicMessageIdImpl => getEntryId(t.getInnerMessageId)
139+
case up: UserProvidedMessageId => up.getEntryId
140+
}
141+
}
142+
143+
def getPartitionIndex(mid: MessageId): Int = {
144+
mid match {
145+
case bmid: BatchMessageIdImpl =>
146+
bmid.getPartitionIndex
147+
case midi: MessageIdImpl => midi.getPartitionIndex
148+
case t: TopicMessageIdImpl => getPartitionIndex(t.getInnerMessageId)
149+
case up: UserProvidedMessageId => up.getPartitionIndex
150+
}
151+
}
152+
123153
def seekableLatestMid(mid: MessageId): MessageId = {
124154
if (messageExists(mid)) mid else MessageId.earliest
125155
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/**
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package org.apache.spark.sql.pulsar.topicinternalstats.forward
15+
16+
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats
17+
18+
trait ForwardStrategy {
19+
def forward(topics: Map[String, TopicState]): Map[String, Long]
20+
}
21+
22+
case class TopicState(internalStat: PersistentTopicInternalStats,
23+
actualLedgerId: Long,
24+
actualEntryId: Long)

0 commit comments

Comments
 (0)