Skip to content

Commit e277ba7

Browse files
[feat] Support multiple Parallax DBs via key hashing
1 parent fb38aea commit e277ba7

File tree

8 files changed

+124
-69
lines changed

8 files changed

+124
-69
lines changed

src/fdb5/parallax/ParallaxCatalogue.cpp

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,18 @@
1414

1515
namespace fdb5
1616
{
17+
ParallaxCatalogue::ParallaxCatalogue(const Key &key, const fdb5::Config &config)
18+
: Catalogue(key, ControlIdentifiers(), config)
19+
{
20+
par_init_db_handles();
21+
}
22+
23+
ParallaxCatalogue::ParallaxCatalogue(const eckit::URI &uri, const ControlIdentifiers &controlIdentifiers,
24+
const fdb5::Config &config)
25+
: Catalogue(Key(), controlIdentifiers, config)
26+
{
27+
}
28+
1729
ParallaxCatalogue::~ParallaxCatalogue() = default;
1830

1931
std::string ParallaxCatalogue::type() const
@@ -36,9 +48,6 @@ void ParallaxCatalogue::loadSchema()
3648
{
3749
eckit::Timer timer("ParallaxCatalogue::loadSchema()", eckit::Log::debug<fdb5::LibFdb5>());
3850

39-
par_handle db_handle = par_get_db(PARALLAX_GLOBAL_DB);
40-
const char *error_msg = nullptr;
41-
4251
struct par_key key;
4352
std::string key_str = "schema";
4453

@@ -51,6 +60,13 @@ void ParallaxCatalogue::loadSchema()
5160
throw eckit::Exception("Memory allocation failed for schema retrieval.");
5261
}
5362

63+
size_t hash = std::hash<std::string>{}(key_str.c_str());
64+
int db_index = hash % 16;
65+
66+
std::string db_name = "par_db" + std::to_string(db_index + 1);
67+
par_handle db_handle = par_get_db(db_name);
68+
const char *error_msg = nullptr;
69+
5470
par_get(db_handle, &key, &value, &error_msg);
5571
if (error_msg) {
5672
free(value.val_buffer);

src/fdb5/parallax/ParallaxCatalogue.h

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,22 +11,17 @@
1111
#pragma once
1212

1313
#include "fdb5/database/DB.h"
14+
#include "fdb5/parallax/parallax_handle.h"
1415
#include <parallax.h>
1516

1617
namespace fdb5
1718
{
1819
class ParallaxCatalogue : public Catalogue {
1920
public:
20-
ParallaxCatalogue(const Key &key, const fdb5::Config &config)
21-
: Catalogue(key, ControlIdentifiers(), config)
22-
{
23-
}
21+
ParallaxCatalogue(const Key &key, const fdb5::Config &config);
2422

2523
ParallaxCatalogue(const eckit::URI &uri, const ControlIdentifiers &controlIdentifiers,
26-
const fdb5::Config &config)
27-
: Catalogue(Key(), controlIdentifiers, config)
28-
{
29-
}
24+
const fdb5::Config &config);
3025

3126
~ParallaxCatalogue() override;
3227

@@ -98,4 +93,4 @@ class ParallaxCatalogue : public Catalogue {
9893
Schema schema_;
9994
};
10095

101-
}
96+
}

src/fdb5/parallax/ParallaxCatalogueReader.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,9 @@ bool ParallaxCatalogueReader::selectIndex(const Key &key)
3535
currentIndexKey_ = key;
3636

3737
if (indexes_.find(key) == indexes_.end()) {
38-
par_handle db_handle = par_get_db(PARALLAX_GLOBAL_DB);
3938
const char *error_msg = nullptr;
4039

4140
std::string keyStr = key.valuesToString();
42-
4341
par_key keyData;
4442
keyData.size = keyStr.size() + 1;
4543
keyData.data = keyStr.c_str();
@@ -52,6 +50,12 @@ bool ParallaxCatalogueReader::selectIndex(const Key &key)
5250
throw eckit::Exception("Memory allocation failed for Parallax index retrieval");
5351
}
5452

53+
size_t hash = std::hash<std::string>{}(keyStr.c_str());
54+
int db_index = hash % 16;
55+
56+
std::string db_name = "par_db" + std::to_string(db_index + 1);
57+
par_handle db_handle = par_get_db(db_name);
58+
5559
par_get(db_handle, &keyData, &valueData, &error_msg);
5660

5761
if (error_msg != nullptr || valueData.val_size <= 0) {
@@ -108,4 +112,4 @@ bool ParallaxCatalogueReader::retrieve(const Key &key, Field &field) const
108112

109113
static fdb5::CatalogueBuilder<fdb5::ParallaxCatalogueReader> builder("parallax.reader");
110114

111-
}
115+
}

src/fdb5/parallax/ParallaxCatalogueWriter.cpp

Lines changed: 44 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -21,29 +21,6 @@ ParallaxCatalogueWriter::ParallaxCatalogueWriter(const Key &key, const fdb5::Con
2121
: ParallaxCatalogue(key, config)
2222
, firstIndexWrite_(false)
2323
{
24-
par_handle db_handle = par_get_db(PARALLAX_GLOBAL_DB);
25-
const char *error_msg = NULL;
26-
27-
par_key_value dbKey_kv;
28-
dbKey_kv.k.size = sizeof("key");
29-
dbKey_kv.k.data = "key";
30-
31-
std::ostringstream dbKeyStream;
32-
dbKeyStream << dbKey_;
33-
std::string dbKeyStr = dbKeyStream.str();
34-
35-
if (dbKeyStr.size() + 1 > 512) {
36-
throw eckit::Exception("Serialised db key exceeded maximum length.");
37-
}
38-
39-
dbKey_kv.v.val_size = dbKeyStr.size() + 1;
40-
dbKey_kv.v.val_buffer = (char *)dbKeyStr.c_str();
41-
42-
par_put(db_handle, &dbKey_kv, &error_msg);
43-
if (error_msg) {
44-
throw eckit::Exception("Failed to insert dbKey: " + std::string(error_msg));
45-
}
46-
4724
std::string path = config.schemaPath();
4825

4926
std::stringstream schema_buffer;
@@ -56,7 +33,6 @@ ParallaxCatalogueWriter::ParallaxCatalogueWriter(const Key &key, const fdb5::Con
5633
}
5734

5835
std::string schema_str = schema_buffer.str();
59-
error_msg = NULL;
6036
par_key_value schema_kv;
6137
std::string key_str = "schema";
6238

@@ -68,6 +44,13 @@ ParallaxCatalogueWriter::ParallaxCatalogueWriter(const Key &key, const fdb5::Con
6844
std::memcpy(schema_kv.v.val_buffer, schema_str.c_str(), schema_str.size());
6945
schema_kv.v.val_buffer[schema_str.size()] = '\0';
7046

47+
size_t hash = std::hash<std::string>{}(key_str.c_str());
48+
int db_index = hash % 16;
49+
50+
std::string db_name = "par_db" + std::to_string(db_index + 1);
51+
par_handle db_handle = par_get_db(db_name);
52+
const char *error_msg = NULL;
53+
7154
par_put(db_handle, &schema_kv, &error_msg);
7255
if (error_msg) {
7356
std::cout << "Sorry Parallax put failed reason: " << error_msg << std::endl;
@@ -99,9 +82,6 @@ bool ParallaxCatalogueWriter::selectIndex(const Key &key)
9982
currentIndexKey_ = key;
10083

10184
if (indexes_.find(key) == indexes_.end()) {
102-
par_handle db_handle = par_get_db(PARALLAX_GLOBAL_DB);
103-
const char *error_msg = nullptr;
104-
10585
par_key_value kv;
10686
std::string keyStr = key.valuesToString();
10787
kv.k.size = keyStr.size() + 1;
@@ -115,6 +95,13 @@ bool ParallaxCatalogueWriter::selectIndex(const Key &key)
11595
throw eckit::Exception("Memory allocation failed for index retrieval.");
11696
}
11797

98+
size_t hash = std::hash<std::string>{}(keyStr.c_str());
99+
int db_index = hash % 16;
100+
101+
std::string db_name = "par_db" + std::to_string(db_index + 1);
102+
par_handle db_handle = par_get_db(db_name);
103+
const char *error_msg = nullptr;
104+
118105
par_get(db_handle, &kv.k, &kv.v, &error_msg);
119106

120107
error_msg = nullptr;
@@ -130,6 +117,12 @@ bool ParallaxCatalogueWriter::selectIndex(const Key &key)
130117
strncpy(kv.v.val_buffer, placeholderValue.c_str(), kv.v.val_buffer_size - 1);
131118
kv.v.val_buffer[kv.v.val_buffer_size - 1] = '\0';
132119

120+
size_t hash = std::hash<std::string>{}(keyStr.c_str());
121+
int db_index = hash % 16;
122+
123+
std::string db_name = "par_db" + std::to_string(db_index + 1);
124+
db_handle = par_get_db(db_name);
125+
133126
par_put(db_handle, &kv, &error_msg);
134127

135128
if (error_msg) {
@@ -138,7 +131,6 @@ bool ParallaxCatalogueWriter::selectIndex(const Key &key)
138131
}
139132
}
140133
}
141-
142134
indexes_[key] = Index(new ParallaxIndex(key));
143135
current_ = indexes_[key];
144136
firstIndexWrite_ = true;
@@ -174,6 +166,11 @@ const Index &ParallaxCatalogueWriter::currentIndex()
174166

175167
void ParallaxCatalogueWriter::archive(const Key &key, std::unique_ptr<FieldLocation> fieldLocation)
176168
{
169+
par_handle db_handle;
170+
size_t hash;
171+
int db_index;
172+
std::string db_name;
173+
177174
if (current_.null()) {
178175
ASSERT(!currentIndexKey_.empty());
179176
selectIndex(currentIndexKey_);
@@ -206,12 +203,6 @@ void ParallaxCatalogueWriter::archive(const Key &key, std::unique_ptr<FieldLocat
206203

207204
current_.put(key, field);
208205

209-
par_handle db_handle = par_get_db("par_db");
210-
if (!db_handle) {
211-
std::cerr << "Failed to open Parallax database." << std::endl;
212-
_exit(EXIT_FAILURE);
213-
}
214-
215206
if (firstIndexWrite_) {
216207
par_key_value kv{};
217208
const char *error_message = nullptr;
@@ -224,6 +215,12 @@ void ParallaxCatalogueWriter::archive(const Key &key, std::unique_ptr<FieldLocat
224215
kv.v.val_buffer = axisNames.data();
225216
kv.v.val_size = axisNames.length();
226217

218+
hash = std::hash<std::string>{}(indexKeyWithAxes.c_str());
219+
db_index = hash % 16;
220+
221+
db_name = "par_db" + std::to_string(db_index + 1);
222+
db_handle = par_get_db(db_name);
223+
227224
par_put(db_handle, &kv, &error_message);
228225
if (error_message) {
229226
std::cerr << "Parallax put failed: " << error_message << std::endl;
@@ -250,6 +247,12 @@ void ParallaxCatalogueWriter::archive(const Key &key, std::unique_ptr<FieldLocat
250247
.val_size = 0,
251248
.val_buffer = value_buf.data() };
252249

250+
hash = std::hash<std::string>{}(axisKey.c_str());
251+
db_index = hash % 16;
252+
253+
db_name = "par_db" + std::to_string(db_index + 1);
254+
db_handle = par_get_db(db_name);
255+
253256
par_get(db_handle, &existing_key, &existing_value, &error_message2);
254257

255258
std::string updatedValueStr;
@@ -275,6 +278,12 @@ void ParallaxCatalogueWriter::archive(const Key &key, std::unique_ptr<FieldLocat
275278
kv2.v.val_size = updatedValueStr.size();
276279

277280
error_message2 = nullptr;
281+
282+
size_t hash = std::hash<std::string>{}(axisKey.c_str());
283+
int db_index = hash % 16;
284+
285+
std::string db_name = "par_db" + std::to_string(db_index + 1);
286+
db_handle = par_get_db(db_name);
278287
par_put(db_handle, &kv2, &error_message2);
279288
if (error_message2) {
280289
_exit(EXIT_FAILURE);
@@ -298,4 +307,4 @@ void ParallaxCatalogueWriter::closeIndexes()
298307

299308
static fdb5::CatalogueBuilder<fdb5::ParallaxCatalogueWriter> builder("parallax.writer");
300309

301-
}
310+
}

src/fdb5/parallax/ParallaxIndex.cpp

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,11 @@ ParallaxIndex::ParallaxIndex(const Key &key, bool readAxes)
3535

3636
void ParallaxIndex::updateAxes()
3737
{
38-
par_handle db_handle = par_get_db(PARALLAX_GLOBAL_DB);
39-
if (!db_handle) {
40-
throw eckit::Exception("Failed to open Parallax metadata database.");
41-
}
38+
par_handle db_handle;
39+
size_t hash;
40+
int db_index;
41+
42+
std::string db_name;
4243

4344
std::string keyStr = "axes";
4445
struct par_key axes_key {
@@ -51,6 +52,11 @@ void ParallaxIndex::updateAxes()
5152
.val_buffer = axes_data.data() };
5253

5354
const char *error_msg = nullptr;
55+
hash = std::hash<std::string>{}(keyStr.c_str());
56+
db_index = hash % 16;
57+
58+
db_name = "par_db" + std::to_string(db_index + 1);
59+
db_handle = par_get_db(db_name);
5460
par_get(db_handle, &axes_key, &axes_value, &error_msg);
5561

5662
std::vector<std::string> axis_names;
@@ -72,6 +78,12 @@ void ParallaxIndex::updateAxes()
7278
.val_size = 0,
7379
.val_buffer = axis_values_buf.data() };
7480

81+
hash = std::hash<std::string>{}(axisKeyStr.c_str());
82+
db_index = hash % 16;
83+
84+
db_name = "par_db" + std::to_string(db_index + 1);
85+
db_handle = par_get_db(db_name);
86+
7587
par_get(db_handle, &axis_key, &axis_value, &error_msg);
7688

7789
std::vector<std::string> values;
@@ -102,9 +114,13 @@ bool ParallaxIndex::get(const Key &key, const Key &remapKey, Field &field) const
102114
value.val_buffer_size = field_loc_max_len;
103115
value.val_size = 0;
104116

117+
size_t hash = std::hash<std::string>{}(query.c_str());
118+
int db_index = hash % 16;
119+
120+
std::string db_name = "par_db" + std::to_string(db_index + 1);
121+
par_handle db_handle = par_get_db(db_name);
105122
const char *error_msg = nullptr;
106123

107-
par_handle db_handle = par_get_db(PARALLAX_GLOBAL_DB);
108124
if (!db_handle) {
109125
throw eckit::Exception("Failed to open Parallax index database");
110126
}
@@ -142,16 +158,11 @@ void ParallaxIndex::add(const Key &key, const Field &field)
142158
throw eckit::Exception("Serialized field location exceeded maximum allowed length.");
143159
}
144160

145-
par_handle db_handle = par_get_db(PARALLAX_GLOBAL_DB);
146-
if (!db_handle) {
147-
throw eckit::Exception("Failed to get Parallax database handle.");
148-
}
149-
150161
std::string keyStr = key.valuesToString();
151162
par_value valueData;
152163
valueData.val_size = hs.bytesWritten();
153-
valueData.val_buffer_size = valueData.val_size;
154-
valueData.val_buffer = (char *)malloc(valueData.val_size);
164+
valueData.val_buffer_size = h.size();
165+
valueData.val_buffer = reinterpret_cast<char *>(const_cast<void *>(h.data()));
155166
if (!valueData.val_buffer) {
156167
throw eckit::Exception("Memory allocation failed for Parallax index storage");
157168
}
@@ -160,12 +171,18 @@ void ParallaxIndex::add(const Key &key, const Field &field)
160171
keyData.k.data = keyStr.c_str();
161172
keyData.v = valueData;
162173

163-
memcpy(valueData.val_buffer, h.data(), valueData.val_size);
174+
size_t hash = std::hash<std::string>{}(keyStr);
175+
int db_index = hash % 16;
164176

177+
std::string db_name = "par_db" + std::to_string(db_index + 1);
178+
par_handle db_handle = par_get_db(db_name);
165179
const char *error_msg = nullptr;
166-
par_put(db_handle, &keyData, &error_msg);
167180

168-
free(valueData.val_buffer);
181+
if (!db_handle) {
182+
throw eckit::Exception("Failed to get Parallax database handle.");
183+
}
184+
185+
par_put(db_handle, &keyData, &error_msg);
169186

170187
if (error_msg != nullptr) {
171188
throw eckit::Exception(std::string("Parallax index insertion failed: ") + error_msg);

src/fdb5/parallax/ParallaxStore.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,15 @@ std::unique_ptr<FieldLocation> ParallaxStore::archive(const Key &key, const void
6363
std::string internalKey = key.valuesToString();
6464
internalKey += std::to_string(archive_counter.fetch_add(1));
6565

66-
par_handle db_handle = par_get_db(PARALLAX_GLOBAL_DB);
66+
size_t hash = std::hash<std::string>{}(internalKey);
67+
int db_index = hash % 16;
68+
69+
std::string db_name = "par_db" + std::to_string(db_index + 1);
70+
par_handle db_handle = par_get_db(db_name);
71+
6772
const char *error_msg = nullptr;
6873

69-
struct par_key_value kv {
70-
};
74+
struct par_key_value kv {};
7175
kv.k.data = internalKey.data();
7276
kv.k.size = internalKey.size() + 1;
7377

0 commit comments

Comments
 (0)