Skip to content

Commit 8aef773

Browse files
committed
feat: convert the load event into interface.
1 parent fb60357 commit 8aef773

File tree

8 files changed

+69
-47
lines changed

8 files changed

+69
-47
lines changed

docker-compose.yml

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,3 @@ services:
3232
- ./docker/bootstrap.sh:/pulsar/bin/bootstrap.sh
3333
restart: always
3434
command: ["/pulsar/bin/bootstrap.sh"]
35-
36-
flink:
37-
image: streamnative/pulsar-flink:1.16.0.0
38-
container_name: Flink
39-
command: /bin/bash
40-
depends_on:
41-
- pulsar
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package io.streamnative.flink.java.dynamic;
2+
3+
import io.streamnative.flink.java.models.LoadCreatedEvent;
4+
import org.apache.flink.api.common.typeinfo.TypeInformation;
5+
import org.apache.flink.api.common.typeinfo.Types;
6+
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
7+
import org.apache.flink.util.Collector;
8+
import org.apache.pulsar.client.api.Message;
9+
10+
/**
11+
* Query the schema by schema version.
12+
*/
13+
public class DynamicDeserializationSchema implements PulsarDeserializationSchema<LoadCreatedEvent> {
14+
private static final long serialVersionUID = 3320218454364912622L;
15+
16+
@Override
17+
public void deserialize(Message<byte[]> message, Collector<LoadCreatedEvent> collector) throws Exception {
18+
19+
}
20+
21+
@Override
22+
public TypeInformation<LoadCreatedEvent> getProducedType() {
23+
return Types.POJO(LoadCreatedEvent.class);
24+
}
25+
}

src/main/java/io/streamnative/flink/java/generator/RandomLoadEventGenerator.java

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import net.datafaker.Faker;
2727
import org.apache.flink.api.common.functions.RuntimeContext;
2828
import org.apache.flink.api.common.typeinfo.TypeInformation;
29-
import org.apache.flink.api.common.typeinfo.Types;
29+
import org.apache.flink.api.java.typeutils.TypeExtractor;
3030

3131
import java.io.Serializable;
3232
import java.util.List;
@@ -65,11 +65,7 @@ private LoadEvent randomLoadCreatedEvent() {
6565

6666
return new LoadCreatedEvent()
6767
.setCreatedAction(faker.bigBangTheory().character())
68-
.setMessages(messages)
69-
.setUuid(UUID.randomUUID().toString())
70-
.setId(faker.number().positive())
71-
.setName(faker.name().name())
72-
.setContent(faker.commerce().productName());
68+
.setMessages(messages);
7369
}
7470

7571
private LoadEvent randomLoadDeletedEvent() {
@@ -79,21 +75,13 @@ private LoadEvent randomLoadDeletedEvent() {
7975

8076
return new LoadDeletedEvent()
8177
.setDeletedIds(ids)
82-
.setErrorMsg(faker.appliance().brand())
83-
.setUuid(UUID.randomUUID().toString())
84-
.setId(faker.number().positive())
85-
.setName(faker.name().name())
86-
.setContent(faker.commerce().productName());
78+
.setErrorMsg(faker.appliance().brand());
8779
}
8880

8981
private LoadEvent randomLoadUpdatedEvent() {
9082
return new LoadUpdatedEvent()
9183
.setNewAction(faker.pokemon().name())
92-
.setErrorMsg(faker.appliance().brand())
93-
.setUuid(UUID.randomUUID().toString())
94-
.setId(faker.number().positive())
95-
.setName(faker.name().name())
96-
.setContent(faker.commerce().productName());
84+
.setErrorMsg(faker.appliance().brand());
9785
}
9886

9987
@Override
@@ -103,6 +91,7 @@ public void open(RuntimeContext runtimeContext) {
10391

10492
@Override
10593
public TypeInformation<LoadEvent> getType() {
106-
return Types.POJO(LoadEvent.class);
94+
// This is only used for demo, you can't pass an event with such type information in production environments.
95+
return TypeExtractor.getForClass(LoadEvent.class);
10796
}
10897
}

src/main/java/io/streamnative/flink/java/models/LoadCreatedEvent.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package io.streamnative.flink.java.models;
2020

2121
import lombok.Data;
22-
import lombok.EqualsAndHashCode;
2322
import lombok.experimental.Accessors;
2423

2524
import java.util.List;
@@ -29,8 +28,7 @@
2928
*/
3029
@Data
3130
@Accessors(chain = true)
32-
@EqualsAndHashCode(callSuper = true)
33-
public class LoadCreatedEvent extends LoadEvent {
31+
public class LoadCreatedEvent implements LoadEvent {
3432
private static final long serialVersionUID = 8378154813741037238L;
3533

3634
private String createdAction;

src/main/java/io/streamnative/flink/java/models/LoadDeletedEvent.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package io.streamnative.flink.java.models;
2020

2121
import lombok.Data;
22-
import lombok.EqualsAndHashCode;
2322
import lombok.experimental.Accessors;
2423

2524
import java.util.List;
@@ -29,8 +28,7 @@
2928
*/
3029
@Data
3130
@Accessors(chain = true)
32-
@EqualsAndHashCode(callSuper = true)
33-
public class LoadDeletedEvent extends LoadEvent {
31+
public class LoadDeletedEvent implements LoadEvent {
3432
private static final long serialVersionUID = -3637355499307958595L;
3533

3634
private List<String> deletedIds;

src/main/java/io/streamnative/flink/java/models/LoadEvent.java

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818

1919
package io.streamnative.flink.java.models;
2020

21-
import lombok.Data;
22-
import lombok.experimental.Accessors;
2321
import org.apache.pulsar.shade.com.fasterxml.jackson.annotation.JsonSubTypes;
2422
import org.apache.pulsar.shade.com.fasterxml.jackson.annotation.JsonTypeInfo;
2523

@@ -29,8 +27,6 @@
2927
* Common class for events. We should add the Jackson annotation here.
3028
* You should use the Pulsar-shaded Jackson annotations.
3129
*/
32-
@Data
33-
@Accessors(chain = true)
3430
@JsonTypeInfo(
3531
use = JsonTypeInfo.Id.CLASS,
3632
include = JsonTypeInfo.As.PROPERTY,
@@ -41,14 +37,6 @@
4137
@JsonSubTypes.Type(value = LoadDeletedEvent.class, name = "LoadDeletedEvent"),
4238
@JsonSubTypes.Type(value = LoadUpdatedEvent.class, name = "LoadUpdatedEvent"),
4339
})
44-
public class LoadEvent implements Serializable {
45-
private static final long serialVersionUID = 2895533085240328403L;
46-
47-
private String uuid;
48-
49-
private Integer id;
50-
51-
private String name;
52-
53-
private String content;
40+
public interface LoadEvent extends Serializable {
41+
// This is an empty interface.
5442
}

src/main/java/io/streamnative/flink/java/models/LoadUpdatedEvent.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,14 @@
1919
package io.streamnative.flink.java.models;
2020

2121
import lombok.Data;
22-
import lombok.EqualsAndHashCode;
2322
import lombok.experimental.Accessors;
2423

2524
/**
2625
* The event for deleting elements.
2726
*/
2827
@Data
2928
@Accessors(chain = true)
30-
@EqualsAndHashCode(callSuper = true)
31-
public class LoadUpdatedEvent extends LoadEvent {
29+
public class LoadUpdatedEvent implements LoadEvent {
3230
private static final long serialVersionUID = -6271238097681840919L;
3331

3432
private String newAction;
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package io.streamnative.flink.java.polymorphic;
2+
3+
import io.streamnative.flink.java.generator.RandomLoadEventGenerator;
4+
import io.streamnative.flink.java.models.LoadEvent;
5+
import org.apache.pulsar.common.util.ObjectMapperFactory;
6+
import org.apache.pulsar.shade.com.fasterxml.jackson.core.JsonProcessingException;
7+
import org.apache.pulsar.shade.com.fasterxml.jackson.databind.ObjectMapper;
8+
9+
/**
10+
* This is an example on how to serialize/deserializer the load event interface with jackson.
11+
*/
12+
public class LoadEventSeDe {
13+
14+
private static final ObjectMapper mapper = ObjectMapperFactory.create();
15+
16+
public static void main(String[] args) throws JsonProcessingException {
17+
RandomLoadEventGenerator generator = new RandomLoadEventGenerator();
18+
generator.open(null);
19+
20+
while (true) {
21+
LoadEvent event = generator.generate();
22+
System.out.println("The event type is: " + event.getClass().getName());
23+
24+
// Serialize it into JSON.
25+
String json = mapper.writeValueAsString(event);
26+
System.out.println("Serialize into JSON: "+ json);
27+
28+
// Deserializer the json into interface.
29+
LoadEvent value = mapper.readValue(json, LoadEvent.class);
30+
System.out.println("Deserailize the json, class type: " + value.getClass().getName() + ", content: " + value);
31+
}
32+
}
33+
}

0 commit comments

Comments
 (0)