Skip to content

Commit ad42544

Browse files
authored
feat: implement robust of CSV data caching (#40)
1 parent 65b3b54 commit ad42544

File tree

9 files changed

+446
-121
lines changed

9 files changed

+446
-121
lines changed
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
#pragma once
2+
3+
#include "ColumnsCSVReader.hpp"
4+
#include <mutex>
5+
#include <memory>
6+
#include <string>
7+
#include <unordered_map>
8+
9+
10+
class CSVFileCache {
11+
public:
12+
using TableMap = std::unordered_map<std::string, TableData>;
13+
14+
const TableMap& get_data(const ColumnsCSV& config, const std::optional<ColumnConfigInstanceVector>& instances) {
15+
std::call_once(once_flag_, [this, &config, &instances]() {
16+
try {
17+
auto columns_csv = std::make_unique<ColumnsCSVReader>(config, instances);
18+
data_ = columns_csv->generate();
19+
} catch (const std::exception& e) {
20+
error_ = e.what();
21+
}
22+
});
23+
24+
if (error_) {
25+
throw std::runtime_error("Failed to load CSV file '" + config.file_path + "': " + *error_);
26+
}
27+
return data_;
28+
}
29+
30+
private:
31+
std::once_flag once_flag_;
32+
TableMap data_;
33+
std::optional<std::string> error_;
34+
};
35+
36+
37+
class CSVDataManager {
38+
public:
39+
static CSVDataManager& instance() {
40+
static CSVDataManager manager;
41+
return manager;
42+
}
43+
44+
std::shared_ptr<CSVFileCache> get_cache_for_file(const std::string& file_path) {
45+
std::lock_guard<std::mutex> lock(mutex_);
46+
47+
auto it = file_caches_.find(file_path);
48+
if (it == file_caches_.end()) {
49+
it = file_caches_.emplace(file_path, std::make_shared<CSVFileCache>()).first;
50+
}
51+
return it->second;
52+
}
53+
54+
void reset() {
55+
std::lock_guard<std::mutex> lock(mutex_);
56+
file_caches_.clear();
57+
}
58+
59+
private:
60+
CSVDataManager() = default;
61+
~CSVDataManager() = default;
62+
CSVDataManager(const CSVDataManager&) = delete;
63+
CSVDataManager& operator=(const CSVDataManager&) = delete;
64+
65+
std::mutex mutex_;
66+
std::unordered_map<std::string, std::shared_ptr<CSVFileCache>> file_caches_;
67+
};

src/actions/components/reader/csv/src/ColumnsCSVReader.cpp

Lines changed: 109 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
#include "ColumnsCSVReader.hpp"
2+
#include "LogUtils.hpp"
3+
#include "StringUtils.hpp"
4+
#include "TypeConverter.hpp"
5+
#include "TimestampUtils.hpp"
26
#include <stdexcept>
37
#include <algorithm>
48
#include <sstream>
@@ -12,9 +16,6 @@
1216
#include <iomanip>
1317
#include <ctime>
1418
#include <unordered_map>
15-
#include "StringUtils.hpp"
16-
#include "TypeConverter.hpp"
17-
#include "TimestampUtils.hpp"
1819

1920

2021
ColumnsCSVReader::ColumnsCSVReader(const ColumnsCSV& config, std::optional<ColumnConfigInstanceVector> instances)
@@ -125,126 +126,130 @@ std::unordered_map<std::string, TableData> ColumnsCSVReader::generate() const {
125126
for (size_t row_idx = 0; row_idx < rows.size(); ++row_idx) {
126127
const auto& row = rows[row_idx];
127128

128-
// Validate row has enough columns
129-
if (row.size() < total_columns_) {
130-
std::stringstream ss;
131-
ss << "Row " << (row_idx + 1) << " has only " << row.size()
132-
<< " columns, expected " << total_columns_
133-
<< " in file: " << config_.file_path;
134-
throw std::out_of_range(ss.str());
135-
}
136-
137-
// Get table name
138-
std::string table_name = "default_table";
139-
if (tbname_index >= 0) {
140-
table_name = row[static_cast<size_t>(tbname_index)];
141-
StringUtils::trim(table_name);
142-
}
143-
144-
// Get or create TableData
145-
auto& data = table_map[table_name];
146-
if (data.table_name.empty()) {
147-
data.table_name = table_name;
148-
}
129+
try {
130+
// Validate row has enough columns
131+
if (row.size() < total_columns_) {
132+
std::stringstream ss;
133+
ss << "Row " << (row_idx + 1) << " has only " << row.size()
134+
<< " columns, expected " << total_columns_
135+
<< " in file: " << config_.file_path;
136+
throw std::out_of_range(ss.str());
137+
}
149138

150-
// Handle timestamp
151-
int64_t timestamp = 0;
152-
153-
if (timestamp_index) {
154-
// original mode
155-
const auto& raw_value = row[*timestamp_index];
156-
int64_t raw_ts = TimestampUtils::parse_timestamp(raw_value, ts_config.timestamp_precision.value());
157-
158-
if (ts_config.offset_config) {
159-
const auto& offset = *ts_config.offset_config;
160-
if (offset.offset_type == "absolute") {
161-
// absolute mode
162-
int64_t& first_raw_ts = table_first_raw_ts[table_name];
163-
if (first_raw_ts == 0) {
164-
first_raw_ts = raw_ts;
165-
}
166-
timestamp = offset.absolute_value + (raw_ts - first_raw_ts);
167-
} else if (offset.offset_type == "relative") {
168-
// relative mode
169-
int64_t multiplier = TimestampUtils::get_precision_multiplier(ts_config.timestamp_precision.value());
170-
auto [years, months, days, hours, seconds] = offset.relative_offset;
171-
172-
// Convert timestamp to seconds
173-
std::time_t raw_time = raw_ts / multiplier;
174-
int64_t fraction = raw_ts % multiplier;
175-
176-
// Handle time offset
177-
std::tm* timeinfo = std::localtime(&raw_time);
178-
if (!timeinfo) {
179-
throw std::runtime_error("Failed to convert timestamp to local time, raw_ts: " + std::to_string(raw_ts));
180-
}
139+
// Get table name
140+
std::string table_name = "default_table";
141+
if (tbname_index >= 0) {
142+
table_name = row[static_cast<size_t>(tbname_index)];
143+
StringUtils::trim(table_name);
144+
}
181145

182-
// Apply year/month/day offset
183-
timeinfo->tm_year += years;
184-
timeinfo->tm_mon += months;
185-
timeinfo->tm_mday += days;
186-
timeinfo->tm_hour += hours;
187-
timeinfo->tm_sec += seconds;
146+
// Get or create TableData
147+
auto& data = table_map[table_name];
148+
if (data.table_name.empty()) {
149+
data.table_name = table_name;
150+
}
188151

189-
std::time_t new_time = std::mktime(timeinfo);
190-
if (new_time == -1) {
191-
throw std::runtime_error("Failed to apply time offset, raw_ts: " + std::to_string(raw_ts));
152+
// Handle timestamp
153+
int64_t timestamp = 0;
154+
155+
if (timestamp_index) {
156+
// original mode
157+
const auto& raw_value = row[*timestamp_index];
158+
int64_t raw_ts = TimestampUtils::parse_timestamp(raw_value, ts_config.timestamp_precision.value());
159+
160+
if (ts_config.offset_config) {
161+
const auto& offset = *ts_config.offset_config;
162+
if (offset.offset_type == "absolute") {
163+
// absolute mode
164+
int64_t& first_raw_ts = table_first_raw_ts[table_name];
165+
if (first_raw_ts == 0) {
166+
first_raw_ts = raw_ts;
167+
}
168+
timestamp = offset.absolute_value + (raw_ts - first_raw_ts);
169+
} else if (offset.offset_type == "relative") {
170+
// relative mode
171+
int64_t multiplier = TimestampUtils::get_precision_multiplier(ts_config.timestamp_precision.value());
172+
auto [years, months, days, hours, seconds] = offset.relative_offset;
173+
174+
// Convert timestamp to seconds
175+
std::time_t raw_time = raw_ts / multiplier;
176+
int64_t fraction = raw_ts % multiplier;
177+
178+
// Handle time offset
179+
std::tm* timeinfo = std::localtime(&raw_time);
180+
if (!timeinfo) {
181+
throw std::runtime_error("Failed to convert timestamp to local time, raw_ts: " + std::to_string(raw_ts));
182+
}
183+
184+
// Apply year/month/day offset
185+
timeinfo->tm_year += years;
186+
timeinfo->tm_mon += months;
187+
timeinfo->tm_mday += days;
188+
timeinfo->tm_hour += hours;
189+
timeinfo->tm_sec += seconds;
190+
191+
std::time_t new_time = std::mktime(timeinfo);
192+
if (new_time == -1) {
193+
throw std::runtime_error("Failed to apply time offset, raw_ts: " + std::to_string(raw_ts));
194+
}
195+
196+
timestamp = new_time * multiplier + fraction;
197+
} else {
198+
throw std::runtime_error("Unsupported offset type: " + offset.offset_type);
192199
}
193-
194-
timestamp = new_time * multiplier + fraction;
195200
} else {
196-
throw std::runtime_error("Unsupported offset type: " + offset.offset_type);
201+
// No offset
202+
timestamp = raw_ts;
197203
}
198-
} else {
199-
// No offset
200-
timestamp = raw_ts;
201-
}
202-
} else if (is_generator_mode) {
203-
// generator mode
204-
auto& gen_ptr = table_ts_generators[table_name];
205-
if (!gen_ptr) {
206-
gen_ptr = TimestampGenerator::create(gen_config);
204+
} else if (is_generator_mode) {
205+
// generator mode
206+
auto& gen_ptr = table_ts_generators[table_name];
207+
if (!gen_ptr) {
208+
gen_ptr = TimestampGenerator::create(gen_config);
209+
}
210+
timestamp = gen_ptr->generate();
207211
}
208-
timestamp = gen_ptr->generate();
209-
}
210212

211-
data.timestamps.push_back(timestamp);
213+
data.timestamps.push_back(timestamp);
212214

213215

214-
// Handle normal columns
215-
RowType data_row;
216-
data_row.reserve(actual_columns_);
216+
// Handle normal columns
217+
RowType data_row;
218+
data_row.reserve(actual_columns_);
217219

218-
size_t index = 0;
220+
size_t index = 0;
219221

220-
for (size_t col_idx = 0; col_idx < total_columns_; ++col_idx) {
221-
// Skip table name and timestamp columns
222-
if (static_cast<int>(col_idx) == tbname_index) continue;
223-
if (timestamp_index && col_idx == *timestamp_index) continue;
222+
for (size_t col_idx = 0; col_idx < total_columns_; ++col_idx) {
223+
// Skip table name and timestamp columns
224+
if (static_cast<int>(col_idx) == tbname_index) continue;
225+
if (timestamp_index && col_idx == *timestamp_index) continue;
224226

225-
// Convert value type
226-
if (instances_) {
227-
// Use provided column types
228-
const ColumnConfigInstance& instance = (*instances_)[index];
229-
data_row.push_back(convert_to_type(row[col_idx], instance.config().type_tag));
230-
index++;
231-
} else {
232-
// Default as string
233-
std::string val = row[col_idx];
234-
StringUtils::trim(val);
235-
data_row.push_back(val);
227+
// Convert value type
228+
if (instances_) {
229+
// Use provided column types
230+
const ColumnConfigInstance& instance = (*instances_)[index];
231+
data_row.push_back(convert_to_type(row[col_idx], instance.config().type_tag));
232+
index++;
233+
} else {
234+
// Default as string
235+
std::string val = row[col_idx];
236+
StringUtils::trim(val);
237+
data_row.push_back(val);
238+
}
236239
}
237-
}
238240

239-
data.rows.push_back(std::move(data_row));
241+
data.rows.push_back(std::move(data_row));
242+
} catch (const std::exception& e) {
243+
LogUtils::error("Failed to process row {} in file {}: Reason: {}. Row content: [{}]",
244+
row_idx + 1, config_.file_path, e.what(), fmt::join(row, ","));
245+
throw;
246+
}
240247
}
241248

242249
return table_map;
243250

244251
} catch (const std::exception& e) {
245-
std::stringstream ss;
246-
ss << "Failed to generate table data from CSV: " << config_.file_path
247-
<< " - " << e.what();
248-
throw std::runtime_error(ss.str());
252+
LogUtils::error("Failed to generate table data from CSV {}: {}", config_.file_path, e.what());
253+
throw;
249254
}
250255
}

src/actions/components/reader/csv/test/CMakeLists.txt

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,14 @@ target_link_libraries(TestColumnsCSVReader
3636
PRIVATE
3737
reader_csv
3838
)
39-
add_test(NAME TestColumnsCSVReader COMMAND TestColumnsCSVReader)
39+
add_test(NAME TestColumnsCSVReader COMMAND TestColumnsCSVReader)
40+
41+
# Test CSVDataManager
42+
add_executable(TestCSVDataManager
43+
TestCSVDataManager.cpp
44+
)
45+
target_link_libraries(TestCSVDataManager
46+
PRIVATE
47+
reader_csv
48+
)
49+
add_test(NAME TestCSVDataManager COMMAND TestCSVDataManager)

0 commit comments

Comments
 (0)