Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
33af059
add support for full record (key+value) access
hpgrahsl Jun 11, 2025
e213dd9
add relocation to shadowJar task of example project
hpgrahsl Jun 12, 2025
50cf0c3
apply same changes to flink 1.18
hpgrahsl Jun 12, 2025
703be8d
apply same changes to flink 1.16
hpgrahsl Jun 12, 2025
0bd8270
apply same changes to flink 1.20
hpgrahsl Jun 12, 2025
98e53e8
adapt sql to work around limitations and make calcite parser in Flink…
hpgrahsl Jun 13, 2025
fff19e4
add necessary relocations for maven build of the example project
hpgrahsl Jun 13, 2025
f4306dd
remove some unused imports
hpgrahsl Jun 13, 2025
9dc2bac
introduce serialization key/value fields constraint checking
hpgrahsl Jun 16, 2025
8479124
adapt example jobs to reflect recent sdk changes
hpgrahsl Jun 16, 2025
d0d6773
apply recent sdk changes for Flink 1.18
hpgrahsl Jun 16, 2025
fc2480e
apply recent sdk changes for Flink 1.20
hpgrahsl Jun 16, 2025
84a14ab
apply recent sdk changes for Flink 1.16
hpgrahsl Jun 16, 2025
6146461
minor constructor refactoring
hpgrahsl Jun 16, 2025
cfd64f6
add test to example project showing how to validate POJO types for se…
hpgrahsl Jun 16, 2025
b60a864
follow method name convention for parameterized test
hpgrahsl Jun 16, 2025
5000f87
refactor and add building blocks for handling append vs. change streams
hpgrahsl Jun 18, 2025
93fb46d
refactor and add tests
hpgrahsl Jun 18, 2025
bba1f5b
adapt base types for example project
hpgrahsl Jun 18, 2025
643221b
consistent naming of things in tests
hpgrahsl Jun 18, 2025
5b386eb
refactor tests and use consistent namings
hpgrahsl Jun 18, 2025
bf897eb
add example job including a test for change stream processing
hpgrahsl Jun 18, 2025
73ffe08
add serializability related tests for used POJO types
hpgrahsl Jun 19, 2025
ded6dc5
clean up example jobs and use more consistent naming
hpgrahsl Jun 26, 2025
77f3d87
make sure to serialize record key as JSON only when key Class is set …
hpgrahsl Jun 26, 2025
2258364
add example jobs with tests to show how to work with mixed append str…
hpgrahsl Jun 26, 2025
03420d6
add explicit key checks for keyed append and change stream tests of e…
hpgrahsl Jun 26, 2025
2c2025c
use fixed xmx settings for both builds (maven+gradle)
hpgrahsl Jun 26, 2025
914ed1f
add explicit key checks for keyed append and change stream tests in t…
hpgrahsl Jun 26, 2025
0ab5aa5
apply latest code / test changes to flink version 1.20
hpgrahsl Jun 26, 2025
6c26814
apply latest code / test changes to flink version 1.18
hpgrahsl Jun 26, 2025
9f582c3
apply latest code / test changes to flink version 1.16
hpgrahsl Jun 26, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions examples/custom-pipelines-hello-world/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ plugins {
}

group = 'co.decodable.examples'
version = '0.2'
version = '0.3'

ext {
flinkVersion = '1.19.2'
kafkaConnectorVersion = '3.3.0-1.19'
log4jVersion = '2.17.1'
sdkVersion = '1.19.2-1.0.0.Beta9'
sdkVersion = '1.19.2-1.0.0-SNAPSHOT'
}

repositories {
Expand Down Expand Up @@ -116,6 +116,8 @@ shadowJar {
}

mergeServiceFiles()

relocate('org.apache.kafka.clients','org.apache.flink.kafka.shaded.org.apache.kafka.clients')
}

java {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,14 @@ spec:
type: JAVA

# When building with Maven
# job_file_path: target/custom-pipelines-hello-world-0.2.jar
# job_file_path: target/custom-pipelines-hello-world-0.3.jar
# When building with Gradle
job_file_path: build/libs/custom-pipelines-hello-world-0.2-all.jar
job_file_path: build/libs/custom-pipelines-hello-world-0.3-all.jar

# When running the DataStream API Job
entry_class: co.decodable.examples.cpdemo.DataStreamJob
entry_class: co.decodable.examples.cpdemo.KeylessDataStreamJob
# When running the Table API Job
# entry_class: co.decodable.examples.cpdemo.TableAPIJob
# entry_class: co.decodable.examples.cpdemo.KeylessTableAPIJob

properties:
flink_version: 1.19-java11
Expand Down
6 changes: 3 additions & 3 deletions examples/custom-pipelines-hello-world/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

<groupId>co.decodable.examples</groupId>
<artifactId>custom-pipelines-hello-world</artifactId>
<version>0.2</version>
<version>0.3</version>
<packaging>jar</packaging>

<name>Decodable Pipeline SDK Example</name>
Expand All @@ -25,7 +25,7 @@
<scala.binary.version>2.12</scala.binary.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<decodable.pipeline.sdk.version>1.19.2-1.0.0.Beta9</decodable.pipeline.sdk.version>
<decodable.pipeline.sdk.version>1.19.2-1.0.0-SNAPSHOT</decodable.pipeline.sdk.version>
<flink.kafka.connector.version>3.3.0-1.19</flink.kafka.connector.version>
<aws.msk.iam.auth.version>1.1.6</aws.msk.iam.auth.version>
<flink.version>1.19.2</flink.version>
Expand Down Expand Up @@ -182,7 +182,7 @@
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>co.decodable.examples.cpdemo.DataStreamJob</mainClass>
<mainClass>co.decodable.examples.cpdemo.KeylessDataStreamJob</mainClass>
</transformer>
</transformers>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* Copyright Decodable, Inc.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package co.decodable.examples.cpdemo;

import co.decodable.examples.cpdemo.model.KeyedPurchaseOrder;
import co.decodable.examples.cpdemo.model.PurchaseOrder;
import co.decodable.sdk.pipeline.DecodableStreamSink;
import co.decodable.sdk.pipeline.DecodableStreamSource;
import co.decodable.sdk.pipeline.metadata.SinkStreams;
import co.decodable.sdk.pipeline.metadata.SourceStreams;

import co.decodable.sdk.pipeline.serde.DecodableRecordDeserializationSchema;
import co.decodable.sdk.pipeline.serde.DecodableRecordSerializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import static co.decodable.examples.cpdemo.KeylessDataStreamJob.PURCHASE_ORDERS_PROCESSED_STREAM;
import static co.decodable.examples.cpdemo.KeylessDataStreamJob.PURCHASE_ORDERS_STREAM;

@SourceStreams(PURCHASE_ORDERS_STREAM)
@SinkStreams(PURCHASE_ORDERS_PROCESSED_STREAM)
public class KeyedDataStreamJob {

static final String PURCHASE_ORDERS_PROCESSED_STREAM = "purchase-orders-processed";
static final String PURCHASE_ORDERS_STREAM = "purchase-orders";

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DecodableStreamSource<KeyedPurchaseOrder> source =
DecodableStreamSource.<KeyedPurchaseOrder>builder()
.withStreamName(PURCHASE_ORDERS_STREAM)
.withRecordDeserializationSchema(new DecodableRecordDeserializationSchema<>(KeyedPurchaseOrder.class))
.build();

DecodableStreamSink<KeyedPurchaseOrder> sink =
DecodableStreamSink.<KeyedPurchaseOrder>builder()
.withStreamName(PURCHASE_ORDERS_PROCESSED_STREAM)
.withRecordSerializationSchema(new DecodableRecordSerializationSchema<>())
.build();

DataStream<KeyedPurchaseOrder> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(),
"[stream-purchase-orders] Purchase Orders Source")
.map(new NameConverter());

stream.sinkTo(sink)
.name("[stream-purchase-orders-processed] Purchase Orders Sink");

env.execute("Purchase Order Processor");
}

public static class NameConverter extends RichMapFunction<KeyedPurchaseOrder, KeyedPurchaseOrder> {

private static final long serialVersionUID = 1L;

private Counter recordsProcessed;

@Override
public void open(Configuration parameters) throws Exception {
recordsProcessed = getRuntimeContext()
.getMetricGroup()
.addGroup("DecodableMetrics")
.counter("recordsProcessed", new SimpleCounter());
}

@Override
public KeyedPurchaseOrder map(KeyedPurchaseOrder order) throws Exception {
var newOrderValue = new PurchaseOrder(
order.getValue().orderId,
order.getValue().orderDate,
order.getValue().customerName.toUpperCase(),
order.getValue().price,
order.getValue().productId,
order.getValue().orderStatus
);
recordsProcessed.inc();
return new KeyedPurchaseOrder(order.getKey(),newOrderValue);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* Copyright Decodable, Inc.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package co.decodable.examples.cpdemo;

import co.decodable.examples.cpdemo.model.KeyedPurchaseOrder;
import co.decodable.examples.cpdemo.model.OrderKey;
import co.decodable.examples.cpdemo.model.PurchaseOrder;
import co.decodable.sdk.pipeline.DecodableStreamSink;
import co.decodable.sdk.pipeline.DecodableStreamSource;
import co.decodable.sdk.pipeline.metadata.SinkStreams;
import co.decodable.sdk.pipeline.metadata.SourceStreams;
import co.decodable.sdk.pipeline.serde.DecodableRecordDeserializationSchema;
import co.decodable.sdk.pipeline.serde.DecodableRecordSerializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;

@SourceStreams(KeyedTableAPIJob.PURCHASE_ORDERS_STREAM)
@SinkStreams(KeyedTableAPIJob.PURCHASE_ORDERS_PROCESSED_STREAM)
public class KeyedTableAPIJob {
static final String PURCHASE_ORDERS_STREAM = "purchase-orders";
static final String PURCHASE_ORDERS_PROCESSED_STREAM = "purchase-orders-processed";

public static void main(String[] strings) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

DecodableStreamSource<KeyedPurchaseOrder> source =
DecodableStreamSource.<KeyedPurchaseOrder>builder()
.withStreamName(PURCHASE_ORDERS_STREAM)
.withRecordDeserializationSchema(new DecodableRecordDeserializationSchema<>(KeyedPurchaseOrder.class))
.build();

DecodableStreamSink<KeyedPurchaseOrder> sink =
DecodableStreamSink.<KeyedPurchaseOrder>builder()
.withStreamName(PURCHASE_ORDERS_PROCESSED_STREAM)
.withRecordSerializationSchema(new DecodableRecordSerializationSchema<>())
.build();

DataStream<KeyedPurchaseOrder> stream =
env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
"[stream-purchase-orders] Purchase Orders Source");

Table inputTable = tableEnv.fromDataStream(
stream,
Schema.newBuilder()
.column("key", DataTypes.STRUCTURED(
OrderKey.class,
DataTypes.FIELD("orderId",DataTypes.BIGINT()))
)
.column("value", DataTypes.STRUCTURED(
PurchaseOrder.class,
DataTypes.FIELD("orderId",DataTypes.BIGINT()),
DataTypes.FIELD("orderDate", DataTypes.STRING()),
DataTypes.FIELD("customerName", DataTypes.STRING()),
DataTypes.FIELD("price", DataTypes.DOUBLE()),
DataTypes.FIELD("productId", DataTypes.BIGINT()),
DataTypes.FIELD("orderStatus", DataTypes.BOOLEAN()))
)
.build()
);

tableEnv.createTemporaryView("purchase_orders", inputTable);

// register a UDF
tableEnv.createTemporarySystemFunction("upper_case", UpperCase.class);

Table resultTable =
tableEnv.sqlQuery(
"SELECT `key`, ROW(`value`.orderId, orderDate, upper_case(customerName), price, productId, orderStatus) as `value` FROM purchase_orders");

DataStream<KeyedPurchaseOrder> resultStream =
tableEnv.toDataStream(
resultTable,
DataTypes.STRUCTURED(
KeyedPurchaseOrder.class,
DataTypes.FIELD("key", DataTypes.STRUCTURED(
OrderKey.class,
DataTypes.FIELD("orderId",DataTypes.BIGINT()))
),
DataTypes.FIELD("value", DataTypes.STRUCTURED(
PurchaseOrder.class,
DataTypes.FIELD("orderId",DataTypes.BIGINT()),
DataTypes.FIELD("orderDate", DataTypes.STRING()),
DataTypes.FIELD("customerName", DataTypes.STRING()),
DataTypes.FIELD("price", DataTypes.DOUBLE()),
DataTypes.FIELD("productId", DataTypes.BIGINT()),
DataTypes.FIELD("orderStatus", DataTypes.BOOLEAN()))
)
)
);

resultStream.sinkTo(sink).name("[stream-purchase-orders-processed] Purchase Orders Sink");

env.execute("Purchase Order Processor");
}

// UDF
@FunctionHint(output = @DataTypeHint("STRING"))
public static class UpperCase extends ScalarFunction {
public UpperCase() {}

@SuppressWarnings("unused")
public String eval(String input) {
if (input == null) {
return null;
}
return input.toUpperCase();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@
*/
package co.decodable.examples.cpdemo;

import static co.decodable.examples.cpdemo.DataStreamJob.PURCHASE_ORDERS_PROCESSED_STREAM;
import static co.decodable.examples.cpdemo.DataStreamJob.PURCHASE_ORDERS_STREAM;
import static co.decodable.examples.cpdemo.KeylessDataStreamJob.PURCHASE_ORDERS_PROCESSED_STREAM;
import static co.decodable.examples.cpdemo.KeylessDataStreamJob.PURCHASE_ORDERS_STREAM;

import co.decodable.examples.cpdemo.model.KeylessPurchaseOrder;
import co.decodable.examples.cpdemo.model.PurchaseOrder;
import co.decodable.sdk.pipeline.serde.DecodableRecordDeserializationSchema;
import co.decodable.sdk.pipeline.serde.DecodableRecordSerializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
Expand All @@ -21,39 +25,34 @@
import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.flink.formats.json.JsonDeserializationSchema;
import org.apache.flink.formats.json.JsonSerializationSchema;

import co.decodable.sdk.pipeline.DecodableStreamSink;
import co.decodable.sdk.pipeline.DecodableStreamSource;
import co.decodable.sdk.pipeline.metadata.SinkStreams;
import co.decodable.sdk.pipeline.metadata.SourceStreams;

@SourceStreams(PURCHASE_ORDERS_STREAM)
@SinkStreams(PURCHASE_ORDERS_PROCESSED_STREAM)
public class DataStreamJob {
public class KeylessDataStreamJob {

static final String PURCHASE_ORDERS_PROCESSED_STREAM = "purchase-orders-processed";
static final String PURCHASE_ORDERS_STREAM = "purchase-orders";

static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DecodableStreamSource<PurchaseOrder> source =
DecodableStreamSource.<PurchaseOrder>builder()
DecodableStreamSource<KeylessPurchaseOrder> source =
DecodableStreamSource.<KeylessPurchaseOrder>builder()
.withStreamName(PURCHASE_ORDERS_STREAM)
.withDeserializationSchema(new JsonDeserializationSchema<>(PurchaseOrder.class, () -> OBJECT_MAPPER))
.withRecordDeserializationSchema(new DecodableRecordDeserializationSchema<>(KeylessPurchaseOrder.class))
.build();

DecodableStreamSink<PurchaseOrder> sink =
DecodableStreamSink.<PurchaseOrder>builder()
DecodableStreamSink<KeylessPurchaseOrder> sink =
DecodableStreamSink.<KeylessPurchaseOrder>builder()
.withStreamName(PURCHASE_ORDERS_PROCESSED_STREAM)
.withSerializationSchema(new JsonSerializationSchema<>(() -> OBJECT_MAPPER))
.withRecordSerializationSchema(new DecodableRecordSerializationSchema<>())
.build();

DataStream<PurchaseOrder> stream =
DataStream<KeylessPurchaseOrder> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(),
"[stream-purchase-orders] Purchase Orders Source")
.map(new NameConverter());
Expand All @@ -64,7 +63,7 @@ public static void main(String[] args) throws Exception {
env.execute("Purchase Order Processor");
}

public static class NameConverter extends RichMapFunction<PurchaseOrder, PurchaseOrder> {
public static class NameConverter extends RichMapFunction<KeylessPurchaseOrder, KeylessPurchaseOrder> {

private static final long serialVersionUID = 1L;

Expand All @@ -79,17 +78,17 @@ public void open(Configuration parameters) throws Exception {
}

@Override
public PurchaseOrder map(PurchaseOrder order) throws Exception {
public KeylessPurchaseOrder map(KeylessPurchaseOrder order) throws Exception {
var newOrder = new PurchaseOrder(
order.orderId,
order.orderDate,
order.customerName.toUpperCase(),
order.price,
order.productId,
order.orderStatus
order.getValue().orderId,
order.getValue().orderDate,
order.getValue().customerName.toUpperCase(),
order.getValue().price,
order.getValue().productId,
order.getValue().orderStatus
);
recordsProcessed.inc();
return newOrder;
return new KeylessPurchaseOrder(newOrder);
}
}
}
Loading
Loading