Skip to content

Commit d4bc91a

Browse files
authored
Merge pull request #3 from majusko/feature/mock-client-feature
Added mock classes for pulsar client, builders, producers and consumers.
2 parents 71377c7 + f655b07 commit d4bc91a

File tree

11 files changed

+625
-9
lines changed

11 files changed

+625
-9
lines changed

src/main/java/io/github/majusko/pulsar/PulsarAutoConfiguration.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import org.apache.pulsar.client.api.PulsarClient;
44
import org.apache.pulsar.client.api.PulsarClientException;
5+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
56
import org.springframework.boot.context.properties.EnableConfigurationProperties;
67
import org.springframework.context.annotation.Bean;
78
import org.springframework.context.annotation.ComponentScan;
@@ -12,6 +13,7 @@
1213
@Configuration
1314
@ComponentScan
1415
@EnableConfigurationProperties(PulsarProperties.class)
16+
@ConditionalOnProperty(value = "pulsar.mock", havingValue = "false", matchIfMissing = true)
1517
public class PulsarAutoConfiguration {
1618

1719
private final PulsarProperties pulsarProperties;
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package io.github.majusko.pulsar;
2+
3+
import io.github.majusko.pulsar.mock.MockPulsarClient;
4+
import org.apache.pulsar.client.api.PulsarClient;
5+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
6+
import org.springframework.context.annotation.Bean;
7+
import org.springframework.context.annotation.ComponentScan;
8+
import org.springframework.context.annotation.Configuration;
9+
10+
@Configuration
11+
@ComponentScan
12+
@ConditionalOnProperty(value = "pulsar.mock")
13+
public class PulsarMockAutoConfiguration {
14+
15+
@Bean
16+
public PulsarClient pulsarClient() {
17+
return new MockPulsarClient();
18+
}
19+
}

src/main/java/io/github/majusko/pulsar/consumer/ConsumerBuilder.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import org.apache.pulsar.client.api.PulsarClient;
77
import org.apache.pulsar.client.api.PulsarClientException;
88
import org.apache.pulsar.client.api.Schema;
9+
import org.springframework.context.annotation.DependsOn;
910
import org.springframework.stereotype.Component;
1011

1112
import javax.annotation.PostConstruct;
@@ -14,6 +15,7 @@
1415
import java.util.stream.Collectors;
1516

1617
@Component
18+
@DependsOn({"pulsarClient", "consumerCollector"})
1719
public class ConsumerBuilder {
1820

1921
private final ConsumerCollector consumerCollector;
@@ -48,12 +50,12 @@ private Consumer<?> subscribe(String name, ConsumerHolder holder) {
4850
method.invoke(holder.getBean(), msg);
4951

5052
consumer.acknowledge(msg);
51-
} catch(Exception e) {
53+
} catch (Exception e) {
5254
consumer.negativeAcknowledge(msg);
5355
throw new RuntimeException("TODO Custom Exception!", e);
5456
}
5557
}).subscribe();
56-
} catch(PulsarClientException e) {
58+
} catch (PulsarClientException e) {
5759
throw new RuntimeException("TODO Custom Exception!", e);
5860
}
5961
}
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
package io.github.majusko.pulsar.mock;
2+
3+
import org.apache.pulsar.client.api.*;
4+
5+
import java.util.concurrent.CompletableFuture;
6+
import java.util.concurrent.TimeUnit;
7+
8+
public class MockConsumer<T> implements Consumer<T> {
9+
@Override
10+
public String getTopic() {
11+
return "mock-topic";
12+
}
13+
14+
@Override
15+
public String getSubscription() {
16+
return "mock-subscription";
17+
}
18+
19+
@Override
20+
public void unsubscribe() throws PulsarClientException {
21+
22+
}
23+
24+
@Override
25+
public CompletableFuture<Void> unsubscribeAsync() {
26+
return null;
27+
}
28+
29+
@Override
30+
public Message<T> receive() throws PulsarClientException {
31+
return null;
32+
}
33+
34+
@Override
35+
public CompletableFuture<Message<T>> receiveAsync() {
36+
return null;
37+
}
38+
39+
@Override
40+
public Message<T> receive(int timeout, TimeUnit unit) throws PulsarClientException {
41+
return null;
42+
}
43+
44+
@Override
45+
public void acknowledge(MessageId messageId) throws PulsarClientException {
46+
47+
}
48+
49+
@Override
50+
public void negativeAcknowledge(MessageId messageId) {
51+
52+
}
53+
54+
@Override
55+
public void acknowledgeCumulative(MessageId messageId) throws PulsarClientException {
56+
57+
}
58+
59+
@Override
60+
public CompletableFuture<Void> acknowledgeAsync(MessageId messageId) {
61+
return null;
62+
}
63+
64+
@Override
65+
public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId) {
66+
return null;
67+
}
68+
69+
@Override
70+
public ConsumerStats getStats() {
71+
return null;
72+
}
73+
74+
@Override
75+
public void close() throws PulsarClientException {
76+
77+
}
78+
79+
@Override
80+
public CompletableFuture<Void> closeAsync() {
81+
return null;
82+
}
83+
84+
@Override
85+
public boolean hasReachedEndOfTopic() {
86+
return false;
87+
}
88+
89+
@Override
90+
public void redeliverUnacknowledgedMessages() {
91+
92+
}
93+
94+
@Override
95+
public void seek(MessageId messageId) throws PulsarClientException {
96+
97+
}
98+
99+
@Override
100+
public void seek(long timestamp) throws PulsarClientException {
101+
102+
}
103+
104+
@Override
105+
public CompletableFuture<Void> seekAsync(MessageId messageId) {
106+
return null;
107+
}
108+
109+
@Override
110+
public CompletableFuture<Void> seekAsync(long timestamp) {
111+
return null;
112+
}
113+
114+
@Override
115+
public boolean isConnected() {
116+
return false;
117+
}
118+
119+
@Override
120+
public String getConsumerName() {
121+
return null;
122+
}
123+
124+
@Override
125+
public void pause() {
126+
127+
}
128+
129+
@Override
130+
public void resume() {
131+
132+
}
133+
134+
@Override
135+
public CompletableFuture<Void> acknowledgeCumulativeAsync(Message message) {
136+
return null;
137+
}
138+
139+
@Override
140+
public CompletableFuture<Void> acknowledgeAsync(Message message) {
141+
return null;
142+
}
143+
144+
@Override
145+
public void acknowledgeCumulative(Message message) throws PulsarClientException {
146+
147+
}
148+
149+
@Override
150+
public void negativeAcknowledge(Message message) {
151+
152+
}
153+
154+
@Override
155+
public void acknowledge(Message message) throws PulsarClientException {
156+
157+
}
158+
}
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
package io.github.majusko.pulsar.mock;
2+
3+
import org.apache.pulsar.client.api.*;
4+
5+
import java.util.List;
6+
import java.util.Map;
7+
import java.util.concurrent.CompletableFuture;
8+
import java.util.concurrent.TimeUnit;
9+
import java.util.regex.Pattern;
10+
11+
public class MockConsumerBuilder<T> implements ConsumerBuilder<T> {
12+
@Override
13+
public ConsumerBuilder<T> clone() {
14+
return this;
15+
}
16+
17+
@Override
18+
public Consumer<T> subscribe() throws PulsarClientException {
19+
return new MockConsumer<>();
20+
}
21+
22+
@Override
23+
public CompletableFuture<Consumer<T>> subscribeAsync() {
24+
return CompletableFuture.supplyAsync(MockConsumer::new);
25+
}
26+
27+
@Override
28+
public ConsumerBuilder<T> topic(String... topicNames) {
29+
return this;
30+
}
31+
32+
@Override
33+
public ConsumerBuilder<T> topicsPattern(Pattern topicsPattern) {
34+
return this;
35+
}
36+
37+
@Override
38+
public ConsumerBuilder<T> topicsPattern(String topicsPattern) {
39+
return this;
40+
}
41+
42+
@Override
43+
public ConsumerBuilder<T> subscriptionName(String subscriptionName) {
44+
return this;
45+
}
46+
47+
@Override
48+
public ConsumerBuilder<T> ackTimeout(long ackTimeout, TimeUnit timeUnit) {
49+
return this;
50+
}
51+
52+
@Override
53+
public ConsumerBuilder<T> ackTimeoutTickTime(long tickTime, TimeUnit timeUnit) {
54+
return this;
55+
}
56+
57+
@Override
58+
public ConsumerBuilder<T> negativeAckRedeliveryDelay(long redeliveryDelay, TimeUnit timeUnit) {
59+
return this;
60+
}
61+
62+
@Override
63+
public ConsumerBuilder<T> subscriptionType(SubscriptionType subscriptionType) {
64+
return this;
65+
}
66+
67+
@Override
68+
public ConsumerBuilder<T> messageListener(MessageListener messageListener) {
69+
return this;
70+
}
71+
72+
@Override
73+
public ConsumerBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
74+
return this;
75+
}
76+
77+
@Override
78+
public ConsumerBuilder<T> cryptoFailureAction(ConsumerCryptoFailureAction action) {
79+
return this;
80+
}
81+
82+
@Override
83+
public ConsumerBuilder<T> receiverQueueSize(int receiverQueueSize) {
84+
return this;
85+
}
86+
87+
@Override
88+
public ConsumerBuilder<T> acknowledgmentGroupTime(long delay, TimeUnit unit) {
89+
return this;
90+
}
91+
92+
@Override
93+
public ConsumerBuilder<T> replicateSubscriptionState(boolean replicateSubscriptionState) {
94+
return this;
95+
}
96+
97+
@Override
98+
public ConsumerBuilder<T> maxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions) {
99+
return this;
100+
}
101+
102+
@Override
103+
public ConsumerBuilder<T> consumerName(String consumerName) {
104+
return this;
105+
}
106+
107+
@Override
108+
public ConsumerBuilder<T> consumerEventListener(ConsumerEventListener consumerEventListener) {
109+
return this;
110+
}
111+
112+
@Override
113+
public ConsumerBuilder<T> readCompacted(boolean readCompacted) {
114+
return this;
115+
}
116+
117+
@Override
118+
public ConsumerBuilder<T> patternAutoDiscoveryPeriod(int periodInMinutes) {
119+
return this;
120+
}
121+
122+
@Override
123+
public ConsumerBuilder<T> priorityLevel(int priorityLevel) {
124+
return this;
125+
}
126+
127+
@Override
128+
public ConsumerBuilder<T> property(String key, String value) {
129+
return this;
130+
}
131+
132+
@Override
133+
public ConsumerBuilder<T> subscriptionInitialPosition(SubscriptionInitialPosition subscriptionInitialPosition) {
134+
return this;
135+
}
136+
137+
@Override
138+
public ConsumerBuilder<T> subscriptionTopicsMode(RegexSubscriptionMode regexSubscriptionMode) {
139+
return this;
140+
}
141+
142+
@Override
143+
public ConsumerBuilder<T> intercept(ConsumerInterceptor[] interceptors) {
144+
return this;
145+
}
146+
147+
@Override
148+
public ConsumerBuilder<T> deadLetterPolicy(DeadLetterPolicy deadLetterPolicy) {
149+
return this;
150+
}
151+
152+
@Override
153+
public ConsumerBuilder<T> autoUpdatePartitions(boolean autoUpdate) {
154+
return this;
155+
}
156+
157+
@Override
158+
public ConsumerBuilder<T> properties(Map properties) {
159+
return this;
160+
}
161+
162+
@Override
163+
public ConsumerBuilder<T> topics(List topicNames) {
164+
return this;
165+
}
166+
167+
@Override
168+
public ConsumerBuilder<T> loadConf(Map config) {
169+
return this;
170+
}
171+
}

0 commit comments

Comments
 (0)