Skip to content

Commit cf24f28

Browse files
authored
Support time and bytes size based flush strategy (#61)
1 parent 65c131a commit cf24f28

File tree

10 files changed

+362
-152
lines changed

10 files changed

+362
-152
lines changed
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package io.hstream;
2+
3+
import java.io.Closeable;
4+
5+
/** The interface for the HStream BuffetedProducer. */
6+
public interface BufferedProducer extends Producer, Closeable {
7+
8+
/** explicitly flush buffered records. */
9+
void flush();
10+
11+
/** closes the buffered producer, will call flush() first */
12+
void close();
13+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package io.hstream;
2+
3+
/** A builder for {@link BufferedProducer}s. */
4+
public interface BufferedProducerBuilder {
5+
6+
BufferedProducerBuilder stream(String streamName);
7+
8+
BufferedProducerBuilder recordCountLimit(int recordCountLimit);
9+
10+
BufferedProducerBuilder flushIntervalMs(long flushIntervalMs);
11+
12+
BufferedProducerBuilder maxBytesSize(int maxBytesSize);
13+
14+
BufferedProducerBuilder throwExceptionIfFull(boolean throwExceptionIfFull);
15+
16+
BufferedProducer build();
17+
}

client/src/main/java/io/hstream/HStreamClient.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ static HStreamClientBuilder builder() {
1414
/** @return a {@link ProducerBuilder} */
1515
ProducerBuilder newProducer();
1616

17+
BufferedProducerBuilder newBufferedProducer();
18+
1719
/** @return a {@link ConsumerBuilder} */
1820
ConsumerBuilder newConsumer();
1921

client/src/main/java/io/hstream/ProducerBuilder.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,5 @@ public interface ProducerBuilder {
55

66
ProducerBuilder stream(String streamName);
77

8-
ProducerBuilder enableBatch();
9-
10-
ProducerBuilder recordCountLimit(int recordCountLimit);
11-
128
Producer build();
139
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package io.hstream.impl;
2+
3+
import io.hstream.BufferedProducer;
4+
import io.hstream.BufferedProducerBuilder;
5+
import io.hstream.HStreamDBClientException;
6+
7+
public class BufferedProducerBuilderImpl implements BufferedProducerBuilder {
8+
9+
private String streamName;
10+
private int recordCountLimit = 100;
11+
private long flushIntervalMs = 100;
12+
private int maxBytesSize = 4096;
13+
private boolean throwExceptionIfFull = false;
14+
15+
@Override
16+
public BufferedProducerBuilder stream(String streamName) {
17+
this.streamName = streamName;
18+
return this;
19+
}
20+
21+
/** @param recordCountLimit default value is 100, it can NOT be less than 1 */
22+
@Override
23+
public BufferedProducerBuilder recordCountLimit(int recordCountLimit) {
24+
this.recordCountLimit = recordCountLimit;
25+
return this;
26+
}
27+
28+
/**
29+
* @param flushIntervalMs default value is 100ms, if flushIntervalMs <= 0, disables timed based
30+
* flush strategy.
31+
*/
32+
@Override
33+
public BufferedProducerBuilder flushIntervalMs(long flushIntervalMs) {
34+
this.flushIntervalMs = flushIntervalMs;
35+
return this;
36+
}
37+
38+
/**
39+
* @param maxBytesSize default value is 4K(4096), if maxBytesSize <= 0, does not limit bytes size
40+
*/
41+
@Override
42+
public BufferedProducerBuilder maxBytesSize(int maxBytesSize) {
43+
this.maxBytesSize = maxBytesSize;
44+
return this;
45+
}
46+
47+
@Override
48+
public BufferedProducerBuilder throwExceptionIfFull(boolean throwExceptionIfFull) {
49+
this.throwExceptionIfFull = throwExceptionIfFull;
50+
return this;
51+
}
52+
53+
@Override
54+
public BufferedProducer build() {
55+
if (recordCountLimit < 1) {
56+
throw new HStreamDBClientException(
57+
String.format(
58+
"build buffedProducer failed, recordCountLimit(%d) can NOT be less than 1",
59+
recordCountLimit));
60+
}
61+
return new BufferedProducerKtImpl(
62+
streamName, recordCountLimit, flushIntervalMs, maxBytesSize, throwExceptionIfFull);
63+
}
64+
}

client/src/main/java/io/hstream/impl/ProducerBuilderImpl.java

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -4,47 +4,20 @@
44

55
import io.hstream.Producer;
66
import io.hstream.ProducerBuilder;
7-
import java.util.List;
8-
import java.util.concurrent.atomic.AtomicReference;
97

108
public class ProducerBuilderImpl implements ProducerBuilder {
119

1210
private String streamName;
1311

14-
private boolean enableBatch = false;
15-
16-
private int recordCountLimit = 1;
17-
18-
private final AtomicReference<List<String>> serverUrls;
19-
private final ChannelProvider channelProvider;
20-
21-
public ProducerBuilderImpl(
22-
AtomicReference<List<String>> serverUrls, ChannelProvider channelProvider) {
23-
this.serverUrls = serverUrls;
24-
this.channelProvider = channelProvider;
25-
}
26-
2712
@Override
2813
public ProducerBuilder stream(String streamName) {
2914
this.streamName = streamName;
3015
return this;
3116
}
3217

33-
@Override
34-
public ProducerBuilder enableBatch() {
35-
this.enableBatch = true;
36-
return this;
37-
}
38-
39-
@Override
40-
public ProducerBuilder recordCountLimit(int recordCountLimit) {
41-
this.recordCountLimit = recordCountLimit;
42-
return this;
43-
}
44-
4518
@Override
4619
public Producer build() {
4720
checkNotNull(streamName);
48-
return new ProducerKtImpl(streamName, enableBatch, recordCountLimit);
21+
return new ProducerKtImpl(streamName);
4922
}
5023
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
package io.hstream.impl
2+
3+
import io.hstream.BufferedProducer
4+
import io.hstream.HStreamDBClientException
5+
import io.hstream.RecordId
6+
import io.hstream.internal.HStreamRecord
7+
import org.slf4j.LoggerFactory
8+
import java.util.concurrent.CompletableFuture
9+
import java.util.concurrent.Executors
10+
import java.util.concurrent.ScheduledFuture
11+
import java.util.concurrent.TimeUnit
12+
import java.util.concurrent.locks.ReentrantLock
13+
import kotlin.collections.ArrayList
14+
import kotlin.concurrent.withLock
15+
16+
class BufferedProducerKtImpl(
17+
stream: String,
18+
private val recordCountLimit: Int,
19+
private val flushIntervalMs: Long,
20+
private val maxBytesSize: Int,
21+
private val throwExceptionIfFull: Boolean
22+
) : ProducerKtImpl(stream), BufferedProducer {
23+
private var lock = ReentrantLock()
24+
private var recordBuffer: MutableList<HStreamRecord> = ArrayList(recordCountLimit)
25+
private var futures: MutableList<CompletableFuture<RecordId>> = ArrayList(recordCountLimit)
26+
private var timerService: ScheduledFuture<*>? = null
27+
28+
@Volatile
29+
private var closed: Boolean = false
30+
private var bufferedBytesSize: Int = 0
31+
32+
@Volatile
33+
private var isFull = false
34+
35+
init {
36+
if (flushIntervalMs > 0) {
37+
runTimer()
38+
}
39+
}
40+
41+
private fun runTimer() {
42+
timerService = scheduler.scheduleAtFixedRate(
43+
{ flush() },
44+
flushIntervalMs,
45+
flushIntervalMs,
46+
TimeUnit.MILLISECONDS
47+
)
48+
}
49+
50+
override fun writeInternal(hStreamRecord: HStreamRecord): CompletableFuture<RecordId> {
51+
return addToBuffer(hStreamRecord)
52+
}
53+
54+
private fun addToBuffer(hStreamRecord: HStreamRecord): CompletableFuture<RecordId> {
55+
// fuzzy check
56+
val recordFuture = CompletableFuture<RecordId>()
57+
if (throwExceptionIfFull && isFull) {
58+
recordFuture.completeExceptionally(HStreamDBClientException("buffer is full"))
59+
return recordFuture
60+
}
61+
lock.withLock {
62+
if (closed) {
63+
throw HStreamDBClientException("BufferedProducer is closed")
64+
}
65+
// it is impossible that buffer is full after holding the lock,
66+
// if buffer is full, there must exist another thread keeping the lock(flushing buffer).
67+
recordBuffer.add(hStreamRecord)
68+
futures.add(recordFuture)
69+
bufferedBytesSize += hStreamRecord.payload.size()
70+
if (isFull()) {
71+
isFull = true
72+
flush()
73+
}
74+
return recordFuture
75+
}
76+
}
77+
78+
private fun isFull(): Boolean {
79+
return (recordBuffer.size == recordCountLimit) || maxBytesSize > 0 && bufferedBytesSize >= maxBytesSize
80+
}
81+
82+
override fun flush() {
83+
lock.withLock {
84+
if (recordBuffer.isEmpty()) {
85+
return
86+
}
87+
try {
88+
val recordBufferCount = recordBuffer.size
89+
logger.info("ready to flush recordBuffer, current buffer size is [{}]", recordBufferCount)
90+
val ids = futureForIO { super.writeHStreamRecords(recordBuffer) }.join()
91+
for (i in ids.indices) {
92+
futures[i].complete(ids[i])
93+
}
94+
logger.info("flush the record buffer successfully")
95+
} catch (e: Throwable) {
96+
for (i in futures.indices) {
97+
futures[i].completeExceptionally(e)
98+
}
99+
}
100+
recordBuffer.clear()
101+
futures.clear()
102+
bufferedBytesSize = 0
103+
isFull = false
104+
}
105+
}
106+
107+
override fun close() {
108+
if (!closed) {
109+
timerService?.cancel(false)
110+
closed = true
111+
flush()
112+
}
113+
}
114+
115+
companion object {
116+
private val logger = LoggerFactory.getLogger(BufferedProducerKtImpl::class.java)
117+
private val scheduler = Executors.newScheduledThreadPool(4)
118+
}
119+
}

client/src/main/kotlin/io/hstream/impl/HStreamClientKtImpl.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.hstream.impl
22

33
import com.google.protobuf.Empty
4+
import io.hstream.BufferedProducerBuilder
45
import io.hstream.ConsumerBuilder
56
import io.hstream.HStreamClient
67
import io.hstream.ProducerBuilder
@@ -57,7 +58,11 @@ class HStreamClientKtImpl(bootstrapServerUrls: List<String>) : HStreamClient {
5758
}
5859

5960
override fun newProducer(): ProducerBuilder {
60-
return ProducerBuilderImpl(clusterServerUrls, channelProvider)
61+
return ProducerBuilderImpl()
62+
}
63+
64+
override fun newBufferedProducer(): BufferedProducerBuilder {
65+
return BufferedProducerBuilderImpl()
6166
}
6267

6368
override fun newConsumer(): ConsumerBuilder {

0 commit comments

Comments
 (0)