Skip to content

Commit 697529c

Browse files
authored
feat: pending with hardcoded service URL (#26)
Signed-off-by: jacque1ine <js3fung@uwaterloo.ca> Signed-off-by: Jacqueline Fung <fungjsl@gmail.com>
1 parent d3a9ea0 commit 697529c

File tree

5 files changed

+61
-12
lines changed

5 files changed

+61
-12
lines changed

pipeline.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@ metadata:
44
name: simple-pipeline
55
spec:
66
limits:
7-
readBatchSize: 6
7+
readBatchSize: 1
88
vertices:
99
- name: in
1010
scale:
11-
min: 2
11+
min: 1
1212
volumes: # Shared between containers that are part of the same pod, useful for sharing configurations
1313
- name: pulsar-config-volume
1414
configMap:

pulsar-config-map.yaml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@ data:
1515
consumer:
1616
enabled: true
1717
consumerConfig:
18-
topicNames: "marchten"
19-
subscriptionName: "sub"
18+
topicNames: "marchtwelve"
2019
2120
2221

src/main/java/io/numaproj/pulsar/config/PulsarConfig.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
import org.apache.pulsar.client.api.PulsarClient;
44
import org.apache.pulsar.client.api.PulsarClientException;
55
import org.apache.pulsar.client.api.Schema;
6+
import org.apache.pulsar.client.admin.PulsarAdmin;
7+
import org.apache.pulsar.client.admin.PulsarAdminException;
8+
import java.net.MalformedURLException;
69

710
import java.util.Map;
811
import java.util.UUID;
@@ -57,4 +60,12 @@ public Producer<byte[]> pulsarProducer(PulsarClient pulsarClient, PulsarProducer
5760
.loadConf(producerConfig)
5861
.create();
5962
}
60-
}
63+
64+
@Bean
65+
public PulsarAdmin pulsarAdmin(PulsarClientProperties pulsarClientProperties) throws PulsarClientException {
66+
return PulsarAdmin.builder()
67+
.serviceHttpUrl("http://host.docker.internal:8080/")
68+
// .authentication(...) if needed
69+
.build();
70+
}
71+
}

src/main/java/io/numaproj/pulsar/config/PulsarConsumerProperties.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,31 @@
2020
@ConfigurationProperties(prefix = "spring.pulsar.consumer")
2121
@Slf4j
2222
public class PulsarConsumerProperties {
23+
24+
private static final String DEFAULT_SUBSCRIPTION_NAME = "sub";
25+
2326
private Map<String, Object> consumerConfig = new HashMap<>(); // Default to an empty map
2427

2528
@PostConstruct
2629
public void init() {
2730
// Pulsar expects topicNames to be type set, but the configMap accepts a string
28-
if (consumerConfig.containsKey("topicNames")) {
29-
String topicName = (String) consumerConfig.remove("topicNames");
31+
String topicNameKey = "topicNames";
32+
if (consumerConfig.containsKey(topicNameKey)) {
33+
String topicName = (String) consumerConfig.remove(topicNameKey);
3034
Set<String> topicNames = new HashSet<>();
3135
topicNames.add(topicName);
32-
consumerConfig.put("topicNames", topicNames);
36+
consumerConfig.put(topicNameKey, topicNames);
37+
}
38+
39+
// If 'subscriptionName' not present, provide a default
40+
String subscriptionNameKey = "subscriptionName";
41+
if (!consumerConfig.containsKey(subscriptionNameKey)) {
42+
consumerConfig.put(subscriptionNameKey, DEFAULT_SUBSCRIPTION_NAME);
43+
log.info("No subscriptionName provided. Setting default: '{}'", DEFAULT_SUBSCRIPTION_NAME);
44+
} else {
45+
log.info("subscriptionName was already set, leaving as-is.");
3346
}
34-
log.info("Consumer Config: " + consumerConfig);
47+
48+
log.info("Consumer Config: {}", consumerConfig);
3549
}
3650
}

src/main/java/io/numaproj/pulsar/consumer/PulsarSource.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,16 @@
77
import io.numaproj.numaflow.sourcer.ReadRequest;
88
import io.numaproj.numaflow.sourcer.Server;
99
import io.numaproj.numaflow.sourcer.Sourcer;
10+
import io.numaproj.pulsar.config.PulsarConsumerProperties;
1011
import lombok.extern.slf4j.Slf4j;
1112

1213
import org.apache.pulsar.client.api.Consumer;
1314
import org.apache.pulsar.client.api.Messages;
1415
import org.apache.pulsar.client.api.PulsarClientException;
16+
import org.apache.pulsar.client.admin.PulsarAdmin;
17+
import org.apache.pulsar.client.admin.PulsarAdminException;
18+
import org.apache.pulsar.common.policies.data.TopicStats;
19+
import org.apache.pulsar.common.policies.data.SubscriptionStats;
1520
import org.springframework.beans.factory.annotation.Autowired;
1621
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
1722
import org.springframework.stereotype.Component;
@@ -22,6 +27,7 @@
2227
import java.util.HashMap;
2328
import java.util.List;
2429
import java.util.Map;
30+
import java.util.Set;
2531

2632
@Slf4j
2733
@Component
@@ -36,6 +42,12 @@ public class PulsarSource extends Sourcer {
3642
@Autowired
3743
private PulsarConsumerManager pulsarConsumerManager;
3844

45+
@Autowired
46+
private PulsarAdmin pulsarAdmin;
47+
48+
@Autowired
49+
PulsarConsumerProperties pulsarConsumerProperties;
50+
3951
@PostConstruct
4052
public void startServer() throws Exception {
4153
server = new Server(this);
@@ -124,9 +136,22 @@ public void ack(AckRequest request) {
124136

125137
@Override
126138
public long getPending() {
127-
// TO DO: Currently this is received but not acked. Should be num messages in
128-
// backlog
129-
return messagesToAck.size();
139+
try {
140+
// If changing to support multiple topics, need to update this
141+
Set<String> topicNames = (Set<String>) pulsarConsumerProperties.getConsumerConfig().get("topicNames");
142+
String topicName = (String) topicNames.iterator().next(); // Assumes there is only one topic name in the set
143+
String subscriptionName = (String) pulsarConsumerProperties.getConsumerConfig().get("subscriptionName");
144+
145+
TopicStats topicStats = pulsarAdmin.topics().getStats(topicName);
146+
SubscriptionStats subscriptionStats = topicStats.getSubscriptions().get(subscriptionName);
147+
// will remove later - used for testing
148+
log.info("Number of messages in the backlog: {}", subscriptionStats.getMsgBacklog());
149+
return subscriptionStats.getMsgBacklog();
150+
} catch (PulsarAdminException e) {
151+
log.error("Error while fetching admin stats for pending messages", e);
152+
// Return a negative value to indicate no pending information
153+
return -1;
154+
}
130155
}
131156

132157
@Override

0 commit comments

Comments
 (0)