Skip to content

Commit b898823

Browse files
committed
fix(FAM): FamStore WIP
FDB-528
1 parent 5dbcdad commit b898823

File tree

2 files changed

+125
-19
lines changed

2 files changed

+125
-19
lines changed

src/fdb5/fam/FamStore.cc

Lines changed: 102 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <memory>
2020
#include <ostream>
2121
#include <set>
22+
#include <sstream>
2223
#include <string>
2324
#include <vector>
2425

@@ -29,13 +30,15 @@
2930
#include "eckit/io/fam/FamObject.h"
3031
#include "eckit/io/fam/FamObjectName.h"
3132
#include "eckit/io/fam/FamPath.h"
33+
#include "eckit/io/fam/FamRegion.h"
3234
#include "eckit/log/Log.h"
3335

3436
#include "fdb5/LibFdb5.h"
3537
#include "fdb5/database/Field.h"
3638
#include "fdb5/database/FieldLocation.h"
3739
#include "fdb5/database/Key.h"
3840
#include "fdb5/database/Store.h"
41+
#include "fdb5/database/WipeState.h"
3942
#include "fdb5/fam/FamCommon.h"
4043
#include "fdb5/fam/FamFieldLocation.h"
4144

@@ -45,12 +48,19 @@ static const StoreBuilder<FamStore> builder(FamCommon::type);
4548

4649
//----------------------------------------------------------------------------------------------------------------------
4750

48-
FamStore::FamStore(const Key& key, const Config& config) : FamCommon(config, key), config_(config) {}
51+
FamStore::FamStore(const Key& key, const Config& config) : FamCommon(key, config) {}
52+
53+
FamStore::FamStore(const eckit::URI& uri, const Config& config) : FamCommon(uri, config) {}
4954

5055
FamStore::~FamStore() = default;
5156

5257
//----------------------------------------------------------------------------------------------------------------------
5358

59+
eckit::URI FamStore::uri(const eckit::URI& dataURI) {
60+
ASSERT(dataURI.scheme() == FamCommon::type);
61+
return eckit::FamObjectName(dataURI).uri();
62+
}
63+
5464
auto FamStore::uri() const -> eckit::URI {
5565
return FamCommon::uri();
5666
}
@@ -79,12 +89,22 @@ void FamStore::close() {
7989
LOG_DEBUG_LIB(LibFdb5) << "FamStore::close() nothing to do!" << '\n';
8090
}
8191

82-
auto FamStore::asCollocatedDataURIs(const std::vector<eckit::URI>& /* uriList */) const -> std::set<eckit::URI> {
83-
NOTIMP;
92+
std::set<eckit::URI> FamStore::collocatedDataURIs() const {
93+
return {};
94+
}
95+
96+
std::set<eckit::URI> FamStore::asCollocatedDataURIs(const std::set<eckit::URI>& uris) const {
97+
std::set<eckit::URI> res;
98+
for (const auto& uri : uris) {
99+
ASSERT(uriBelongs(uri));
100+
res.insert(uri);
101+
}
102+
return res;
84103
}
85104

86-
auto FamStore::collocatedDataURIs() const -> std::vector<eckit::URI> {
87-
NOTIMP;
105+
std::vector<eckit::URI> FamStore::getAuxiliaryURIs(const eckit::URI& uri, bool /*onlyExisting*/) const {
106+
ASSERT(uri.scheme() == type());
107+
return {};
88108
}
89109

90110
auto FamStore::makeObject(const Key& key) const -> eckit::FamObjectName {
@@ -117,14 +137,6 @@ auto FamStore::archive(const Key& key, const void* data, eckit::Length length) -
117137
return std::make_unique<FamFieldLocation>(object.uri(), 0, length, fdb5::Key());
118138
}
119139

120-
void FamStore::remove(const Key& key) const {
121-
auto object = makeObject(key);
122-
123-
LOG_DEBUG_LIB(LibFdb5) << "FamStore removing object: " << object << '\n';
124-
125-
object.lookup().deallocate();
126-
}
127-
128140
void FamStore::remove(const eckit::URI& uri, std::ostream& logAlways, std::ostream& logVerbose, bool doit) const {
129141
ASSERT(root_.uriBelongs(uri));
130142

@@ -136,6 +148,83 @@ void FamStore::remove(const eckit::URI& uri, std::ostream& logAlways, std::ostre
136148
}
137149
}
138150

151+
void FamStore::finaliseWipeState(StoreWipeState& storeState, bool /*doit*/, bool /*unsafeWipeAll*/) {
152+
153+
const std::set<eckit::URI>& dataURIs = storeState.includedDataURIs();
154+
155+
if (dataURIs.empty()) {
156+
return;
157+
}
158+
159+
if (!root_.exists()) {
160+
storeState.markAllMissing();
161+
return;
162+
}
163+
164+
for (const auto& uri : dataURIs) {
165+
if (!uriBelongs(uri)) {
166+
std::stringstream msg;
167+
msg << "FamStore::finaliseWipeState: Index to be deleted has pointers to fields that don't belong to the "
168+
"configured store."
169+
<< '\n';
170+
msg << "Configured Store URI: " << this->uri().asString() << '\n';
171+
msg << "Pointed Store unit URI: " << uri.asString() << '\n';
172+
msg << "Impossible to delete such fields. Index deletion aborted to avoid leaking fields." << '\n';
173+
throw eckit::SeriousBug(msg.str(), Here());
174+
}
175+
176+
if (!uriExists(uri)) {
177+
storeState.markAsMissing(uri);
178+
}
179+
}
180+
}
181+
182+
bool FamStore::doWipeUnknowns(const std::set<eckit::URI>& unknownURIs) const {
183+
// if (!wipeDoit_ || !wipeUnsafeWipeAll_) {
184+
// return true;
185+
// }
186+
187+
for (const auto& uri : unknownURIs) {
188+
if (!uriBelongs(uri)) {
189+
continue;
190+
}
191+
if (uriExists(uri)) {
192+
remove(uri, eckit::Log::info(), eckit::Log::info(), true);
193+
}
194+
}
195+
196+
return true;
197+
}
198+
199+
bool FamStore::doWipeURIs(const StoreWipeState& wipeState) const {
200+
const bool wipeall = wipeState.safeURIs().empty();
201+
202+
for (const auto& uri : wipeState.dataAuxiliaryURIs()) {
203+
remove(uri, eckit::Log::info(), eckit::Log::info(), true);
204+
}
205+
206+
for (const auto& uri : wipeState.includedDataURIs()) {
207+
remove(uri, eckit::Log::info(), eckit::Log::info(), true);
208+
}
209+
210+
if (wipeall) {
211+
cleanupEmptyDatabase_ = true;
212+
}
213+
214+
return true;
215+
}
216+
217+
void FamStore::doWipeEmptyDatabase() const {
218+
if (!cleanupEmptyDatabase_) {
219+
return;
220+
}
221+
222+
if (root_.exists()) {
223+
root_.lookup().destroy();
224+
}
225+
cleanupEmptyDatabase_ = false;
226+
}
227+
139228
void FamStore::print(std::ostream& out) const {
140229
out << "FamStore[root=" << root_ << ']';
141230
}

src/fdb5/fam/FamStore.h

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
namespace fdb5 {
3939

4040
class Schema;
41+
class StoreWipeState;
4142

4243
//----------------------------------------------------------------------------------------------------------------------
4344

@@ -54,20 +55,30 @@ class FamStore : protected FamCommon, public Store {
5455
public: // methods
5556

5657
FamStore(const Key& key, const Config& config);
58+
FamStore(const eckit::URI& uri, const Config& config);
59+
60+
FamStore(const FamStore&) = delete;
61+
FamStore& operator=(const FamStore&) = delete;
62+
FamStore(FamStore&&) = delete;
63+
FamStore& operator=(FamStore&&) = delete;
5764

5865
~FamStore() override;
5966

6067
auto type() const -> std::string override { return FamCommon::type; }
6168

69+
static eckit::URI uri(const eckit::URI& dataURI);
70+
6271
auto uri() const -> eckit::URI override;
6372

6473
auto uriBelongs(const eckit::URI& uri) const -> bool override;
6574

6675
auto uriExists(const eckit::URI& uri) const -> bool override;
6776

68-
auto collocatedDataURIs() const -> std::vector<eckit::URI> override;
77+
std::set<eckit::URI> collocatedDataURIs() const override;
78+
79+
std::set<eckit::URI> asCollocatedDataURIs(const std::set<eckit::URI>& uris) const override;
6980

70-
auto asCollocatedDataURIs(const std::vector<eckit::URI>& uriList) const -> std::set<eckit::URI> override;
81+
std::vector<eckit::URI> getAuxiliaryURIs(const eckit::URI& uri, bool onlyExisting) const override;
7182

7283
auto open() -> bool override { return true; }
7384

@@ -77,6 +88,16 @@ class FamStore : protected FamCommon, public Store {
7788

7889
void checkUID() const override {}
7990

91+
void finaliseWipeState(StoreWipeState& storeState, bool doit, bool unsafeWipeAll) override;
92+
93+
bool doWipeUnknowns(const std::set<eckit::URI>& unknownURIs) const override;
94+
95+
bool doWipeURIs(const StoreWipeState& wipeState) const override;
96+
97+
void doWipeEmptyDatabase() const override;
98+
99+
bool doUnsafeFullWipe() const override { return false; }
100+
80101
auto makeObject(const Key& key) const -> eckit::FamObjectName;
81102

82103
protected: // methods
@@ -88,17 +109,13 @@ class FamStore : protected FamCommon, public Store {
88109
auto archive(const Key& key, const void* data, eckit::Length length)
89110
-> std::unique_ptr<const FieldLocation> override;
90111

91-
void remove(const Key& key) const override;
92-
93112
void remove(const eckit::URI& uri, std::ostream& logAlways, std::ostream& logVerbose, bool doit) const override;
94113

95114
void print(std::ostream& out) const override;
96115

97116
private: // members
98117

99118
mutable Stats stats_;
100-
101-
const Config& config_;
102119
};
103120

104121
//----------------------------------------------------------------------------------------------------------------------

0 commit comments

Comments
 (0)