Skip to content

Commit eddecfa

Browse files
author
Neelab Chaudhuri
committed
new sqs dispatcher
1 parent 606f17e commit eddecfa

File tree

3 files changed

+108
-0
lines changed

3 files changed

+108
-0
lines changed

core/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,11 @@
8484
<artifactId>snakeyaml</artifactId>
8585
<version>1.33</version>
8686
</dependency>
87+
<dependency>
88+
<groupId>software.amazon.awssdk</groupId>
89+
<artifactId>sqs</artifactId>
90+
<version>2.20.18</version>
91+
</dependency>
8792

8893
<!-- dependencies shaded by us-->
8994

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package za.co.absa.spline.harvester.dispatcher.sqsdispatcher
2+
3+
import org.apache.commons.configuration.Configuration
4+
import za.co.absa.commons.config.ConfigurationImplicits._
5+
import za.co.absa.commons.version.Version
6+
import za.co.absa.spline.harvester.dispatcher.sqsdispatcher.SqsLineageDispatcherConfig._
7+
8+
import java.time.Duration
9+
import java.time.temporal.ChronoUnit
10+
11+
object SqsLineageDispatcherConfig {
12+
val QueueUrl = "queue.url"
13+
val ApiVersion = "apiVersion"
14+
15+
def apply(c: Configuration) = new SqsLineageDispatcherConfig(c)
16+
}
17+
18+
class SqsLineageDispatcherConfig(config: Configuration) {
19+
val queueUrl: String = config.getRequiredString(QueueUrl)
20+
val apiVersion: Version = Version.asSimple(config.getString(ApiVersion, "1.2"))
21+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package za.co.absa.spline.harvester.dispatcher.sqsdispatcher
2+
3+
import org.apache.commons.configuration.Configuration
4+
import org.apache.spark.internal.Logging
5+
import software.amazon.awssdk.services.sqs.SqsClient
6+
import software.amazon.awssdk.services.sqs.model.SendMessageRequest
7+
import za.co.absa.commons.version.Version
8+
import za.co.absa.spline.harvester.dispatcher.LineageDispatcher
9+
import za.co.absa.spline.harvester.dispatcher.modelmapper.ModelMapper
10+
import za.co.absa.spline.producer.model.{ExecutionEvent, ExecutionPlan}
11+
12+
class SqsLineageDispatcherImpl(sqsClient: SqsClient,
13+
sqsUrl: String,
14+
apiVersion: Version) extends LineageDispatcher with Logging {
15+
import za.co.absa.spline.harvester.json.HarvesterJsonSerDe.impl._
16+
def this(dispatcherConfig: SqsLineageDispatcherConfig) = this(
17+
SqsLineageDispatcherImpl.createSqsClient(dispatcherConfig),
18+
dispatcherConfig.queueUrl,
19+
dispatcherConfig.apiVersion
20+
)
21+
22+
def this(configuration: Configuration) = this(new SqsLineageDispatcherConfig(configuration))
23+
24+
override def name = "Sqs"
25+
26+
logInfo(s"Using Producer API version: ${apiVersion.asString}")
27+
logInfo(s"Sqs url: $sqsUrl")
28+
29+
private val modelMapper = ModelMapper.forApiVersion(apiVersion)
30+
31+
private var cachedPlan: ExecutionPlan = _
32+
33+
override def send(plan: ExecutionPlan): Unit = {
34+
cachedPlan = plan
35+
}
36+
37+
override def send(event: ExecutionEvent): Unit = {
38+
assert(cachedPlan != null)
39+
val plan = cachedPlan
40+
for {
41+
execPlanDTO <- modelMapper.toDTO(plan)
42+
eventDTO <- modelMapper.toDTO(event)
43+
} {
44+
val jsonPlan = execPlanDTO.toJson
45+
val jsonEvent = eventDTO.toJson
46+
val json =
47+
s"""
48+
| {
49+
| "plan": $jsonPlan,
50+
| "event": $jsonEvent
51+
| }
52+
|""".stripMargin
53+
sendToSqs(json)
54+
}
55+
}
56+
57+
private def sendToSqs(json: String,
58+
objectType: String = "Spline"): Unit = {
59+
val body =
60+
s"""
61+
| { "requestType": "SparkJobRunInfo",
62+
| "objectType": "$objectType",
63+
| "body": $json
64+
| }
65+
|""".stripMargin
66+
val sendMsgRequest = SendMessageRequest.builder()
67+
.queueUrl(sqsUrl)
68+
.messageBody(body)
69+
.build()
70+
sqsClient.sendMessage(sendMsgRequest)
71+
}
72+
}
73+
74+
75+
object SqsLineageDispatcherImpl extends Logging {
76+
77+
private def createSqsClient(config: SqsLineageDispatcherConfig): SqsClient = {
78+
SqsClient
79+
.builder()
80+
.build()
81+
}
82+
}

0 commit comments

Comments
 (0)