Skip to content

Commit 2050ee1

Browse files
authored
mock: buffered producer (#196)
1 parent 95a6696 commit 2050ee1

File tree

9 files changed

+350
-74
lines changed

9 files changed

+350
-74
lines changed

client/src/main/java/io/hstream/util/GrpcUtils.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,21 @@ public static Stream streamFromGrpc(io.hstream.internal.Stream stream) {
9898
.build();
9999
}
100100

101+
public static CompressionType compressionTypeFromGrpc(
102+
io.hstream.internal.CompressionType compressionType) {
103+
switch (compressionType) {
104+
case None:
105+
return CompressionType.NONE;
106+
case Gzip:
107+
return CompressionType.GZIP;
108+
case Zstd:
109+
return CompressionType.ZSTD;
110+
case UNRECOGNIZED:
111+
throw new IllegalArgumentException();
112+
}
113+
throw new IllegalArgumentException();
114+
}
115+
101116
public static io.hstream.internal.ShardOffset streamShardOffsetToGrpc(
102117
StreamShardOffset shardOffset) {
103118
if (shardOffset.isSpecialOffset()) {

client/src/main/kotlin/io/hstream/impl/HStreamClientKtImpl.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ class HStreamClientKtImpl(
117117
return HStreamApiGrpcKt.HStreamApiCoroutineStub(channelProvider.get(url)).withDeadlineAfter(timeoutMs, TimeUnit.MILLISECONDS)
118118
}
119119

120+
120121
init {
121122
if (channelProvider == null) {
122123
this.channelProvider = ChannelProviderImpl(credentials)

client/src/test/java/io/hstream/impl/ConsumerKtImplTest.kt

Lines changed: 1 addition & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,7 @@ import com.google.common.util.concurrent.Service
44
import io.hstream.HStreamClient
55
import io.hstream.Subscription
66
import io.hstream.buildMockedClient
7-
import org.junit.jupiter.api.Disabled
87
import org.junit.jupiter.api.Test
9-
import org.junit.jupiter.api.assertDoesNotThrow
108
import org.junit.runner.RunWith
119
import org.mockito.junit.MockitoJUnitRunner
1210
import java.util.concurrent.CompletableFuture
@@ -15,60 +13,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor
1513
@RunWith(MockitoJUnitRunner::class)
1614
class ConsumerKtImplTest {
1715

18-
@Disabled("FIXME")
19-
@Test
20-
fun testCreateConsumerOnNonExistedSubscriptionIdShouldFailed() {
21-
val client: HStreamClient = buildMockedClient()
22-
val future = CompletableFuture<Unit>()
23-
val consumer = client.newConsumer()
24-
.subscription("some_sub")
25-
.hRecordReceiver { record, ackSender ->
26-
assert(record != null)
27-
ackSender.ack()
28-
}
29-
.build()
30-
31-
val threadPool = ScheduledThreadPoolExecutor(1)
32-
consumer.addListener(
33-
object : Service.Listener() {
34-
35-
override fun starting() {
36-
println("Listener is starting")
37-
}
38-
39-
override fun running() {
40-
println("Listener is running")
41-
}
42-
43-
override fun failed(from: Service.State, failure: Throwable) {
44-
println("failed: $from")
45-
future.completeExceptionally(failure)
46-
}
47-
48-
override fun terminated(from: Service.State) {
49-
println("Listener is terminated")
50-
}
51-
},
52-
threadPool
53-
)
54-
55-
consumer.startAsync().awaitRunning()
56-
Thread.sleep(1000)
57-
consumer.stopAsync().awaitTerminated()
58-
59-
assertDoesNotThrow {
60-
consumer.failureCause()
61-
}
62-
63-
if (!future.isCompletedExceptionally) {
64-
future.complete(Unit)
65-
}
66-
67-
if (future.isCompletedExceptionally) {
68-
future.get()
69-
}
70-
}
71-
7216
@Test
7317
fun testConsumerKtImplTestBasic() {
7418
val future = CompletableFuture<Unit>()
@@ -103,7 +47,7 @@ class ConsumerKtImplTest {
10347
}
10448

10549
if (future.isCompletedExceptionally) {
106-
future.get()
50+
future.join()
10751
}
10852
}
10953
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package io.hstream
2+
3+
import io.grpc.stub.StreamObserver
4+
import io.hstream.internal.AppendRequest
5+
import io.hstream.internal.AppendResponse
6+
import io.hstream.internal.BatchedRecord
7+
import io.hstream.internal.HStreamApiGrpc
8+
import io.hstream.internal.LookupShardRequest
9+
import io.hstream.internal.LookupShardResponse
10+
import io.hstream.internal.RecordId
11+
import kotlinx.coroutines.channels.Channel
12+
import kotlinx.coroutines.channels.trySendBlocking
13+
import java.util.concurrent.atomic.AtomicLong
14+
import kotlin.random.Random
15+
16+
class BlackBoxSinkHServerMock(
17+
hMetaMockCluster: HMetaMock,
18+
private val serverName: String,
19+
) : HServerMock(
20+
hMetaMockCluster,
21+
serverName
22+
) {
23+
val delayTimeMs: AtomicLong = AtomicLong(0)
24+
val streamNameFlushChannelMap: MutableMap<String, Channel<Pair<List<RecordId>, BatchedRecord>>> = mutableMapOf()
25+
26+
override fun append(request: AppendRequest?, responseObserver: StreamObserver<AppendResponse>?) {
27+
val sleepTimeMs = delayTimeMs.get()
28+
if (sleepTimeMs != 0L) {
29+
io.hstream.impl.logger.info("append: sleep for $sleepTimeMs ms")
30+
Thread.sleep(
31+
sleepTimeMs
32+
)
33+
}
34+
35+
val streamName = request!!.streamName
36+
val channel = this.streamNameFlushChannelMap.getOrPut(streamName) {
37+
Channel(1000)
38+
}
39+
40+
val recordIds = (1..request.records.batchSize).map {
41+
RecordId.newBuilder()
42+
.setShardId(Random.nextLong())
43+
.setBatchId(Random.nextLong())
44+
.setBatchIndex(it).build()
45+
}
46+
47+
val sendResult = channel.trySendBlocking(Pair(recordIds, request.records))
48+
if (sendResult.isClosed || sendResult.isFailure) {
49+
io.hstream.impl.logger.error(sendResult.toString())
50+
responseObserver!!.onError(
51+
io.grpc.Status.INTERNAL.asException()
52+
)
53+
}
54+
55+
val shardId = request.shardId
56+
responseObserver!!.onNext(
57+
AppendResponse.newBuilder()
58+
.setStreamName(streamName)
59+
.setShardId(shardId)
60+
.addAllRecordIds(recordIds)
61+
.build()
62+
)
63+
responseObserver.onCompleted()
64+
}
65+
66+
override fun lookupShard(request: LookupShardRequest?, responseObserver: StreamObserver<LookupShardResponse>?) {
67+
val shardId = request!!.shardId
68+
69+
responseObserver!!.onNext(
70+
LookupShardResponse.newBuilder()
71+
.setShardId(shardId)
72+
.setServerNode(serverNameToServerNode(serverName))
73+
.build()
74+
)
75+
76+
responseObserver.onCompleted()
77+
}
78+
}
79+
80+
class BlackBoxSinkHServerMockController(
81+
private val streamNameFlushChannelMap: MutableMap<String, Channel<Pair<List<RecordId>, BatchedRecord>>>,
82+
private val delayTimeMs: AtomicLong,
83+
) {
84+
fun getStreamNameFlushChannel(streamName: String): Channel<Pair<List<RecordId>, BatchedRecord>> {
85+
return this.streamNameFlushChannelMap.getOrPut(streamName) {
86+
Channel(1000)
87+
}
88+
}
89+
90+
fun setDelayTimeMs(newValue: Long) {
91+
this.delayTimeMs.set(newValue)
92+
}
93+
}
94+
95+
fun buildBlackBoxSinkClient_(): Pair<HStreamClient, Pair<BlackBoxSinkHServerMockController, MockedChannelProvider>> {
96+
val xs = buildMockedClient_(
97+
BlackBoxSinkHServerMock::class.java as Class<HStreamApiGrpc.HStreamApiImplBase>
98+
)
99+
val client = xs.first
100+
val server = xs.second.first as BlackBoxSinkHServerMock
101+
val controller = BlackBoxSinkHServerMockController(
102+
server.streamNameFlushChannelMap,
103+
server.delayTimeMs
104+
)
105+
return Pair(client, Pair(controller, xs.second.second))
106+
}
107+
108+
fun buildBlackBoxSinkClient(): HStreamClient {
109+
return buildBlackBoxSinkClient_().first
110+
}

client/src/test/kotlin/io/hstream/BlackBoxSourceHServerMock.kt

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -182,19 +182,22 @@ class BlackBoxSourceHServerMockController(
182182
}
183183
}
184184

185-
fun buildBlackBoxSourceClient_(): Pair<HStreamClient, BlackBoxSourceHServerMockController> {
185+
fun buildBlackBoxSourceClient_(): Pair<HStreamClient, Pair<BlackBoxSourceHServerMockController, MockedChannelProvider >> {
186186
val xs = buildMockedClient_(
187187
BlackBoxSourceHServerMock::class.java as Class<HStreamApiGrpc.HStreamApiImplBase>
188188
)
189-
val serverImpl: BlackBoxSourceHServerMock = (xs.second) as BlackBoxSourceHServerMock
189+
val serverImpl: BlackBoxSourceHServerMock = (xs.second.first) as BlackBoxSourceHServerMock
190190
val channel = serverImpl.getConsumerNameChannelMap()
191191
return Pair(
192192
xs.first,
193-
BlackBoxSourceHServerMockController(
194-
channel,
195-
serverImpl.getShouldCloseAllSubscriptions(),
196-
serverImpl.getSendInterval(),
197-
serverImpl.getSendBatchLen()
193+
Pair(
194+
BlackBoxSourceHServerMockController(
195+
channel,
196+
serverImpl.getShouldCloseAllSubscriptions(),
197+
serverImpl.getSendInterval(),
198+
serverImpl.getSendBatchLen()
199+
),
200+
xs.second.second
198201
)
199202
)
200203
}
@@ -250,9 +253,9 @@ class BlackBoxSourceHServerMockTests {
250253
countDownLatch.await()
251254
consumer.stopAsync().awaitTerminated()
252255

253-
val channel = xs.second.getAckChannel(consumerName)
256+
val channel = xs.second.first.getAckChannel(consumerName)
254257
val channelAcc = mutableListOf<RecordId>()
255-
xs.second.closeAllSubscriptions()
258+
xs.second.first.closeAllSubscriptions()
256259
assert(!channel.isEmpty)
257260
while (!channel.isEmpty
258261
) {

client/src/test/kotlin/io/hstream/HServerMock.kt

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory
3939
import java.net.URI
4040
import java.util.concurrent.CompletionException
4141
import java.util.concurrent.atomic.AtomicBoolean
42+
import java.util.concurrent.atomic.AtomicLong
4243
import kotlin.random.Random
4344

4445
open class HServerMock(
@@ -270,7 +271,8 @@ open class HServerMock(
270271
.addShards(
271272
Shard.newBuilder()
272273
.setStreamName(streamName)
273-
// TODO: add more fields
274+
.setStartHashRangeKey("000000000000000000000000000000000000000")
275+
.setEndHashRangeKey("999999999999999999999999999999999999999")
274276
.build()
275277
)
276278
.build()
@@ -328,7 +330,17 @@ fun trimServerUrlToServerName(serverUrl: String): Pair<UrlSchema, String> {
328330
}
329331

330332
class MockedChannelProvider(private val grpcCleanupRule: GrpcCleanupRule) : ChannelProviderImpl() {
333+
private val delayTimeMs: AtomicLong = AtomicLong(0)
334+
fun setDelayTimeMs(newValue: Long) {
335+
this.delayTimeMs.set(newValue)
336+
}
337+
331338
override fun get(serverUrl: String): ManagedChannel {
339+
val delayTime = delayTimeMs.get()
340+
if (delayTime != 0L) {
341+
Thread.sleep(delayTime)
342+
}
343+
332344
try {
333345
val serverUri = URI(serverUrl)
334346
if (serverUri.host == null && serverUri.port == -1) {
@@ -348,7 +360,7 @@ class HServerMockTests {
348360
val grpcCleanupRule = GrpcCleanupRule()
349361

350362
val hostname = "127.0.0." + randPort()
351-
val port = 6570
363+
val port = 6000 + randPort()
352364
val serverUrl = "hstream://$hostname:$port"
353365
val hMetaMockCluster = HMetaMock()
354366
startMockedHServer(grpcCleanupRule, mockServiceImpl(hMetaMockCluster, serverUrl), hMetaMockCluster, serverUrl)
@@ -362,7 +374,7 @@ class HServerMockTests {
362374
val grpcCleanupRule = GrpcCleanupRule()
363375

364376
val hostname = "127.0.0." + randPort()
365-
val port = 6570
377+
val port = 6000 + randPort()
366378
val serverUrl = "hstream://$hostname:$port"
367379
val hMetaMockCluster = HMetaMock()
368380
startMockedHServer(grpcCleanupRule, mockServiceImpl(hMetaMockCluster, serverUrl), hMetaMockCluster, serverUrl)
@@ -442,7 +454,7 @@ private fun randPort(): Int {
442454
return Random.nextInt(256)
443455
}
444456

445-
fun buildMockedClient_(hServerMock: Class<HStreamApiGrpc.HStreamApiImplBase>): Pair<HStreamClient, HStreamApiGrpc.HStreamApiImplBase> {
457+
fun buildMockedClient_(hServerMock: Class<HStreamApiGrpc.HStreamApiImplBase>): Pair<HStreamClient, Pair< HStreamApiGrpc.HStreamApiImplBase, MockedChannelProvider>> {
446458
val grpcCleanupRule = GrpcCleanupRule()
447459
// TODO: AutoCloseable?
448460

@@ -458,7 +470,7 @@ fun buildMockedClient_(hServerMock: Class<HStreamApiGrpc.HStreamApiImplBase>): P
458470
clientBuilder.serviceUrl(serverUrl)
459471
clientBuilder.channelProvider(channelProvider)
460472

461-
return Pair(clientBuilder.build() as HStreamClientKtImpl, serverImpl)
473+
return Pair(clientBuilder.build() as HStreamClientKtImpl, Pair(serverImpl, channelProvider))
462474
}
463475

464476
fun buildMockedClient(hServerMock: Class<HStreamApiGrpc.HStreamApiImplBase>): HStreamClient {

0 commit comments

Comments
 (0)