|
1 | 1 | package org.elasticmq.rest.sqs.aws |
2 | 2 |
|
| 3 | +import org.apache.pekko.actor.{ActorRef, ActorSystem, Props} |
| 4 | +import org.apache.pekko.util.Timeout |
| 5 | +import org.elasticmq.actor.QueueManagerActor |
| 6 | +import org.elasticmq.actor.queue.QueueEvent |
3 | 7 | import org.elasticmq.actor.reply._ |
4 | | -import org.elasticmq.persistence.sql.GetAllMessages |
5 | | -import org.elasticmq.rest.sqs.SqlQueuePersistenceServer |
| 8 | +import org.elasticmq.persistence.sql.{GetAllMessages, SqlQueuePersistenceActor, SqlQueuePersistenceConfig} |
6 | 9 | import org.elasticmq.rest.sqs.client._ |
7 | 10 | import org.elasticmq.rest.sqs.model.RedrivePolicy |
8 | 11 | import org.elasticmq.rest.sqs.model.RedrivePolicyJson.format |
9 | | -import org.elasticmq.util.Logging |
| 12 | +import org.elasticmq.rest.sqs.{SQSRestServer, SQSRestServerBuilder} |
| 13 | +import org.elasticmq.util.{Logging, NowProvider} |
| 14 | +import org.elasticmq.{NodeAddress, StrictSQSLimits} |
10 | 15 | import org.scalatest.BeforeAndAfter |
| 16 | +import org.scalatest.concurrent.ScalaFutures |
11 | 17 | import org.scalatest.funsuite.AnyFunSuite |
12 | 18 | import org.scalatest.matchers.should.Matchers |
| 19 | +import org.scalatest.time.{Seconds, Span} |
| 20 | +import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider} |
| 21 | +import software.amazon.awssdk.regions.Region |
| 22 | +import software.amazon.awssdk.services.sqs.{SqsClient => AwsSqsClient} |
13 | 23 | import spray.json.enrichAny |
14 | 24 |
|
| 25 | +import java.net.URI |
| 26 | +import scala.util.Try |
| 27 | + |
15 | 28 | class SqlQueuePersistenceTests |
16 | 29 | extends AnyFunSuite |
17 | | - with SqlQueuePersistenceServer |
| 30 | + with ScalaFutures |
18 | 31 | with BeforeAndAfter |
19 | 32 | with Matchers |
20 | 33 | with Logging { |
21 | 34 |
|
| 35 | + private val awsAccountId = "123456789012" |
| 36 | + private val awsRegion = "elasticmq" |
| 37 | + |
| 38 | + private val actorSystem: ActorSystem = ActorSystem("elasticmq-test-v2") |
| 39 | + private var strictServer: SQSRestServer = _ |
| 40 | + |
| 41 | + var clientV2: AwsSqsClient = _ |
| 42 | + var testClient: AwsSdkV2SqsClient = _ |
| 43 | + var store: ActorRef = _ |
| 44 | + |
| 45 | + implicit val timeout: Timeout = { |
| 46 | + import scala.concurrent.duration._ |
| 47 | + Timeout(5.seconds) |
| 48 | + } |
| 49 | + |
| 50 | + implicit val defaultPatience: PatienceConfig = PatienceConfig(timeout = Span(5, Seconds)) |
| 51 | + |
22 | 52 | val testQueueName = "testQueue1" |
23 | 53 | val deadLetterQueueName = "testDLQ1" |
24 | 54 |
|
25 | | - val awsAccountId = "123456789012" |
26 | | - val awsRegion = "elasticmq" |
| 55 | + def startServerAndRun(pruneDataOnInit: Boolean)(body: => Unit): Unit = { |
| 56 | + startServerAndSetupClient(pruneDataOnInit) |
| 57 | + try { |
| 58 | + body |
| 59 | + } finally { |
| 60 | + stopServerAndClient() |
| 61 | + } |
| 62 | + } |
| 63 | + |
| 64 | + private def startServerAndSetupClient(pruneDataOnInit: Boolean): Unit = { |
| 65 | + val persistenceConfig = SqlQueuePersistenceConfig( |
| 66 | + enabled = true, |
| 67 | + driverClass = "org.h2.Driver", |
| 68 | + uri = "jdbc:h2:./elasticmq-h2-v2", |
| 69 | + pruneDataOnInit = pruneDataOnInit |
| 70 | + ) |
| 71 | + |
| 72 | + store = actorSystem.actorOf(Props(new SqlQueuePersistenceActor(persistenceConfig, List.empty))) |
| 73 | + val manager = actorSystem.actorOf(Props(new QueueManagerActor(new NowProvider(), StrictSQSLimits, Some(store)))) |
| 74 | + |
| 75 | + strictServer = SQSRestServerBuilder |
| 76 | + .withActorSystem(actorSystem) |
| 77 | + .withQueueManagerActor(manager) |
| 78 | + .withPort(9323) // different port to avoid conflicts |
| 79 | + .withServerAddress(NodeAddress(port = 9323)) |
| 80 | + .withAWSAccountId(awsAccountId) |
| 81 | + .withAWSRegion(awsRegion) |
| 82 | + .start() |
| 83 | + |
| 84 | + (store ? QueueEvent.Restore(manager)).futureValue |
| 85 | + |
| 86 | + clientV2 = AwsSqsClient |
| 87 | + .builder() |
| 88 | + .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x"))) |
| 89 | + .region(Region.EU_CENTRAL_1) |
| 90 | + .endpointOverride(new URI("http://localhost:9323")) |
| 91 | + .build() |
| 92 | + |
| 93 | + testClient = new AwsSdkV2SqsClient(clientV2) |
| 94 | + } |
| 95 | + |
| 96 | + private def stopServerAndClient(): Unit = { |
| 97 | + if (clientV2 != null) clientV2.close() |
| 98 | + if (strictServer != null) Try(strictServer.stopAndWait()) |
| 99 | + } |
27 | 100 |
|
28 | 101 | test("should persist the messages and after restart read the messages") { |
29 | 102 | startServerAndRun(pruneDataOnInit = true) { |
|
0 commit comments