Skip to content

Commit f9af045

Browse files
authored
fix: move orderingKeyTest to 07 (#74)
1 parent 7b186ec commit f9af045

File tree

2 files changed

+40
-52
lines changed

2 files changed

+40
-52
lines changed

client/src/test/java/io/hstream/HStreamClientTest.java

Lines changed: 0 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -47,21 +47,6 @@ public static ArrayList<RecordId> doProduceAndGatherRid(
4747
return rids;
4848
}
4949

50-
public static ArrayList<RecordId> doProduceWithKey(
51-
Producer producer, int payloadSize, int recordsNums, String key) {
52-
var rids = new ArrayList<RecordId>();
53-
Random rand = new Random();
54-
byte[] rRec = new byte[payloadSize];
55-
var writes = new ArrayList<CompletableFuture<RecordId>>();
56-
for (int i = 0; i < recordsNums; i++) {
57-
rand.nextBytes(rRec);
58-
Record record = Record.newBuilder().rawRecord(rRec).key(key).build();
59-
writes.add(producer.write(record));
60-
}
61-
writes.forEach(w -> rids.add(w.join()));
62-
return rids;
63-
}
64-
6550
// @Test
6651
// public void testReceiverException() throws Exception {
6752
// Consumer consumer =
@@ -547,41 +532,4 @@ public void testWriteBatchRawRecordBasedBytesSize() throws Exception {
547532
latch.await();
548533
consumer.stopAsync().awaitTerminated();
549534
}
550-
551-
@Test
552-
@Order(8)
553-
public void testOrderingKeyBatch() throws Exception {
554-
BufferedProducer producer =
555-
client.newBufferedProducer().stream(testStreamName)
556-
.recordCountLimit(100)
557-
.flushIntervalMs(100)
558-
.build();
559-
final int count = 10;
560-
doProduceWithKey(producer, 100, count / 2, "K1");
561-
doProduceWithKey(producer, 100, count / 2, "K2");
562-
563-
logger.info("producer finish");
564-
producer.close();
565-
566-
CountDownLatch latch = new CountDownLatch(1);
567-
AtomicInteger index = new AtomicInteger();
568-
Consumer consumer =
569-
client
570-
.newConsumer()
571-
.subscription(testSubscriptionId)
572-
.rawRecordReceiver(
573-
(receivedRawRecord, responder) -> {
574-
responder.ack();
575-
index.incrementAndGet();
576-
logger.info("ack for {}, idx:{}", receivedRawRecord.getRecordId(), index.get());
577-
if (index.get() == count) {
578-
latch.countDown();
579-
}
580-
})
581-
.build();
582-
consumer.startAsync().awaitRunning();
583-
584-
latch.await();
585-
consumer.stopAsync().awaitTerminated();
586-
}
587535
}

client/src/test/java/io/hstream/HStreamClientTest07.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.util.concurrent.CompletableFuture;
1414
import java.util.concurrent.CountDownLatch;
1515
import java.util.concurrent.TimeUnit;
16+
import java.util.concurrent.atomic.AtomicInteger;
1617
import java.util.concurrent.locks.ReentrantLock;
1718
import org.junit.jupiter.api.Assertions;
1819
import org.junit.jupiter.api.Disabled;
@@ -474,4 +475,43 @@ void shardBalance() throws Exception {
474475
}
475476
}
476477
}
478+
479+
@Test
480+
public void testOrderingKeyBatch() throws Exception {
481+
HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
482+
var streamName = randStream(client);
483+
var testSubscriptionId = randSubscription(client, streamName);
484+
BufferedProducer producer =
485+
client.newBufferedProducer().stream(streamName)
486+
.recordCountLimit(100)
487+
.flushIntervalMs(100)
488+
.build();
489+
final int count = 10;
490+
doProduce(producer, 100, count / 2, "K1");
491+
doProduce(producer, 100, count / 2, "K2");
492+
493+
logger.info("producer finish");
494+
producer.close();
495+
496+
CountDownLatch latch = new CountDownLatch(1);
497+
AtomicInteger index = new AtomicInteger();
498+
Consumer consumer =
499+
client
500+
.newConsumer()
501+
.subscription(testSubscriptionId)
502+
.rawRecordReceiver(
503+
(receivedRawRecord, responder) -> {
504+
responder.ack();
505+
index.incrementAndGet();
506+
logger.info("ack for {}, idx:{}", receivedRawRecord.getRecordId(), index.get());
507+
if (index.get() == count) {
508+
latch.countDown();
509+
}
510+
})
511+
.build();
512+
consumer.startAsync().awaitRunning();
513+
514+
latch.await();
515+
consumer.stopAsync().awaitTerminated();
516+
}
477517
}

0 commit comments

Comments
 (0)