@@ -13,17 +13,15 @@ import upickle.default.*
1313import java .time .temporal .ChronoUnit
1414import java .time .{Instant , LocalDate }
1515
16- /**
17- * This code is available in article version with more details of the
18- * implementation:
19- * https://dev.to/geazi_anc/data-engineering-with-scala-mastering-real-time-data-processing-with-apache-flink-and-google-pubsub-3b39
20- */
16+ /** This code is available in article version with more details of the implementation:
17+ * https://dev.to/geazi_anc/data-engineering-with-scala-mastering-real-time-data-processing-with-apache-flink-and-google-pubsub-3b39
18+ */
2119
2220// model definitions
2321final case class CreatedCustomer (fullName : String , birthDate : String ) derives ReadWriter :
24- def firstName : String = fullName.split(" " ).head
25- def lastName : String = fullName.split(" " ).last
26- def age : Int = ChronoUnit .YEARS .between(LocalDate .parse(birthDate), LocalDate .now()).toInt
22+ val firstName = fullName.split(" " ).head
23+ val lastName = fullName.split(" " ).last
24+ val age = ChronoUnit .YEARS .between(LocalDate .parse(birthDate), LocalDate .now()).toInt
2725
2826final case class RegisteredCustomer (firstName : String , lastName : String , age : Int , isActive : Boolean , createdAt : String )
2927 derives ReadWriter
@@ -52,7 +50,7 @@ val pubsubSource = PubSubSource
5250 .withProjectName(projectName)
5351 .withSubscriptionName(subscriptionName)
5452 .build()
55- val pubsubSink = PubSubSink
53+ val pubsubSink = PubSubSink
5654 .newBuilder()
5755 .withSerializationSchema(new RegisteredCustomerSerializer ())
5856 .withProjectName(projectName)
0 commit comments