Skip to content

Commit 1017ae2

Browse files
authored
add streaming job example with google pub/sub connector (#165)
add streaming job example with google pub/sub connector and custom JSON serializers and deserializers
1 parent 4ec56de commit 1017ae2

File tree

2 files changed

+70
-0
lines changed

2 files changed

+70
-0
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ We suggest to remove the official `flink-scala` and `flink-streaming-scala` depe
5555
There is a wide range of [code examples](https://github.com/flink-extended/flink-scala-api/tree/master/modules/examples) to introduce you to flink-scala-api, both using Scala scripts and multimodule applications. These examples include:
5656

5757
- Flink jobs built using Scala 3 with Ammonite and Scala CLI;
58+
- Streaming job using Google Pub/Sub and JSON serialiser;
5859
- A complete application for fraud detection;
5960
- Examples using Datastream and Table APIs;
6061
- Simple job developed interactively via Jupyter notebooks;
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
//> using toolkit default
2+
//> using dep "org.flinkextended::flink-scala-api:1.18.1_1.1.6"
3+
//> using dep "org.apache.flink:flink-clients:1.18.1"
4+
//> using dep org.apache.flink:flink-connector-gcp-pubsub:3.1.0-1.18
5+
6+
import org.apache.flink.api.common.serialization.{AbstractDeserializationSchema, SerializationSchema}
7+
import org.apache.flink.api.java.utils.ParameterTool
8+
import org.apache.flink.streaming.connectors.gcp.pubsub.{PubSubSink, PubSubSource}
9+
import org.apache.flinkx.api.*
10+
import org.apache.flinkx.api.serializers.*
11+
import upickle.default.*
12+
13+
import java.time.temporal.ChronoUnit
14+
import java.time.{Instant, LocalDate}
15+
16+
/** This code is available in article version with more details of the implementation:
17+
* https://dev.to/geazi_anc/data-engineering-with-scala-mastering-real-time-data-processing-with-apache-flink-and-google-pubsub-3b39
18+
*/
19+
20+
// model definitions
21+
final case class CreatedCustomer(fullName: String, birthDate: String) derives ReadWriter:
22+
val firstName = fullName.split(" ").head
23+
val lastName = fullName.split(" ").last
24+
val age = ChronoUnit.YEARS.between(LocalDate.parse(birthDate), LocalDate.now()).toInt
25+
26+
final case class RegisteredCustomer(firstName: String, lastName: String, age: Int, isActive: Boolean, createdAt: String)
27+
derives ReadWriter
28+
29+
object RegisteredCustomer:
30+
def apply(firstName: String, lastName: String, age: Int): RegisteredCustomer =
31+
RegisteredCustomer(firstName, lastName, age, true, Instant.now().toString)
32+
33+
// JSON serializers and deserializers
34+
class CreatedCustomerDeserializer extends AbstractDeserializationSchema[CreatedCustomer]:
35+
override def deserialize(message: Array[Byte]): CreatedCustomer = read[CreatedCustomer](new String(message, "UTF-8"))
36+
37+
class RegisteredCustomerSerializer extends SerializationSchema[RegisteredCustomer]:
38+
override def serialize(element: RegisteredCustomer): Array[Byte] =
39+
write[RegisteredCustomer](element).getBytes("UTF-8")
40+
41+
// main
42+
val parameters = ParameterTool.fromArgs(args)
43+
val projectName = parameters.get("project")
44+
val subscriptionName = parameters.get("subscription-name")
45+
val topicName = parameters.get("topic-name")
46+
47+
val pubsubSource = PubSubSource
48+
.newBuilder()
49+
.withDeserializationSchema(new CreatedCustomerDeserializer())
50+
.withProjectName(projectName)
51+
.withSubscriptionName(subscriptionName)
52+
.build()
53+
val pubsubSink = PubSubSink
54+
.newBuilder()
55+
.withSerializationSchema(new RegisteredCustomerSerializer())
56+
.withProjectName(projectName)
57+
.withTopicName(topicName)
58+
.build()
59+
60+
val env = StreamExecutionEnvironment.getExecutionEnvironment
61+
env.enableCheckpointing(1000L)
62+
63+
env
64+
.addSource(pubsubSource)
65+
.map(cc => RegisteredCustomer(cc.firstName, cc.lastName, cc.age))
66+
.map(rc => if rc.age >= 30 then rc.copy(isActive = false) else rc)
67+
.addSink(pubsubSink)
68+
69+
env.execute("customerRegistering")

0 commit comments

Comments
 (0)