Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public static void main(String[] args) throws Exception {
.param("datasetSourceName", argMap.getOrDefault("dataset", "simple"))
.include(QueryClient.class.getSimpleName())
.include(InsertClient.class.getSimpleName())
.forks(1) // must be a fork. No fork only for debugging
.forks(0) // must be a fork. No fork only for debugging
.mode(Mode.AverageTime)
.timeUnit(TimeUnit.MILLISECONDS)
.threads(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,20 @@
import com.clickhouse.client.ClickHouseCredentials;
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseProtocol;
import com.clickhouse.client.ClickHouseResponse;
import com.clickhouse.client.ClickHouseServerForTest;
import com.clickhouse.client.api.Client;
import com.clickhouse.client.api.data_formats.ClickHouseBinaryFormatReader;
import com.clickhouse.client.api.enums.Protocol;
import com.clickhouse.client.api.insert.InsertResponse;
import com.clickhouse.client.api.metadata.TableSchema;
import com.clickhouse.data.ClickHouseDataProcessor;
import com.clickhouse.client.api.query.GenericRecord;
import com.clickhouse.client.api.query.QueryResponse;
import com.clickhouse.data.ClickHouseFormat;
import com.clickhouse.data.ClickHouseOutputStream;
import com.clickhouse.data.ClickHouseRecord;
import com.clickhouse.data.format.ClickHouseRowBinaryProcessor;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
Expand All @@ -26,7 +32,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.math.BigInteger;
import java.util.List;

Expand Down Expand Up @@ -75,7 +84,6 @@ public void setup(DataState dataState, boolean insertData) throws Exception {
dataState.datasetSourceName = "simple";
dataState.dataSet = new SimpleDataSet();
} else if (dataState.datasetSourceName.startsWith("file://")) {

dataState.dataSet = new FileDataSet(dataState.datasetSourceName.substring("file://".length()));
dataState.datasetSourceName = dataState.dataSet.getName();
}
Expand Down Expand Up @@ -157,4 +165,30 @@ protected static Client getClientV2(boolean useDatabase) {
.setDefaultDatabase(useDatabase ? DB_NAME : "default")
.build();
}

public static void loadClickHouseRecords(String tableName, DataSet dataSet) {
ClickHouseNode node = getServer();

try (ClickHouseClient clientV1 = ClickHouseClient
.newInstance(ClickHouseCredentials.fromUserAndPassword(getUsername(), getPassword()), ClickHouseProtocol.HTTP);
ClickHouseResponse response = clientV1.read(node).query("SELECT * FROM " + DB_NAME + "." + tableName)
.format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
.executeAndWait()) {

// Create a data processor to serialize data in ClientV1 tests
ClickHouseDataProcessor dataProcessor= new ClickHouseRowBinaryProcessor(clientV1.getConfig(), null,
ClickHouseOutputStream.of(new ByteArrayOutputStream()), response.getColumns(), Collections.emptyMap());
assert dataProcessor.getColumns() != null;
dataSet.setClickHouseDataProcessor(dataProcessor);
ArrayList<ClickHouseRecord> records = new ArrayList<>();
for (ClickHouseRecord record : response.records()) {
records.add(record);
}

dataSet.setClickHouseRecords(records);
} catch (Exception e) {
LOGGER.error("Error inserting data: ", e);
throw new RuntimeException("Error inserting data", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,19 @@

import com.clickhouse.client.ClickHouseResponse;
import com.clickhouse.client.ClickHouseResponseSummary;
import com.clickhouse.client.api.Client;
import com.clickhouse.client.api.data_formats.ClickHouseBinaryFormatReader;
import com.clickhouse.client.api.data_formats.RowBinaryFormatWriter;
import com.clickhouse.client.api.enums.Protocol;
import com.clickhouse.client.api.insert.InsertResponse;
import com.clickhouse.client.api.insert.InsertSettings;
import com.clickhouse.client.api.query.QueryResponse;
import com.clickhouse.client.config.ClickHouseClientOption;
import com.clickhouse.data.ClickHouseColumn;
import com.clickhouse.data.ClickHouseDataProcessor;
import com.clickhouse.data.ClickHouseFormat;
import com.clickhouse.data.ClickHouseRecord;
import com.clickhouse.data.ClickHouseSerializer;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Scope;
Expand All @@ -15,6 +24,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigInteger;
import java.util.List;
import java.util.Map;

import static com.clickhouse.client.ClickHouseServerForTest.isCloud;

@State(Scope.Benchmark)
public class InsertClient extends BenchmarkBase {
private static final Logger LOGGER = LoggerFactory.getLogger(InsertClient.class);
Expand All @@ -30,7 +45,6 @@ public void tearDownIteration(DataState dataState) throws InterruptedException {
verifyRowsInsertedAndCleanup(dataState.dataSet);
}


@Benchmark
public void insertV1(DataState dataState) {
try {
Expand Down Expand Up @@ -75,4 +89,90 @@ public void insertV2(DataState dataState) {
LOGGER.error("Error: ", e);
}
}

@Benchmark
public void insertV1RowBinary(DataState dataState) {
try {
ClickHouseFormat format = ClickHouseFormat.RowBinary;
try (ClickHouseResponse response = clientV1.read(getServer())
.write()
.option(ClickHouseClientOption.ASYNC, false)
.format(format)
.query("INSERT INTO `" + DB_NAME + "`.`" + dataState.dataSet.getTableName() + "`")
.data(out -> {
ClickHouseDataProcessor p = dataState.dataSet.getClickHouseDataProcessor();
ClickHouseSerializer[] serializers = p.getSerializers(clientV1.getConfig(), p.getColumns());

for (ClickHouseRecord record : dataState.dataSet.getClickHouseRecords()) {
for (int i = 0; i < serializers.length; i++) {
serializers[i].serialize(record.getValue(i), out);
}
}

})
.executeAndWait()) {
ClickHouseResponseSummary summary = response.getSummary();
if (summary.getWrittenRows() <= 0) {
throw new RuntimeException("Rows written: " + summary.getWrittenRows());
}
}
} catch ( Exception e) {
LOGGER.error("Error: ", e);
}
}

@Benchmark
public void insertV2RowBinary(DataState dataState) {
try {
try (InsertResponse response = clientV2.insert(dataState.dataSet.getTableName(), out -> {
RowBinaryFormatWriter w = new RowBinaryFormatWriter(out, dataState.dataSet.getSchema(), ClickHouseFormat.RowBinary);
List<ClickHouseColumn> columns = dataState.dataSet.getSchema().getColumns();
for (Map<String, Object> row : dataState.dataSet.getRows()) {
for (ClickHouseColumn column : columns) {
w.setValue(column.getColumnName(),row.get(column.getColumnName()));
}
w.commitRow();
}
out.flush();

}, ClickHouseFormat.RowBinaryWithDefaults, new InsertSettings()).get()) {
if (response.getWrittenRows() <= 0) {
throw new RuntimeException("Rows written: " + response.getWrittenRows());
}
}
} catch (Exception e) {
LOGGER.error("Error: ", e);
}
}

// @Benchmark
// public void insertV1WithV2RowBinaryWriter(DataState dataState) {
// try {
// ClickHouseFormat format = ClickHouseFormat.RowBinary;
// try (ClickHouseResponse response = clientV1.read(getServer())
// .write()
// .option(ClickHouseClientOption.ASYNC, false)
// .format(format)
// .query("INSERT INTO `" + DB_NAME + "`.`" + dataState.dataSet.getTableName() + "`")
// .data(out -> {
// RowBinaryFormatWriter w = new RowBinaryFormatWriter(out, dataState.dataSet.getSchema(), ClickHouseFormat.RowBinary);
// List<ClickHouseColumn> columns = dataState.dataSet.getSchema().getColumns();
// for (Map<String, Object> row : dataState.dataSet.getRows()) {
// for (ClickHouseColumn column : columns) {
// w.setValue(column.getColumnName(),row.get(column.getColumnName()));
// }
// w.commitRow();
// }
// out.close();
// })
// .executeAndWait()) {
// ClickHouseResponseSummary summary = response.getSummary();
// if (summary.getWrittenRows() <= 0) {
// throw new RuntimeException("Rows written: " + summary.getWrittenRows());
// }
// }
// } catch ( Exception e) {
// LOGGER.error("Error: ", e);
// }
// }
}
10 changes: 10 additions & 0 deletions performance/src/test/com/clickhouse/benchmark/data/DataSet.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.clickhouse.benchmark.data;

import com.clickhouse.client.api.metadata.TableSchema;
import com.clickhouse.data.ClickHouseDataProcessor;
import com.clickhouse.data.ClickHouseFormat;
import com.clickhouse.data.ClickHouseRecord;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
Expand Down Expand Up @@ -41,4 +43,12 @@ default InputStream getInputStream(int rowId, ClickHouseFormat format) {
List<byte[]> getBytesList(ClickHouseFormat format);

List<Map<String, Object>> getRows();

List<ClickHouseRecord> getClickHouseRecords();

void setClickHouseRecords(List<ClickHouseRecord> records);

void setClickHouseDataProcessor(ClickHouseDataProcessor dataProcessor);

ClickHouseDataProcessor getClickHouseDataProcessor();
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,11 @@ public static TableSchema parseSchema(String createTableStatement) {//TODO: Cons
public static void initializeTables(DataSet set, boolean insertData) {
BenchmarkBase.runQuery(set.getCreateTableString(), true);
ClickHouseFormat format = set.getFormat();
if (insertData) {
BenchmarkBase.insertData(set.getTableName(), set.getInputStream(format), format);

BenchmarkBase.insertData(set.getTableName(), set.getInputStream(format), format);
if (!insertData) {
BenchmarkBase.loadClickHouseRecords(set.getTableName(), set);
BenchmarkBase.runQuery("TRUNCATE TABLE " + set.getTableName(), true);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
package com.clickhouse.benchmark.data;

import com.clickhouse.client.api.metadata.TableSchema;
import com.clickhouse.data.ClickHouseColumn;
import com.clickhouse.data.ClickHouseDataProcessor;
import com.clickhouse.data.ClickHouseFormat;
import com.clickhouse.data.ClickHouseRecord;
import com.clickhouse.data.ClickHouseValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.com.google.common.collect.Table;

import java.io.BufferedReader;
import java.io.File;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

Expand All @@ -26,6 +31,8 @@ public class FileDataSet implements DataSet{

private List<byte[]> lines =null;

private List<Map<String, Object>> data;

public FileDataSet(String filePath) {
File srcFile = new File(filePath);

Expand Down Expand Up @@ -99,9 +106,10 @@ public String getTrucateTableString() {
return "TRUNCATE TABLE " + getTableName();
}

private TableSchema tableSchema = new TableSchema();
@Override
public TableSchema getSchema() {
return schema;
return tableSchema;
}

@Override
Expand All @@ -116,14 +124,53 @@ public List<byte[]> getBytesList(ClickHouseFormat format) {

@Override
public List<Map<String, Object>> getRows() {
return Collections.emptyList();
return data;
}

@Override
public ClickHouseFormat getFormat() {
return ClickHouseFormat.CSV;
}

private List<ClickHouseRecord> clickHouseRecords;

@Override
public List<ClickHouseRecord> getClickHouseRecords() {
return clickHouseRecords;
}

@Override
public void setClickHouseRecords(List<ClickHouseRecord> records) {
this.clickHouseRecords = records;
List<ClickHouseColumn> columns = tableSchema.getColumns();
data = new ArrayList<>(records.size());
for (ClickHouseRecord record : records) {
Iterator<ClickHouseValue> vIter = record.iterator();
int i = 0;
Map<String, Object> row = new HashMap<>();
while (vIter.hasNext()) {
ClickHouseValue v = vIter.next();
row.put(columns.get(i++).getColumnName(), v.asObject());
}
data.add(row);
}
}

private ClickHouseDataProcessor dataProcessor;

@Override
public ClickHouseDataProcessor getClickHouseDataProcessor() {
return dataProcessor;
}

@Override
public void setClickHouseDataProcessor(ClickHouseDataProcessor dataProcessor) {
this.dataProcessor = dataProcessor;
for (ClickHouseColumn column : dataProcessor.getColumns()) {
tableSchema.addColumn(column.getColumnName(), column.getOriginalTypeName());
}
}

@Override
public String toString() {
return "FileDataSet{" +
Expand Down
Loading
Loading