Skip to content

Commit 8dfd9c2

Browse files
committed
added method to subscribe topic with start and end time
Signed-off-by: Manish Dait <[email protected]>
1 parent 8bbc6ae commit 8dfd9c2

File tree

4 files changed

+139
-0
lines changed

4 files changed

+139
-0
lines changed

hiero-enterprise-base/src/main/java/com/openelements/hiero/base/TopicClient.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.hedera.hashgraph.sdk.TopicMessage;
77
import org.jspecify.annotations.NonNull;
88

9+
import java.time.Instant;
910
import java.util.Objects;
1011
import java.util.function.Consumer;
1112

@@ -366,4 +367,60 @@ default SubscriptionHandle subscribeTopic(@NonNull String topicId, @NonNull Cons
366367
long limit) throws HieroException {
367368
return subscribeTopic(TopicId.fromString(topicId), subscription, limit);
368369
}
370+
371+
/**
372+
* Subscribe to a Topic
373+
*
374+
* @param topicId the topicId of topic
375+
* @param startTime time to start subscribing to a topic
376+
* @param endTime time to stop subscribing to a topic
377+
* @return SubscriptionHandle for the Topic
378+
* @throws HieroException if Topic could not be subscribed
379+
*/
380+
SubscriptionHandle subscribeTopic(@NonNull TopicId topicId, @NonNull Consumer<TopicMessage> subscription,
381+
@NonNull Instant startTime, @NonNull Instant endTime) throws HieroException;
382+
383+
/**
384+
* Subscribe to a Topic
385+
*
386+
* @param topicId the topicId of topic
387+
* @param startTime time to start subscribing to a topic
388+
* @param endTime time to stop subscribing to a topic
389+
* @return SubscriptionHandle for the Topic
390+
* @throws HieroException if Topic could not be subscribed
391+
*/
392+
default SubscriptionHandle subscribeTopic(@NonNull String topicId, @NonNull Consumer<TopicMessage> subscription,
393+
@NonNull Instant startTime, @NonNull Instant endTime) throws HieroException {
394+
return subscribeTopic(TopicId.fromString(topicId), subscription, startTime, endTime);
395+
}
396+
397+
/**
398+
* Subscribe to a Topic
399+
*
400+
* @param topicId the topicId of topic
401+
* @param startTime time to start subscribing to a topic
402+
* @param endTime time to stop subscribing to a topic
403+
* @param limit the number of message to return
404+
* @return SubscriptionHandle for the Topic
405+
* @throws HieroException if Topic could not be subscribed
406+
*/
407+
SubscriptionHandle subscribeTopic(@NonNull TopicId topicId, @NonNull Consumer<TopicMessage> subscription,
408+
@NonNull Instant startTime, @NonNull Instant endTime, long limit)
409+
throws HieroException;
410+
411+
/**
412+
* Subscribe to a Topic
413+
*
414+
* @param topicId the topicId of topic
415+
* @param startTime time to start subscribing to a topic
416+
* @param endTime time to stop subscribing to a topic
417+
* @param limit the number of message to return
418+
* @return SubscriptionHandle for the Topic
419+
* @throws HieroException if Topic could not be subscribed
420+
*/
421+
default SubscriptionHandle subscribeTopic(@NonNull String topicId, @NonNull Consumer<TopicMessage> subscription,
422+
@NonNull Instant startTime, @NonNull Instant endTime, long limit)
423+
throws HieroException {
424+
return subscribeTopic(TopicId.fromString(topicId), subscription, startTime, endTime, limit);
425+
}
369426
}

hiero-enterprise-base/src/main/java/com/openelements/hiero/base/implementation/TopicClientImpl.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.openelements.hiero.base.protocol.data.TopicMessageResult;
1818
import org.jspecify.annotations.NonNull;
1919

20+
import java.time.Instant;
2021
import java.util.Objects;
2122
import java.util.function.Consumer;
2223

@@ -235,4 +236,48 @@ public SubscriptionHandle subscribeTopic(@NonNull TopicId topicId, @NonNull Cons
235236
TopicMessageResult result = client.executeTopicMessageQuery(request);
236237
return result.subscriptionHandle();
237238
}
239+
240+
@Override
241+
public SubscriptionHandle subscribeTopic(@NonNull TopicId topicId, @NonNull Consumer<TopicMessage> subscription,
242+
Instant startTime, Instant endTime) throws HieroException {
243+
Objects.requireNonNull(topicId, "topicId must not be null");
244+
Objects.requireNonNull(subscription, "subscription must not be null");
245+
Objects.requireNonNull(startTime, "startTime must not be null");
246+
Objects.requireNonNull(endTime, "endTime must not be null");
247+
248+
if (startTime.isBefore(Instant.now())) {
249+
throw new IllegalArgumentException("startTime must be greater than currentTime");
250+
}
251+
if (endTime.isBefore(startTime)) {
252+
throw new IllegalArgumentException("endTime must be greater than starTime");
253+
}
254+
255+
TopicMessageRequest request = TopicMessageRequest.of(topicId, subscription, startTime, endTime);
256+
TopicMessageResult result = client.executeTopicMessageQuery(request);
257+
return result.subscriptionHandle();
258+
}
259+
260+
@Override
261+
public SubscriptionHandle subscribeTopic(@NonNull TopicId topicId, @NonNull Consumer<TopicMessage> subscription,
262+
@NonNull Instant startTime, @NonNull Instant endTime, long limit)
263+
throws HieroException {
264+
Objects.requireNonNull(topicId, "topicId must not be null");
265+
Objects.requireNonNull(subscription, "subscription must not be null");
266+
Objects.requireNonNull(startTime, "startTime must not be null");
267+
Objects.requireNonNull(endTime, "endTime must not be null");
268+
269+
if (startTime.isBefore(Instant.now())) {
270+
throw new IllegalArgumentException("startTime must be greater than currentTime");
271+
}
272+
if (endTime.isBefore(startTime)) {
273+
throw new IllegalArgumentException("endTime must be greater than starTime");
274+
}
275+
if (limit == 0) {
276+
throw new IllegalArgumentException("limit must be greater than 0");
277+
}
278+
279+
TopicMessageRequest request = TopicMessageRequest.of(topicId, subscription, startTime, endTime, limit);
280+
TopicMessageResult result = client.executeTopicMessageQuery(request);
281+
return result.subscriptionHandle();
282+
}
238283
}

hiero-enterprise-base/src/main/java/com/openelements/hiero/base/protocol/data/TopicMessageRequest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,16 @@ public static TopicMessageRequest of(@NonNull TopicId topicId, @NonNull Consumer
3232
@NonNull long limit) {
3333
return new TopicMessageRequest(topicId, subscription, null, null,limit, null, null);
3434
}
35+
36+
@NonNull
37+
public static TopicMessageRequest of(@NonNull TopicId topicId, @NonNull Consumer<TopicMessage> subscription,
38+
@NonNull Instant startTime, @NonNull Instant endTime) {
39+
return new TopicMessageRequest(topicId, subscription, startTime, endTime, NO_LIMIT, null, null);
40+
}
41+
42+
@NonNull
43+
public static TopicMessageRequest of(@NonNull TopicId topicId, @NonNull Consumer<TopicMessage> subscription,
44+
@NonNull Instant startTime, @NonNull Instant endTime, long limit) {
45+
return new TopicMessageRequest(topicId, subscription, startTime, endTime, limit, null, null);
46+
}
3547
}

hiero-enterprise-spring/src/test/java/com/openelements/hiero/spring/test/TopicClientTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@
77
import com.openelements.hiero.base.TopicClient;
88
import com.openelements.hiero.test.HieroTestUtils;
99
import org.junit.jupiter.api.Assertions;
10+
import org.junit.jupiter.api.Disabled;
1011
import org.junit.jupiter.api.Test;
1112
import org.springframework.beans.factory.annotation.Autowired;
1213
import org.springframework.boot.test.context.SpringBootTest;
1314

15+
import java.time.Instant;
1416
import java.util.ArrayList;
1517
import java.util.List;
1618

@@ -263,4 +265,27 @@ void testSubscribeTopicWithInvalidLimit() throws HieroException {
263265

264266
Assertions.assertEquals(msg, e.getMessage());
265267
}
268+
269+
@Test
270+
@Disabled
271+
// To fix
272+
void testSubscribeTopicWithStartAndEndTime() throws HieroException {
273+
final TopicId topicId = topicClient.createTopic();
274+
hieroTestUtils.waitForMirrorNodeRecords();
275+
276+
final Instant startTime = Instant.now().plusSeconds(60);
277+
final Instant endTime = startTime.plusSeconds(120);
278+
279+
final List<String> messages = new ArrayList<>();
280+
281+
final SubscriptionHandle handler = topicClient.subscribeTopic(topicId, (message) -> {
282+
messages.add(new String(message.contents));
283+
}, startTime, endTime);
284+
285+
topicClient.submitMessage(topicId, "Hello Hiero");
286+
hieroTestUtils.waitForMirrorNodeRecords();
287+
288+
Assertions.assertNotNull(handler);
289+
Assertions.assertEquals(1, messages.size());
290+
}
266291
}

0 commit comments

Comments
 (0)