Skip to content

Commit c2aa609

Browse files
author
Sergii Kozlov
committed
Pass sourceQueueName via System Message Attribute for DeadLetterQueueSourceArn
1 parent a0b8c8f commit c2aa609

File tree

15 files changed

+43
-21
lines changed

15 files changed

+43
-21
lines changed

core/src/main/scala/org/elasticmq/MessageData.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ case class MessageData(
77
deliveryReceipt: Option[DeliveryReceipt],
88
content: String,
99
messageAttributes: Map[String, MessageAttribute],
10+
messageSystemAttributes: Map[String, MessageAttribute],
1011
nextDelivery: MillisNextDelivery,
1112
created: OffsetDateTime,
1213
statistics: MessageStatistics,

core/src/main/scala/org/elasticmq/NewMessageData.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package org.elasticmq
22

3+
import scala.collection.mutable
4+
35
case class NewMessageData(
46
id: Option[MessageId],
57
content: String,

core/src/main/scala/org/elasticmq/actor/queue/InternalMessage.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ case class InternalMessage(
1313
var nextDelivery: Long,
1414
content: String,
1515
messageAttributes: Map[String, MessageAttribute],
16-
messageSystemAttributes: Map[String, MessageAttribute],
16+
messageSystemAttributes: mutable.HashMap[String, MessageAttribute],
1717
created: OffsetDateTime,
1818
orderIndex: Int,
1919
var firstReceive: Received,
@@ -61,6 +61,7 @@ case class InternalMessage(
6161
deliveryReceipts.lastOption.map(DeliveryReceipt(_)),
6262
content,
6363
messageAttributes,
64+
messageSystemAttributes.to(Map),
6465
MillisNextDelivery(nextDelivery),
6566
created,
6667
MessageStatistics(firstReceive, receiveCount),
@@ -75,7 +76,7 @@ case class InternalMessage(
7576
Some(MessageId(id)),
7677
content,
7778
messageAttributes,
78-
messageSystemAttributes,
79+
messageSystemAttributes.to(Map),
7980
MillisNextDelivery(nextDelivery),
8081
messageGroupId,
8182
messageDeduplicationId,
@@ -97,7 +98,7 @@ object InternalMessage {
9798
newMessageData.nextDelivery.toMillis(now, queueData.delay.toMillis).millis,
9899
newMessageData.content,
99100
newMessageData.messageAttributes,
100-
newMessageData.messageSystemAttributes,
101+
newMessageData.messageSystemAttributes.to(mutable.HashMap),
101102
OffsetDateTime.now(),
102103
newMessageData.orderIndex,
103104
NeverReceived,

core/src/main/scala/org/elasticmq/actor/queue/QueueActorMessageOps.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ trait QueueActorMessageOps
3434
case DeleteMessage(deliveryReceipt) =>
3535
deleteMessage(deliveryReceipt).send()
3636
case LookupMessage(messageId) => messageQueue.getById(messageId.id).map(_.toMessageData)
37-
case MoveMessage(message, destination) => moveMessage(message, destination).send()
37+
case MoveMessage(message, destination, sourceQueueName) => moveMessage(message, destination, sourceQueueName).send()
3838
case DeduplicationIdsCleanup =>
3939
fifoMessagesHistory = fifoMessagesHistory.cleanOutdatedMessages(nowProvider)
4040
DoNotReply()

core/src/main/scala/org/elasticmq/actor/queue/operations/MoveMessageOps.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,14 @@ package org.elasticmq.actor.queue.operations
33
import org.elasticmq.actor.queue.{InternalMessage, QueueActorStorage, QueueEvent}
44
import org.elasticmq.msg.SendMessage
55
import org.elasticmq.util.Logging
6-
import org.elasticmq.{DeduplicationId, MoveDestination, MoveToDLQ}
6+
import org.elasticmq.{DeduplicationId, MoveDestination, MoveToDLQ, StringMessageAttribute}
77

88
trait MoveMessageOps extends Logging {
99
this: QueueActorStorage =>
1010

11-
def moveMessage(message: InternalMessage, destination: MoveDestination): ResultWithEvents[Unit] = {
11+
def moveMessage(message: InternalMessage, destination: MoveDestination, sourceQueueName: String): ResultWithEvents[Unit] = {
1212

13+
message.messageSystemAttributes.put("sourceQueueName", StringMessageAttribute(sourceQueueName))
1314
copyMessagesToActorRef.foreach { _ ! SendMessage(message.toNewMessageData) }
1415

1516
destination match {

core/src/main/scala/org/elasticmq/actor/queue/operations/ReceiveMessageOps.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ trait ReceiveMessageOps extends Logging {
8585
messageQueue.dequeue(count, deliveryTime).map { internalMessage =>
8686
if (queueData.deadLettersQueue.map(_.maxReceiveCount).exists(_ <= internalMessage.receiveCount)) {
8787
logger.debug(s"${queueData.name}: send message $internalMessage to dead letters actor $deadLettersActorRef")
88-
deadLettersActorRef.foreach(_ ! MoveMessage(internalMessage, MoveToDLQ))
88+
deadLettersActorRef.foreach(_ ! MoveMessage(internalMessage, MoveToDLQ, queueData.name))
8989
MessageToDelete(internalMessage)
9090
} else {
9191
MessageToReturn(internalMessage)

core/src/main/scala/org/elasticmq/msg/QueueMsg.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ case class GetQueueStatistics(deliveryTime: Long) extends QueueQueueMsg[QueueSta
4242
case class ClearQueue() extends QueueQueueMsg[Unit]
4343

4444
case class SendMessage(message: NewMessageData) extends QueueMessageMsg[MessageData]
45-
case class MoveMessage(message: InternalMessage, moveDestination: MoveDestination) extends QueueMessageMsg[Unit]
45+
case class MoveMessage(message: InternalMessage, moveDestination: MoveDestination, sourceQueueName: String) extends QueueMessageMsg[Unit]
4646
case class UpdateVisibilityTimeout(deliveryReceipt: DeliveryReceipt, visibilityTimeout: VisibilityTimeout)
4747
extends QueueMessageMsg[Either[InvalidReceiptHandle, Unit]]
4848
case class ReceiveMessages(

core/src/test/scala/org/elasticmq/FifoDeduplicationIdsHistoryTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ class FifoDeduplicationIdsHistoryTest extends AnyFunSuite with Matchers {
107107
nextDelivery = 100L,
108108
content = "",
109109
messageAttributes = Map.empty,
110-
messageSystemAttributes = Map.empty,
110+
messageSystemAttributes = mutable.HashMap.empty,
111111
created = created,
112112
orderIndex = 0,
113113
firstReceive = NeverReceived,

core/src/test/scala/org/elasticmq/actor/queue/InternalMessageSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ class InternalMessageSpec extends AnyFunSuite with Matchers {
1818
nextDelivery = 0L,
1919
content = "content",
2020
messageAttributes = Map.empty,
21-
messageSystemAttributes = Map.empty,
21+
messageSystemAttributes = mutable.HashMap.empty,
2222
created = freezedDateTime,
2323
orderIndex = 100,
2424
firstReceive = NeverReceived,
@@ -46,7 +46,7 @@ class InternalMessageSpec extends AnyFunSuite with Matchers {
4646
nextDelivery = 0L,
4747
content = "content",
4848
messageAttributes = Map.empty,
49-
messageSystemAttributes = Map.empty,
49+
messageSystemAttributes = mutable.HashMap.empty,
5050
created = freezedDateTime,
5151
orderIndex = 100,
5252
firstReceive = NeverReceived,

core/src/test/scala/org/elasticmq/actor/queue/ReceiveRequestAttemptCacheTest.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ class ReceiveRequestAttemptCacheTest extends AnyFunSuite with Matchers {
2323
1L,
2424
"content",
2525
Map.empty,
26-
Map.empty,
26+
mutable.HashMap.empty,
2727
nowProvider.now,
2828
orderIndex = 0,
2929
NeverReceived,
@@ -74,7 +74,7 @@ class ReceiveRequestAttemptCacheTest extends AnyFunSuite with Matchers {
7474
1L,
7575
"content",
7676
Map.empty,
77-
Map.empty,
77+
mutable.HashMap.empty,
7878
nowProvider.now,
7979
orderIndex = 0,
8080
NeverReceived,

0 commit comments

Comments
 (0)