Skip to content

Commit a2066b5

Browse files
Merge pull request #552 from permutive-engineering/feature/with-payload
2 parents 06a78ef + bfec3a5 commit a2066b5

File tree

3 files changed

+173
-1
lines changed

3 files changed

+173
-1
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
ThisBuild / scalaVersion := "2.13.17"
22
ThisBuild / crossScalaVersions := Seq("2.13.17", "3.3.7")
33
ThisBuild / organization := "com.permutive"
4-
ThisBuild / versionPolicyIntention := Compatibility.BinaryAndSourceCompatible
4+
ThisBuild / versionPolicyIntention := Compatibility.BinaryCompatible
55

66
addCommandAlias("ci-test", "fix --check; versionPolicyCheck; mdoc; publishLocal; +test")
77
addCommandAlias("ci-docs", "github; mdoc; headerCreateAll")

modules/fs2-pubsub/src/main/scala/fs2/pubsub/PubSubRecord.scala

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,168 @@ object PubSubRecord {
171171

172172
}
173173

174+
/** Represents a message that has been received from a Pub/Sub subscription.
175+
*
176+
* @param value
177+
* the message payload
178+
* @param attributes
179+
* the message attributes
180+
* @param messageId
181+
* the unique identifier for the message
182+
* @param publishTime
183+
* the time at which the message was published
184+
* @param deliveryAttempt
185+
* Optional. The approximate number of times that Pub/Sub has attempted to deliver the associated message to a
186+
* subscriber. More precisely, this is 1 + (number of NACKs) + (number of ack_deadline exceeds) for this message.
187+
* @param ackId
188+
* the unique identifier for the acknowledgment of the message
189+
* @param ack
190+
* the function to acknowledge the message
191+
* @param nack
192+
* the function to negatively acknowledge the message
193+
* @param extendDeadline
194+
* the function to extend the deadline for acknowledging the message
195+
*/
196+
sealed abstract class WithPayload[F[_], +A] private (
197+
val value: A,
198+
val attributes: Map[String, String],
199+
val messageId: Option[MessageId],
200+
val publishTime: Option[Instant],
201+
val deliveryAttempt: Option[Int],
202+
val ackId: AckId,
203+
val ack: F[Unit],
204+
val nack: F[Unit],
205+
val extendDeadline: AckDeadline => F[Unit]
206+
) {
207+
208+
private def copy[B](
209+
value: B = this.value,
210+
attributes: Map[String, String] = this.attributes,
211+
messageId: Option[MessageId] = this.messageId,
212+
publishTime: Option[Instant] = this.publishTime,
213+
deliveryAttempt: Option[Int] = this.deliveryAttempt,
214+
ackId: AckId = this.ackId,
215+
ack: F[Unit] = this.ack,
216+
nack: F[Unit] = this.nack,
217+
extendDeadline: AckDeadline => F[Unit] = this.extendDeadline
218+
): PubSubRecord.Subscriber.WithPayload[F, B] =
219+
PubSubRecord.Subscriber.WithPayload(value, attributes, messageId, publishTime, deliveryAttempt, ackId, ack,
220+
nack, extendDeadline)
221+
222+
@SuppressWarnings(Array("scalafix:DisableSyntax.==", "scalafix:Disable.equals"))
223+
override def equals(obj: Any): Boolean = obj match {
224+
case record: PubSubRecord.Subscriber.WithPayload[_, _] =>
225+
this.value == record.value && this.attributes === record.attributes && this.messageId === record.messageId &&
226+
this.publishTime == record.publishTime && this.ackId === record.ackId
227+
case _ => false
228+
}
229+
230+
@SuppressWarnings(Array("scalafix:Disable.toString"))
231+
override def toString(): String =
232+
s"PubSubRecord.Subscriber.WithPayload($value, $attributes, $messageId, $publishTime, $ackId)"
233+
234+
/** Updates the function to acknowledge the message */
235+
def withAck(f: F[Unit]): PubSubRecord.Subscriber.WithPayload[F, A] = copy(ack = f)
236+
237+
/** Updates the function to negatively acknowledge the message */
238+
def withNack(f: F[Unit]): PubSubRecord.Subscriber.WithPayload[F, A] = copy(nack = f)
239+
240+
/** Contramaps the value of the message from type `B` to type `A` using the provided function.
241+
*
242+
* @param f
243+
* the function to contramap the value of the message from type `B` to type `A`
244+
* @return
245+
* a new `PubSubRecord.Subscriber` instance for the contramapped type `B`
246+
*/
247+
def map[B](f: A => B): PubSubRecord.Subscriber.WithPayload[F, B] = copy(value = f(value))
248+
249+
/** Maps the value of the message from type `A` to type `B` using the provided function that may result in an
250+
* `Either`.
251+
*
252+
* @param f
253+
* the function that may result in an `Either` value for mapping to type `B`
254+
* @return
255+
* a new `PubSubRecord.Subscriber` instance for the mapped type `B`
256+
*/
257+
def emap[B](f: A => Either[Throwable, B]): Either[Throwable, PubSubRecord.Subscriber.WithPayload[F, B]] =
258+
f(value).map(v => copy(value = v))
259+
260+
}
261+
262+
object WithPayload {
263+
264+
/** Creates a new `PubSubRecord.Subscriber.WithPayload` from the provided parameters.
265+
*
266+
* @param value
267+
* the message payload
268+
* @param attributes
269+
* the message attributes
270+
* @param messageId
271+
* the unique identifier for the message
272+
* @param publishTime
273+
* the time at which the message was published
274+
* @param deliveryAttempt
275+
* the approximate number of times that Pub/Sub has attempted to deliver the associated message to a
276+
* subscriber.
277+
* @param ackId
278+
* the unique identifier for the acknowledgment of the message
279+
* @param ack
280+
* the function to acknowledge the message
281+
* @param nack
282+
* the function to negatively acknowledge the message
283+
* @param extendDeadline
284+
* the function to extend the deadline for acknowledging the message
285+
* @return
286+
* a new `PubSubRecord.Subscriber` instance
287+
*/
288+
def apply[F[_], A](
289+
value: A,
290+
attributes: Map[String, String],
291+
messageId: Option[MessageId],
292+
publishTime: Option[Instant],
293+
deliveryAttempt: Option[Int],
294+
ackId: AckId,
295+
ack: F[Unit],
296+
nack: F[Unit],
297+
extendDeadline: AckDeadline => F[Unit]
298+
): PubSubRecord.Subscriber.WithPayload[F, A] =
299+
new PubSubRecord.Subscriber.WithPayload(value, attributes, messageId, publishTime, deliveryAttempt, ackId, ack,
300+
nack, extendDeadline) {}
301+
302+
def fromSubscriber[F[_], A](
303+
subscriber: PubSubRecord.Subscriber[F, A]
304+
): Option[PubSubRecord.Subscriber.WithPayload[F, A]] =
305+
subscriber.value.map { v =>
306+
apply(v, subscriber.attributes, subscriber.messageId, subscriber.publishTime, subscriber.deliveryAttempt,
307+
subscriber.ackId, subscriber.ack, subscriber.nack, subscriber.extendDeadline)
308+
}
309+
310+
// format: off
311+
def unapply[F[_], A](record: PubSubRecord.Subscriber.WithPayload[F, A]): Some[(A, Map[String, String], Option[MessageId], Option[Instant], AckId, F[Unit], F[Unit], AckDeadline => F[Unit])] =
312+
Some((record.value, record.attributes, record.messageId, record.publishTime, record.ackId, record.ack, record.nack, record.extendDeadline))
313+
// format: on
314+
315+
implicit def show[F[_], A: Show]: Show[PubSubRecord.Subscriber.WithPayload[F, A]] = record =>
316+
show"PubSubRecord.Subscriber.WithPayload(${record.value}, ${record.attributes}, ${record.messageId}, ${s"${record.publishTime}"}, ${record.ackId})"
317+
318+
implicit class PubSubRecordSubscriberWithPayloadSyntax[F[_]](
319+
subscriber: PubSubRecord.Subscriber.WithPayload[F, Array[Byte]]
320+
) {
321+
322+
/** Decodes the message payload of the `PubSubRecord.Subscriber.WithPayload` into the specified message type.
323+
*
324+
* @tparam B
325+
* the type of message to be decoded
326+
* @return
327+
* either the decoded message of type `B` or an exception if the decoding fails
328+
*/
329+
def as[B: MessageDecoder]: Either[Throwable, Subscriber.WithPayload[F, B]] =
330+
subscriber.emap(MessageDecoder[B].decode)
331+
332+
}
333+
334+
}
335+
174336
}
175337

176338
/** Represents a message that is to be published to a Pub/Sub topic.

modules/fs2-pubsub/src/main/scala/fs2/pubsub/dsl/subscriber.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,16 @@ object subscriber {
223223
*/
224224
def subscribeAndAck: Stream[F, Option[A]] = subscribe.evalTap(_.ack).map(_.value)
225225

226+
/** Subscribes to the Pub/Sub topic and returns a stream of subscriber records with payload. Records will be
227+
* automatically acknowledged if they don't have a payload. It is up to the user to acknowledge the records.
228+
*/
229+
def subscribeAndEnsurePayload(implicit F: Applicative[F]): Stream[F, PubSubRecord.Subscriber.WithPayload[F, A]] =
230+
subscribe.evalMapFilter { record =>
231+
val result = PubSubRecord.Subscriber.WithPayload.fromSubscriber(record)
232+
233+
record.ack.whenA(result.isEmpty).as(result)
234+
}
235+
226236
}
227237

228238
object SubscribeStep {

0 commit comments

Comments
 (0)