Skip to content

Commit aa6e04d

Browse files
authored
fix: csv data source supports data caching (#30)
* feat: csv data source supports data caching * fix: adjust the precision of the timestamp * feat: optimize CSV data generation
1 parent 20533ed commit aa6e04d

File tree

9 files changed

+105
-80
lines changed

9 files changed

+105
-80
lines changed

conf/tdengine-csv.yaml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,7 @@ schema:
2222
columns:
2323
- name: ts
2424
type: timestamp
25-
start: now
2625
precision : us
27-
step: 1
2826
- name: current
2927
type: float
3028
- name: voltage
@@ -40,6 +38,7 @@ schema:
4038
interlace: 1
4139
rows_per_table: 100
4240
rows_per_batch: 10000
41+
tables_reuse_data: false
4342

4443
jobs:
4544
# TDengine insert job

src/actions/components/memory_pool/test/TestMemoryPool.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,8 @@ void test_memory_pool_cache_mode_basic() {
233233
std::vector<RowData> cache_data_0, cache_data_1;
234234
for (size_t i = 0; i < num_rows; ++i) {
235235
RowData row0, row1;
236+
row0.timestamp = 0;
237+
row1.timestamp = 0;
236238
row0.columns = {int32_t(i * 10), std::string("cache0_t0_" + std::to_string(i))};
237239
row1.columns = {int32_t(i * 100), std::string("cache0_t1_" + std::to_string(i))};
238240
cache_data_0.push_back(row0);

src/actions/components/reader/csv/inc/ColumnsCSVReader.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ class ColumnsCSVReader {
2323

2424
~ColumnsCSVReader() = default;
2525

26-
std::vector<TableData> generate() const;
26+
std::unordered_map<std::string, TableData> generate() const;
2727

2828
private:
2929
ColumnsCSV config_;

src/actions/components/reader/csv/src/ColumnsCSVReader.cpp

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ ColumnType ColumnsCSVReader::convert_to_type(const std::string& value, ColumnTyp
9090
return CSVUtils::convert_to_type(value, target_type);
9191
}
9292

93-
std::vector<TableData> ColumnsCSVReader::generate() const {
93+
std::unordered_map<std::string, TableData> ColumnsCSVReader::generate() const {
9494
try {
9595
// Create CSV reader
9696
CSVReader reader(
@@ -242,12 +242,7 @@ std::vector<TableData> ColumnsCSVReader::generate() const {
242242
data.rows.push_back(std::move(data_row));
243243
}
244244

245-
// Convert to std::vector
246-
table_data.reserve(table_map.size());
247-
for (auto& [_, data] : table_map) {
248-
table_data.push_back(std::move(data));
249-
}
250-
return table_data;
245+
return table_map;
251246

252247
} catch (const std::exception& e) {
253248
std::stringstream ss;

src/actions/components/reader/csv/test/TestColumnsCSVReader.cpp

Lines changed: 47 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,15 @@ void test_generate_table_data_with_default_timestamp() {
7171
auto table_data = columns_csv.generate();
7272

7373
assert(table_data.size() == 1 && "Expected 1 table");
74-
assert(table_data[0].timestamps.size() == 2 && "Expected 2 timestamps");
75-
assert(table_data[0].timestamps[0] == 1622505600000 && "Expected first timestamp to match");
76-
assert(table_data[0].timestamps[1] == 1622592000000 && "Expected second timestamp to match");
77-
assert(table_data[0].rows.size() == 2 && "Expected 2 rows of data");
78-
assert(std::get<std::string>(table_data[0].rows[0][0]) == "Alice" && "Expected first column to be 'Alice'");
79-
assert(std::get<std::string>(table_data[0].rows[0][1]) == "New York" && "Expected second column to be 'New York'");
74+
const auto& table_pair = *table_data.begin();
75+
const auto& table = table_pair.second;
76+
77+
assert(table.timestamps.size() == 2 && "Expected 2 timestamps");
78+
assert(table.timestamps[0] == 1622505600000 && "Expected first timestamp to match");
79+
assert(table.timestamps[1] == 1622592000000 && "Expected second timestamp to match");
80+
assert(table.rows.size() == 2 && "Expected 2 rows of data");
81+
assert(std::get<std::string>(table.rows[0][0]) == "Alice" && "Expected first column to be 'Alice'");
82+
assert(std::get<std::string>(table.rows[0][1]) == "New York" && "Expected second column to be 'New York'");
8083
std::cout << "test_generate_table_data_with_default_timestamp passed\n";
8184
}
8285

@@ -106,12 +109,15 @@ void test_generate_table_data_with_timestamp() {
106109
auto table_data = columns_csv.generate();
107110

108111
assert(table_data.size() == 1 && "Expected 1 table");
109-
assert(table_data[0].timestamps.size() == 2 && "Expected 2 timestamps");
110-
assert(table_data[0].timestamps[0] == 1622505600000 && "Expected first timestamp to match");
111-
assert(table_data[0].timestamps[1] == 1622592000000 && "Expected second timestamp to match");
112-
assert(table_data[0].rows.size() == 2 && "Expected 2 rows of data");
113-
assert(std::get<std::string>(table_data[0].rows[0][0]) == "Alice" && "Expected first column to be 'Alice'");
114-
assert(std::get<std::string>(table_data[0].rows[0][1]) == "New York" && "Expected second column to be 'New York'");
112+
const auto& table_pair = *table_data.begin();
113+
const auto& table = table_pair.second;
114+
115+
assert(table.timestamps.size() == 2 && "Expected 2 timestamps");
116+
assert(table.timestamps[0] == 1622505600000 && "Expected first timestamp to match");
117+
assert(table.timestamps[1] == 1622592000000 && "Expected second timestamp to match");
118+
assert(table.rows.size() == 2 && "Expected 2 rows of data");
119+
assert(std::get<std::string>(table.rows[0][0]) == "Alice" && "Expected first column to be 'Alice'");
120+
assert(std::get<std::string>(table.rows[0][1]) == "New York" && "Expected second column to be 'New York'");
115121
std::cout << "test_generate_table_data_with_timestamp passed\n";
116122
}
117123

@@ -140,15 +146,17 @@ void test_generate_table_data_with_generated_timestamp() {
140146
auto table_data = columns_csv.generate();
141147

142148
assert(table_data.size() == 1 && "Expected 1 table");
143-
assert(table_data[0].timestamps.size() == 2 && "Expected 2 timestamps");
144-
assert(table_data[0].rows.size() == 2 && "Expected 2 rows of data");
145-
assert(std::get<std::string>(table_data[0].rows[0][0]) == "Alice" && "Expected first column to be 'Alice'");
146-
assert(std::get<int32_t>(table_data[0].rows[0][1]) == 30 && "Expected second column to be 30");
147-
assert(std::get<std::string>(table_data[0].rows[0][2]) == "New York" && "Expected third column to be 'New York'");
149+
const auto& table_pair = *table_data.begin();
150+
const auto& table = table_pair.second;
151+
152+
assert(table.timestamps.size() == 2 && "Expected 2 timestamps");
153+
assert(table.rows.size() == 2 && "Expected 2 rows of data");
154+
assert(std::get<std::string>(table.rows[0][0]) == "Alice" && "Expected first column to be 'Alice'");
155+
assert(std::get<int32_t>(table.rows[0][1]) == 30 && "Expected second column to be 30");
156+
assert(std::get<std::string>(table.rows[0][2]) == "New York" && "Expected third column to be 'New York'");
148157
std::cout << "test_generate_table_data_with_generated_timestamp passed\n";
149158
}
150159

151-
152160
void test_generate_table_data_include_tbname() {
153161
ColumnsCSV config;
154162
config.file_path = "include_tbname.csv";
@@ -175,30 +183,20 @@ void test_generate_table_data_include_tbname() {
175183
assert(table_data.size() == 2 && "Expected 2 tables");
176184

177185
// Check table names and data
178-
bool table1_found = false;
179-
bool table2_found = false;
180-
181-
for (const auto& table : table_data) {
182-
if (table.table_name == "table1") {
183-
table1_found = true;
184-
assert(table.timestamps.size() == 1 && "Expected 1 timestamp for table1");
185-
assert(table.rows.size() == 1 && "Expected 1 row of data for table1");
186-
assert(std::get<int32_t>(table.rows[0][0]) == 30 && "Expected first column to be 30 for table1");
187-
assert(std::get<std::string>(table.rows[0][1]) == "New York" && "Expected second column to be 'New York' for table1");
188-
} else if (table.table_name == "table2") {
189-
table2_found = true;
190-
assert(table.timestamps.size() == 1 && "Expected 1 timestamp for table2");
191-
assert(table.rows.size() == 1 && "Expected 1 row of data for table2");
192-
assert(std::get<int32_t>(table.rows[0][0]) == 25 && "Expected first column to be 25 for table2");
193-
assert(std::get<std::string>(table.rows[0][1]) == "Los Angeles" && "Expected second column to be 'Los Angeles' for table2");
194-
}
195-
}
186+
assert(table_data.find("table1") != table_data.end() && "Expected table1 to be found");
187+
assert(table_data.find("table2") != table_data.end() && "Expected table2 to be found");
188+
189+
const auto& table1 = table_data.at("table1");
190+
assert(table1.timestamps.size() == 1 && "Expected 1 timestamp for table1");
191+
assert(table1.rows.size() == 1 && "Expected 1 row of data for table1");
192+
assert(std::get<int32_t>(table1.rows[0][0]) == 30 && "Expected first column to be 30 for table1");
193+
assert(std::get<std::string>(table1.rows[0][1]) == "New York" && "Expected second column to be 'New York' for table1");
196194

197-
// Verify both tables are found
198-
(void)table1_found;
199-
(void)table2_found;
200-
assert(table1_found && "Expected table1 to be found");
201-
assert(table2_found && "Expected table2 to be found");
195+
const auto& table2 = table_data.at("table2");
196+
assert(table2.timestamps.size() == 1 && "Expected 1 timestamp for table2");
197+
assert(table2.rows.size() == 1 && "Expected 1 row of data for table2");
198+
assert(std::get<int32_t>(table2.rows[0][0]) == 25 && "Expected first column to be 25 for table2");
199+
assert(std::get<std::string>(table2.rows[0][1]) == "Los Angeles" && "Expected second column to be 'Los Angeles' for table2");
202200

203201
std::cout << "test_generate_table_data_include_tbname passed\n";
204202
}
@@ -223,11 +221,15 @@ void test_generate_table_data_default_column_types() {
223221
auto table_data = columns_csv.generate();
224222

225223
assert(table_data.size() == 1 && "Expected 1 table");
226-
assert(table_data[0].rows.size() == 2 && "Expected 2 rows of data");
227-
assert(std::get<std::string>(table_data[0].rows[0][0]) == "Alice" && "Expected first column to be 'Alice'");
228-
assert(std::get<std::string>(table_data[0].rows[0][1]) == "30" && "Expected second column to be '30'");
229-
assert(std::get<std::string>(table_data[0].rows[0][2]) == "New York" && "Expected third column to be 'New York'");
224+
const auto& table_pair = *table_data.begin();
225+
const auto& table = table_pair.second;
226+
227+
assert(table.rows.size() == 2 && "Expected 2 rows of data");
228+
assert(std::get<std::string>(table.rows[0][0]) == "Alice" && "Expected first column to be 'Alice'");
229+
assert(std::get<std::string>(table.rows[0][1]) == "30" && "Expected second column to be '30'");
230+
assert(std::get<std::string>(table.rows[0][2]) == "New York" && "Expected third column to be 'New York'");
230231
std::cout << "test_generate_table_data_default_column_types passed\n";
232+
231233
}
232234

233235
int main() {

src/actions/config/inc/SchemaConfig.hpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,5 +61,19 @@ struct SchemaConfig {
6161
columns_cfg.generator.schema = ColumnConfigVector(columns.begin() + 1, columns.end());
6262
columns_cfg.generator.timestamp_strategy.timestamp_config = columns[0].ts.generator;
6363
}
64+
65+
if (generation.data_cache.enabled) {
66+
if (from_csv.columns.enabled) {
67+
if (from_csv.columns.tbname_index >= 0) {
68+
std::cerr << "[Config Warning] data_cache.enabled is set to false because from_csv.columns.tbname_index in effect." << std::endl;
69+
generation.data_cache.enabled = false;
70+
}
71+
72+
if (from_csv.columns.enabled && !generation.tables_reuse_data) {
73+
std::cerr << "[Config Warning] data_cache.enabled is set to false because tables_reuse_data is false." << std::endl;
74+
generation.data_cache.enabled = false;
75+
}
76+
}
77+
}
6478
}
6579
};

src/actions/core/insert/src/InsertDataAction.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,7 @@ void InsertDataAction::init_cache_units_data(
457457
size_t max_rows_per_table
458458
) {
459459
for (size_t table_idx = 0; table_idx < max_tables_per_block; ++table_idx) {
460-
std::string table_name = "cache_table_" + std::to_string(table_idx);
460+
std::string table_name = "default_table";
461461
RowDataGenerator generator(table_name, config_, col_instances_);
462462

463463
for (size_t cache_idx = 0; cache_idx < num_cached_batches; ++cache_idx) {

src/actions/core/insert/src/generator/src/RowDataGenerator.cpp

Lines changed: 34 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -137,37 +137,38 @@ void RowDataGenerator::init_generator() {
137137
void RowDataGenerator::init_csv_reader() {
138138
use_generator_ = false;
139139

140+
if (use_cache_ && timestamp_generator_) {
141+
return;
142+
}
143+
140144
csv_precision_ = columns_config_.csv.timestamp_strategy.get_precision();
141145

142-
// Create ColumnsCSV Reader
143-
columns_csv_ = std::make_unique<ColumnsCSVReader>(columns_config_.csv, instances_);
146+
static std::once_flag csv_once_flag;
147+
static std::unordered_map<std::string, TableData> all_tables;
144148

145-
// TODO: ColumnsCSV Reader needs to support table name index interface
146-
// Get all table data
147-
std::vector<TableData> all_tables = columns_csv_->generate();
149+
std::call_once(csv_once_flag, [this]() {
150+
// Create ColumnsCSV Reader
151+
auto columns_csv = std::make_unique<ColumnsCSVReader>(columns_config_.csv, instances_);
152+
all_tables = columns_csv->generate();
153+
});
148154

149155
// Find current table data
150-
bool found = false;
151-
for (const auto& table_data : all_tables) {
152-
if (table_data.table_name == table_name_ || table_data.table_name == "default_table") {
153-
found = true;
154-
155-
for (size_t i = 0; i < table_data.rows.size(); i++) {
156-
RowData row;
157-
// row.table_name = table_name_;
158-
row.timestamp = TimestampUtils::convert_timestamp_precision(table_data.timestamps[i], csv_precision_, target_precision_);
159-
row.columns = table_data.rows[i];
160-
csv_rows_.push_back(row);
161-
}
162-
break;
163-
}
156+
auto it = all_tables.find(table_name_);
157+
if (it == all_tables.end()) {
158+
it = all_tables.find("default_table");
164159
}
165-
166-
if (!found) {
160+
if (it == all_tables.end()) {
167161
throw std::runtime_error("Table '" + table_name_ + "' not found in CSV file");
168162
}
169-
}
170163

164+
const auto& table_data = it->second;
165+
for (size_t i = 0; i < table_data.rows.size(); i++) {
166+
RowData row;
167+
row.timestamp = TimestampUtils::convert_timestamp_precision(table_data.timestamps[i], csv_precision_, target_precision_);
168+
row.columns = table_data.rows[i];
169+
csv_rows_.push_back(row);
170+
}
171+
}
171172

172173
std::optional<RowData> RowDataGenerator::next_row() {
173174
if (generated_rows_ >= total_rows_) {
@@ -318,10 +319,19 @@ void RowDataGenerator::generate_from_generator() {
318319
}
319320

320321
bool RowDataGenerator::generate_from_csv() {
321-
cached_row_.timestamp = csv_rows_[csv_row_index_].timestamp;
322+
if (timestamp_generator_) {
323+
cached_row_.timestamp = TimestampUtils::convert_timestamp_precision(timestamp_generator_->generate(),
324+
timestamp_generator_->timestamp_precision(), target_precision_);
325+
} else {
326+
cached_row_.timestamp = csv_rows_[csv_row_index_].timestamp;
327+
}
328+
322329
if (!use_cache_) {
323330
cached_row_.columns = csv_rows_[csv_row_index_].columns;
324331
}
325-
csv_row_index_ = (csv_row_index_ + 1) % csv_rows_.size();
332+
333+
if (!use_cache_ || !timestamp_generator_) {
334+
csv_row_index_ = (csv_row_index_ + 1) % csv_rows_.size();
335+
}
326336
return true;
327337
}

src/parameter/src/ParameterContext.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -732,6 +732,9 @@ void ParameterContext::parse_commandline(int argc, char* argv[]) {
732732

733733
cli_params[key] = value;
734734
}
735+
else {
736+
throw std::runtime_error("Unknown argument: " + arg);
737+
}
735738
}
736739
}
737740

0 commit comments

Comments
 (0)