Skip to content
This repository was archived by the owner on Apr 18, 2024. It is now read-only.

akka-sample-kafka-to-sharding-scala doesn't work as expected #219

@evgkarasev

Description

@evgkarasev

Versions used

val AkkaVersion = "2.6.6"
val AlpakkaKafkaVersion = "2.0.3"
val AkkaManagementVersion = "1.0.5"
val AkkaHttpVersion = "10.1.11"
val KafkaVersion = "2.4.0"
val LogbackVersion = "1.2.3"

Expected Behavior

[info] [2020-01-16 09:48:51,040] [INFO] [akka://KafkaToSharding/user/kafka-event-processor/rebalancerRef] - Partition [1] assigned to current node. Updating shard allocation
[info] [2020-01-16 09:48:51,040] [INFO] [akka://KafkaToSharding/user/kafka-event-processor/rebalancerRef] - Partition [25] assigned to current node. Updating shard allocation
[info] [2020-01-16 09:48:51,043] [INFO] [akka://KafkaToSharding/user/kafka-event-processor/rebalancerRef] - Partition [116] assigned to current node. Updating shard allocation

After producer started, in the single processor node the messages should be:

[info] [2020-01-16 09:51:38,672] [INFO] [sample.sharding.kafka.UserEventsKafkaProcessor$] [KafkaToSharding-akka.actor.default-dispatcher-26] [akka://KafkaToSharding/user/kafka-event-processor] - entityId->partition 29->45
[info] [2020-01-16 09:51:38,672] [INFO] [sample.sharding.kafka.UserEventsKafkaProcessor$] [KafkaToSharding-akka.actor.default-dispatcher-26] [akka://KafkaToSharding/user/kafka-event-processor] - Forwarding message for entity 29 to cluster sharding
[info] [2020-01-16 09:51:38,673] [INFO] [sample.sharding.kafka.UserEvents$] [KafkaToSharding-akka.actor.default-dispatcher-26] [akka://KafkaToSharding/system/sharding/user-processing/75/29] - user 29 purchase cat t-shirt, quantity 0, price 8874
[info] [2020-01-16 09:51:39,702] [INFO] [sample.sharding.kafka.UserEventsKafkaProcessor$] [KafkaToSharding-akka.actor.default-dispatcher-17] [akka://KafkaToSharding/user/kafka-event-processor] - entityId->partition 60->111
[info] [2020-01-16 09:51:39,703] [INFO] [sample.sharding.kafka.UserEventsKafkaProcessor$] [KafkaToSharding-akka.actor.default-dispatcher-17] [akka://KafkaToSharding/user/kafka-event-processor] - Forwarding message for entity 60 to cluster sharding
[info] [2020-01-16 09:51:39,706] [INFO] [sample.sharding.kafka.UserEvents$] [KafkaToSharding-akka.actor.default-dispatcher-17] [akka://KafkaToSharding/system/sharding/user-processing/2/60] - user 60 purchase cat t-shirt, quantity 2, price 9375
[info] [2020-01-16 09:51:40,732] [INFO] [sample.sharding.kafka.UserEventsKafkaProcessor$] [KafkaToSharding-akka.actor.default-dispatcher-17] [akka://KafkaToSharding/user/kafka-event-processor] - entityId->partition 75->1
[info] [2020-01-16 09:51:40,732] [INFO] [sample.sharding.kafka.UserEventsKafkaProcessor$] [KafkaToSharding-akka.actor.default-dispatcher-17] [akka://KafkaToSharding/user/kafka-event-processor] - Forwarding message for entity 75 to cluster sharding

Actual Behavior

[info] [2020-07-08 18:56:40,587] [INFO] [akka.actor.RepointableActorRef] [KafkaToSharding-akka.actor.default-dispatcher-5] [akka://KafkaToSharding/system/kafka-consumer-1] - Message [akka.kafka.internal.KafkaConsumerActor$Internal$StopFromStage] from Actor[akka://KafkaToSharding/system/Materializers/StreamSupervisor-0/$$a#1729317594] to Actor[akka://KafkaToSharding/system/kafka-consumer-1#1483160912] was not delivered. [1] dead letters encountered. If this is not an expected behavior then Actor[akka://KafkaToSharding/system/kafka-consumer-1#1483160912] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

Relevant logs

~/ScalaProjects/akka-sample-kafka-to-sharding-scala $ sbt "processor / run 2551 8551 8081"
[info] Loading global plugins from /home/eugene/.sbt/1.0/plugins
[info] Loading settings for project akka-sample-kafka-to-sharding-scala-build from plugins.sbt ...
[info] Loading project definition from /home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/project
[info] Loading settings for project akka-sample-kafka-to-sharding from build.sbt ...
[info] Set current project to akka-sample-kafka-to-sharding (in build file:/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/)
[info] Compiling 2 protobuf files to /home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/src_managed/main,/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/src_managed/main,/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/src_managed/main,/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/src_managed/main
[info] Compiling schema /home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/src/main/protobuf/users.proto
[info] Compiling schema /home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/src/main/protobuf/user-events.proto
protoc-jar: protoc version: 3.7.1, detected platform: linux-x86_64 (linux/amd64)
protoc-jar: embedded: bin/3.7.1/protoc-3.7.1-linux-x86_64.exe
protoc-jar: executing: [/tmp/protocjar5741868647544472609/bin/protoc.exe, --plugin=protoc-gen-scala_0=/tmp/protocbridge4871978644069516449, --plugin=protoc-gen-akka-grpc-scaladsl-trait_1=/tmp/protocbridge4453544094864461486, --plugin=protoc-gen-akka-grpc-scaladsl-client_2=/tmp/protocbridge2444707610667080398, --plugin=protoc-gen-akka-grpc-scaladsl-server_3=/tmp/protocbridge1023303769398452281, --scala_0_out=flat_package:/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/src_managed/main, --akka-grpc-scaladsl-trait_1_out=flat_package:/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/src_managed/main, --akka-grpc-scaladsl-client_2_out=flat_package:/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/src_managed/main, --akka-grpc-scaladsl-server_3_out=flat_package:/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/src_managed/main, -I/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/src/main/protobuf, -I/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/src/main/proto, -I/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/protobuf_external, -I/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/protobuf_external, -I/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/src/main/protobuf, -I/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/src/main/proto, -I/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/protobuf_external, /home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/src/main/protobuf/users.proto, /home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/src/main/protobuf/user-events.proto]le / protocGenerate 0s
/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/src/main/proto: warning: directory does not exist.
/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/protobuf_external: warning: directory does not exist.
/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/protobuf_external: warning: directory does not exist.
/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/src/main/proto: warning: directory does not exist.
/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/protobuf_external: warning: directory does not exist.
[info] Generating Akka gRPC service interface for sample.sharding.kafka.UserService
[info] Generating Akka gRPC client for sample.sharding.kafka.UserService
[info] Generating Akka gRPC service handler for sample.sharding.kafka.UserService
[info] Compiling protobuf
[info] Protoc target directory: /home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/src_managed/main
[info] Protoc target directory: /home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/src_managed/main
[info] Protoc target directory: /home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/src_managed/main
[info] Protoc target directory: /home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/src_managed/main
[info] Compiling 15 Scala sources to /home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/classes ...
[warn] /home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/src_managed/main/sample/sharding/kafka/UserServiceClient.scala:36:91: class ActorMaterializer in package stream is deprecated (since 2.6.0): The Materializer now has all methods the ActorMaterializer used to have
[warn] private val clientState = new ClientState(settings, akka.event.Logging(mat.asInstanceOf[ActorMaterializer].system, this.getClass))
[warn] ^
[warn] one warning found
[info] running (fork) sample.sharding.kafka.Main 2551 8551 8081
[info] [2020-07-08 18:55:32,397] [INFO] [akka.event.slf4j.Slf4jLogger] [KafkaToSharding-akka.actor.default-dispatcher-3] [] - Slf4jLogger started
[error] SLF4J: A number (1) of logging calls during the initialization phase have been intercepted and are
[error] SLF4J: now being replayed. These are subject to the filtering rules of the underlying logging system.
[error] SLF4J: See also http://www.slf4j.org/codes.html#replay
[info] [2020-07-08 18:55:32,610] [INFO] [akka.remote.artery.tcp.ArteryTcpTransport] [KafkaToSharding-akka.actor.default-dispatcher-3] [ArteryTcpTransport(akka://KafkaToSharding)] - Remoting started with transport [Artery tcp]; listening on address [akka://KafkaToSharding@127.0.0.1:2551] with UID [1857486658713966714]
[info] [2020-07-08 18:55:32,629] [INFO] [akka.cluster.Cluster] [KafkaToSharding-akka.actor.default-dispatcher-3] [Cluster(akka://KafkaToSharding)] - Cluster Node [akka://KafkaToSharding@127.0.0.1:2551] - Starting up, Akka version [2.6.6] ...
[info] [2020-07-08 18:55:32,739] [INFO] [akka.cluster.Cluster] [KafkaToSharding-akka.actor.default-dispatcher-3] [Cluster(akka://KafkaToSharding)] - Cluster Node [akka://KafkaToSharding@127.0.0.1:2551] - Registered cluster JMX MBean [akka:type=Cluster]
[info] [2020-07-08 18:55:32,739] [INFO] [akka.cluster.Cluster] [KafkaToSharding-akka.actor.default-dispatcher-3] [Cluster(akka://KafkaToSharding)] - Cluster Node [akka://KafkaToSharding@127.0.0.1:2551] - Started up successfully
[info] [2020-07-08 18:55:32,786] [INFO] [akka.cluster.sbr.SplitBrainResolver] [KafkaToSharding-akka.actor.default-dispatcher-14] [akka://KafkaToSharding/system/cluster/core/daemon/downingProvider] - SBR started. Config: stableAfter: 20000 ms, strategy: KeepMajority, selfUniqueAddress: UniqueAddress(akka://KafkaToSharding@127.0.0.1:2551,1857486658713966714), selfDc: default
[info] [2020-07-08 18:55:33,314] [INFO] [akka.management.internal.HealthChecksImpl] [KafkaToSharding-akka.actor.default-dispatcher-14] [HealthChecksImpl(akka://KafkaToSharding)] - Loading readiness checks List(NamedHealthCheck(cluster-membership,akka.management.cluster.scaladsl.ClusterMembershipCheck))
[info] [2020-07-08 18:55:33,315] [INFO] [akka.management.internal.HealthChecksImpl] [KafkaToSharding-akka.actor.default-dispatcher-14] [HealthChecksImpl(akka://KafkaToSharding)] - Loading liveness checks List()
[info] [2020-07-08 18:55:33,401] [WARN] [akka.stream.Materializer] [KafkaToSharding-akka.actor.default-dispatcher-14] [akka.stream.Log(akka://KafkaToSharding/system/Materializers/StreamSupervisor-1)] - [outbound connection to [akka://KafkaToSharding@127.0.0.1:2552], control stream] Upstream failed, cause: StreamTcpException: Tcp command [Connect(127.0.0.1:2552,None,List(),Some(5000 milliseconds),true)] failed because of java.net.ConnectException: Connection refused
[info] [2020-07-08 18:55:33,401] [WARN] [akka.stream.Materializer] [KafkaToSharding-akka.actor.default-dispatcher-14] [akka.stream.Log(akka://KafkaToSharding/system/Materializers/StreamSupervisor-1)] - [outbound connection to [akka://KafkaToSharding@127.0.0.1:2552], message stream] Upstream failed, cause: StreamTcpException: Tcp command [Connect(127.0.0.1:2552,None,List(),Some(5000 milliseconds),true)] failed because of java.net.ConnectException: Connection refused
[info] [2020-07-08 18:55:33,416] [INFO] [akka.management.scaladsl.AkkaManagement] [KafkaToSharding-akka.actor.default-dispatcher-14] [AkkaManagement(akka://KafkaToSharding)] - Binding Akka Management (HTTP) endpoint to: 127.0.0.1:8551
[info] [2020-07-08 18:55:33,479] [INFO] [akka.management.scaladsl.AkkaManagement] [KafkaToSharding-akka.actor.default-dispatcher-14] [AkkaManagement(akka://KafkaToSharding)] - Including HTTP management routes for ClusterHttpManagementRouteProvider
[info] [2020-07-08 18:55:33,521] [INFO] [akka.management.scaladsl.AkkaManagement] [KafkaToSharding-akka.actor.default-dispatcher-14] [AkkaManagement(akka://KafkaToSharding)] - Including HTTP management routes for HealthCheckRoutes
[info] [2020-07-08 18:55:34,053] [INFO] [akka.management.scaladsl.AkkaManagement] [KafkaToSharding-akka.actor.default-dispatcher-21] [AkkaManagement(akka://KafkaToSharding)] - Bound Akka Management (HTTP) endpoint to: 127.0.0.1:8551
[info] [2020-07-08 18:55:34,498] [INFO] [akka.actor.ActorSystemImpl] [KafkaToSharding-akka.actor.default-dispatcher-14] [akka.actor.ActorSystemImpl(KafkaToSharding)] - Retrieved 128 partitions for topic 'user-events'
[info] [2020-07-08 18:55:34,498] [INFO] [akka.actor.typed.ActorSystem] [KafkaToSharding-akka.actor.default-dispatcher-5] [] - Message extractor created. Initializing sharding
[info] [2020-07-08 18:55:34,520] [INFO] [akka.cluster.sharding.typed.scaladsl.ClusterSharding] [KafkaToSharding-akka.actor.default-dispatcher-14] [ClusterSharding(akka://KafkaToSharding)] - Starting Shard Region [user-processing]...
[info] [2020-07-08 18:55:34,549] [INFO] [sample.sharding.kafka.Main$] [KafkaToSharding-akka.actor.default-dispatcher-14] [akka://KafkaToSharding/user] - Sharding has started
[info] [2020-07-08 18:55:34,556] [INFO] [akka.cluster.sharding.ShardRegion] [KafkaToSharding-akka.actor.default-dispatcher-14] [akka://KafkaToSharding@127.0.0.1:2551/system/sharding/user-processing] - user-processing: Idle entities will be passivated after [2.000 min]
[info] [2020-07-08 18:55:37,891] [INFO] [akka.cluster.Cluster] [KafkaToSharding-akka.actor.default-dispatcher-5] [Cluster(akka://KafkaToSharding)] - Cluster Node [akka://KafkaToSharding@127.0.0.1:2551] - Node [akka://KafkaToSharding@127.0.0.1:2551] is JOINING itself (with roles [dc-default]) and forming new cluster
[info] [2020-07-08 18:55:37,892] [INFO] [akka.cluster.Cluster] [KafkaToSharding-akka.actor.default-dispatcher-5] [Cluster(akka://KafkaToSharding)] - Cluster Node [akka://KafkaToSharding@127.0.0.1:2551] - is the new leader among reachable nodes (more leaders may exist)
[info] [2020-07-08 18:55:37,898] [INFO] [akka.cluster.Cluster] [KafkaToSharding-akka.actor.default-dispatcher-5] [Cluster(akka://KafkaToSharding)] - Cluster Node [akka://KafkaToSharding@127.0.0.1:2551] - Leader is moving node [akka://KafkaToSharding@127.0.0.1:2551] to [Up]
[info] [2020-07-08 18:55:37,903] [INFO] [sample.sharding.kafka.Main$] [KafkaToSharding-akka.actor.default-dispatcher-5] [akka://KafkaToSharding/user] - Member has joined the cluster
[info] [2020-07-08 18:55:37,904] [INFO] [sample.sharding.kafka.Main$] [KafkaToSharding-akka.actor.default-dispatcher-5] [akka://KafkaToSharding/user] - Sharding started and joined cluster. Starting event processor
[info] [2020-07-08 18:55:37,908] [INFO] [akka.cluster.sbr.SplitBrainResolver] [KafkaToSharding-akka.actor.default-dispatcher-3] [akka://KafkaToSharding/system/cluster/core/daemon/downingProvider] - This node is now the leader responsible for taking SBR decisions among the reachable nodes (more leaders may exist).
[info] [2020-07-08 18:55:37,917] [INFO] [akka.cluster.singleton.ClusterSingletonManager] [KafkaToSharding-akka.actor.default-dispatcher-3] [akka://KafkaToSharding@127.0.0.1:2551/system/sharding/user-processingCoordinator] - Singleton manager starting singleton actor [akka://KafkaToSharding/system/sharding/user-processingCoordinator/singleton]
[info] [2020-07-08 18:55:37,920] [INFO] [akka.cluster.singleton.ClusterSingletonManager] [KafkaToSharding-akka.actor.default-dispatcher-3] [akka://KafkaToSharding@127.0.0.1:2551/system/sharding/user-processingCoordinator] - ClusterSingletonManager state change [Start -> Oldest]
[info] [2020-07-08 18:55:37,936] [INFO] [akka.cluster.sharding.DDataShardCoordinator] [KafkaToSharding-akka.actor.default-dispatcher-14] [akka://KafkaToSharding@127.0.0.1:2551/system/sharding/user-processingCoordinator/singleton/coordinator] - ShardCoordinator was moved to the active state State(Map())
[info] [2020-07-08 18:55:37,942] [INFO] [akka.kafka.internal.SingleSourceLogic] [KafkaToSharding-akka.actor.default-dispatcher-21] [SingleSourceLogic(akka://KafkaToSharding)] - [117ff] Starting. StageActor Actor[akka://KafkaToSharding/system/Materializers/StreamSupervisor-0/$$a#1729317594]
[info] [2020-07-08 18:55:42,312] [INFO] [akka.kafka.cluster.sharding.KafkaClusterSharding$RebalanceListener$] [KafkaToSharding-akka.actor.default-dispatcher-14] [akka://KafkaToSharding/system/kafka-cluster-sharding-rebalance-listener-user-processing] - Consumer group 'user-processing' assigned topic partitions to cluster member 'akka://KafkaToSharding@127.0.0.1:2551': [user-events-16,user-events-29,user-events-44,user-events-101,user-events-112,user-events-94,user-events-114,user-events-83,user-events-0,user-events-13,user-events-75,user-events-8,user-events-72,user-events-37,user-events-42,user-events-108,user-events-34,user-events-64,user-events-58,user-events-61,user-events-113,user-events-87,user-events-119,user-events-77,user-events-71,user-events-95,user-events-50,user-events-24,user-events-124,user-events-53,user-events-47,user-events-20,user-events-106,user-events-122,user-events-27,user-events-104,user-events-7,user-events-115,user-events-3,user-events-15,user-events-4,user-events-23,user-events-19,user-events-88,user-events-84,user-events-11,user-events-68,user-events-38,user-events-105,user-events-118,user-events-33,user-events-49,user-events-67,user-events-100,user-events-62,user-events-111,user-events-121,user-events-127,user-events-41,user-events-54,user-events-57,user-events-78,user-events-10,user-events-14,user-events-2,user-events-6,user-events-117,user-events-123,user-events-107,user-events-98,user-events-126,user-events-81,user-events-21,user-events-5,user-events-32,user-events-97,user-events-96,user-events-74,user-events-66,user-events-103,user-events-40,user-events-110,user-events-26,user-events-85,user-events-79,user-events-69,user-events-48,user-events-93,user-events-63,user-events-92,user-events-56,user-events-55,user-events-45,user-events-18,user-events-82,user-events-120,user-events-31,user-events-35,user-events-39,user-events-99,user-events-90,user-events-109,user-events-125,user-events-76,user-events-80,user-events-43,user-events-12,user-events-1,user-events-25,user-events-116,user-events-17,user-events-28,user-events-70,user-events-36,user-events-89,user-events-73,user-events-51,user-events-60,user-events-102,user-events-9,user-events-65,user-events-91,user-events-30,user-events-59,user-events-86,user-events-46,user-events-52,user-events-22]
[info] [2020-07-08 18:55:42,443] [ERROR] [akka.dispatch.Dispatcher] [KafkaToSharding-akka.actor.default-dispatcher-14] [akka.dispatch.Dispatcher] - Unsupported access to ActorContext from the outside of Actor[akka://KafkaToSharding/system/kafka-cluster-sharding-rebalance-listener-user-processing#-748606125]. No message is currently processed by the actor, but ActorContext was called from Thread[KafkaToSharding-akka.actor.default-dispatcher-14,5,main].
[info] java.lang.UnsupportedOperationException: Unsupported access to ActorContext from the outside of Actor[akka://KafkaToSharding/system/kafka-cluster-sharding-rebalance-listener-user-processing#-748606125]. No message is currently processed by the actor, but ActorContext was called from Thread[KafkaToSharding-akka.actor.default-dispatcher-14,5,main].
[info] at akka.actor.typed.internal.ActorContextImpl.checkCurrentActorThread(ActorContextImpl.scala:317)
[info] at akka.actor.typed.internal.ActorContextImpl.checkCurrentActorThread$(ActorContextImpl.scala:305)
[info] at akka.actor.typed.internal.adapter.ActorContextAdapter.checkCurrentActorThread(ActorContextAdapter.scala:49)
[info] at akka.actor.typed.internal.ActorContextImpl.log(ActorContextImpl.scala:161)
[info] at akka.actor.typed.internal.ActorContextImpl.log$(ActorContextImpl.scala:160)
[info] at akka.actor.typed.internal.adapter.ActorContextAdapter.log(ActorContextAdapter.scala:49)
[info] at akka.kafka.cluster.sharding.KafkaClusterSharding$RebalanceListener$.$anonfun$apply$4(KafkaClusterSharding.scala:314)
[info] at akka.kafka.cluster.sharding.KafkaClusterSharding$RebalanceListener$.$anonfun$apply$4$adapted(KafkaClusterSharding.scala:312)
[info] at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:447)
[info] at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:56)
[info] at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:93)
[info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
[info] at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:94)
[info] at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:93)
[info] at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:48)
[info] at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
[info] at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
[info] at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
[info] at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
[info] at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
[info] [2020-07-08 18:55:52,896] [WARN] [akka.remote.artery.Association] [KafkaToSharding-akka.actor.default-dispatcher-14] [Association(akka://KafkaToSharding)] - Outbound control stream to [akka://KafkaToSharding@127.0.0.1:2552] failed. Restarting it. akka.remote.artery.OutboundHandshake$HandshakeTimeoutException: Handshake with [akka://KafkaToSharding@127.0.0.1:2552] did not complete within 20000 ms
[info] [2020-07-08 18:56:40,565] [INFO] [akka.kafka.internal.SingleSourceLogic] [KafkaToSharding-akka.actor.default-dispatcher-5] [SingleSourceLogic(akka://KafkaToSharding)] - [117ff] Completing
[info] [2020-07-08 18:56:40,566] [INFO] [sample.sharding.kafka.UserEventsKafkaProcessor$] [KafkaToSharding-akka.actor.default-dispatcher-14] [akka://KafkaToSharding/user/kafka-event-processor] - Consumer stopped Failure(java.lang.UnsupportedOperationException: Unsupported access to ActorContext from the outside of Actor[akka://KafkaToSharding/user/kafka-event-processor#1200911144]. No message is currently processed by the actor, but ActorContext was called from Thread[KafkaToSharding-akka.actor.default-dispatcher-14,5,main].)
[info] [2020-07-08 18:56:40,569] [INFO] [akka.kafka.cluster.sharding.KafkaClusterSharding$RebalanceListener$] [KafkaToSharding-akka.actor.default-dispatcher-5] [akka://KafkaToSharding/system/kafka-cluster-sharding-rebalance-listener-user-processing] - Consumer group 'user-processing' revoked topic partitions from cluster member 'akka://KafkaToSharding@127.0.0.1:2551': [user-events-16,user-events-29,user-events-44,user-events-101,user-events-112,user-events-94,user-events-114,user-events-83,user-events-0,user-events-13,user-events-75,user-events-8,user-events-72,user-events-37,user-events-42,user-events-108,user-events-34,user-events-64,user-events-58,user-events-61,user-events-113,user-events-87,user-events-119,user-events-77,user-events-71,user-events-95,user-events-50,user-events-24,user-events-124,user-events-53,user-events-47,user-events-20,user-events-106,user-events-122,user-events-27,user-events-104,user-events-7,user-events-115,user-events-3,user-events-15,user-events-4,user-events-23,user-events-19,user-events-88,user-events-84,user-events-11,user-events-68,user-events-38,user-events-105,user-events-118,user-events-33,user-events-49,user-events-67,user-events-100,user-events-62,user-events-111,user-events-121,user-events-127,user-events-41,user-events-54,user-events-57,user-events-78,user-events-10,user-events-14,user-events-2,user-events-6,user-events-117,user-events-123,user-events-107,user-events-98,user-events-126,user-events-81,user-events-21,user-events-5,user-events-32,user-events-97,user-events-96,user-events-74,user-events-66,user-events-103,user-events-40,user-events-110,user-events-26,user-events-85,user-events-79,user-events-69,user-events-48,user-events-93,user-events-63,user-events-92,user-events-56,user-events-55,user-events-45,user-events-18,user-events-82,user-events-120,user-events-31,user-events-35,user-events-39,user-events-99,user-events-90,user-events-109,user-events-125,user-events-76,user-events-80,user-events-43,user-events-12,user-events-1,user-events-25,user-events-116,user-events-17,user-events-28,user-events-70,user-events-36,user-events-89,user-events-73,user-events-51,user-events-60,user-events-102,user-events-9,user-events-65,user-events-91,user-events-30,user-events-59,user-events-86,user-events-46,user-events-52,user-events-22]
[info] [2020-07-08 18:56:40,587] [INFO] [akka.actor.RepointableActorRef] [KafkaToSharding-akka.actor.default-dispatcher-5] [akka://KafkaToSharding/system/kafka-consumer-1] - Message [akka.kafka.internal.KafkaConsumerActor$Internal$StopFromStage] from Actor[akka://KafkaToSharding/system/Materializers/StreamSupervisor-0/$$a#1729317594] to Actor[akka://KafkaToSharding/system/kafka-consumer-1#1483160912] was not delivered. [1] dead letters encountered. If this is not an expected behavior then Actor[akka://KafkaToSharding/system/kafka-consumer-1#1483160912] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

Reproducible Test Case

Please provide a PR with a failing test.

If the issue is more complex or requires configuration, please provide a link to a project that reproduces the issue.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions