Skip to content

Commit 2bffb14

Browse files
authored
feat: support the writer to use a separate queue (#20)
* feat: support the writer to use a separate queue * feat: compensation for uneven write queue completion * feat: by default no thread is bound when writing data * feat: return 0 when showing version
1 parent 3cea8f4 commit 2bffb14

File tree

15 files changed

+612
-140
lines changed

15 files changed

+612
-140
lines changed

src/actions/components/connector/src/TDengineConnector.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,9 @@ bool TDengineConnector::execute(const StmtV2InsertData& data) {
165165
);
166166
if (code != 0) {
167167
std::cerr << display_name_ << " failed to bind parameters: "
168-
<< taos_stmt2_error(stmt_) << std::endl;
168+
<< taos_stmt2_error(stmt_)
169+
<< " [code: 0x" << std::hex << code << std::dec << "]"
170+
<< std::endl;
169171
return false;
170172
}
171173

@@ -174,7 +176,9 @@ bool TDengineConnector::execute(const StmtV2InsertData& data) {
174176
code = taos_stmt2_exec(stmt_, &affected_rows);
175177
if (code != 0) {
176178
std::cerr << display_name_ << " execute failed: "
177-
<< taos_stmt2_error(stmt_) << std::endl;
179+
<< taos_stmt2_error(stmt_)
180+
<< " [code: 0x" << std::hex << code << std::dec << "]"
181+
<< std::endl;
178182
return false;
179183
}
180184

src/actions/components/expression/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ target_include_directories(components_expression
1717
target_link_libraries(components_expression
1818
PUBLIC
1919
luajit::luajit
20+
Threads::Threads
2021
m
2122
dl
2223
)

src/actions/components/formatter/test/TestTopicGenerator.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ void test_basic_pattern() {
1212

1313
MultiBatch batch;
1414
std::vector<RowData> rows;
15-
rows.push_back({1500000000000, {"f01", 101}});
16-
rows.push_back({1500000000001, {"f02", 102}});
15+
rows.push_back({1500000000000, {std::string("f01"), 101}});
16+
rows.push_back({1500000000001, {std::string("f02"), 102}});
1717
batch.table_batches.emplace_back("t1", std::move(rows));
1818
batch.update_metadata();
1919

@@ -63,7 +63,7 @@ void test_col_not_found() {
6363

6464
MultiBatch batch;
6565
std::vector<RowData> rows;
66-
rows.push_back({1500000000000, {"x"}});
66+
rows.push_back({1500000000000, {std::string("x")}});
6767
batch.table_batches.emplace_back("my_table", std::move(rows));
6868
batch.update_metadata();
6969

@@ -86,7 +86,7 @@ void test_pattern_edge_cases() {
8686

8787
MultiBatch batch;
8888
std::vector<RowData> rows;
89-
rows.push_back({1500000000000, {"A"}});
89+
rows.push_back({1500000000000, {std::string("A")}});
9090
batch.table_batches.emplace_back("my_table", std::move(rows));
9191
batch.update_metadata();
9292

src/actions/components/generator/test/TestRandomColumnGenerator.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,6 @@ void test_generate_multiple_values() {
9797
std::cout << "test_generate_multiple_values passed.\n";
9898
}
9999

100-
101100
void test_generate_int_column_with_values() {
102101
ColumnConfig config;
103102
config.type = "int";

src/actions/config/inc/InsertDataConfig.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@ struct InsertDataConfig {
2727
size_t insert_threads = 8;
2828
size_t queue_capacity = 10;
2929
double queue_warmup_ratio = 0.0;
30+
bool shared_queue = false;
3031
std::string thread_allocation = "index_range";
32+
bool thread_affinity = false;
33+
bool thread_realtime = false;
3134

3235
struct FailureHandling {
3336
size_t max_retries = 0;

src/actions/core/insert/src/InsertDataAction.cpp

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ void InsertDataAction::execute() {
114114
const size_t consumer_thread_count = config_.insert_threads;
115115
const size_t queue_capacity = config_.queue_capacity;
116116
const double queue_warmup_ratio = config_.queue_warmup_ratio;
117+
const bool shared_queue = config_.shared_queue;
117118
const size_t per_request_rows = config_.schema.generation.per_batch_rows;
118119
const size_t interlace_rows = config_.schema.generation.interlace_mode.rows;
119120
const int64_t per_table_rows = config_.schema.generation.per_table_rows;
@@ -133,7 +134,7 @@ void InsertDataAction::execute() {
133134
MemoryPool pool(block_count, max_tables_per_block, max_rows_per_table, col_instances_);
134135

135136
// Create data pipeline
136-
DataPipeline<FormatResult> pipeline(block_count);
137+
DataPipeline<FormatResult> pipeline(shared_queue, producer_thread_count, consumer_thread_count, queue_capacity);
137138
Barrier sync_barrier(consumer_thread_count + 1);
138139

139140
// Start consumer threads
@@ -189,8 +190,12 @@ void InsertDataAction::execute() {
189190

190191
producer_threads.emplace_back([this, i, &split_names, &pipeline, data_manager, &active_producers, &producer_finished] {
191192
try {
192-
set_thread_affinity(i, false, "Producer");
193-
set_realtime_priority();
193+
if (config_.thread_affinity) {
194+
set_thread_affinity(i, false, "Producer");
195+
}
196+
if (config_.thread_realtime) {
197+
set_realtime_priority();
198+
}
194199
producer_thread_function(i, split_names[i], pipeline, data_manager);
195200
producer_finished[i].store(true);
196201
} catch (const std::exception& e) {
@@ -201,7 +206,8 @@ void InsertDataAction::execute() {
201206
}
202207

203208
(void)ProcessUtils::get_cpu_usage_percent();
204-
std::this_thread::sleep_for(std::chrono::seconds(2));
209+
int64_t wait_seconds = std::min(static_cast<int64_t>(5), static_cast<int64_t>(producer_thread_count));
210+
std::this_thread::sleep_for(std::chrono::seconds(wait_seconds));
205211

206212
while (true) {
207213
size_t total_queued = pipeline.total_queued();
@@ -270,7 +276,7 @@ void InsertDataAction::execute() {
270276
<< "Queue: " << std::setw(3) << std::setfill(' ') << pipeline.total_queued() << " items | "
271277
<< "CPU Usage: " << std::setw(7) << std::fixed << std::setprecision(2) << ProcessUtils::get_cpu_usage_percent() << "% | "
272278
<< "Memory Usage: " << ProcessUtils::get_memory_usage_human_readable() << " | "
273-
<< "Thread Count: " << std::setw(3) << std::setfill(' ') << ProcessUtils::get_thread_count() << "\n";
279+
<< "Thread Count: " << std::setw(3) << std::setfill(' ') << ProcessUtils::get_thread_count() << std::endl;
274280
}
275281

276282
std::cout << "All producer threads have finished." << std::endl;
@@ -294,7 +300,7 @@ void InsertDataAction::execute() {
294300
<< "Processing Rate: " << std::setw(6) << std::fixed << std::setprecision(2) << process_rate << " items/s | "
295301
<< "CPU Usage: " << std::setw(7) << std::fixed << std::setprecision(2) << ProcessUtils::get_cpu_usage_percent() << "% | "
296302
<< "Memory Usage: " << ProcessUtils::get_memory_usage_human_readable() << " | "
297-
<< "Thread Count: " << std::setw(3) << std::setfill(' ') << ProcessUtils::get_thread_count() << "\n";
303+
<< "Thread Count: " << std::setw(3) << std::setfill(' ') << ProcessUtils::get_thread_count() << std::endl;
298304

299305
last_queue_size = current_queue_size;
300306
last_check_time = current_time;
@@ -360,7 +366,7 @@ void InsertDataAction::execute() {
360366
double avg_wait_time = total_wait_time / consumer_thread_count; // average per thread
361367

362368
// Calculate total duration (seconds)
363-
const auto total_duration = std::chrono::duration<double>(max_end_write_time - min_start_write_time).count();
369+
const auto total_duration = std::chrono::duration<double>(max_end_write_time - min_start_write_time).count() - avg_wait_time;
364370

365371
// Calculate average insert rate
366372
const double avg_rows_per_sec = total_duration > 0 ?
@@ -391,7 +397,7 @@ void InsertDataAction::execute() {
391397
// Print performance statistics
392398
double thread_latency = global_write_metrics.get_sum() / consumer_thread_count / 1000;
393399
double effective_ratio = thread_latency / total_duration * 100.0;
394-
double framework_ratio = (1 - (thread_latency + avg_wait_time) / total_duration) * 100.0;
400+
double framework_ratio = (1 - thread_latency / total_duration) * 100.0;
395401
TimeIntervalStrategy time_strategy(config_.time_interval, config_.timestamp_precision);
396402
std::cout << "\n=============================================== Insert Latency & Efficiency Metrics ==========================================\n"
397403
<< "Total Operations: " << global_write_metrics.get_samples().size() << "\n"
@@ -485,7 +491,7 @@ void InsertDataAction::producer_thread_function(
485491
// }, formatted_result);
486492

487493
// Push data to pipeline
488-
pipeline.push_data(std::move(formatted_result));
494+
pipeline.push_data(producer_id, std::move(formatted_result));
489495

490496
// std::cout << "Producer " << producer_id << ": Pushed batch for table(s): "
491497
// << batch_size << ", total rows: " << total_rows
@@ -531,7 +537,7 @@ void InsertDataAction::consumer_thread_function(
531537
// Data processing loop
532538
(void)running;
533539
while (true) {
534-
auto result = pipeline.fetch_data();
540+
auto result = pipeline.fetch_data(consumer_id);
535541

536542
switch (result.status) {
537543
case DataPipeline<FormatResult>::Status::Success:

0 commit comments

Comments
 (0)