Skip to content

Channel Actor not Stopping #60

@pietervanlillsi

Description

@pietervanlillsi
[ERROR] [12/11/2019 12:08:55.034] [fixture-cluster-akka.actor.default-dispatcher-4] [akka.tcp://[email protected]:2552/user/mq-connection/{queueName}] close com.rabbitmq.client.AlreadyClosedException: chan
nel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue '{queueName}' in vhost '{vhost}', class-id=50, method-id=20)

I have an interesting flow that I am trying to implement.

Our business requires using one connection per VM to connect to RabbitMQ. What I have Implemented is the following flow.

Call our API -> GET queueLink/name -> have a single connection actor shared on the actorsystem -> spawn an actor in the cluster to handle the data events -> spawn a MQSubscriber for the data actor -> Connect the Subscriber which Heartbeats between Data event and Subscriber -> On connection failure, suspend data events and wait for reconnect -> on reconnect obtain new queueLink/name and use the MQSubscriber to connect.

The issue I am having is the old channel actor is not dying when the connection drops and I cannot find a way to kill it. I do not care about keeping it around since our queueNames are generated dynamically. I need a way to kill the channel actor when the connection drops.

class MQSubscriber(receiver: ActorRef)(implicit val actorSystem: ActorSystem) extends Actor with ActorSystemLogging {
  private val exchange                                    = "amq.fanout"
  implicit val executionContext: ExecutionContextExecutor = context.dispatcher
  import context.become

  private def setupSubscriber(channel: Channel, self: ActorRef, queueName: String) {
    channel.queueBind(queueName, exchange, "")

    def fromBytes(x: Array[Byte]) = new String(x)

    val consumer = new DefaultConsumer(channel) {
      override def handleDelivery(consumerTag: String, envelope: Envelope, properties: BasicProperties, body: Array[Byte]) {
        receiver ! MQMessage(envelope.getDeliveryTag.toString, fromBytes(body))
        channel.basicAck(envelope.getDeliveryTag, false)
        super.handleDelivery(consumerTag, envelope, properties, body)
      }

      override def handleCancel(consumerTag: String): Unit = {
        receiver ! MQEvents.UnexpectedDisconnect
        super.handleCancel(consumerTag)
      }

      override def handleShutdownSignal(consumerTag: String, sig: ShutdownSignalException): Unit = {
        receiver ! MQEvents.UnexpectedDisconnect
        super.handleShutdownSignal(consumerTag, sig)
      }
    }
    channel.basicConsume(queueName, false, consumer)
    receiver ! MQEvents.Connected
  }

  def checkHeartBeat(connection: Option[ActorRef], heartbeatTime: Int)(implicit ex: ExecutionContext): Future[MQResponse] = {
    implicit val timeout: Timeout = Timeout(100.milliseconds)
    scheduleHeartBeat(heartbeatTime)
    connection match {
      case None =>
        log.error(s"No Connection found yet")
        Future.successful(MQEvents.NoConnection)
      case Some(connection) =>
        (connection ? ConnectionActor.GetState)
          .map {
            case ConnectionActor.Disconnected =>
              log.error(s"Connection $connection is Disconnected")
              MQEvents.Disconnected
            case ConnectionActor.Connected => MQEvents.HeartBeat
          }
          .recover {
            case _: AskTimeoutException | _: ActorNotFound =>
              log.error(s"Connection ${connection.path.name} Actor Timeout or not found!!!")
              MQEvents.Lost
          }
    }
  }

  def scheduleHeartBeat(heartbeatTime: Int): Cancellable =
    context.system.scheduler.scheduleOnce(FiniteDuration(heartbeatTime, SECONDS), self, MQEvents.CheckHeartBeat)

  override def receive: Receive = process(None, 1, "")

  def process(connection: Option[ActorRef], heartbeatTime: Int, queueName: String): Receive = {
    case MQSubscriber.Start(queueName, connection) =>
      connection ! CreateChannel(ChannelActor.props((channel, actor) => {
        setupSubscriber(channel, actor, queueName)
      }), Some(queueName))
      become(process(Option(connection), 1, queueName))
      scheduleHeartBeat(heartbeatTime)
    case MQSubscriber.Stop       => self ! PoisonPill
    case MQEvents.CheckHeartBeat => checkHeartBeat(connection, heartbeatTime).map(receiver ! _)
    case MQEvents.StartCircuitBreaker =>
      become(process(connection, 2, queueName))
    case MQEvents.CloseCircuitBreaker =>
      become(process(connection, 1, queueName))
    case _ =>
  }
}

object MQSubscriber {
  case class Start(queueName: String, connection: ActorRef)
  case object Stop
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions