Skip to content

Commit 88d2e0a

Browse files
committed
chore: improve prepare data perf
1 parent ec5e453 commit 88d2e0a

File tree

8 files changed

+372
-26
lines changed

8 files changed

+372
-26
lines changed

ingester-example/src/main/java/io/greptime/bench/BulkWriteBenchmark.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public static void main(String[] args) throws Exception {
7575

7676
long start = System.nanoTime();
7777
for (; ; ) {
78-
Table.TableBufferRoot table = writer.tableBufferRoot();
78+
Table.TableBufferRoot table = writer.tableBufferRoot(1024);
7979
for (int i = 0; i < batchSize; i++) {
8080
if (!rows.hasNext()) {
8181
break;

ingester-example/src/main/java/io/greptime/bench/MultiProducerTableDataProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333

3434
@SPI(
3535
name = "multi_producer_table_data_provider",
36-
priority = 10 /* newer implementation can use higher priority to override the old one */)
36+
priority = 1 /* newer implementation can use higher priority to override the old one */)
3737
public class MultiProducerTableDataProvider extends RandomTableDataProvider {
3838

3939
private final int producerCount;

ingester-example/src/main/java/io/greptime/bench/RandomTableDataProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727

2828
@SPI(
2929
name = "random_table_data_provider",
30-
priority = 1 /* newer implementation can use higher priority to override the old one */)
30+
priority = 10 /* newer implementation can use higher priority to override the old one */)
3131
public class RandomTableDataProvider implements TableDataProvider {
3232

3333
private final TableSchema tableSchema;

ingester-protocol/src/main/java/io/greptime/BulkStreamWriter.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,13 @@ public interface BulkStreamWriter extends AutoCloseable {
6666
* The `TableBufferRoot` provides direct access to the underlying memory
6767
* where table data is stored for efficient bulk operations.
6868
*
69+
* @param columnBufferSize the buffer size for each column
70+
*
6971
* @see Table.TableBufferRoot
7072
*
7173
* @return a table buffer root
7274
*/
73-
Table.TableBufferRoot tableBufferRoot();
75+
Table.TableBufferRoot tableBufferRoot(int columnBufferSize);
7476

7577
/**
7678
* Writes currenttable data to the stream.

ingester-protocol/src/main/java/io/greptime/BulkWriteClient.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.concurrent.CompletableFuture;
4040
import java.util.concurrent.Executor;
4141
import java.util.concurrent.TimeUnit;
42+
import java.util.concurrent.atomic.AtomicReference;
4243
import org.apache.arrow.flight.FlightCallHeaders;
4344
import org.apache.arrow.flight.HeaderCallOption;
4445
import org.apache.arrow.vector.types.pojo.Schema;
@@ -185,6 +186,7 @@ static class DefaultBulkStreamWriter implements BulkStreamWriter {
185186
private final BulkWriteLimiter pipelineWriteLimiter;
186187
private final BulkWriteService writer;
187188
private final TableSchema tableSchema;
189+
private final AtomicReference<Table.TableBufferRoot> current = new AtomicReference<>();
188190

189191
public DefaultBulkStreamWriter(BulkWriteService writer, TableSchema tableSchema, int maxRequestsInFlight) {
190192
this.writer = writer;
@@ -193,12 +195,21 @@ public DefaultBulkStreamWriter(BulkWriteService writer, TableSchema tableSchema,
193195
}
194196

195197
@Override
196-
public Table.TableBufferRoot tableBufferRoot() {
197-
return Table.tableBufferRoot(this.tableSchema, this.writer.getRoot());
198+
public Table.TableBufferRoot tableBufferRoot(int columnBufferSize) {
199+
Table.TableBufferRoot table =
200+
Table.tableBufferRoot(this.tableSchema, this.writer.getRoot(), columnBufferSize);
201+
this.current.set(table);
202+
return table;
198203
}
199204

200205
@Override
201206
public CompletableFuture<Integer> writeNext() throws Exception {
207+
Table.TableBufferRoot table = this.current.getAndSet(null);
208+
if (table != null) {
209+
// make sure the table is completed
210+
table.complete();
211+
}
212+
202213
// Check if the stream is ready
203214
if (!isStreamReady()) {
204215
LOG.debug(

ingester-protocol/src/main/java/io/greptime/models/ArrowHelper.java

Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.greptime.v1.Common;
2323
import java.nio.charset.StandardCharsets;
2424
import java.util.ArrayList;
25+
import java.util.Iterator;
2526
import java.util.List;
2627
import org.apache.arrow.vector.BigIntVector;
2728
import org.apache.arrow.vector.BitVector;
@@ -114,6 +115,285 @@ public static Schema createSchema(TableSchema tableSchema) {
114115
return new Schema(fields);
115116
}
116117

118+
public static void addValues(
119+
FieldVector vector,
120+
int startRowIndex,
121+
Common.ColumnDataType dataType,
122+
Common.ColumnDataTypeExtension dataTypeExtension,
123+
Iterator<Object> values) {
124+
switch (dataType) {
125+
case INT8:
126+
while (values.hasNext()) {
127+
Object value = values.next();
128+
if (value == null) {
129+
vector.setNull(startRowIndex++);
130+
} else {
131+
((TinyIntVector) vector).setSafe(startRowIndex++, (int) value);
132+
}
133+
}
134+
break;
135+
case INT16:
136+
while (values.hasNext()) {
137+
Object value = values.next();
138+
if (value == null) {
139+
vector.setNull(startRowIndex++);
140+
} else {
141+
((SmallIntVector) vector).setSafe(startRowIndex++, (int) value);
142+
}
143+
}
144+
break;
145+
case INT32:
146+
while (values.hasNext()) {
147+
Object value = values.next();
148+
if (value == null) {
149+
vector.setNull(startRowIndex++);
150+
} else {
151+
((IntVector) vector).setSafe(startRowIndex++, (int) value);
152+
}
153+
}
154+
break;
155+
case INT64:
156+
while (values.hasNext()) {
157+
Object value = values.next();
158+
if (value == null) {
159+
vector.setNull(startRowIndex++);
160+
} else {
161+
((BigIntVector) vector).setSafe(startRowIndex++, (long) value);
162+
}
163+
}
164+
break;
165+
case UINT8:
166+
while (values.hasNext()) {
167+
Object value = values.next();
168+
if (value == null) {
169+
vector.setNull(startRowIndex++);
170+
} else {
171+
((UInt1Vector) vector).setSafe(startRowIndex++, (int) value);
172+
}
173+
}
174+
break;
175+
case UINT16:
176+
while (values.hasNext()) {
177+
Object value = values.next();
178+
if (value == null) {
179+
vector.setNull(startRowIndex++);
180+
} else {
181+
((UInt2Vector) vector).setSafe(startRowIndex++, (int) value);
182+
}
183+
}
184+
break;
185+
case UINT32:
186+
while (values.hasNext()) {
187+
Object value = values.next();
188+
if (value == null) {
189+
vector.setNull(startRowIndex++);
190+
} else {
191+
((UInt4Vector) vector).setSafe(startRowIndex++, ((Long) value).intValue());
192+
}
193+
}
194+
break;
195+
case UINT64:
196+
while (values.hasNext()) {
197+
Object value = values.next();
198+
if (value == null) {
199+
vector.setNull(startRowIndex++);
200+
} else {
201+
((UInt8Vector) vector).setSafe(startRowIndex++, (long) value);
202+
}
203+
}
204+
break;
205+
case FLOAT32:
206+
while (values.hasNext()) {
207+
Object value = values.next();
208+
if (value == null) {
209+
vector.setNull(startRowIndex++);
210+
} else {
211+
((Float4Vector) vector).setSafe(startRowIndex++, (float) value);
212+
}
213+
}
214+
break;
215+
case FLOAT64:
216+
while (values.hasNext()) {
217+
Object value = values.next();
218+
if (value == null) {
219+
vector.setNull(startRowIndex++);
220+
} else {
221+
((Float8Vector) vector).setSafe(startRowIndex++, (double) value);
222+
}
223+
}
224+
break;
225+
case BOOLEAN:
226+
while (values.hasNext()) {
227+
Object value = values.next();
228+
if (value == null) {
229+
vector.setNull(startRowIndex++);
230+
} else {
231+
((BitVector) vector).setSafe(startRowIndex++, (boolean) value ? 1 : 0);
232+
}
233+
}
234+
break;
235+
case BINARY:
236+
while (values.hasNext()) {
237+
Object value = values.next();
238+
if (value == null) {
239+
vector.setNull(startRowIndex++);
240+
} else {
241+
((VarBinaryVector) vector).setSafe(startRowIndex++, (byte[]) value);
242+
}
243+
}
244+
break;
245+
case STRING:
246+
while (values.hasNext()) {
247+
Object value = values.next();
248+
if (value == null) {
249+
vector.setNull(startRowIndex++);
250+
} else {
251+
((VarCharVector) vector)
252+
.setSafe(startRowIndex++, ((String) value).getBytes(StandardCharsets.UTF_8));
253+
}
254+
}
255+
break;
256+
case DATE:
257+
while (values.hasNext()) {
258+
Object value = values.next();
259+
if (value == null) {
260+
vector.setNull(startRowIndex++);
261+
} else {
262+
((DateDayVector) vector).setSafe(startRowIndex++, ValueUtil.getDateValue(value));
263+
}
264+
}
265+
break;
266+
case TIMESTAMP_SECOND: {
267+
while (values.hasNext()) {
268+
Object value = values.next();
269+
if (value == null) {
270+
vector.setNull(startRowIndex++);
271+
} else {
272+
TimeStampSecHolder holder = new TimeStampSecHolder();
273+
holder.value = ValueUtil.getTimestamp(value, java.util.concurrent.TimeUnit.SECONDS);
274+
((TimeStampSecVector) vector).setSafe(startRowIndex++, holder);
275+
}
276+
}
277+
break;
278+
}
279+
case TIMESTAMP_MILLISECOND: {
280+
while (values.hasNext()) {
281+
Object value = values.next();
282+
if (value == null) {
283+
vector.setNull(startRowIndex++);
284+
} else {
285+
TimeStampMilliHolder holder = new TimeStampMilliHolder();
286+
holder.value = ValueUtil.getTimestamp(value, java.util.concurrent.TimeUnit.MILLISECONDS);
287+
((TimeStampMilliVector) vector).setSafe(startRowIndex++, holder);
288+
}
289+
}
290+
break;
291+
}
292+
case TIMESTAMP_MICROSECOND: {
293+
while (values.hasNext()) {
294+
Object value = values.next();
295+
if (value == null) {
296+
vector.setNull(startRowIndex++);
297+
} else {
298+
TimeStampMicroHolder holder = new TimeStampMicroHolder();
299+
holder.value = ValueUtil.getTimestamp(value, java.util.concurrent.TimeUnit.MICROSECONDS);
300+
((TimeStampMicroVector) vector).setSafe(startRowIndex++, holder);
301+
}
302+
}
303+
break;
304+
}
305+
case TIMESTAMP_NANOSECOND: {
306+
while (values.hasNext()) {
307+
Object value = values.next();
308+
if (value == null) {
309+
vector.setNull(startRowIndex++);
310+
} else {
311+
TimeStampNanoHolder holder = new TimeStampNanoHolder();
312+
holder.value = ValueUtil.getTimestamp(value, java.util.concurrent.TimeUnit.NANOSECONDS);
313+
((TimeStampNanoVector) vector).setSafe(startRowIndex++, holder);
314+
}
315+
}
316+
break;
317+
}
318+
case TIME_SECOND: {
319+
while (values.hasNext()) {
320+
Object value = values.next();
321+
if (value == null) {
322+
vector.setNull(startRowIndex++);
323+
} else {
324+
TimeSecHolder holder = new TimeSecHolder();
325+
holder.value = (int) ValueUtil.getLongValue(value);
326+
((TimeSecVector) vector).setSafe(startRowIndex++, holder);
327+
}
328+
}
329+
break;
330+
}
331+
case TIME_MILLISECOND: {
332+
while (values.hasNext()) {
333+
Object value = values.next();
334+
if (value == null) {
335+
vector.setNull(startRowIndex++);
336+
} else {
337+
TimeMilliHolder holder = new TimeMilliHolder();
338+
holder.value = (int) ValueUtil.getLongValue(value);
339+
((TimeMilliVector) vector).setSafe(startRowIndex++, holder);
340+
}
341+
}
342+
break;
343+
}
344+
case TIME_MICROSECOND: {
345+
while (values.hasNext()) {
346+
Object value = values.next();
347+
if (value == null) {
348+
vector.setNull(startRowIndex++);
349+
} else {
350+
TimeMicroHolder holder = new TimeMicroHolder();
351+
holder.value = ValueUtil.getLongValue(value);
352+
((TimeMicroVector) vector).setSafe(startRowIndex++, holder);
353+
}
354+
}
355+
break;
356+
}
357+
case TIME_NANOSECOND: {
358+
while (values.hasNext()) {
359+
Object value = values.next();
360+
if (value == null) {
361+
vector.setNull(startRowIndex++);
362+
} else {
363+
TimeNanoHolder holder = new TimeNanoHolder();
364+
holder.value = ValueUtil.getLongValue(value);
365+
((TimeNanoVector) vector).setSafe(startRowIndex++, holder);
366+
}
367+
}
368+
break;
369+
}
370+
case DECIMAL128:
371+
while (values.hasNext()) {
372+
Object value = values.next();
373+
if (value == null) {
374+
vector.setNull(startRowIndex++);
375+
} else {
376+
byte[] bytes = ValueUtil.getDecimal128BigEndianBytes(dataTypeExtension, value);
377+
((DecimalVector) vector).setBigEndianSafe(startRowIndex++, bytes);
378+
}
379+
}
380+
break;
381+
case JSON:
382+
while (values.hasNext()) {
383+
Object value = values.next();
384+
if (value == null) {
385+
vector.setNull(startRowIndex++);
386+
} else {
387+
byte[] jsonBytes = ValueUtil.getJsonString(value).getBytes(StandardCharsets.UTF_8);
388+
((VarCharVector) vector).setSafe(startRowIndex++, jsonBytes);
389+
}
390+
}
391+
break;
392+
default:
393+
throw new IllegalArgumentException("Unsupported data type: " + dataType);
394+
}
395+
}
396+
117397
public static void addValue(
118398
FieldVector vector,
119399
int rowIndex,

0 commit comments

Comments
 (0)