2020import java .util .function .Function ;
2121import java .util .function .Supplier ;
2222
23+ import org .apache .pulsar .client .api .Schema ;
24+ import org .apache .pulsar .common .schema .SchemaType ;
2325import org .slf4j .Logger ;
2426import org .slf4j .LoggerFactory ;
2527
28+ import org .springframework .boot .ApplicationRunner ;
2629import org .springframework .boot .SpringApplication ;
2730import org .springframework .boot .autoconfigure .SpringBootApplication ;
2831import org .springframework .context .annotation .Bean ;
32+ import org .springframework .pulsar .annotation .PulsarListener ;
33+ import org .springframework .pulsar .core .PulsarTemplate ;
2934
35+ /**
36+ * This sample binder app has an extra consumer that is equipped with Pulsar's DLT feature - timeLoggerToDlt.
37+ * However, this consumer is not part of the spring.cloud.function.definition.
38+ * In order to enable this, add the function timeLoggerToDlt to the definition in the application.yml file.
39+ * When doing this, in order to minimize verbose output and just to focus on the DLT feature, comment out the
40+ * regular supplier below (timeSupplier) and then un-comment the ApplicationRunner below.
41+ * The runner only sends a single message whereas the supplier sends a message every second.
42+ */
3043@ SpringBootApplication
3144public class SpringPulsarBinderSampleApp {
3245
@@ -41,6 +54,18 @@ public Supplier<Time> timeSupplier() {
4154 return () -> new Time (String .valueOf (System .currentTimeMillis ()));
4255 }
4356
57+ // @Bean
58+ // ApplicationRunner runner(PulsarTemplate<Time> pulsarTemplate) {
59+ //
60+ // String topic = "timeSupplier-out-0";
61+ //
62+ // return args -> {
63+ // for (int i = 0; i < 1; i++) {
64+ // pulsarTemplate.send(topic, new Time(String.valueOf(System.currentTimeMillis())), Schema.JSON(Time.class));
65+ // }
66+ // };
67+ // }
68+
4469 @ Bean
4570 public Function <Time , EnhancedTime > timeProcessor () {
4671 return (time ) -> {
@@ -55,6 +80,19 @@ public Consumer<EnhancedTime> timeLogger() {
5580 return (time ) -> this .logger .info ("SINK: {}" , time );
5681 }
5782
83+ @ Bean
84+ public Consumer <EnhancedTime > timeLoggerToDlt () {
85+ return (time ) -> {
86+ this .logger .info ("SINK (TO DLT EVENTUALLY): {}" , time );
87+ throw new RuntimeException ("fail " + time );
88+ };
89+ }
90+
91+ @ PulsarListener (id = "dlqListener" , topics = "notification-dlq" , schemaType = SchemaType .JSON )
92+ void listenDlq (EnhancedTime msg ) {
93+ System .out .println ("From DLQ: " + msg );
94+ }
95+
5896 record Time (String time ) {
5997 }
6098
0 commit comments