Skip to content

Commit d3a9ea0

Browse files
authored
test: add unit test for read and ack in source, add ci (#24)
Signed-off-by: jacque1ine <js3fung@uwaterloo.ca>
1 parent 2c06f41 commit d3a9ea0

File tree

5 files changed

+524
-0
lines changed

5 files changed

+524
-0
lines changed

.github/workflows/ci.yml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
2+
name: Java CI with Maven
3+
4+
on:
5+
push:
6+
branches: [ "main" ]
7+
pull_request:
8+
branches: [ "main" ]
9+
10+
jobs:
11+
build:
12+
13+
runs-on: ubuntu-latest
14+
15+
steps:
16+
- uses: actions/checkout@v4
17+
- name: Set up JDK 17
18+
uses: actions/setup-java@v4
19+
with:
20+
java-version: '17'
21+
distribution: 'temurin'
22+
cache: maven
23+
- name: Build with Maven, run unit tests
24+
run: mvn clean install

pom.xml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,23 @@
5151
</excludes>
5252
</configuration>
5353
</plugin>
54+
<plugin>
55+
<groupId>org.apache.maven.plugins</groupId>
56+
<artifactId>maven-surefire-plugin</artifactId>
57+
<version>2.22.0</version>
58+
<dependencies>
59+
<dependency>
60+
<groupId>org.apache.maven.surefire</groupId>
61+
<artifactId>surefire-junit4</artifactId>
62+
<version>2.22.0</version>
63+
</dependency>
64+
</dependencies>
65+
<configuration>
66+
<includes>
67+
<include>**/*.java</include>
68+
</includes>
69+
</configuration>
70+
</plugin>
5471
<plugin>
5572
<groupId>com.google.cloud.tools</groupId>
5673
<artifactId>jib-maven-plugin</artifactId>
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package io.numaproj.pulsar.config;
2+
3+
import org.junit.Test;
4+
import static org.junit.Assert.*;
5+
6+
import java.util.HashMap;
7+
import java.util.Map;
8+
import java.util.Set;
9+
10+
public class PulsarConsumerPropertiesTest {
11+
12+
@Test
13+
public void testInitTransformsTopicNamesProperty() {
14+
// Prepare a properties instance with a string value for topicNames
15+
PulsarConsumerProperties properties = new PulsarConsumerProperties();
16+
Map<String, Object> config = new HashMap<>();
17+
String topic = "my-test-topic";
18+
config.put("topicNames", topic);
19+
// Also add another unrelated property
20+
config.put("otherKey", "otherValue");
21+
properties.setConsumerConfig(config);
22+
23+
// When init is called, it should convert the topicNames value to a Set
24+
properties.init();
25+
26+
// Verify that topicNames is now a Set containing the original string
27+
Object topicNamesObj = properties.getConsumerConfig().get("topicNames");
28+
assertNotNull("topicNames should not be null after init", topicNamesObj);
29+
assertTrue("topicNames should be of type Set", topicNamesObj instanceof Set);
30+
31+
Set<?> topicNamesSet = (Set<?>) topicNamesObj;
32+
assertEquals("The topicNames set should contain one item", 1, topicNamesSet.size());
33+
assertTrue("The topicNames set should contain the initial topic", topicNamesSet.contains(topic));
34+
35+
// Ensure that unrelated properties remain unchanged
36+
assertEquals("otherKey property should remain unchanged", "otherValue",
37+
properties.getConsumerConfig().get("otherKey"));
38+
}
39+
}
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
package io.numaproj.pulsar.consumer;
2+
3+
import io.numaproj.pulsar.config.PulsarConsumerProperties;
4+
import org.apache.pulsar.client.api.BatchReceivePolicy;
5+
import org.apache.pulsar.client.api.Consumer;
6+
import org.apache.pulsar.client.api.ConsumerBuilder;
7+
import org.apache.pulsar.client.api.PulsarClient;
8+
import org.apache.pulsar.client.api.PulsarClientException;
9+
import org.apache.pulsar.client.api.Schema;
10+
import org.apache.pulsar.client.api.SubscriptionType;
11+
import org.junit.After;
12+
import org.junit.Before;
13+
import org.junit.Test;
14+
import org.mockito.ArgumentCaptor;
15+
import org.springframework.test.util.ReflectionTestUtils;
16+
17+
import java.util.HashMap;
18+
import java.util.Map;
19+
20+
import static org.junit.Assert.*;
21+
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.ArgumentMatchers.anyMap;
23+
import static org.mockito.Mockito.*;
24+
25+
public class PulsarConsumerManagerTest {
26+
27+
private PulsarConsumerManager manager;
28+
private PulsarConsumerProperties consumerProperties;
29+
private PulsarClient mockPulsarClient;
30+
private ConsumerBuilder<byte[]> mockConsumerBuilder;
31+
private Consumer<byte[]> mockConsumer;
32+
33+
@Before
34+
public void setUp() {
35+
// Create a simple consumer properties object with a dummy config
36+
consumerProperties = new PulsarConsumerProperties();
37+
Map<String, Object> config = new HashMap<>();
38+
config.put("dummyKey", "dummyValue");
39+
consumerProperties.setConsumerConfig(config);
40+
41+
// Instantiate the manager and inject dependencies using ReflectionTestUtils
42+
manager = new PulsarConsumerManager();
43+
ReflectionTestUtils.setField(manager, "pulsarConsumerProperties", consumerProperties);
44+
45+
// Create mocks for PulsarClient and the ConsumerBuilder chain
46+
mockPulsarClient = mock(PulsarClient.class);
47+
mockConsumerBuilder = mock(ConsumerBuilder.class);
48+
mockConsumer = mock(Consumer.class);
49+
ReflectionTestUtils.setField(manager, "pulsarClient", mockPulsarClient);
50+
}
51+
52+
@After
53+
public void tearDown() {
54+
manager = null;
55+
consumerProperties = null;
56+
mockPulsarClient = null;
57+
mockConsumerBuilder = null;
58+
mockConsumer = null;
59+
}
60+
61+
@Test
62+
public void getOrCreateConsumer_createsNewConsumer() {
63+
try {
64+
// Set up the chaining calls on the ConsumerBuilder mock
65+
when(mockPulsarClient.newConsumer(Schema.BYTES)).thenReturn(mockConsumerBuilder);
66+
when(mockConsumerBuilder.loadConf(anyMap())).thenReturn(mockConsumerBuilder);
67+
when(mockConsumerBuilder.batchReceivePolicy(any(BatchReceivePolicy.class))).thenReturn(mockConsumerBuilder);
68+
when(mockConsumerBuilder.subscriptionType(SubscriptionType.Shared)).thenReturn(mockConsumerBuilder);
69+
when(mockConsumerBuilder.subscribe()).thenReturn(mockConsumer);
70+
71+
// Call getOrCreateConsumer for the first time so it creates a new consumer
72+
Consumer<byte[]> firstConsumer = manager.getOrCreateConsumer(10L, 1000L);
73+
assertNotNull("A consumer should be created", firstConsumer);
74+
assertEquals("The returned consumer should be the mock consumer", mockConsumer, firstConsumer);
75+
76+
// Call again and verify that it returns the same instance (i.e.,
77+
// builder.subscribe() is not called again)
78+
Consumer<byte[]> secondConsumer = manager.getOrCreateConsumer(10L, 1000L);
79+
assertEquals("Should return the same consumer instance", firstConsumer, secondConsumer);
80+
81+
// Verify that newConsumer(...) and subscribe() are invoked only once
82+
verify(mockPulsarClient, times(1)).newConsumer(Schema.BYTES);
83+
verify(mockConsumerBuilder, times(1)).subscribe();
84+
85+
// Capture loaded configuration to verify that consumerProperties configuration
86+
// is passed
87+
ArgumentCaptor<Map> configCaptor = ArgumentCaptor.forClass(Map.class);
88+
verify(mockConsumerBuilder).loadConf(configCaptor.capture());
89+
Map loadedConfig = configCaptor.getValue();
90+
assertEquals("dummyValue", loadedConfig.get("dummyKey"));
91+
92+
ArgumentCaptor<BatchReceivePolicy> batchPolicyCaptor = ArgumentCaptor.forClass(BatchReceivePolicy.class);
93+
verify(mockConsumerBuilder).batchReceivePolicy(batchPolicyCaptor.capture());
94+
BatchReceivePolicy builtPolicy = batchPolicyCaptor.getValue();
95+
assertNotNull("BatchReceivePolicy should be set", builtPolicy);
96+
97+
// Validate maxNumMessages and timeoutMillis configurations
98+
assertEquals("BatchReceivePolicy should have maxNumMessages set to 10", 10,
99+
builtPolicy.getMaxNumMessages());
100+
assertEquals("BatchReceivePolicy should have timeout set to 1000ms", 1000, builtPolicy.getTimeoutMs());
101+
} catch (PulsarClientException e) {
102+
fail("Unexpected PulsarClientException thrown: " + e.getMessage());
103+
}
104+
}
105+
106+
@Test
107+
public void cleanup_closesConsumerAndClient() {
108+
try {
109+
// Set up the Consumer to be non-null so that cleanup closes it
110+
when(mockPulsarClient.newConsumer(Schema.BYTES)).thenReturn(mockConsumerBuilder);
111+
when(mockConsumerBuilder.loadConf(anyMap())).thenReturn(mockConsumerBuilder);
112+
when(mockConsumerBuilder.batchReceivePolicy(any(BatchReceivePolicy.class))).thenReturn(mockConsumerBuilder);
113+
when(mockConsumerBuilder.subscriptionType(SubscriptionType.Shared)).thenReturn(mockConsumerBuilder);
114+
when(mockConsumerBuilder.subscribe()).thenReturn(mockConsumer);
115+
116+
// Create the consumer via getOrCreateConsumer
117+
Consumer<byte[]> createdConsumer = manager.getOrCreateConsumer(5L, 500L);
118+
assertNotNull(createdConsumer);
119+
120+
// Call cleanup and verify that close() is called on both consumer and client
121+
manager.cleanup();
122+
verify(createdConsumer, times(1)).close();
123+
verify(mockPulsarClient, times(1)).close();
124+
} catch (PulsarClientException e) {
125+
fail("Unexpected PulsarClientException thrown during test cleanup_closesConsumerAndClient: "
126+
+ e.getMessage());
127+
}
128+
}
129+
130+
@Test
131+
public void cleanup_whenConsumerIsNull() {
132+
try {
133+
// Set currentConsumer to null explicitly
134+
ReflectionTestUtils.setField(manager, "currentConsumer", null);
135+
136+
// Call cleanup, expecting that the client is closed even if consumer is null
137+
manager.cleanup();
138+
verify(mockPulsarClient, times(1)).close();
139+
} catch (PulsarClientException e) {
140+
fail("Unexpected PulsarClientException thrown during test cleanup_whenConsumerIsNull: " + e.getMessage());
141+
}
142+
}
143+
144+
@Test
145+
public void cleanup_consumerCloseThrowsException() {
146+
try {
147+
// Setup: create a consumer and simulate an exception on closing consumer
148+
when(mockPulsarClient.newConsumer(Schema.BYTES)).thenReturn(mockConsumerBuilder);
149+
when(mockConsumerBuilder.loadConf(anyMap())).thenReturn(mockConsumerBuilder);
150+
when(mockConsumerBuilder.batchReceivePolicy(any(BatchReceivePolicy.class))).thenReturn(mockConsumerBuilder);
151+
when(mockConsumerBuilder.subscriptionType(SubscriptionType.Shared)).thenReturn(mockConsumerBuilder);
152+
when(mockConsumerBuilder.subscribe()).thenReturn(mockConsumer);
153+
154+
Consumer<byte[]> createdConsumer = manager.getOrCreateConsumer(3L, 300L);
155+
assertNotNull(createdConsumer);
156+
157+
// Simulate exception when consumer.close() is invoked
158+
doThrow(new PulsarClientException("Consumer close failed")).when(createdConsumer).close();
159+
160+
// Call cleanup; should catch the exception and still proceed to close the
161+
// client
162+
manager.cleanup();
163+
verify(createdConsumer, times(1)).close();
164+
verify(mockPulsarClient, times(1)).close();
165+
} catch (PulsarClientException e) {
166+
fail("Unexpected PulsarClientException thrown during test cleanup_consumerCloseThrowsException: "
167+
+ e.getMessage());
168+
}
169+
}
170+
171+
@Test
172+
public void cleanup_clientCloseThrowsException() {
173+
try {
174+
// Set up consumer as null so that only client.close() is invoked during cleanup
175+
ReflectionTestUtils.setField(manager, "currentConsumer", null);
176+
177+
// Simulate exception when pulsarClient.close() is invoked
178+
doThrow(new PulsarClientException("Client close failed")).when(mockPulsarClient).close();
179+
180+
// Call cleanup; should catch the exception
181+
manager.cleanup();
182+
verify(mockPulsarClient, times(1)).close();
183+
} catch (PulsarClientException e) {
184+
fail("Unexpected PulsarClientException thrown during test cleanup_clientCloseThrowsException: "
185+
+ e.getMessage());
186+
}
187+
}
188+
189+
}

0 commit comments

Comments
 (0)