File tree Expand file tree Collapse file tree 6 files changed +13
-24
lines changed
ingester-bulk-protocol/src/main/java
ingester-example/src/main/java/io/greptime/bench
ingester-protocol/src/main/java/io/greptime Expand file tree Collapse file tree 6 files changed +13
-24
lines changed Original file line number Diff line number Diff line change @@ -61,7 +61,7 @@ public class BulkWriteService implements AutoCloseable {
6161 private final ClientStreamListener listener ;
6262 private final AsyncPutListener metadataListener ;
6363 private final long timeoutMs ;
64-
64+
6565 /**
6666 * Constructs a new BulkWriteService.
6767 *
Original file line number Diff line number Diff line change @@ -205,10 +205,7 @@ private static class OnStreamReadyHandler implements Runnable {
205205
206206 @ Override
207207 public void run () {
208- int mayReleasePermits = this .maxRequestsInFlight - this .semaphore .availablePermits ();
209- if (mayReleasePermits > 0 ) {
210- this .semaphore .release (mayReleasePermits );
211- }
208+ this .semaphore .release (this .maxRequestsInFlight );
212209 }
213210
214211 /**
Original file line number Diff line number Diff line change @@ -58,7 +58,7 @@ public static void main(String[] args) throws Exception {
5858 .allocatorInitReservation (0 )
5959 .allocatorMaxAllocation (4 * 1024 * 1024 * 1024L )
6060 .timeoutMsPerMessage (60000 )
61- .maxRequestsInFlight (4 )
61+ .maxRequestsInFlight (8 )
6262 .build ();
6363 Compression compression = zstdCompression ? Compression .Zstd : Compression .None ;
6464 Context ctx = Context .newDefault ().withCompression (compression );
Original file line number Diff line number Diff line change 3030import java .util .concurrent .ThreadLocalRandom ;
3131import java .util .concurrent .ThreadPoolExecutor ;
3232import java .util .concurrent .atomic .AtomicLong ;
33- import org .slf4j .Logger ;
34- import org .slf4j .LoggerFactory ;
3533
3634@ SPI (
3735 name = "multi_producer_table_data_provider" ,
3836 priority = 10 /* newer implementation can use higher priority to override the old one */ )
3937public class MultiProducerTableDataProvider extends RandomTableDataProvider {
4038
41- private static final Logger LOG = LoggerFactory .getLogger (MultiProducerTableDataProvider .class );
42-
4339 private final int producerCount ;
4440 private final long rowCount ;
4541 private final ExecutorService executorService ;
@@ -91,17 +87,13 @@ public boolean hasNext() {
9187
9288 @ Override
9389 public Object [] next () {
94- index ++;
95- Object [] row = buffer .poll ();
96- if (row == null ) {
97- try {
98- LOG .info ("Waiting for row from buffer" );
99- row = buffer .take ();
100- } catch (InterruptedException e ) {
101- throw new RuntimeException (e );
102- }
90+ try {
91+ Object [] row = buffer .take ();
92+ index ++;
93+ return row ;
94+ } catch (InterruptedException e ) {
95+ throw new RuntimeException (e );
10396 }
104- return row ;
10597 }
10698 };
10799 }
Original file line number Diff line number Diff line change @@ -36,9 +36,9 @@ public class RandomTableDataProvider implements TableDataProvider {
3636 {
3737 tableSchema = TableSchema .newBuilder ("my_bench_table" )
3838 .addTimestamp ("log_ts" , DataType .TimestampMillisecond )
39- .addTag ("business_name" , DataType .String )
40- .addTag ("app_name" , DataType .String )
41- .addTag ("host_name" , DataType .String )
39+ .addField ("business_name" , DataType .String )
40+ .addField ("app_name" , DataType .String )
41+ .addField ("host_name" , DataType .String )
4242 .addField ("log_message" , DataType .String ) // 2K
4343 .addField ("log_level" , DataType .String )
4444 .addField ("log_name" , DataType .String )
Original file line number Diff line number Diff line change @@ -45,7 +45,7 @@ public interface BulkWrite {
4545 * This value should be determined based on the size of each request packet. A higher value means more in-flight requests,
4646 * which could potentially saturate network bandwidth or exceed the actual processing capacity of the database.
4747 */
48- int DEFAULT_MAX_REQUESTS_IN_FLIGHT = 4 ;
48+ int DEFAULT_MAX_REQUESTS_IN_FLIGHT = 8 ;
4949
5050 static class Config {
5151 private long allocatorInitReservation = DEFAULT_ALLOCATOR_INIT_RESERVATION ;
You can’t perform that action at this time.
0 commit comments