Skip to content

Commit 4ce799f

Browse files
[feat] Implement ParallaxStore interface for Parallax storage integration
1 parent ab59f94 commit 4ce799f

File tree

2 files changed

+166
-0
lines changed

2 files changed

+166
-0
lines changed
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* (C) Copyright 1996- ECMWF.
3+
*
4+
* This software is licensed under the terms of the Apache Licence Version 2.0
5+
* which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
6+
* In applying this licence, ECMWF does not waive the privileges and immunities
7+
* granted to it by virtue of its status as an intergovernmental organisation nor
8+
* does it submit to any jurisdiction.
9+
*/
10+
11+
#include "ParallaxStore.h"
12+
13+
#include "eckit/config/Resource.h"
14+
#include "fdb5/parallax/ParallaxCatalogue.h"
15+
#include <atomic>
16+
17+
namespace fdb5
18+
{
19+
ParallaxStore::ParallaxStore(const Schema &schema, const Key &key, const Config &config)
20+
: Store(schema)
21+
{
22+
}
23+
24+
eckit::URI ParallaxStore::uri() const
25+
{
26+
throw std::logic_error("uri not implemented");
27+
}
28+
29+
bool ParallaxStore::uriBelongs(const eckit::URI &uri) const
30+
{
31+
throw std::logic_error("uriBelongs not implemented");
32+
}
33+
34+
bool ParallaxStore::uriExists(const eckit::URI &uri) const
35+
{
36+
throw std::logic_error("uriExists not implemented");
37+
}
38+
39+
std::vector<eckit::URI> ParallaxStore::collocatedDataURIs() const
40+
{
41+
throw std::logic_error("collocatedDataURIs not implemented");
42+
}
43+
44+
std::set<eckit::URI> ParallaxStore::asCollocatedDataURIs(const std::vector<eckit::URI> &uris) const
45+
{
46+
throw std::logic_error("asCollocatedDataURIs not implemented");
47+
}
48+
49+
bool ParallaxStore::exists() const
50+
{
51+
throw std::logic_error("exists not implemented");
52+
}
53+
54+
eckit::DataHandle *ParallaxStore::retrieve(Field &field) const
55+
{
56+
return field.dataHandle();
57+
}
58+
59+
std::unique_ptr<FieldLocation> ParallaxStore::archive(const Key &key, const void *data, eckit::Length length)
60+
{
61+
static std::atomic<uint64_t> archive_counter{ 0 };
62+
63+
std::string internalKey = key.valuesToString();
64+
internalKey += std::to_string(archive_counter.fetch_add(1));
65+
66+
par_handle db_handle = par_get_db(PARALLAX_GLOBAL_DB);
67+
const char *error_msg = nullptr;
68+
69+
struct par_key_value kv {
70+
};
71+
kv.k.data = internalKey.data();
72+
kv.k.size = internalKey.size() + 1;
73+
74+
kv.v.val_buffer = const_cast<char *>(reinterpret_cast<const char *>(data));
75+
kv.v.val_size = length;
76+
77+
par_put(db_handle, &kv, &error_msg);
78+
79+
if (error_msg) {
80+
std::cerr << "Parallax put failed: " << error_msg << std::endl;
81+
_exit(EXIT_FAILURE);
82+
}
83+
84+
eckit::URI uri("parallax", kv.k.data);
85+
return std::make_unique<ParallaxFieldLocation>(uri, 0, length, Key(nullptr, true));
86+
}
87+
88+
void ParallaxStore::flush()
89+
{
90+
}
91+
92+
void ParallaxStore::remove(const eckit::URI &uri, std::ostream &logAlways, std::ostream &logVerbose, bool doit) const
93+
{
94+
throw std::logic_error("remove not implemented");
95+
}
96+
97+
void ParallaxStore::print(std::ostream &out) const
98+
{
99+
throw std::logic_error("print not implemented");
100+
}
101+
102+
static StoreBuilder<ParallaxStore> builder("parallax");
103+
104+
}

src/fdb5/parallax/ParallaxStore.h

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* (C) Copyright 1996- ECMWF.
3+
*
4+
* This software is licensed under the terms of the Apache Licence Version 2.0
5+
* which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
6+
* In applying this licence, ECMWF does not waive the privileges and immunities
7+
* granted to it by virtue of its status as an intergovernmental organisation nor
8+
* does it submit to any jurisdiction.
9+
*/
10+
11+
#pragma once
12+
13+
#include "fdb5/database/Store.h"
14+
#include "fdb5/parallax/ParallaxArray.h"
15+
#include "fdb5/parallax/ParallaxFieldLocation.h"
16+
#include "fdb5/parallax/parallax_handle.h"
17+
#include "fdb5/rules/Schema.h"
18+
19+
namespace fdb5
20+
{
21+
class ParallaxStore : public Store {
22+
public:
23+
ParallaxStore(const Schema &schema, const Key &key, const Config &config);
24+
25+
~ParallaxStore() override
26+
{
27+
}
28+
29+
eckit::URI uri() const override;
30+
bool uriBelongs(const eckit::URI &) const override;
31+
bool uriExists(const eckit::URI &) const override;
32+
std::vector<eckit::URI> collocatedDataURIs() const override;
33+
std::set<eckit::URI> asCollocatedDataURIs(const std::vector<eckit::URI> &) const override;
34+
35+
bool open() override
36+
{
37+
return true;
38+
}
39+
void flush() override;
40+
void close() override{};
41+
42+
void checkUID() const override
43+
{ /* nothing to do */
44+
}
45+
46+
protected:
47+
std::string type() const override
48+
{
49+
return "parallax";
50+
}
51+
52+
bool exists() const override;
53+
54+
eckit::DataHandle *retrieve(Field &field) const override;
55+
std::unique_ptr<FieldLocation> archive(const Key &key, const void *data, eckit::Length length) override;
56+
57+
void remove(const eckit::URI &uri, std::ostream &logAlways, std::ostream &logVerbose, bool doit) const override;
58+
59+
void print(std::ostream &out) const override;
60+
};
61+
62+
}

0 commit comments

Comments
 (0)