55import io .hstream .internal .AppendRequest ;
66import io .hstream .internal .AppendResponse ;
77import io .hstream .internal .HStreamApiGrpc ;
8+ import io .hstream .internal .HStreamRecord ;
89import io .hstream .util .GrpcUtils ;
910import io .hstream .util .RecordUtils ;
1011import java .util .ArrayList ;
1415import java .util .concurrent .locks .Lock ;
1516import java .util .concurrent .locks .ReentrantLock ;
1617import java .util .stream .Collectors ;
17- import java .util .stream .IntStream ;
18- import org .apache .commons .lang3 .tuple .ImmutablePair ;
1918import org .slf4j .Logger ;
2019import org .slf4j .LoggerFactory ;
2120
@@ -30,7 +29,7 @@ public class ProducerImpl implements Producer {
3029
3130 private final Semaphore semaphore ;
3231 private final Lock lock ;
33- private final List <Object > recordBuffer ;
32+ private final List <HStreamRecord > recordBuffer ;
3433 private final List <CompletableFuture <RecordId >> futures ;
3534
3635 public ProducerImpl (
@@ -57,126 +56,62 @@ public ProducerImpl(
5756 }
5857
5958 @ Override
60- public RecordId write (byte [] rawRecord ) {
61- CompletableFuture <List <RecordId >> future = writeRawRecordsAsync (List .of (rawRecord ));
62- logger .info ("wait for write future" );
63- return future .join ().get (0 );
59+ public CompletableFuture <RecordId > write (byte [] rawRecord ) {
60+ HStreamRecord hStreamRecord = RecordUtils .buildHStreamRecordFromRawRecord (rawRecord );
61+ return writeInternal (hStreamRecord );
6462 }
6563
6664 @ Override
67- public RecordId write (HRecord hRecord ) {
68- CompletableFuture < List < RecordId >> future = writeHRecordsAsync ( List . of (hRecord ) );
69- return future . join (). get ( 0 );
65+ public CompletableFuture < RecordId > write (HRecord hRecord ) {
66+ HStreamRecord hStreamRecord = RecordUtils . buildHStreamRecordFromHRecord (hRecord );
67+ return writeInternal ( hStreamRecord );
7068 }
7169
72- @ Override
73- public CompletableFuture <RecordId > writeAsync (byte [] rawRecord ) {
70+ private CompletableFuture <RecordId > writeInternal (HStreamRecord hStreamRecord ) {
7471 if (!enableBatch ) {
75- return writeRawRecordsAsync (List .of (rawRecord )).thenApply (list -> list .get (0 ));
72+ return writeHStreamRecords (List .of (hStreamRecord )).thenApply (recordIds -> recordIds .get (0 ));
7673 } else {
77- try {
78- semaphore .acquire ();
79- } catch (InterruptedException e ) {
80- throw new HStreamDBClientException (e );
81- }
82-
83- lock .lock ();
84- try {
85- CompletableFuture <RecordId > completableFuture = new CompletableFuture <>();
86- recordBuffer .add (rawRecord );
87- futures .add (completableFuture );
88-
89- if (recordBuffer .size () == recordCountLimit ) {
90- flush ();
91- }
92- return completableFuture ;
93- } finally {
94- lock .unlock ();
95- }
74+ return addToBuffer (hStreamRecord );
9675 }
9776 }
9877
99- @ Override
100- public CompletableFuture <RecordId > writeAsync (HRecord hRecord ) {
101- if (!enableBatch ) {
102- return writeHRecordsAsync (List .of (hRecord )).thenApply (list -> list .get (0 ));
103- } else {
104- try {
105- semaphore .acquire ();
106- } catch (InterruptedException e ) {
107- throw new HStreamDBClientException (e );
108- }
109-
110- lock .lock ();
111- try {
112- CompletableFuture <RecordId > completableFuture = new CompletableFuture <>();
113- recordBuffer .add (hRecord );
114- futures .add (completableFuture );
115-
116- if (recordBuffer .size () == recordCountLimit ) {
117- flush ();
118- }
119- return completableFuture ;
120- } finally {
121- lock .unlock ();
122- }
123- }
124- }
125-
126- @ Override
127- public void flush () {
128- flushSync ();
129- }
130-
131- private CompletableFuture <List <RecordId >> writeRawRecordsAsync (List <byte []> rawRecords ) {
132-
133- CompletableFuture <List <RecordId >> completableFuture = new CompletableFuture <>();
134-
135- AppendRequest appendRequest =
136- AppendRequest .newBuilder ()
137- .setStreamName (this .stream )
138- .addAllRecords (
139- rawRecords .stream ()
140- .map (rawRecord -> RecordUtils .buildHStreamRecordFromRawRecord (rawRecord ))
141- .collect (Collectors .toList ()))
142- .build ();
78+ private void flush () {
79+ lock .lock ();
80+ try {
81+ if (recordBuffer .isEmpty ()) {
82+ return ;
83+ } else {
84+ final int recordBufferCount = recordBuffer .size ();
14385
144- StreamObserver <AppendResponse > streamObserver =
145- new StreamObserver <>() {
146- @ Override
147- public void onNext (AppendResponse appendResponse ) {
148- completableFuture .complete (
149- appendResponse .getRecordIdsList ().stream ()
150- .map (GrpcUtils ::recordIdFromGrpc )
151- .collect (Collectors .toList ()));
152- }
86+ logger .info ("start flush recordBuffer, current buffer size is: {}" , recordBufferCount );
15387
154- @ Override
155- public void onError (Throwable t ) {
156- logger .error ("write rawRecord error" , t );
157- completableFuture .completeExceptionally (t );
158- }
88+ writeHStreamRecords (recordBuffer )
89+ .thenAccept (
90+ recordIds -> {
91+ for (int i = 0 ; i < recordIds .size (); ++i ) {
92+ futures .get (i ).complete (recordIds .get (i ));
93+ }
94+ })
95+ .join ();
15996
160- @ Override
161- public void onCompleted () {}
162- };
97+ recordBuffer .clear ();
98+ futures .clear ();
16399
164- grpcStub . append ( appendRequest , streamObserver );
100+ logger . info ( "finish clearing record buffer" );
165101
166- return completableFuture ;
102+ semaphore .release (recordBufferCount );
103+ }
104+ } finally {
105+ lock .unlock ();
106+ }
167107 }
168108
169- private CompletableFuture <List <RecordId >> writeHRecordsAsync (List <HRecord > hRecords ) {
109+ private CompletableFuture <List <RecordId >> writeHStreamRecords (
110+ List <HStreamRecord > hStreamRecords ) {
170111 CompletableFuture <List <RecordId >> completableFuture = new CompletableFuture <>();
171112
172113 AppendRequest appendRequest =
173- AppendRequest .newBuilder ()
174- .setStreamName (this .stream )
175- .addAllRecords (
176- hRecords .stream ()
177- .map (hRecord -> RecordUtils .buildHStreamRecordFromHRecord (hRecord ))
178- .collect (Collectors .toList ()))
179- .build ();
114+ AppendRequest .newBuilder ().setStreamName (stream ).addAllRecords (hStreamRecords ).build ();
180115
181116 StreamObserver <AppendResponse > streamObserver =
182117 new StreamObserver <>() {
@@ -202,52 +137,23 @@ public void onCompleted() {}
202137 return completableFuture ;
203138 }
204139
205- private void flushSync () {
206- lock .lock ();
140+ private CompletableFuture <RecordId > addToBuffer (HStreamRecord hStreamRecord ) {
207141 try {
208- if (recordBuffer .isEmpty ()) {
209- return ;
210- } else {
211- final int recordBufferCount = recordBuffer .size ();
212-
213- logger .info ("start flush recordBuffer, current buffer size is: {}" , recordBufferCount );
214-
215- List <ImmutablePair <Integer , byte []>> rawRecords =
216- IntStream .range (0 , recordBufferCount )
217- .filter (index -> recordBuffer .get (index ) instanceof byte [])
218- .mapToObj (index -> ImmutablePair .of (index , (byte []) (recordBuffer .get (index ))))
219- .collect (Collectors .toList ());
220-
221- List <ImmutablePair <Integer , HRecord >> hRecords =
222- IntStream .range (0 , recordBufferCount )
223- .filter (index -> recordBuffer .get (index ) instanceof HRecord )
224- .mapToObj (index -> ImmutablePair .of (index , (HRecord ) (recordBuffer .get (index ))))
225- .collect (Collectors .toList ());
226-
227- List <RecordId > rawRecordIds =
228- writeRawRecordsAsync (
229- rawRecords .stream ().map (pair -> pair .getRight ()).collect (Collectors .toList ()))
230- .join ();
231- List <RecordId > hRecordIds =
232- writeHRecordsAsync (
233- hRecords .stream ().map (ImmutablePair ::getRight ).collect (Collectors .toList ()))
234- .join ();
235-
236- IntStream .range (0 , rawRecords .size ())
237- .mapToObj (i -> ImmutablePair .of (i , rawRecords .get (i ).getLeft ()))
238- .forEach (p -> futures .get (p .getRight ()).complete (rawRecordIds .get (p .getLeft ())));
239-
240- IntStream .range (0 , hRecords .size ())
241- .mapToObj (i -> ImmutablePair .of (i , hRecords .get (i ).getLeft ()))
242- .forEach (p -> futures .get (p .getRight ()).complete (hRecordIds .get (p .getLeft ())));
243-
244- recordBuffer .clear ();
245- futures .clear ();
142+ semaphore .acquire ();
143+ } catch (InterruptedException e ) {
144+ throw new HStreamDBClientException (e );
145+ }
246146
247- logger .info ("finish clearing record buffer" );
147+ lock .lock ();
148+ try {
149+ CompletableFuture <RecordId > completableFuture = new CompletableFuture <>();
150+ recordBuffer .add (hStreamRecord );
151+ futures .add (completableFuture );
248152
249- semaphore .release (recordBufferCount );
153+ if (recordBuffer .size () == recordCountLimit ) {
154+ flush ();
250155 }
156+ return completableFuture ;
251157 } finally {
252158 lock .unlock ();
253159 }
0 commit comments