diff --git a/ingester-example/src/main/java/io/greptime/bench/BenchmarkResultPrinter.java b/ingester-example/src/main/java/io/greptime/bench/BenchmarkResultPrinter.java new file mode 100644 index 0000000..e556770 --- /dev/null +++ b/ingester-example/src/main/java/io/greptime/bench/BenchmarkResultPrinter.java @@ -0,0 +1,148 @@ +/* + * Copyright 2023 Greptime Team + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.greptime.bench; + +import io.greptime.common.util.Cpus; +import io.greptime.common.util.SystemPropertyUtil; +import io.greptime.models.TableSchema; +import org.slf4j.Logger; + +/** + * Utility class for printing benchmark results in a consistent format. + */ +public class BenchmarkResultPrinter { + + public static void printBenchmarkHeader(Logger log, String apiType) { + log.info("=== GreptimeDB {} API Log Benchmark ===", apiType); + log.info("Synthetic log data generation and {} API ingestion performance test", apiType.toLowerCase()); + log.info(""); + } + + public static void printConfiguration( + Logger log, String apiType, boolean zstdCompression, int batchSize, int parallelismOrConcurrency) { + printConfiguration(log, apiType, zstdCompression, batchSize, parallelismOrConcurrency, null); + } + + public static void printConfiguration( + Logger log, + String apiType, + boolean zstdCompression, + int batchSize, + int parallelismOrConcurrency, + Integer maxPointsPerSecond) { + log.info("=== {} API Benchmark Configuration ===", apiType); + + String endpointsStr = SystemPropertyUtil.get("db.endpoints"); + if (endpointsStr == null) { + endpointsStr = "localhost:4001"; + } + String database = SystemPropertyUtil.get("db.database"); + if (database == null) { + database = "public"; + } + + log.info("Endpoint: {}", endpointsStr); + log.info("Database: {}", database); + log.info("Batch size: {}", batchSize); + + if (maxPointsPerSecond != null) { + log.info( + "Max points per second: {}", + maxPointsPerSecond == Integer.MAX_VALUE ? "unlimited" : maxPointsPerSecond); + } else if (apiType.equals("Bulk")) { + log.info("Parallelism: {}", parallelismOrConcurrency); + } else { + log.info("Concurrency: {}", parallelismOrConcurrency); + } + + log.info("Compression: {}", (zstdCompression ? "zstd" : "none")); + log.info("CPU cores: {}", Cpus.cpus()); + log.info("Build profile: release"); + log.info(""); + } + + public static void printBenchmarkStart( + Logger log, + String apiType, + TableDataProvider provider, + TableSchema schema, + int batchSize, + int parallelismOrConcurrency) { + printBenchmarkStart(log, apiType, provider, schema, batchSize, parallelismOrConcurrency, null); + } + + public static void printBenchmarkStart( + Logger log, + String apiType, + TableDataProvider provider, + TableSchema schema, + int batchSize, + int parallelismOrConcurrency, + Integer maxPointsPerSecond) { + log.info("=== Running {} API Log Data Benchmark ===", apiType); + log.info("Setting up {} writer...", apiType.toLowerCase()); + log.info( + "Starting {} API benchmark: {}", + apiType.toLowerCase(), + provider.getClass().getSimpleName()); + log.info( + "Table: {} ({} columns)", + schema.getTableName(), + schema.getColumnNames().size()); + log.info("Target rows: {}", provider.rowCount()); + log.info("Batch size: {}", batchSize); + + if (maxPointsPerSecond != null) { + log.info( + "Max points per second: {}", + maxPointsPerSecond == Integer.MAX_VALUE ? "unlimited" : maxPointsPerSecond); + } else if (apiType.equals("Bulk")) { + log.info("Parallelism: {}", parallelismOrConcurrency); + } else { + log.info("Concurrency: {}", parallelismOrConcurrency); + } + + log.info(""); + } + + public static void printBatchProgress(Logger log, long batch, long totalRows, long writeRatePerSecond) { + log.info("→ Batch {}: {} rows processed ({} rows/sec)", batch, totalRows, writeRatePerSecond); + + if (batch % 10 == 0) { + log.info("Flushed {} responses (total {} affected rows)", batch, totalRows); + } + } + + public static void printCompletionMessages(Logger log, String apiType) { + log.info("Finishing {} writer and waiting for all responses...", apiType.toLowerCase()); + log.info("All {} writes completed successfully", apiType.toLowerCase()); + log.info("Cleaning up data provider..."); + log.info("{} API benchmark completed successfully!", apiType); + } + + public static void printBenchmarkSummary( + Logger log, TableDataProvider provider, long totalRows, long durationMs, long throughput) { + log.info("=== Benchmark Result ==="); + log.info("Table: {}", provider.tableSchema().getTableName()); + log.info(""); + log.info("Provider Rows Duration(ms) Throughput Status"); + log.info("--------------------------------------------------------------------------"); + log.info(String.format( + "%-30s %8d %12d %12d r/s SUCCESS", + provider.getClass().getSimpleName(), totalRows, durationMs, throughput)); + } +} diff --git a/ingester-example/src/main/java/io/greptime/bench/DBConnector.java b/ingester-example/src/main/java/io/greptime/bench/DBConnector.java index d772bea..4b4a6b5 100644 --- a/ingester-example/src/main/java/io/greptime/bench/DBConnector.java +++ b/ingester-example/src/main/java/io/greptime/bench/DBConnector.java @@ -17,8 +17,10 @@ package io.greptime.bench; import io.greptime.GreptimeDB; +import io.greptime.common.util.SystemPropertyUtil; import io.greptime.options.GreptimeOptions; import io.greptime.quickstart.query.QueryJDBCQuickStart; +import io.greptime.rpc.RpcOptions; import java.io.IOException; import java.util.Properties; import org.slf4j.Logger; @@ -38,11 +40,23 @@ public static GreptimeDB connect() { } catch (IOException e) { throw new RuntimeException(e); } - String database = (String) prop.get("db.database"); - String endpointsStr = prop.getProperty("db.endpoints"); + + String database = SystemPropertyUtil.get("db.database"); + if (database == null) { + database = (String) prop.get("db.database"); + } + + String endpointsStr = SystemPropertyUtil.get("db.endpoints"); + if (endpointsStr == null) { + endpointsStr = prop.getProperty("db.endpoints"); + } + String[] endpoints = endpointsStr.split(","); + RpcOptions rpcOptions = RpcOptions.newDefault(); + rpcOptions.setDefaultRpcTimeout(60 * 1000); GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, database) .writeMaxRetries(0) + .rpcOptions(rpcOptions) .defaultStreamMaxWritePointsPerSecond(Integer.MAX_VALUE) .maxInFlightWritePoints(Integer.MAX_VALUE) .useZeroCopyWriteInBulkWrite(true) diff --git a/ingester-example/src/main/java/io/greptime/bench/MultiProducerTableDataProvider.java b/ingester-example/src/main/java/io/greptime/bench/MultiProducerTableDataProvider.java index c1ccf9d..827fb2e 100644 --- a/ingester-example/src/main/java/io/greptime/bench/MultiProducerTableDataProvider.java +++ b/ingester-example/src/main/java/io/greptime/bench/MultiProducerTableDataProvider.java @@ -37,14 +37,12 @@ public class MultiProducerTableDataProvider extends RandomTableDataProvider { private final int producerCount; - private final long rowCount; private final ExecutorService executorService; private final BlockingQueue buffer = new ArrayBlockingQueue<>(100000); { this.producerCount = SystemPropertyUtil.getInt("multi_producer_table_data_provider.producer_count", 10); // Total number of rows to generate, configurable via system property - this.rowCount = SystemPropertyUtil.getLong("table_data_provider.row_count", 10_000_000L); this.executorService = ThreadPoolUtil.newBuilder() .poolName("multi-producer-table-data-provider") .enableMetric(true) @@ -62,7 +60,7 @@ public void init() { AtomicLong rowIndex = new AtomicLong(0); for (int i = 0; i < producerCount; i++) { this.executorService.execute(() -> { - while (rowIndex.getAndIncrement() < rowCount) { + while (rowIndex.getAndIncrement() < rowCount()) { Object[] row = nextRow(); try { buffer.put(row); @@ -82,7 +80,7 @@ public Iterator rows() { @Override public boolean hasNext() { - return index < rowCount; + return index < rowCount(); } @Override diff --git a/ingester-example/src/main/java/io/greptime/bench/RandomTableDataProvider.java b/ingester-example/src/main/java/io/greptime/bench/RandomTableDataProvider.java index 2602883..5ece60e 100644 --- a/ingester-example/src/main/java/io/greptime/bench/RandomTableDataProvider.java +++ b/ingester-example/src/main/java/io/greptime/bench/RandomTableDataProvider.java @@ -51,7 +51,7 @@ public class RandomTableDataProvider implements TableDataProvider { .addField("pod_name", DataType.String) .build(); // Total number of rows to generate, configurable via system property - rowCount = SystemPropertyUtil.getLong("table_data_provider.row_count", 10_000_000L); + rowCount = SystemPropertyUtil.getLong("table_data_provider.row_count", 5_000_000L); } @Override @@ -133,4 +133,9 @@ public Object[] next() { @Override public void close() throws Exception {} + + @Override + public long rowCount() { + return rowCount; + } } diff --git a/ingester-example/src/main/java/io/greptime/bench/TableDataProvider.java b/ingester-example/src/main/java/io/greptime/bench/TableDataProvider.java index 7362c5d..691d3ce 100644 --- a/ingester-example/src/main/java/io/greptime/bench/TableDataProvider.java +++ b/ingester-example/src/main/java/io/greptime/bench/TableDataProvider.java @@ -38,4 +38,9 @@ public interface TableDataProvider extends AutoCloseable { * Returns the iterator of the rows. */ Iterator rows(); + + /** + * Returns the total number of rows. + */ + long rowCount(); } diff --git a/ingester-example/src/main/java/io/greptime/bench/benchmark/BatchingWriteBenchmark.java b/ingester-example/src/main/java/io/greptime/bench/benchmark/BatchingWriteBenchmark.java index 86b2826..1e78804 100644 --- a/ingester-example/src/main/java/io/greptime/bench/benchmark/BatchingWriteBenchmark.java +++ b/ingester-example/src/main/java/io/greptime/bench/benchmark/BatchingWriteBenchmark.java @@ -18,6 +18,7 @@ import io.greptime.GreptimeDB; import io.greptime.WriteOp; +import io.greptime.bench.BenchmarkResultPrinter; import io.greptime.bench.DBConnector; import io.greptime.bench.TableDataProvider; import io.greptime.common.util.MetricsUtil; @@ -35,6 +36,8 @@ import java.util.Collections; import java.util.Iterator; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,64 +45,96 @@ * BatchingWriteBenchmark is a benchmark for the batching write API of GreptimeDB. * * Env: - * - batch_size_per_request: the batch size per request * - zstd_compression: whether to use zstd compression - * - max_points_per_second: the max number of points that can be written per second, exceeding which may cause blockage + * - batch_size_per_request: the batch size per request + * - concurrency: the number of concurrent writers */ public class BatchingWriteBenchmark { private static final Logger LOG = LoggerFactory.getLogger(BatchingWriteBenchmark.class); public static void main(String[] args) throws Exception { - boolean zstdCompression = SystemPropertyUtil.getBool("zstd_compression", true); + boolean zstdCompression = SystemPropertyUtil.getBool("zstd_compression", false); int batchSize = SystemPropertyUtil.getInt("batch_size_per_request", 64 * 1024); + int concurrency = SystemPropertyUtil.getInt("concurrency", 4); + + BenchmarkResultPrinter.printBenchmarkHeader(LOG, "Batching"); + BenchmarkResultPrinter.printConfiguration(LOG, "Batching", zstdCompression, batchSize, concurrency); - LOG.info("Using zstd compression: {}", zstdCompression); - LOG.info("Batch size: {}", batchSize); + Compression compression = zstdCompression ? Compression.Zstd : Compression.None; + Context ctx = Context.newDefault().withCompression(compression); // Start a metrics exporter MetricsExporter metricsExporter = new MetricsExporter(MetricsUtil.metricRegistry()); metricsExporter.init(ExporterOptions.newDefault()); - GreptimeDB greptimeDB = DBConnector.connect(); - Compression compression = zstdCompression ? Compression.Zstd : Compression.None; - Context ctx = Context.newDefault().withCompression(compression); + GreptimeDB greptimeDB = DBConnector.connect(); + Semaphore semaphore = new Semaphore(concurrency); TableDataProvider tableDataProvider = ServiceLoader.load(TableDataProvider.class).first(); - LOG.info("Table data provider: {}", tableDataProvider.getClass().getName()); tableDataProvider.init(); TableSchema tableSchema = tableDataProvider.tableSchema(); - Iterator rows = tableDataProvider.rows(); - - LOG.info("Start writing data"); - long start = System.nanoTime(); - do { - Table table = Table.from(tableSchema); - for (int i = 0; i < batchSize; i++) { - if (!rows.hasNext()) { - break; + AtomicLong totalRowsWritten = new AtomicLong(0); + AtomicLong batchCounter = new AtomicLong(0); + + BenchmarkResultPrinter.printBenchmarkStart( + LOG, "Batching", tableDataProvider, tableSchema, batchSize, concurrency); + + try { + Iterator rows = tableDataProvider.rows(); + + long benchmarkStart = System.nanoTime(); + do { + Table table = Table.from(tableSchema); + for (int j = 0; j < batchSize; j++) { + if (!rows.hasNext()) { + break; + } + table.addRow(rows.next()); } - table.addRow(rows.next()); - } - LOG.info("Table bytes used: {}", table.bytesUsed()); - // Complete the table; adding rows is no longer permitted. - table.complete(); - long fStart = System.nanoTime(); - // Write the table data to the server - CompletableFuture> future = - greptimeDB.write(Collections.singletonList(table), WriteOp.Insert, ctx); - // Wait for the writing to complete - int numRows = future.get().mapOr(0, WriteOk::getSuccess); - long costMs = (System.nanoTime() - fStart) / 1000000; - LOG.info("Write rows: {}, time cost: {}ms", numRows, costMs); - - } while (rows.hasNext()); - - LOG.info("Completed writing data, time cost: {}s", (System.nanoTime() - start) / 1000000000); - - greptimeDB.shutdownGracefully(); - tableDataProvider.close(); - metricsExporter.shutdownGracefully(); + + // Complete the table; adding rows is no longer permitted. + table.complete(); + + semaphore.acquire(); + + // Write the table data to the server + CompletableFuture> future = + greptimeDB.write(Collections.singletonList(table), WriteOp.Insert, ctx); + future.whenComplete((result, error) -> { + semaphore.release(); + + if (error != null) { + LOG.error("Error writing data", error); + return; + } + + int numRows = result.mapOr(0, writeOk -> writeOk.getSuccess()); + long totalRows = totalRowsWritten.addAndGet(numRows); + long batch = batchCounter.incrementAndGet(); + long totalElapsedMs = (System.nanoTime() - benchmarkStart) / 1000000; + long writeRatePerSecond = totalElapsedMs > 0 ? (totalRows * 1000) / totalElapsedMs : 0; + BenchmarkResultPrinter.printBatchProgress(LOG, batch, totalRows, writeRatePerSecond); + }); + } while (rows.hasNext()); + + // Wait for all the requests to complete + semaphore.acquire(concurrency); + + BenchmarkResultPrinter.printCompletionMessages(LOG, "Batching"); + + long totalDurationMs = (System.nanoTime() - benchmarkStart) / 1000000; + long finalRowCount = totalRowsWritten.get(); + long finalThroughput = totalDurationMs > 0 ? (finalRowCount * 1000) / totalDurationMs : 0; + + BenchmarkResultPrinter.printBenchmarkSummary( + LOG, tableDataProvider, finalRowCount, totalDurationMs, finalThroughput); + + } finally { + tableDataProvider.close(); + greptimeDB.shutdownGracefully(); + metricsExporter.shutdownGracefully(); + } } } diff --git a/ingester-example/src/main/java/io/greptime/bench/benchmark/BulkWriteBenchmark.java b/ingester-example/src/main/java/io/greptime/bench/benchmark/BulkWriteBenchmark.java index 53e8fbd..2456966 100644 --- a/ingester-example/src/main/java/io/greptime/bench/benchmark/BulkWriteBenchmark.java +++ b/ingester-example/src/main/java/io/greptime/bench/benchmark/BulkWriteBenchmark.java @@ -20,6 +20,7 @@ import io.greptime.BulkWrite; import io.greptime.GreptimeDB; import io.greptime.WriteOp; +import io.greptime.bench.BenchmarkResultPrinter; import io.greptime.bench.DBConnector; import io.greptime.bench.TableDataProvider; import io.greptime.common.util.MetricsUtil; @@ -34,6 +35,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,8 +43,9 @@ * BulkWriteBenchmark is a benchmark for the bulk write API of GreptimeDB. * * Env: - * - batch_size_per_request: the batch size per request * - zstd_compression: whether to use zstd compression + * - batch_size_per_request: the batch size per request + * - max_requests_in_flight: the max number of requests in flight *

* IMPORTANT: Unlike the standard write method, * this bulk writing stream API requires the target table to exist beforehand. It will @@ -54,40 +57,45 @@ public class BulkWriteBenchmark { private static final Logger LOG = LoggerFactory.getLogger(BulkWriteBenchmark.class); public static void main(String[] args) throws Exception { - boolean zstdCompression = SystemPropertyUtil.getBool("zstd_compression", true); + boolean zstdCompression = SystemPropertyUtil.getBool("zstd_compression", false); int batchSize = SystemPropertyUtil.getInt("batch_size_per_request", 64 * 1024); - LOG.info("Using zstd compression: {}", zstdCompression); - LOG.info("Batch size: {}", batchSize); + int maxRequestsInFlight = SystemPropertyUtil.getInt("max_requests_in_flight", 4); + + BenchmarkResultPrinter.printBenchmarkHeader(LOG, "Bulk"); + BenchmarkResultPrinter.printConfiguration(LOG, "Bulk", zstdCompression, batchSize, maxRequestsInFlight); + + Compression compression = zstdCompression ? Compression.Zstd : Compression.None; + Context ctx = Context.newDefault().withCompression(compression); // Start a metrics exporter MetricsExporter metricsExporter = new MetricsExporter(MetricsUtil.metricRegistry()); metricsExporter.init(ExporterOptions.newDefault()); GreptimeDB greptimeDB = DBConnector.connect(); - BulkWrite.Config cfg = BulkWrite.Config.newBuilder() .allocatorInitReservation(0) .allocatorMaxAllocation(4 * 1024 * 1024 * 1024L) .timeoutMsPerMessage(60000) - .maxRequestsInFlight(4) + .maxRequestsInFlight(maxRequestsInFlight) .build(); - Compression compression = zstdCompression ? Compression.Zstd : Compression.None; - Context ctx = Context.newDefault().withCompression(compression); TableDataProvider tableDataProvider = ServiceLoader.load(TableDataProvider.class).first(); - LOG.info("Table data provider: {}", tableDataProvider.getClass().getName()); tableDataProvider.init(); TableSchema tableSchema = tableDataProvider.tableSchema(); + AtomicLong totalRowsWritten = new AtomicLong(0); + AtomicLong batchCounter = new AtomicLong(0); + + BenchmarkResultPrinter.printBenchmarkStart( + LOG, "Bulk", tableDataProvider, tableSchema, batchSize, maxRequestsInFlight); // Before writing data, ensure the table exists, bulk write API does not create tables. ensureTableExists(greptimeDB, tableSchema, tableDataProvider, ctx); - LOG.info("Start writing data"); + long benchmarkStart = System.nanoTime(); try (BulkStreamWriter writer = greptimeDB.bulkStreamWriter(tableSchema, cfg, ctx)) { Iterator rows = tableDataProvider.rows(); - long start = System.nanoTime(); do { Table.TableBufferRoot table = writer.tableBufferRoot(1024); for (int i = 0; i < batchSize; i++) { @@ -96,7 +104,6 @@ public static void main(String[] args) throws Exception { } table.addRow(rows.next()); } - LOG.info("Table bytes used: {}", table.bytesUsed()); // Complete the table; adding rows is no longer permitted. table.complete(); @@ -107,16 +114,27 @@ public static void main(String[] args) throws Exception { long costMs = (System.nanoTime() - fStart) / 1000000; if (t != null) { LOG.error("Error writing data, time cost: {}ms", costMs, t); - } else { - LOG.info("Wrote rows: {}, time cost: {}ms", r, costMs); + return; } - }); + long totalRows = totalRowsWritten.addAndGet(r); + long batch = batchCounter.incrementAndGet(); + long totalElapsedMs = (System.nanoTime() - benchmarkStart) / 1000000; + long writeRatePerSecond = totalElapsedMs > 0 ? (totalRows * 1000) / totalElapsedMs : 0; + BenchmarkResultPrinter.printBatchProgress(LOG, batch, totalRows, writeRatePerSecond); + }); } while (rows.hasNext()); writer.completed(); - LOG.info("Completed writing data, time cost: {}s", (System.nanoTime() - start) / 1000000000); + BenchmarkResultPrinter.printCompletionMessages(LOG, "Bulk"); + + long totalDurationMs = (System.nanoTime() - benchmarkStart) / 1000000; + long finalRowCount = totalRowsWritten.get(); + long finalThroughput = totalDurationMs > 0 ? (finalRowCount * 1000) / totalDurationMs : 0; + + BenchmarkResultPrinter.printBenchmarkSummary( + LOG, tableDataProvider, finalRowCount, totalDurationMs, finalThroughput); } finally { tableDataProvider.close(); } diff --git a/ingester-example/src/main/java/io/greptime/bench/benchmark/StreamingWriteBenchmark.java b/ingester-example/src/main/java/io/greptime/bench/benchmark/StreamingWriteBenchmark.java index 934d450..aec8b8b 100644 --- a/ingester-example/src/main/java/io/greptime/bench/benchmark/StreamingWriteBenchmark.java +++ b/ingester-example/src/main/java/io/greptime/bench/benchmark/StreamingWriteBenchmark.java @@ -18,6 +18,7 @@ import io.greptime.GreptimeDB; import io.greptime.StreamWriter; +import io.greptime.bench.BenchmarkResultPrinter; import io.greptime.bench.DBConnector; import io.greptime.bench.TableDataProvider; import io.greptime.common.util.MetricsUtil; @@ -48,12 +49,12 @@ public class StreamingWriteBenchmark { private static final Logger LOG = LoggerFactory.getLogger(StreamingWriteBenchmark.class); public static void main(String[] args) throws Exception { - boolean zstdCompression = SystemPropertyUtil.getBool("zstd_compression", true); + boolean zstdCompression = SystemPropertyUtil.getBool("zstd_compression", false); int batchSize = SystemPropertyUtil.getInt("batch_size_per_request", 64 * 1024); int maxPointsPerSecond = SystemPropertyUtil.getInt("max_points_per_second", Integer.MAX_VALUE); - LOG.info("Using zstd compression: {}", zstdCompression); - LOG.info("Batch size: {}", batchSize); - LOG.info("Max points per second: {}", maxPointsPerSecond); + + BenchmarkResultPrinter.printBenchmarkHeader(LOG, "Streaming"); + BenchmarkResultPrinter.printConfiguration(LOG, "Streaming", zstdCompression, batchSize, 0, maxPointsPerSecond); // Start a metrics exporter MetricsExporter metricsExporter = new MetricsExporter(MetricsUtil.metricRegistry()); @@ -68,13 +69,16 @@ public static void main(String[] args) throws Exception { TableDataProvider tableDataProvider = ServiceLoader.load(TableDataProvider.class).first(); - LOG.info("Table data provider: {}", tableDataProvider.getClass().getName()); tableDataProvider.init(); TableSchema tableSchema = tableDataProvider.tableSchema(); Iterator rows = tableDataProvider.rows(); - LOG.info("Start writing data"); - long start = System.nanoTime(); + BenchmarkResultPrinter.printBenchmarkStart( + LOG, "Streaming", tableDataProvider, tableSchema, batchSize, 0, maxPointsPerSecond); + + long benchmarkStart = System.nanoTime(); + long totalRowsWritten = 0; + int batchCounter = 0; do { Table table = Table.from(tableSchema); for (int i = 0; i < batchSize; i++) { @@ -83,7 +87,14 @@ public static void main(String[] args) throws Exception { } table.addRow(rows.next()); } - LOG.info("Table bytes used: {}", table.bytesUsed()); + int rowsInBatch = table.rowCount(); + totalRowsWritten += rowsInBatch; + batchCounter++; + + long totalElapsedMs = (System.nanoTime() - benchmarkStart) / 1000000; + long writeRatePerSecond = totalElapsedMs > 0 ? (totalRowsWritten * 1000) / totalElapsedMs : 0; + BenchmarkResultPrinter.printBatchProgress(LOG, batchCounter, totalRowsWritten, writeRatePerSecond); + // Complete the table; adding rows is no longer permitted. table.complete(); // Write the table data to the server @@ -95,9 +106,15 @@ public static void main(String[] args) throws Exception { CompletableFuture future = writer.completed(); // Now we can get the writing result. - WriteOk result = future.get(); + future.get(); + + BenchmarkResultPrinter.printCompletionMessages(LOG, "Streaming"); + + long totalDurationMs = (System.nanoTime() - benchmarkStart) / 1000000; + long finalThroughput = totalDurationMs > 0 ? (totalRowsWritten * 1000) / totalDurationMs : 0; - LOG.info("Completed writing data: {}, time cost: {}s", result, (System.nanoTime() - start) / 1000000000); + BenchmarkResultPrinter.printBenchmarkSummary( + LOG, tableDataProvider, totalRowsWritten, totalDurationMs, finalThroughput); greptimeDB.shutdownGracefully(); tableDataProvider.close(); diff --git a/ingester-protocol/src/main/java/io/greptime/Util.java b/ingester-protocol/src/main/java/io/greptime/Util.java index 1a92832..539e73d 100644 --- a/ingester-protocol/src/main/java/io/greptime/Util.java +++ b/ingester-protocol/src/main/java/io/greptime/Util.java @@ -46,7 +46,7 @@ public final class Util { static { WRITE_LOGGING = new AtomicBoolean(SystemPropertyUtil.getBool(Keys.WRITE_LOGGING, false)); - BULK_WRITE_LOGGING = new AtomicBoolean(SystemPropertyUtil.getBool(Keys.BULK_WRITE_LOGGING, true)); + BULK_WRITE_LOGGING = new AtomicBoolean(SystemPropertyUtil.getBool(Keys.BULK_WRITE_LOGGING, false)); REPORT_PERIOD_MIN = SystemPropertyUtil.getInt(Keys.REPORT_PERIOD, 10); DISPLAY = ThreadPoolUtil.newScheduledBuilder() .poolName("display_self")