|
| 1 | +//> using toolkit default |
| 2 | +//> using dep "org.flinkextended::flink-scala-api:1.18.1_1.1.6" |
| 3 | +//> using dep "org.apache.flink:flink-clients:1.18.1" |
| 4 | +//> using dep org.apache.flink:flink-connector-gcp-pubsub:3.1.0-1.18 |
| 5 | + |
| 6 | +import org.apache.flink.api.common.serialization.{AbstractDeserializationSchema, SerializationSchema} |
| 7 | +import org.apache.flink.api.java.utils.ParameterTool |
| 8 | +import org.apache.flink.streaming.connectors.gcp.pubsub.{PubSubSink, PubSubSource} |
| 9 | +import org.apache.flinkx.api.* |
| 10 | +import org.apache.flinkx.api.serializers.* |
| 11 | +import upickle.default.* |
| 12 | + |
| 13 | +import java.time.temporal.ChronoUnit |
| 14 | +import java.time.{Instant, LocalDate} |
| 15 | + |
| 16 | +/** |
| 17 | + * This code is available in article version with more details of the |
| 18 | + * implementation: |
| 19 | + * https://dev.to/geazi_anc/data-engineering-with-scala-mastering-real-time-data-processing-with-apache-flink-and-google-pubsub-3b39 |
| 20 | + */ |
| 21 | + |
| 22 | +// model definitions |
| 23 | +final case class CreatedCustomer(fullName: String, birthDate: String) derives ReadWriter: |
| 24 | + def firstName: String = fullName.split(" ").head |
| 25 | + def lastName: String = fullName.split(" ").last |
| 26 | + def age: Int = ChronoUnit.YEARS.between(LocalDate.parse(birthDate), LocalDate.now()).toInt |
| 27 | + |
| 28 | +final case class RegisteredCustomer(firstName: String, lastName: String, age: Int, isActive: Boolean, createdAt: String) |
| 29 | + derives ReadWriter |
| 30 | + |
| 31 | +object RegisteredCustomer: |
| 32 | + def apply(firstName: String, lastName: String, age: Int): RegisteredCustomer = |
| 33 | + RegisteredCustomer(firstName, lastName, age, true, Instant.now().toString) |
| 34 | + |
| 35 | +// JSON serializers and deserializers |
| 36 | +class CreatedCustomerDeserializer extends AbstractDeserializationSchema[CreatedCustomer]: |
| 37 | + override def deserialize(message: Array[Byte]): CreatedCustomer = read[CreatedCustomer](new String(message, "UTF-8")) |
| 38 | + |
| 39 | +class RegisteredCustomerSerializer extends SerializationSchema[RegisteredCustomer]: |
| 40 | + override def serialize(element: RegisteredCustomer): Array[Byte] = |
| 41 | + write[RegisteredCustomer](element).getBytes("UTF-8") |
| 42 | + |
| 43 | +// main |
| 44 | +val parameters = ParameterTool.fromArgs(args) |
| 45 | +val projectName = parameters.get("project") |
| 46 | +val subscriptionName = parameters.get("subscription-name") |
| 47 | +val topicName = parameters.get("topic-name") |
| 48 | + |
| 49 | +val pubsubSource = PubSubSource |
| 50 | + .newBuilder() |
| 51 | + .withDeserializationSchema(new CreatedCustomerDeserializer()) |
| 52 | + .withProjectName(projectName) |
| 53 | + .withSubscriptionName(subscriptionName) |
| 54 | + .build() |
| 55 | +val pubsubSink = PubSubSink |
| 56 | + .newBuilder() |
| 57 | + .withSerializationSchema(new RegisteredCustomerSerializer()) |
| 58 | + .withProjectName(projectName) |
| 59 | + .withTopicName(topicName) |
| 60 | + .build() |
| 61 | + |
| 62 | +val env = StreamExecutionEnvironment.getExecutionEnvironment |
| 63 | +env.enableCheckpointing(1000L) |
| 64 | + |
| 65 | +env |
| 66 | + .addSource(pubsubSource) |
| 67 | + .map(cc => RegisteredCustomer(cc.firstName, cc.lastName, cc.age)) |
| 68 | + .map(rc => if rc.age >= 30 then rc.copy(isActive = false) else rc) |
| 69 | + .addSink(pubsubSink) |
| 70 | + |
| 71 | +env.execute("customerRegistering") |
0 commit comments