Skip to content

Commit 0cea064

Browse files
authored
mock impl: use blackbox class (#186)
1 parent 81124c1 commit 0cea064

File tree

13 files changed

+974
-48
lines changed

13 files changed

+974
-48
lines changed

client/src/main/java/io/hstream/util/RecordUtils.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,9 @@ private static List<ReceivedHStreamRecord> parseBatchHStreamRecords(
137137
try {
138138
BatchHStreamRecords batchHStreamRecords = BatchHStreamRecords.parseFrom(byteString);
139139
List<HStreamRecord> hStreamRecords = batchHStreamRecords.getRecordsList();
140-
checkArgument(receivedRecord.getRecordIdsCount() == hStreamRecords.size());
140+
checkArgument(
141+
receivedRecord.getRecordIdsCount() == hStreamRecords.size(),
142+
"decode HStreamRecord error: invalid batched records from server, the `RecordIdsCount` should equals to `BatchHStreamRecords.size`");
141143

142144
List<ReceivedHStreamRecord> receivedHStreamRecords = new ArrayList<>(hStreamRecords.size());
143145
for (int i = 0; i < hStreamRecords.size(); ++i) {

client/src/main/kotlin/io/hstream/impl/ConsumerKtImpl.kt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,11 +74,15 @@ class ConsumerKtImpl(
7474

7575
if (!isRunning) return
7676

77+
logger.debug("begin lookupSubscription for $subscriptionId")
7778
val server: String = try {
7879
lookupSubscription()
7980
} catch (e: Throwable) {
8081
logger.error("lookupSubscription error: ${e.message}")
81-
notifyFailed(e)
82+
if (isRunning) {
83+
notifyFailed(e)
84+
}
85+
stopAsync()
8286
return
8387
}
8488

client/src/main/kotlin/io/hstream/impl/HStreamClientKtImpl.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,7 @@ class HStreamClientKtImpl(
413413
}
414414
}
415415

416-
private final suspend fun lookupSubscriptionServerUrl(subscriptionId: String?): String {
416+
private suspend fun lookupSubscriptionServerUrl(subscriptionId: String?): String {
417417
return unaryCallCoroutine {
418418
val req: LookupSubscriptionRequest =
419419
LookupSubscriptionRequest

client/src/main/kotlin/io/hstream/impl/ResponderImpl.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,9 @@ class AckSender(
4545
throw HStreamDBClientException("ackSender is Closed")
4646
}
4747
if (ackAgeLimit > 0 && buffer.isEmpty() && bufferSize > 1) {
48-
pendingFlushFuture = scheduler!!.schedule({ flush() }, ackAgeLimit, TimeUnit.MILLISECONDS)
48+
pendingFlushFuture = scheduler!!.schedule({
49+
flush()
50+
}, ackAgeLimit, TimeUnit.MILLISECONDS)
4951
}
5052
buffer.add(recordId)
5153
if (buffer.size >= bufferSize) {

client/src/main/kotlin/io/hstream/impl/Utils.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,12 +113,12 @@ suspend fun <Resp> unaryCallCoroutine(
113113

114114
logger.debug("unary rpc with urls [{}]", urls)
115115

116-
try {
117-
return call(HStreamApiCoroutineStub(channelProvider.get(urls[0])).withDeadlineAfter(timeoutMs, TimeUnit.MILLISECONDS))
116+
return try {
117+
call(HStreamApiCoroutineStub(channelProvider.get(urls[0])).withDeadlineAfter(timeoutMs, TimeUnit.MILLISECONDS))
118118
} catch (e: StatusException) {
119-
return handleGRPCException(urls, e)
119+
handleGRPCException(urls, e)
120120
} catch (e: StatusRuntimeException) {
121-
return handleGRPCException(urls, e)
121+
handleGRPCException(urls, e)
122122
}
123123
}
124124

client/src/test/java/io/hstream/RecordUtilsTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ private void testCompressRecords(CompressionType compressionType) {
106106
.build();
107107
var receivedHStreamRecords = RecordUtils.decompress(receivedRecord);
108108
for (var receivedHStreamRecord : receivedHStreamRecords) {
109-
Assertions.assertTrue(receivedHStreamRecord.getRecord().getPayload().equals(recordPayload));
109+
Assertions.assertEquals(receivedHStreamRecord.getRecord().getPayload(), recordPayload);
110110
}
111111
}
112112
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package io.hstream.impl
2+
3+
import com.google.common.util.concurrent.Service
4+
import io.hstream.HStreamClient
5+
import io.hstream.Subscription
6+
import io.hstream.buildMockedClient
7+
import org.junit.jupiter.api.Disabled
8+
import org.junit.jupiter.api.Test
9+
import org.junit.jupiter.api.assertDoesNotThrow
10+
import org.junit.runner.RunWith
11+
import org.mockito.junit.MockitoJUnitRunner
12+
import java.util.concurrent.CompletableFuture
13+
import java.util.concurrent.ScheduledThreadPoolExecutor
14+
15+
@RunWith(MockitoJUnitRunner::class)
16+
class ConsumerKtImplTest {
17+
18+
@Disabled("FIXME")
19+
@Test
20+
fun testCreateConsumerOnNonExistedSubscriptionIdShouldFailed() {
21+
val client: HStreamClient = buildMockedClient()
22+
val future = CompletableFuture<Unit>()
23+
val consumer = client.newConsumer()
24+
.subscription("some_sub")
25+
.hRecordReceiver { record, ackSender ->
26+
assert(record != null)
27+
ackSender.ack()
28+
}
29+
.build()
30+
31+
val threadPool = ScheduledThreadPoolExecutor(1)
32+
consumer.addListener(
33+
object : Service.Listener() {
34+
35+
override fun starting() {
36+
println("Listener is starting")
37+
}
38+
39+
override fun running() {
40+
println("Listener is running")
41+
}
42+
43+
override fun failed(from: Service.State, failure: Throwable) {
44+
println("failed: $from")
45+
future.completeExceptionally(failure)
46+
}
47+
48+
override fun terminated(from: Service.State) {
49+
println("Listener is terminated")
50+
}
51+
},
52+
threadPool
53+
)
54+
55+
consumer.startAsync().awaitRunning()
56+
Thread.sleep(1000)
57+
consumer.stopAsync().awaitTerminated()
58+
59+
assertDoesNotThrow {
60+
consumer.failureCause()
61+
}
62+
63+
if (!future.isCompletedExceptionally) {
64+
future.complete(Unit)
65+
}
66+
67+
if (future.isCompletedExceptionally) {
68+
future.get()
69+
}
70+
}
71+
72+
@Test
73+
fun testConsumerKtImplTestBasic() {
74+
val future = CompletableFuture<Unit>()
75+
val client: HStreamClient = buildMockedClient()
76+
val streamName = "some_stream"
77+
client.createStream(streamName)
78+
val subId = "some_sub"
79+
client.createSubscription(Subscription.newBuilder().offset(Subscription.SubscriptionOffset.EARLIEST).subscription(subId).stream(streamName).build())
80+
val consumer = client.newConsumer()
81+
.subscription(subId)
82+
.hRecordReceiver { record, ackSender ->
83+
assert(record != null)
84+
ackSender.ack()
85+
}
86+
.build()
87+
val threadPool = ScheduledThreadPoolExecutor(1)
88+
consumer.addListener(
89+
object : Service.Listener() {
90+
override fun failed(from: Service.State, failure: Throwable) {
91+
println("failed: $from")
92+
future.completeExceptionally(failure)
93+
}
94+
},
95+
threadPool
96+
)
97+
consumer.startAsync().awaitRunning()
98+
Thread.sleep(1000)
99+
consumer.stopAsync().awaitTerminated()
100+
101+
if (!future.isCompletedExceptionally) {
102+
future.complete(Unit)
103+
}
104+
105+
if (future.isCompletedExceptionally) {
106+
future.get()
107+
}
108+
}
109+
}

client/src/test/java/io/hstream/impl/ProducerBuilderImplTest.java

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import static org.junit.jupiter.api.Assertions.assertThrows;
55
import static org.mockito.Mockito.mock;
66

7-
import io.hstream.HStreamClient;
87
import io.hstream.Producer;
98
import org.junit.jupiter.api.Disabled;
109
import org.junit.jupiter.api.Test;
@@ -17,27 +16,23 @@ public class ProducerBuilderImplTest {
1716

1817
@Test
1918
public void testBuildWithValidInput() {
20-
HStreamClient client = mock(HStreamClientKtImpl.class);
21-
ProducerBuilderImpl builder = new ProducerBuilderImpl((HStreamClientKtImpl) client);
19+
HStreamClientKtImpl client = mock(HStreamClientKtImpl.class);
20+
ProducerBuilderImpl builder = new ProducerBuilderImpl(client);
2221
Producer producer = builder.stream("test-stream").requestTimeoutMs(1000).build();
2322
assertNotNull(producer);
2423
}
2524

2625
@Test
2726
public void testBuildWithMissingStreamName() {
28-
HStreamClient client = mock(HStreamClientKtImpl.class);
29-
ProducerBuilderImpl builder = new ProducerBuilderImpl((HStreamClientKtImpl) client);
30-
assertThrows(
31-
NullPointerException.class,
32-
() -> {
33-
builder.requestTimeoutMs(1000).build();
34-
});
27+
HStreamClientKtImpl client = mock(HStreamClientKtImpl.class);
28+
ProducerBuilderImpl builder = new ProducerBuilderImpl(client);
29+
assertThrows(NullPointerException.class, () -> builder.requestTimeoutMs(1000).build());
3530
}
3631

3732
@Test
3833
public void testBuildWithInvalidRequestTimeoutMs() {
39-
HStreamClient client = mock(HStreamClientKtImpl.class);
40-
ProducerBuilderImpl builder = new ProducerBuilderImpl((HStreamClientKtImpl) client);
34+
HStreamClientKtImpl client = mock(HStreamClientKtImpl.class);
35+
ProducerBuilderImpl builder = new ProducerBuilderImpl(client);
4136
assertThrows(
4237
IllegalArgumentException.class,
4338
() -> {
@@ -50,23 +45,21 @@ public void testBuildWithNullClient() {
5045
ProducerBuilderImpl builder = new ProducerBuilderImpl(null);
5146
assertThrows(
5247
NullPointerException.class,
53-
() -> {
54-
builder.stream("test-stream").requestTimeoutMs(1000).build();
55-
});
48+
() -> builder.stream("test-stream").requestTimeoutMs(1000).build());
5649
}
5750

5851
@Test
5952
public void testBuildWithDefaultRequestTimeoutMs() {
60-
HStreamClient client = mock(HStreamClientKtImpl.class);
61-
ProducerBuilderImpl builder = new ProducerBuilderImpl((HStreamClientKtImpl) client);
53+
HStreamClientKtImpl client = mock(HStreamClientKtImpl.class);
54+
ProducerBuilderImpl builder = new ProducerBuilderImpl(client);
6255
Producer producer = builder.stream("test-stream").build();
6356
assertNotNull(producer);
6457
}
6558

6659
@Test
6760
public void testBuildWithMaxRequestTimeoutMs() {
68-
HStreamClient client = mock(HStreamClientKtImpl.class);
69-
ProducerBuilderImpl builder = new ProducerBuilderImpl((HStreamClientKtImpl) client);
61+
HStreamClientKtImpl client = mock(HStreamClientKtImpl.class);
62+
ProducerBuilderImpl builder = new ProducerBuilderImpl(client);
7063
Producer producer = builder.stream("test-stream").requestTimeoutMs(Long.MAX_VALUE).build();
7164
assertNotNull(producer);
7265
}

0 commit comments

Comments
 (0)