Skip to content

Commit ad35056

Browse files
Merge pull request #550 from permutive-engineering/feature/delivery-attempt
2 parents da0b653 + cdda38e commit ad35056

File tree

2 files changed

+21
-8
lines changed

2 files changed

+21
-8
lines changed

modules/fs2-pubsub/src/main/scala/fs2/pubsub/PubSubClient.scala

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -250,17 +250,19 @@ object PubSubClient {
250250

251251
implicit val decoder: Decoder[PubSubRecord.Subscriber[F, Array[Byte]]] = cursor =>
252252
for {
253-
ackId <- cursor.get[AckId]("ackId")
254-
message = cursor.downField("message")
255-
data <- message.get[Option[String]]("data")
256-
attributes <- message.get[Option[Map[String, String]]]("attributes")
257-
messageId <- message.get[Option[MessageId]]("messageId")
258-
publishTime <- message.get[Option[Instant]]("publishTime")
253+
ackId <- cursor.get[AckId]("ackId")
254+
message = cursor.downField("message")
255+
data <- message.get[Option[String]]("data")
256+
attributes <- message.get[Option[Map[String, String]]]("attributes")
257+
messageId <- message.get[Option[MessageId]]("messageId")
258+
publishTime <- message.get[Option[Instant]]("publishTime")
259+
deliveryAttempt <- message.get[Option[Int]]("deliveryAttempt")
259260
} yield PubSubRecord.Subscriber(
260261
data.map(Base64.getDecoder().decode),
261262
attributes.orEmpty,
262263
messageId,
263264
publishTime,
265+
deliveryAttempt,
264266
ackId,
265267
ack(subscription, ackId),
266268
nack(subscription, ackId),
@@ -370,6 +372,7 @@ object PubSubClient {
370372
message.message.map(_.attributes).orEmpty,
371373
message.message.map(_.messageId).map(MessageId(_)),
372374
message.message.flatMap(_.publishTime.map(_.asJavaInstant)),
375+
message.deliveryAttempt.some,
373376
AckId(message.ackId),
374377
ack(subscription, AckId(message.ackId)),
375378
nack(subscription, AckId(message.ackId)),

modules/fs2-pubsub/src/main/scala/fs2/pubsub/PubSubRecord.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ object PubSubRecord {
3333
* the unique identifier for the message
3434
* @param publishTime
3535
* the time at which the message was published
36+
* @param deliveryAttempt
37+
* Optional. The approximate number of times that Pub/Sub has attempted to deliver the associated message to a
38+
* subscriber. More precisely, this is 1 + (number of NACKs) + (number of ack_deadline exceeds) for this message.
3639
* @param ackId
3740
* the unique identifier for the acknowledgment of the message
3841
* @param ack
@@ -47,6 +50,7 @@ object PubSubRecord {
4750
val attributes: Map[String, String],
4851
val messageId: Option[MessageId],
4952
val publishTime: Option[Instant],
53+
val deliveryAttempt: Option[Int],
5054
val ackId: AckId,
5155
val ack: F[Unit],
5256
val nack: F[Unit],
@@ -58,11 +62,13 @@ object PubSubRecord {
5862
attributes: Map[String, String] = this.attributes,
5963
messageId: Option[MessageId] = this.messageId,
6064
publishTime: Option[Instant] = this.publishTime,
65+
deliveryAttempt: Option[Int] = this.deliveryAttempt,
6166
ackId: AckId = this.ackId,
6267
ack: F[Unit] = this.ack,
6368
nack: F[Unit] = this.nack,
6469
extendDeadline: AckDeadline => F[Unit] = this.extendDeadline
65-
): Subscriber[F, B] = Subscriber(value, attributes, messageId, publishTime, ackId, ack, nack, extendDeadline)
70+
): Subscriber[F, B] =
71+
Subscriber(value, attributes, messageId, publishTime, deliveryAttempt, ackId, ack, nack, extendDeadline)
6672

6773
@SuppressWarnings(Array("scalafix:DisableSyntax.==", "scalafix:Disable.equals"))
6874
override def equals(obj: Any): Boolean = obj match {
@@ -117,6 +123,8 @@ object PubSubRecord {
117123
* the unique identifier for the message
118124
* @param publishTime
119125
* the time at which the message was published
126+
* @param deliveryAttempt
127+
* the approximate number of times that Pub/Sub has attempted to deliver the associated message to a subscriber.
120128
* @param ackId
121129
* the unique identifier for the acknowledgment of the message
122130
* @param ack
@@ -133,12 +141,14 @@ object PubSubRecord {
133141
attributes: Map[String, String],
134142
messageId: Option[MessageId],
135143
publishTime: Option[Instant],
144+
deliveryAttempt: Option[Int],
136145
ackId: AckId,
137146
ack: F[Unit],
138147
nack: F[Unit],
139148
extendDeadline: AckDeadline => F[Unit]
140149
): PubSubRecord.Subscriber[F, A] =
141-
new PubSubRecord.Subscriber(value, attributes, messageId, publishTime, ackId, ack, nack, extendDeadline) {}
150+
new PubSubRecord.Subscriber(value, attributes, messageId, publishTime, deliveryAttempt, ackId, ack, nack,
151+
extendDeadline) {}
142152

143153
// format: off
144154
def unapply[F[_], A](record: PubSubRecord.Subscriber[F, A]): Some[(Option[A], Map[String, String], Option[MessageId], Option[Instant], AckId, F[Unit], F[Unit], AckDeadline => F[Unit])] =

0 commit comments

Comments
 (0)