Skip to content

Commit 4de4199

Browse files
committed
feat: add schema auto example.
1 parent 8aef773 commit 4de4199

File tree

9 files changed

+219
-4
lines changed

9 files changed

+219
-4
lines changed
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
<component name="ProjectRunConfigurationManager">
2+
<configuration default="false" name="DynamicSink" type="Application" factoryName="Application" nameIsGenerated="true">
3+
<option name="INCLUDE_PROVIDED_SCOPE" value="true" />
4+
<option name="MAIN_CLASS_NAME" value="io.streamnative.flink.java.dynamic.DynamicSink" />
5+
<module name="flink-example" />
6+
<extension name="coverage">
7+
<pattern>
8+
<option name="PATTERN" value="io.streamnative.flink.java.dynamic.*" />
9+
<option name="ENABLED" value="true" />
10+
</pattern>
11+
</extension>
12+
<method v="2">
13+
<option name="Make" enabled="true" />
14+
</method>
15+
</configuration>
16+
</component>
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
<component name="ProjectRunConfigurationManager">
2+
<configuration default="false" name="DynamicSource" type="Application" factoryName="Application" nameIsGenerated="true">
3+
<option name="INCLUDE_PROVIDED_SCOPE" value="true" />
4+
<option name="MAIN_CLASS_NAME" value="io.streamnative.flink.java.dynamic.DynamicSource" />
5+
<module name="flink-example" />
6+
<extension name="coverage">
7+
<pattern>
8+
<option name="PATTERN" value="io.streamnative.flink.java.dynamic.*" />
9+
<option name="ENABLED" value="true" />
10+
</pattern>
11+
</extension>
12+
<method v="2">
13+
<option name="Make" enabled="true" />
14+
</method>
15+
</configuration>
16+
</component>
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
<component name="ProjectRunConfigurationManager">
2+
<configuration default="false" name="LoadEventSeDe" type="Application" factoryName="Application" nameIsGenerated="true">
3+
<option name="MAIN_CLASS_NAME" value="io.streamnative.flink.java.polymorphic.LoadEventSeDe" />
4+
<module name="flink-example" />
5+
<extension name="coverage">
6+
<pattern>
7+
<option name="PATTERN" value="io.streamnative.flink.java.polymorphic.*" />
8+
<option name="ENABLED" value="true" />
9+
</pattern>
10+
</extension>
11+
<method v="2">
12+
<option name="Make" enabled="true" />
13+
</method>
14+
</configuration>
15+
</component>

docker/create-topics.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,6 @@ pulsarctl tenants create sample --allowed-clusters="standalone"
2525
pulsarctl namespaces create sample/flink
2626
pulsarctl topics create sample/flink/simple-string 8
2727
pulsarctl topics create sample/flink/load-event 8
28+
pulsarctl topics create sample/flink/dynamic-load-event 8
2829
pulsarctl topics create sample/flink/user 4
2930
pulsarctl topics list sample/flink

src/main/java/io/streamnative/flink/java/SimpleSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public static void main(String[] args) throws Exception {
6060
.setConfig(fromMap(configs.sourceConfigs()))
6161
.build();
6262

63-
// Pulsar Source don't require extra TypeInformation be provided.
63+
// Pulsar Source doesn't require extra TypeInformation be provided.
6464
env.fromSource(pulsarSource, forBoundedOutOfOrderness(ofMinutes(5)), "pulsar-source")
6565
.print();
6666

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,70 @@
11
package io.streamnative.flink.java.dynamic;
22

33
import io.streamnative.flink.java.models.LoadCreatedEvent;
4+
import org.apache.flink.api.common.serialization.DeserializationSchema.InitializationContext;
45
import org.apache.flink.api.common.typeinfo.TypeInformation;
56
import org.apache.flink.api.common.typeinfo.Types;
7+
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
8+
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
69
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
710
import org.apache.flink.util.Collector;
811
import org.apache.pulsar.client.api.Message;
12+
import org.apache.pulsar.client.impl.PulsarClientImpl;
13+
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
14+
import org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvider;
15+
import org.apache.pulsar.common.naming.TopicName;
16+
import org.apache.pulsar.shade.org.apache.avro.generic.GenericRecord;
17+
18+
import java.util.List;
19+
20+
import static java.util.stream.Collectors.toList;
21+
import static org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient;
922

1023
/**
11-
* Query the schema by schema version.
24+
* Query the schema by schema version. This should only be applied to a topic with
25+
* {@code Schema.AUTO_PRODUCE(Schema.AVRO(Class))} producer.
1226
*/
1327
public class DynamicDeserializationSchema implements PulsarDeserializationSchema<LoadCreatedEvent> {
1428
private static final long serialVersionUID = 3320218454364912622L;
1529

30+
private transient PulsarClientImpl client;
31+
32+
// No need to use the concurrent hash map, flink is thread safe here.
33+
private transient AutoConsumeSchema schema;
34+
1635
@Override
17-
public void deserialize(Message<byte[]> message, Collector<LoadCreatedEvent> collector) throws Exception {
36+
@SuppressWarnings("unchecked")
37+
public void deserialize(Message<byte[]> message, Collector<LoadCreatedEvent> collector) {
38+
GenericRecord record = decode(message);
1839

40+
// Convert this GenericRecord to your class instance.
41+
// I only add a quite simple demo here which shouldn't be used in the production code.
42+
if (record.hasField("createdAction")) {
43+
String action = record.get("createdAction").toString();
44+
List<String> messages = ((List<?>) record.get("messages")).stream().map(Object::toString).collect(toList());
45+
collector.collect(new LoadCreatedEvent().setCreatedAction(action).setMessages(messages));
46+
}
47+
}
48+
49+
private GenericRecord decode(Message<byte[]> message) {
50+
if (schema == null) {
51+
this.schema = new AutoConsumeSchema();
52+
53+
// Set schema info provider.
54+
String topicName = TopicNameUtils.topicName(message.getTopicName());
55+
schema.setSchemaInfoProvider(new MultiVersionSchemaInfoProvider(TopicName.get(topicName), client));
56+
}
57+
58+
return (GenericRecord) schema.decode(message.getData(), message.getSchemaVersion()).getNativeObject();
1959
}
2060

2161
@Override
2262
public TypeInformation<LoadCreatedEvent> getProducedType() {
2363
return Types.POJO(LoadCreatedEvent.class);
2464
}
65+
66+
@Override
67+
public void open(InitializationContext context, SourceConfiguration configuration) {
68+
this.client = (PulsarClientImpl) createClient(configuration);
69+
}
2570
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package io.streamnative.flink.java.dynamic;
2+
3+
import io.streamnative.flink.java.config.ApplicationConfigs;
4+
import io.streamnative.flink.java.generator.RandomLoadEventGenerator;
5+
import io.streamnative.flink.java.models.LoadEvent;
6+
import org.apache.pulsar.client.api.Producer;
7+
import org.apache.pulsar.client.api.PulsarClient;
8+
import org.apache.pulsar.client.api.PulsarClientException;
9+
10+
import java.util.HashMap;
11+
import java.util.Map;
12+
13+
import static io.streamnative.flink.java.config.ApplicationConfigs.loadConfig;
14+
import static org.apache.pulsar.client.api.Schema.AVRO;
15+
16+
/**
17+
* This sink will use Pulsar's {@code Schema.AVRO} to serialize the messages into a topic.
18+
* The read serializer is {@code Schema.AUTO}, Pulsar only supports avro and json in such way.
19+
*/
20+
public class DynamicSink {
21+
22+
private static final Map<Class<?>, Producer<?>> producers = new HashMap<>();
23+
24+
public static void main(String[] args) throws Exception {
25+
// Load application configs.
26+
ApplicationConfigs configs = loadConfig(args);
27+
28+
// Create a fake source.
29+
RandomLoadEventGenerator generator = new RandomLoadEventGenerator();
30+
generator.open(null);
31+
32+
// Create Pulsar Producer.
33+
PulsarClient client = PulsarClient.builder().serviceUrl(configs.serviceUrl()).build();
34+
35+
while (true) {
36+
LoadEvent event = generator.generate();
37+
Producer producer = getProducer(client, event);
38+
producer.newMessage().value(event).send();
39+
}
40+
}
41+
42+
private static Producer<?> getProducer(PulsarClient client, LoadEvent event) throws PulsarClientException {
43+
Class<?> clazz = event.getClass();
44+
Producer<?> producer = producers.get(clazz);
45+
46+
if (producer == null) {
47+
producer = client.newProducer(AVRO(clazz))
48+
.topic("persistent://sample/flink/dynamic-load-event")
49+
.create();
50+
producers.put(clazz, producer);
51+
}
52+
53+
return producer;
54+
}
55+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package io.streamnative.flink.java.dynamic;
20+
21+
import io.streamnative.flink.java.config.ApplicationConfigs;
22+
import io.streamnative.flink.java.models.LoadCreatedEvent;
23+
import org.apache.flink.connector.pulsar.source.PulsarSource;
24+
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
25+
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
26+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
27+
28+
import static io.streamnative.flink.java.common.EnvironmentUtils.createEnvironment;
29+
import static io.streamnative.flink.java.config.ApplicationConfigs.loadConfig;
30+
import static java.time.Duration.ofMinutes;
31+
import static org.apache.flink.api.common.eventtime.WatermarkStrategy.forBoundedOutOfOrderness;
32+
import static org.apache.flink.configuration.Configuration.fromMap;
33+
import static org.apache.pulsar.client.api.SubscriptionType.Failover;
34+
35+
/**
36+
* This example is used for consuming load event from Pulsar with different sub event class.
37+
*/
38+
public class DynamicSource {
39+
40+
public static void main(String[] args) throws Exception {
41+
// Load application configs.
42+
ApplicationConfigs configs = loadConfig(args);
43+
44+
// Create execution environment
45+
StreamExecutionEnvironment env = createEnvironment(configs);
46+
47+
// Create a Pulsar source, it would consume messages from Pulsar on "sample/flink/simple-string" topic.
48+
PulsarSource<LoadCreatedEvent> pulsarSource = PulsarSource.builder()
49+
.setServiceUrl(configs.serviceUrl())
50+
.setAdminUrl(configs.adminUrl())
51+
.setStartCursor(StartCursor.earliest())
52+
.setUnboundedStopCursor(StopCursor.never())
53+
.setTopics("persistent://sample/flink/dynamic-load-event")
54+
.setDeserializationSchema(new DynamicDeserializationSchema())
55+
.setSubscriptionName("flink-source")
56+
.setConsumerName("flink-source-%s")
57+
.setSubscriptionType(Failover)
58+
.setConfig(fromMap(configs.sourceConfigs()))
59+
.build();
60+
61+
// Pulsar Source doesn't require extra TypeInformation be provided.
62+
env.fromSource(pulsarSource, forBoundedOutOfOrderness(ofMinutes(5)), "pulsar-source")
63+
.print();
64+
65+
env.execute("Load Event Pulsar Source");
66+
}
67+
}

src/main/java/io/streamnative/flink/java/polymorphic/LoadEventSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public static void main(String[] args) throws Exception {
6060
.setConfig(fromMap(configs.sourceConfigs()))
6161
.build();
6262

63-
// Pulsar Source don't require extra TypeInformation be provided.
63+
// Pulsar Source doesn't require extra TypeInformation be provided.
6464
env.fromSource(pulsarSource, forBoundedOutOfOrderness(ofMinutes(5)), "pulsar-source")
6565
.flatMap(new LoadEventFilterFunction())
6666
.print();

0 commit comments

Comments
 (0)