@@ -17,7 +17,7 @@ import software.amazon.awssdk.http.async.SdkAsyncHttpClient
1717import software .amazon .kinesis .lifecycle .events .{ProcessRecordsInput , ShardEndedInput }
1818import software .amazon .kinesis .retrieval .kpl .ExtendedSequenceNumber
1919
20- import java .util .concurrent .{CountDownLatch , SynchronousQueue }
20+ import java .util .concurrent .{CountDownLatch , LinkedBlockingQueue }
2121import scala .concurrent .duration .{DurationLong , FiniteDuration }
2222import scala .jdk .CollectionConverters ._
2323
@@ -46,34 +46,37 @@ private[kinesis] object KinesisSource {
4646 }
4747 }
4848
49- // We enable fairness on the `SynchronousQueue` to ensure all Kinesis shards are sourced at an equal rate.
50- private val synchronousQueueFairness : Boolean = true
51-
5249 private def kinesisStream [F [_]: Async ](
5350 config : KinesisSourceConfig ,
5451 client : SdkAsyncHttpClient ,
5552 awsUserAgent : Option [String ]
5653 ): Stream [F , Stream [F , Option [LowLevelEvents [Map [String , Checkpointable ]]]]] = {
57- val actionQueue = new SynchronousQueue [KCLAction ](synchronousQueueFairness )
54+ val actionQueue = new LinkedBlockingQueue [KCLAction ]()
5855 for {
5956 _ <- Stream .resource(KCLScheduler .populateQueue[F ](config, actionQueue, client, awsUserAgent))
6057 events <- Stream .emit(pullFromQueueAndEmit(actionQueue).stream).repeat
6158 } yield events
6259 }
6360
6461 private def pullFromQueueAndEmit [F [_]: Sync ](
65- queue : SynchronousQueue [KCLAction ]
62+ queue : LinkedBlockingQueue [KCLAction ]
6663 ): Pull [F , Option [LowLevelEvents [Map [String , Checkpointable ]]], Unit ] =
67- Pull .eval(pullFromQueue(queue)).flatMap { case PullFromQueueResult ( actions, hasShardEnd) =>
64+ Pull .eval(pullFromQueue(queue)).flatMap { actions =>
6865 val toEmit = actions.traverse {
69- case KCLAction .ProcessRecords (_, processRecordsInput) if processRecordsInput.records.asScala.isEmpty =>
70- Pull .output1(None )
71- case KCLAction .ProcessRecords (shardId, processRecordsInput) =>
72- Pull .output1(Some (provideNextChunk(shardId, processRecordsInput))).covary[F ]
66+ case KCLAction .ProcessRecords (_, await, processRecordsInput) if processRecordsInput.records.asScala.isEmpty =>
67+ Pull .eval( Sync [ F ].delay(await.countDown())) >> Pull . output1(None )
68+ case KCLAction .ProcessRecords (shardId, await, processRecordsInput) =>
69+ Pull .eval( Sync [ F ].delay(await.countDown())) >> Pull . output1(Some (provideNextChunk(shardId, processRecordsInput))).covary[F ]
7370 case KCLAction .ShardEnd (shardId, await, shardEndedInput) =>
71+ // Do not call `await.countDown()` yet. It must be released later by the checkpointer.
7472 handleShardEnd[F ](shardId, await, shardEndedInput)
75- case KCLAction .KCLError (t) =>
76- Pull .eval(Logger [F ].error(t)(" Exception from Kinesis source" )) *> Pull .raiseError[F ](t)
73+ case KCLAction .KCLError (t, await) =>
74+ Pull .eval(Sync [F ].delay(await.countDown())) >> Pull .eval(Logger [F ].error(t)(" Exception from Kinesis source" )) >> Pull
75+ .raiseError[F ](t)
76+ }
77+ val hasShardEnd = actions.exists {
78+ case _ : KCLAction .ShardEnd => true
79+ case _ : KCLAction => false
7780 }
7881 if (hasShardEnd) {
7982 val log = Logger [F ].info {
@@ -88,31 +91,21 @@ private[kinesis] object KinesisSource {
8891 toEmit *> pullFromQueueAndEmit(queue)
8992 }
9093
91- private case class PullFromQueueResult (actions : NonEmptyList [KCLAction ], hasShardEnd : Boolean )
92-
93- private def pullFromQueue [F [_]: Sync ](queue : SynchronousQueue [KCLAction ]): F [PullFromQueueResult ] =
94- resolveNextAction(queue)
95- .flatMap {
96- case shardEnd : KCLAction .ShardEnd =>
97- // If we reached the end of one shard, it is likely we reached the end of other shards too.
98- // Therefore pull more actions from the queue, to minimize the number of times we need to do
99- // an early close of the inner stream.
100- resolveAllActions(queue).map { more =>
101- PullFromQueueResult (NonEmptyList (shardEnd, more), hasShardEnd = true )
102- }
103- case other =>
104- PullFromQueueResult (NonEmptyList .one(other), hasShardEnd = false ).pure[F ]
105- }
94+ private def pullFromQueue [F [_]: Sync ](queue : LinkedBlockingQueue [KCLAction ]): F [NonEmptyList [KCLAction ]] =
95+ for {
96+ head <- resolveNextAction(queue)
97+ tail <- resolveAllActions(queue)
98+ } yield NonEmptyList (head, tail)
10699
107100 /** Always returns a `KCLAction`, possibly waiting until one is available */
108- private def resolveNextAction [F [_]: Sync ](queue : SynchronousQueue [KCLAction ]): F [KCLAction ] =
101+ private def resolveNextAction [F [_]: Sync ](queue : LinkedBlockingQueue [KCLAction ]): F [KCLAction ] =
109102 Sync [F ].delay(Option [KCLAction ](queue.poll)).flatMap {
110103 case Some (action) => Sync [F ].pure(action)
111104 case None => Sync [F ].interruptible(queue.take)
112105 }
113106
114107 /** Returns immediately, but the `List[KCLAction]` might be empty */
115- private def resolveAllActions [F [_]: Sync ](queue : SynchronousQueue [KCLAction ]): F [List [KCLAction ]] =
108+ private def resolveAllActions [F [_]: Sync ](queue : LinkedBlockingQueue [KCLAction ]): F [List [KCLAction ]] =
116109 for {
117110 ret <- Sync [F ].delay(new java.util.ArrayList [KCLAction ]())
118111 _ <- Sync [F ].delay(queue.drainTo(ret))
0 commit comments