Skip to content

Commit 8bbc6ae

Browse files
committed
added method to subscribe topic for limited message
Signed-off-by: Manish Dait <[email protected]>
1 parent ad0ee01 commit 8bbc6ae

File tree

4 files changed

+92
-8
lines changed

4 files changed

+92
-8
lines changed

hiero-enterprise-base/src/main/java/com/openelements/hiero/base/TopicClient.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,8 @@ default void submitMessage(@NonNull String topicId, @NonNull String submitKey, @
328328
* @return SubscriptionHandle for the Topic
329329
* @throws HieroException if Topic could not be subscribed
330330
*/
331-
SubscriptionHandle subscribeTopic(@NonNull TopicId topicId, @NonNull Consumer<TopicMessage> subscription) throws HieroException;
331+
SubscriptionHandle subscribeTopic(@NonNull TopicId topicId, @NonNull Consumer<TopicMessage> subscription)
332+
throws HieroException;
332333

333334
/**
334335
* Subscribe to a Topic
@@ -337,7 +338,32 @@ default void submitMessage(@NonNull String topicId, @NonNull String submitKey, @
337338
* @return SubscriptionHandle for the Topic
338339
* @throws HieroException if Topic could not be subscribed
339340
*/
340-
default SubscriptionHandle subscribeTopic(@NonNull String topicId, @NonNull Consumer<TopicMessage> subscription) throws HieroException {
341+
default SubscriptionHandle subscribeTopic(@NonNull String topicId, @NonNull Consumer<TopicMessage> subscription)
342+
throws HieroException {
341343
return subscribeTopic(TopicId.fromString(topicId), subscription);
342344
}
345+
346+
/**
347+
* Subscribe to a Topic
348+
*
349+
* @param topicId the topicId of topic
350+
* @param limit the number of message to return
351+
* @return SubscriptionHandle for the Topic
352+
* @throws HieroException if Topic could not be subscribed
353+
*/
354+
SubscriptionHandle subscribeTopic(@NonNull TopicId topicId, @NonNull Consumer<TopicMessage> subscription,
355+
long limit) throws HieroException;
356+
357+
/**
358+
* Subscribe to a Topic
359+
*
360+
* @param topicId the topicId of topic
361+
* @param limit the number of message to return
362+
* @return SubscriptionHandle for the Topic
363+
* @throws HieroException if Topic could not be subscribed
364+
*/
365+
default SubscriptionHandle subscribeTopic(@NonNull String topicId, @NonNull Consumer<TopicMessage> subscription,
366+
long limit) throws HieroException {
367+
return subscribeTopic(TopicId.fromString(topicId), subscription, limit);
368+
}
343369
}

hiero-enterprise-base/src/main/java/com/openelements/hiero/base/implementation/TopicClientImpl.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,4 +221,18 @@ public SubscriptionHandle subscribeTopic(@NonNull TopicId topicId, @NonNull Cons
221221
TopicMessageResult result = client.executeTopicMessageQuery(request);
222222
return result.subscriptionHandle();
223223
}
224+
225+
@Override
226+
public SubscriptionHandle subscribeTopic(@NonNull TopicId topicId, @NonNull Consumer<TopicMessage> subscription,
227+
long limit) throws HieroException {
228+
Objects.requireNonNull(topicId, "topicId must not be null");
229+
Objects.requireNonNull(subscription, "subscription must not be null");
230+
if (limit == 0) {
231+
throw new IllegalArgumentException("limit must be greater than 0");
232+
}
233+
234+
TopicMessageRequest request = TopicMessageRequest.of(topicId, subscription, limit);
235+
TopicMessageResult result = client.executeTopicMessageQuery(request);
236+
return result.subscriptionHandle();
237+
}
224238
}

hiero-enterprise-base/src/main/java/com/openelements/hiero/base/protocol/data/TopicMessageRequest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,10 @@ public record TopicMessageRequest(@NonNull TopicId topicId, @NonNull Consumer<To
2626
public static TopicMessageRequest of(@NonNull TopicId topicId, @NonNull Consumer<TopicMessage> subscription) {
2727
return new TopicMessageRequest(topicId, subscription, null, null, NO_LIMIT, null, null);
2828
}
29+
30+
@NonNull
31+
public static TopicMessageRequest of(@NonNull TopicId topicId, @NonNull Consumer<TopicMessage> subscription,
32+
@NonNull long limit) {
33+
return new TopicMessageRequest(topicId, subscription, null, null,limit, null, null);
34+
}
2935
}

hiero-enterprise-spring/src/test/java/com/openelements/hiero/spring/test/TopicClientTest.java

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.springframework.boot.test.context.SpringBootTest;
1313

1414
import java.util.ArrayList;
15-
import java.util.Arrays;
1615
import java.util.List;
1716

1817
@SpringBootTest(classes = HieroTestConfig.class)
@@ -210,19 +209,58 @@ void testSubmitMessageThrowExceptionFroInvalidId() {
210209

211210
@Test
212211
void testSubscribeTopic() throws HieroException {
212+
final String msg = "Hello Hiero";
213213
final List<String> messages = new ArrayList<>();
214214
final TopicId topicId = topicClient.createTopic();
215215
hieroTestUtils.waitForMirrorNodeRecords();
216216

217-
final SubscriptionHandle handle = topicClient.subscribeTopic(topicId, (message) -> {
218-
messages.add(message.toString());
219-
System.out.println(Arrays.toString(message.contents));
217+
final SubscriptionHandle handler = topicClient.subscribeTopic(topicId, (message) -> {
218+
messages.add(new String(message.contents));
220219
});
221220

222-
topicClient.submitMessage(topicId, "Hello Hiero");
221+
topicClient.submitMessage(topicId, msg);
223222
hieroTestUtils.waitForMirrorNodeRecords();
224223

225-
Assertions.assertNotNull(handle);
224+
Assertions.assertNotNull(handler);
226225
Assertions.assertEquals(1, messages.size());
226+
Assertions.assertEquals(msg,messages.getFirst());
227+
}
228+
229+
@Test
230+
void testSubscribeTopicWithLimit() throws HieroException {
231+
final String msg = "Hello Hiero";
232+
final long limit = 1;
233+
234+
final List<String> messages = new ArrayList<>();
235+
final TopicId topicId = topicClient.createTopic();
236+
hieroTestUtils.waitForMirrorNodeRecords();
237+
238+
final SubscriptionHandle handler = topicClient.subscribeTopic(topicId, (message) -> {
239+
messages.add(new String(message.contents));
240+
}, limit);
241+
242+
topicClient.submitMessage(topicId, msg);
243+
hieroTestUtils.waitForMirrorNodeRecords();
244+
245+
topicClient.submitMessage(topicId, msg);
246+
hieroTestUtils.waitForMirrorNodeRecords();
247+
248+
Assertions.assertNotNull(handler);
249+
Assertions.assertEquals(limit, messages.size());
250+
}
251+
252+
@Test
253+
void testSubscribeTopicWithInvalidLimit() throws HieroException {
254+
final String msg = "limit must be greater than 0";
255+
final long limit = 0;
256+
257+
final TopicId topicId = topicClient.createTopic();
258+
hieroTestUtils.waitForMirrorNodeRecords();
259+
260+
final IllegalArgumentException e = Assertions.assertThrows(IllegalArgumentException.class, () -> topicClient.subscribeTopic(
261+
topicId, (message) -> {/**/}, limit
262+
));
263+
264+
Assertions.assertEquals(msg, e.getMessage());
227265
}
228266
}

0 commit comments

Comments
 (0)