Skip to content

Commit 6d59759

Browse files
committed
Pubsub source can use streaming pull
The PubsubSourceConfig now takes an option `streamingPull` which can be set to true to use a GRPC streaming pull to fetch from pubsub. Streaming pull is better for apps with low latency requirements.
1 parent a1b77b6 commit 6d59759

File tree

12 files changed

+437
-116
lines changed

12 files changed

+437
-116
lines changed

modules/pubsub/src/main/resources/reference.conf

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,13 @@ snowplow.defaults: {
66
minRemainingAckDeadline: 0.1
77
maxMessagesPerPull: 1000
88
debounceRequests: "100 millis"
9+
streamingPull: true
10+
retries: {
11+
transientErrors: {
12+
delay: "100 millis"
13+
attempts: 10
14+
}
15+
}
916
}
1017
}
1118

@@ -15,6 +22,12 @@ snowplow.defaults: {
1522
requestByteThreshold: 1000000
1623
# Equal to 9 MB. Pubsub message size limit 10 MB however, we further reduce it to 9 MB to be on the safe side.
1724
maxRecordSize: 9000000
25+
retries: {
26+
transientErrors: {
27+
delay: "100 millis"
28+
attempts: 10
29+
}
30+
}
1831
}
1932
}
2033

modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/streams/pubsub/PubsubRetryOps.scala

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,25 +7,60 @@
77
*/
88
package com.snowplowanalytics.snowplow.streams.pubsub
99

10+
import cats.MonadError
1011
import cats.implicits._
11-
import cats.effect.Async
12+
import cats.effect.{Async, Poll}
1213
import com.google.api.gax.rpc.{ApiException, StatusCode}
1314
import io.grpc.Status
1415
import org.typelevel.log4cats.Logger
15-
import retry.RetryPolicies
16+
import retry.{RetryPolicies, Sleep}
1617
import retry.implicits._
1718

18-
import scala.concurrent.duration.DurationDouble
19+
import scala.concurrent.duration.FiniteDuration
1920

2021
private[pubsub] object PubsubRetryOps {
2122

2223
object implicits {
2324
implicit class Ops[F[_], A](val f: F[A]) extends AnyVal {
2425

25-
def retryingOnTransientGrpcFailures(implicit F: Async[F], L: Logger[F]): F[A] =
26+
/**
27+
* Retries an effect when an exception matches known retryable GRPC errors
28+
*
29+
* The retry policy is full jitter, with capped number of attempts
30+
*/
31+
def retryingOnTransientGrpcFailures(retryDelay: FiniteDuration, retryAttempts: Int)(implicit F: Async[F], L: Logger[F]): F[A] =
32+
retryingOnTransientGrpcFailuresImpl(retryDelay, retryAttempts)
33+
34+
/**
35+
* Retries an effect when an exception matches known retryable GRPC errors
36+
*
37+
* The retry policy is full jitter, with capped number of attempts
38+
*
39+
* This variant takes a `Poll[F]` as a parameter. This is helpful when the effect is run
40+
* inside `Sync[F].uncancelable` but we don't mind if the effect is cancelled in between retry
41+
* attempts.
42+
*/
43+
def retryingOnTransientGrpcFailuresWithCancelableSleep(
44+
retryDelay: FiniteDuration,
45+
retryAttempts: Int,
46+
poll: Poll[F]
47+
)(implicit F: Async[F],
48+
L: Logger[F]
49+
): F[A] = {
50+
implicit val s: Sleep[F] = (delay: FiniteDuration) => poll(Async[F].sleep(delay))
51+
retryingOnTransientGrpcFailuresImpl(retryDelay, retryAttempts)
52+
}
53+
54+
private def retryingOnTransientGrpcFailuresImpl(
55+
retryDelay: FiniteDuration,
56+
retryAttempts: Int
57+
)(implicit F: MonadError[F, Throwable],
58+
L: Logger[F],
59+
s: Sleep[F]
60+
): F[A] =
2661
f.retryingOnSomeErrors(
2762
isWorthRetrying = { e => isRetryableException(e).pure[F] },
28-
policy = RetryPolicies.fullJitter(1.second),
63+
policy = RetryPolicies.fullJitter[F](retryDelay).join(RetryPolicies.limitRetries(retryAttempts - 1)),
2964
onError = { case (t, _) =>
3065
Logger[F].info(t)(s"Pubsub retryable GRPC error will be retried: ${t.getMessage}")
3166
}

modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/streams/pubsub/PubsubSinkConfig.scala

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,23 @@ package com.snowplowanalytics.snowplow.streams.pubsub
1010
import cats.Id
1111
import io.circe.Decoder
1212
import io.circe.generic.semiauto._
13+
import io.circe.config.syntax._
14+
15+
import scala.concurrent.duration.FiniteDuration
1316

1417
case class PubsubSinkConfigM[M[_]](
1518
topic: M[PubsubSinkConfig.Topic],
1619
batchSize: Int,
17-
requestByteThreshold: Int
20+
requestByteThreshold: Int,
21+
retries: PubsubSinkConfig.Retries
1822
)
1923

2024
object PubsubSinkConfig {
25+
26+
case class TransientErrorRetrying(delay: FiniteDuration, attempts: Int)
27+
28+
case class Retries(transientErrors: TransientErrorRetrying)
29+
2130
case class Topic(projectId: String, topicId: String)
2231
}
2332

@@ -34,13 +43,20 @@ object PubsubSinkConfigM {
3443
Left("Expected format: projects/<project>/topics/<topic>")
3544
}
3645

37-
implicit def decoder: Decoder[PubsubSinkConfig] = deriveDecoder[PubsubSinkConfig]
46+
implicit def decoder: Decoder[PubsubSinkConfig] = {
47+
implicit val transientErrorDecoder = deriveDecoder[TransientErrorRetrying]
48+
implicit val retriesDecoder = deriveDecoder[Retries]
49+
deriveDecoder[PubsubSinkConfig]
50+
}
3851

39-
implicit def optionalDecoder: Decoder[Option[PubsubSinkConfig]] =
52+
implicit def optionalDecoder: Decoder[Option[PubsubSinkConfig]] = {
53+
implicit val transientErrorDecoder = deriveDecoder[TransientErrorRetrying]
54+
implicit val retriesDecoder = deriveDecoder[Retries]
4055
deriveDecoder[PubsubSinkConfigM[Option]].map {
41-
case PubsubSinkConfigM(Some(t), a, b) =>
42-
Some(PubsubSinkConfigM[Id](t, a, b))
56+
case PubsubSinkConfigM(Some(t), a, b, c) =>
57+
Some(PubsubSinkConfigM[Id](t, a, b, c))
4358
case _ =>
4459
None
4560
}
61+
}
4662
}

modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/streams/pubsub/PubsubSourceConfig.scala

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import scala.concurrent.duration.FiniteDuration
2424
* Controls how many RPCs may be opened concurrently from the client to the PubSub server. The
2525
* maximum number of RPCs is equal to this factor multiplied by the number of available cpu cores.
2626
* Increasing this factor can increase the rate of events provided by the Source to the
27-
* application.
27+
* application. Only used when `streamingPull` is false.
2828
* @param durationPerAckExtension
2929
* Ack deadlines are extended for this duration. For common-streams apps this should be set
3030
* slightly larger than the maximum time we expect between app receiving the message and acking
@@ -41,18 +41,32 @@ import scala.concurrent.duration.FiniteDuration
4141
* Adds an artifical delay between consecutive requests to pubsub for more messages. Under some
4242
* circumstances, this was found to slightly alleviate a problem in which pubsub might re-deliver
4343
* the same messages multiple times.
44+
* @param streamingPull
45+
* Controls whether to use "Streaming Pull" (true) or "Unary Pull" (false) GRPC method to fetch
46+
* events. Use "Unary Pull" (false) for apps which have long delays in between acks (e.g. Lake
47+
* Loader), to avoid a problem in which PubSub occasionally re-delivers the same messages, causing
48+
* downstream duplicates. Use "Streaming Pull" (true) for apps that ack often, and which have
49+
* lower latency requirements (e.g. Enrich).
50+
* @param retries
51+
* Configuration for retrying transient errors
4452
*/
4553
case class PubsubSourceConfig(
4654
subscription: PubsubSourceConfig.Subscription,
4755
parallelPullFactor: BigDecimal,
4856
durationPerAckExtension: FiniteDuration,
4957
minRemainingAckDeadline: BigDecimal,
5058
maxMessagesPerPull: Int,
51-
debounceRequests: FiniteDuration
59+
debounceRequests: FiniteDuration,
60+
streamingPull: Boolean,
61+
retries: PubsubSourceConfig.Retries
5262
)
5363

5464
object PubsubSourceConfig {
5565

66+
case class TransientErrorRetrying(delay: FiniteDuration, attempts: Int)
67+
68+
case class Retries(transientErrors: TransientErrorRetrying)
69+
5670
case class Subscription(projectId: String, subscriptionId: String)
5771

5872
object Subscription {
@@ -71,5 +85,9 @@ object PubsubSourceConfig {
7185
Left("Expected format: projects/<project>/subscriptions/<subscription>")
7286
}
7387

74-
implicit def decoder: Decoder[PubsubSourceConfig] = deriveDecoder[PubsubSourceConfig]
88+
implicit def decoder: Decoder[PubsubSourceConfig] = {
89+
implicit val transientErrorDecoder = deriveDecoder[TransientErrorRetrying]
90+
implicit val retriesDecoder = deriveDecoder[Retries]
91+
deriveDecoder[PubsubSourceConfig]
92+
}
7593
}

modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/streams/pubsub/sink/PubsubSink.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ private[pubsub] object PubsubSink {
8080
apiFuture <- Sync[F].delay(stub.publishCallable.futureCall(request))
8181
_ <- FutureInterop.fromFuture_(apiFuture)
8282
} yield ()
83-
io.retryingOnTransientGrpcFailures
83+
io.retryingOnTransientGrpcFailures(config.retries.transientErrors.delay, config.retries.transientErrors.attempts)
8484
}
8585

8686
/**

modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/streams/pubsub/source/PubsubCheckpointer.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import com.snowplowanalytics.snowplow.streams.pubsub.PubsubRetryOps.implicits._
3535
* state.
3636
*/
3737
private class PubsubCheckpointer[F[_]: Async](
38-
subscription: PubsubSourceConfig.Subscription,
38+
config: PubsubSourceConfig,
3939
stub: SubscriberStub,
4040
refAckIds: Ref[F, Map[Unique.Token, PubsubBatchState]]
4141
) extends Checkpointer[F, Vector[Unique.Token]] {
@@ -57,12 +57,13 @@ private class PubsubCheckpointer[F[_]: Async](
5757
for {
5858
ackDatas <- refAckIds.modify(m => (m -- c, c.flatMap(m.get)))
5959
_ <- ackDatas.flatMap(_.ackIds).grouped(1000).toVector.traverse_ { ackIds =>
60-
val request = AcknowledgeRequest.newBuilder.setSubscription(subscription.show).addAllAckIds(ackIds.asJava).build
60+
val request = AcknowledgeRequest.newBuilder.setSubscription(config.subscription.show).addAllAckIds(ackIds.asJava).build
6161
val attempt = for {
6262
apiFuture <- Sync[F].delay(stub.acknowledgeCallable.futureCall(request))
6363
_ <- FutureInterop.fromFuture[F, com.google.protobuf.Empty](apiFuture)
6464
} yield ()
65-
attempt.retryingOnTransientGrpcFailures
65+
attempt
66+
.retryingOnTransientGrpcFailures(config.retries.transientErrors.delay, config.retries.transientErrors.attempts)
6667
.recoveringOnGrpcInvalidArgument { s =>
6768
// This can happen if ack IDs have expired before we acked
6869
Logger[F].info(s"Ignoring error from GRPC when acking: ${s.getDescription}")
@@ -81,6 +82,6 @@ private class PubsubCheckpointer[F[_]: Async](
8182
ackDatas <- refAckIds.modify(m => (m -- c, c.flatMap(m.get)))
8283
ackIds = ackDatas.flatMap(_.ackIds)
8384
// A nack is just a modack with zero duration
84-
_ <- Utils.modAck[F](subscription, stub, ackIds, Duration.Zero)
85+
_ <- Utils.modAck[F](config, stub, ackIds, Duration.Zero)
8586
} yield ()
8687
}

0 commit comments

Comments
 (0)