Skip to content

Commit 87a40a7

Browse files
authored
feat: support writing of tag data (#41)
* feat: support writing of tag data * test: add test cases for memory pool * test: add test cases for sql insert formatter * test: add test cases for stmt insert formatter * test: add test cases for table data manager * enh: simplify the writer parameters * test: add test cases for insert data action * test: add test cases for row serializer * test: add test cases for pattern generator * test: add test cases for topic generator * test: add test cases for key generator * test: add test cases for mqtt insert formatter * test: add test cases for kafka insert formatter * feat: optimize configuration checking * feat: optimize sql insert formatter * fix: fix incorrect index when fetching tag data * chore: remove the redundant database information from the sql * test: fix failed test cases * test: fix failed test cases
1 parent ad42544 commit 87a40a7

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+2210
-566
lines changed

conf/tdengine-gen-with-tags.yaml

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
tdengine:
2+
dsn: taos+ws://root:taosdata@127.0.0.1:6041/tsbench
3+
drop_if_exists: true
4+
props: precision 'us' vgroups 4
5+
6+
schema:
7+
name: meters
8+
tbname:
9+
prefix: d
10+
count: 100000
11+
from: 0
12+
columns:
13+
- name: ts
14+
type: timestamp
15+
start: now + 10s
16+
precision : us
17+
step: 1
18+
- name: current
19+
type: float
20+
min: 0
21+
max: 100
22+
- name: voltage
23+
type: int
24+
min: 200
25+
max: 240
26+
- name: phase
27+
type: float
28+
expr: _i * math.pi % 180
29+
tags:
30+
- name: groupid
31+
type: int
32+
min: 1
33+
max: 10
34+
- name: location
35+
type: binary(24)
36+
values:
37+
- New York
38+
- Los Angeles
39+
- Chicago
40+
- Houston
41+
- Phoenix
42+
- Philadelphia
43+
- San Antonio
44+
- San Diego
45+
- Dallas
46+
- Austin
47+
generation:
48+
interlace: 1
49+
rows_per_table: 100
50+
rows_per_batch: 10000
51+
52+
jobs:
53+
# TDengine insert job
54+
insert-data:
55+
steps:
56+
- uses: tdengine/create-super-table
57+
- uses: tdengine/insert
58+
with:
59+
concurrency: 8
60+
auto_create_table: true

src/actions/components/formatter/inc/BaseInsertData.hpp

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,22 @@ struct BaseInsertData {
1717
int64_t end_time;
1818
size_t total_rows;
1919

20-
BaseInsertData(MemoryPool::MemoryBlock* block, const ColumnConfigInstanceVector& col_instances)
21-
: BaseInsertData(DataType::BASE, block, col_instances) {}
20+
BaseInsertData(MemoryPool::MemoryBlock* block,
21+
const ColumnConfigInstanceVector& col_instances,
22+
const ColumnConfigInstanceVector& tag_instances)
23+
: BaseInsertData(DataType::BASE, block, col_instances, tag_instances) {}
2224

23-
BaseInsertData(DataType t, MemoryPool::MemoryBlock* block, const ColumnConfigInstanceVector& col_instances)
25+
BaseInsertData(DataType t,
26+
MemoryPool::MemoryBlock* block,
27+
const ColumnConfigInstanceVector& col_instances,
28+
const ColumnConfigInstanceVector& tag_instances)
2429
: type(t),
2530
start_time(block->start_time),
2631
end_time(block->end_time),
2732
total_rows(block->total_rows),
2833
block_(block),
29-
col_instances_(col_instances) {}
34+
col_instances_(col_instances),
35+
tag_instances_(tag_instances) {}
3036

3137
// Move constructor
3238
BaseInsertData(BaseInsertData&& other) noexcept
@@ -35,7 +41,8 @@ struct BaseInsertData {
3541
end_time(other.end_time),
3642
total_rows(other.total_rows),
3743
block_(other.block_),
38-
col_instances_(other.col_instances_) {
44+
col_instances_(other.col_instances_),
45+
tag_instances_(other.tag_instances_) {
3946
other.block_ = nullptr;
4047
other.type = DataType::BASE;
4148
}
@@ -60,6 +67,10 @@ struct BaseInsertData {
6067
return col_instances_.size();
6168
}
6269

70+
size_t tag_count() const noexcept {
71+
return tag_instances_.size();
72+
}
73+
6374
const TAOS_STMT2_BINDV* bindv_ptr() const noexcept {
6475
return block_ ? &block_->bindv_ : nullptr;
6576
}
@@ -75,4 +86,5 @@ struct BaseInsertData {
7586
private:
7687
MemoryPool::MemoryBlock* block_;
7788
const ColumnConfigInstanceVector& col_instances_;
89+
const ColumnConfigInstanceVector& tag_instances_;
7890
};

src/actions/components/formatter/inc/IFormatter.hpp

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,22 +26,29 @@ class ISuperTableFormatter : public IFormatter {
2626

2727
class IChildTableFormatter : public IFormatter {
2828
public:
29-
virtual FormatResult format(const CreateChildTableConfig& config,
30-
const std::vector<std::string>& table_names,
29+
virtual FormatResult format(const CreateChildTableConfig& config,
30+
const std::vector<std::string>& table_names,
3131
const std::vector<RowType>& tags) const = 0;
3232
};
3333

3434

3535
enum class InsertMode {
36-
SubTable, // INSERT INTO ? VALUES(?,cols-qmark)
37-
SuperTable, // INSERT INTO `db_name`.`stb_name`(tbname,ts,cols-name) VALUES(?,?,col-qmark)
38-
AutoCreate // INSERT INTO ? USING `db_name`.`stb_name` TAGS (tags-qmark) VALUES(?,cols-qmark)
36+
SubTable, // INSERT INTO ? VALUES(?,cols-qmark)
37+
SuperTable, // INSERT INTO `db_name`.`stb_name`(tbname,ts,cols-name) VALUES(?,?,col-qmark)
38+
AutoCreateTable // INSERT INTO ? USING `db_name`.`stb_name` TAGS (tags-qmark) VALUES(?,cols-qmark)
3939
};
4040

4141
class IInsertDataFormatter : public IFormatter {
4242
public:
43-
virtual std::string prepare(const InsertDataConfig& config, const ColumnConfigInstanceVector& col_instances) = 0;
44-
virtual FormatResult format(const InsertDataConfig& config, const ColumnConfigInstanceVector& col_instances, MemoryPool::MemoryBlock* batch, bool is_checkpoint_recover = false) const = 0;
43+
virtual std::string prepare(const InsertDataConfig& config,
44+
const ColumnConfigInstanceVector& col_instances,
45+
const ColumnConfigInstanceVector& tag_instances) = 0;
46+
47+
virtual FormatResult format(const InsertDataConfig& config,
48+
const ColumnConfigInstanceVector& col_instances,
49+
const ColumnConfigInstanceVector& tag_instances,
50+
MemoryPool::MemoryBlock* batch,
51+
bool is_checkpoint_recover = false) const = 0;
4552

4653
protected:
4754
InsertMode mode_;

src/actions/components/formatter/inc/KafkaInsertData.hpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,11 @@ using KafkaMessageBatch = std::vector<KeyValuePayload>;
99
struct KafkaInsertData : public BaseInsertData {
1010
KafkaMessageBatch data;
1111

12-
KafkaInsertData(MemoryPool::MemoryBlock* block, const ColumnConfigInstanceVector& col_instances, KafkaMessageBatch&& msgs) noexcept
13-
: BaseInsertData(DataType::KAFKA, block, col_instances), data(std::move(msgs)) {}
12+
KafkaInsertData(MemoryPool::MemoryBlock* block,
13+
const ColumnConfigInstanceVector& col_instances,
14+
const ColumnConfigInstanceVector& tag_instances,
15+
KafkaMessageBatch&& msgs) noexcept
16+
: BaseInsertData(DataType::KAFKA, block, col_instances, tag_instances), data(std::move(msgs)) {}
1417

1518
KafkaInsertData(KafkaInsertData&& other) noexcept
1619
: BaseInsertData(std::move(other))

src/actions/components/formatter/inc/KafkaInsertDataFormatter.hpp

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,25 @@ class KafkaInsertDataFormatter final : public IInsertDataFormatter {
1212
explicit KafkaInsertDataFormatter(const DataFormat& format) : format_(format) {}
1313

1414
std::string prepare(const InsertDataConfig& config,
15-
const ColumnConfigInstanceVector& col_instances) override;
15+
const ColumnConfigInstanceVector& col_instances,
16+
const ColumnConfigInstanceVector& tag_instances) override;
1617

1718
FormatResult format(const InsertDataConfig& config,
1819
const ColumnConfigInstanceVector& col_instances,
19-
MemoryPool::MemoryBlock* batch, bool is_checkpoint_recover = false) const override;
20+
const ColumnConfigInstanceVector& tag_instances,
21+
MemoryPool::MemoryBlock* batch,
22+
bool is_checkpoint_recover = false) const override;
2023

2124
private:
2225
const DataFormat& format_;
2326

24-
KafkaInsertData format_json(const ColumnConfigInstanceVector& col_instances, MemoryPool::MemoryBlock* batch) const;
27+
KafkaInsertData format_json(const ColumnConfigInstanceVector& col_instances,
28+
const ColumnConfigInstanceVector& tag_instances,
29+
MemoryPool::MemoryBlock* batch) const;
2530

26-
KafkaInsertData format_influx(const ColumnConfigInstanceVector& col_instances, MemoryPool::MemoryBlock* batch) const;
31+
KafkaInsertData format_influx(const ColumnConfigInstanceVector& col_instances,
32+
const ColumnConfigInstanceVector& tag_instances,
33+
MemoryPool::MemoryBlock* batch) const;
2734

2835
inline static bool registered_ = []() {
2936
FormatterFactory::instance().register_formatter<InsertDataConfig>(

src/actions/components/formatter/inc/KeyGenerator.hpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@
33

44
class KeyGenerator final : public PatternGenerator {
55
public:
6-
KeyGenerator(const std::string& pattern, const std::string& serializer, const ColumnConfigInstanceVector& col_instances);
6+
KeyGenerator(const std::string& pattern,
7+
const std::string& serializer,
8+
const ColumnConfigInstanceVector& col_instances,
9+
const ColumnConfigInstanceVector& tag_instances);
710

811
std::string generate(const MemoryPool::TableBlock& data, size_t row_index) const override;
912

src/actions/components/formatter/inc/MqttInsertData.hpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,11 @@ using MqttMessageBatch = std::vector<TopicPayloadPair>;
99
struct MqttInsertData : public BaseInsertData {
1010
MqttMessageBatch data;
1111

12-
MqttInsertData(MemoryPool::MemoryBlock* block, const ColumnConfigInstanceVector& col_instances, MqttMessageBatch&& msgs) noexcept
13-
: BaseInsertData(DataType::MQTT, block, col_instances), data(std::move(msgs)) {}
12+
MqttInsertData(MemoryPool::MemoryBlock* block,
13+
const ColumnConfigInstanceVector& col_instances,
14+
const ColumnConfigInstanceVector& tag_instances,
15+
MqttMessageBatch&& msgs) noexcept
16+
: BaseInsertData(DataType::MQTT, block, col_instances, tag_instances), data(std::move(msgs)) {}
1417

1518
MqttInsertData(MqttInsertData&& other) noexcept
1619
: BaseInsertData(std::move(other))

src/actions/components/formatter/inc/MqttInsertDataFormatter.hpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,21 @@ class MqttInsertDataFormatter final : public IInsertDataFormatter {
1414
explicit MqttInsertDataFormatter(const DataFormat& format) : format_(format) {}
1515

1616
std::string prepare(const InsertDataConfig& config,
17-
const ColumnConfigInstanceVector& col_instances) override;
17+
const ColumnConfigInstanceVector& col_instances,
18+
const ColumnConfigInstanceVector& tag_instances) override;
1819

1920
FormatResult format(const InsertDataConfig& config,
2021
const ColumnConfigInstanceVector& col_instances,
21-
MemoryPool::MemoryBlock* batch, bool is_checkpoint_recover = false) const override;
22+
const ColumnConfigInstanceVector& tag_instances,
23+
MemoryPool::MemoryBlock* batch,
24+
bool is_checkpoint_recover = false) const override;
2225

2326
private:
2427
const DataFormat& format_;
2528

26-
MqttInsertData format_json(const ColumnConfigInstanceVector& col_instances, MemoryPool::MemoryBlock* batch) const;
29+
MqttInsertData format_json(const ColumnConfigInstanceVector& col_instances,
30+
const ColumnConfigInstanceVector& tag_instances,
31+
MemoryPool::MemoryBlock* batch) const;
2732

2833
inline static bool registered_ = []() {
2934
FormatterFactory::instance().register_formatter<InsertDataConfig>(

src/actions/components/formatter/inc/PatternGenerator.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ class PatternGenerator {
2525

2626
static std::vector<Token> parse_pattern(const std::string& pattern);
2727
std::string get_value_as_string(const std::string& key, const MemoryPool::TableBlock& data, size_t row_index) const;
28+
void build_mapping(const ColumnConfigInstanceVector& col_instances, const ColumnConfigInstanceVector& tag_instances);
2829

2930
std::vector<Token> tokens_;
3031
std::unordered_map<std::string, size_t> col_index_map_;
32+
std::unordered_map<std::string, size_t> tag_index_map_;
3133
};

src/actions/components/formatter/inc/RowSerializer.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,15 @@ class RowSerializer {
88
public:
99
static nlohmann::ordered_json to_json(
1010
const ColumnConfigInstanceVector& col_instances,
11+
const ColumnConfigInstanceVector& tag_instances,
1112
const MemoryPool::TableBlock& table,
1213
size_t row_index,
1314
const std::string& tbname_key
1415
);
1516

1617
static void to_json_inplace(
1718
const ColumnConfigInstanceVector& col_instances,
19+
const ColumnConfigInstanceVector& tag_instances,
1820
const MemoryPool::TableBlock& table,
1921
size_t row_index,
2022
const std::string& tbname_key,

0 commit comments

Comments
 (0)