@@ -123,8 +123,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
123123 }
124124 }
125125
126- bool NotifyFetched (EntryId id, string_view value, tiering::DiskSegment segment,
127- bool modified) override ;
126+ bool NotifyFetched (EntryId id, tiering::DiskSegment segment, tiering::Decoder* decoder) override ;
128127
129128 bool NotifyDelete (tiering::DiskSegment segment) override ;
130129
@@ -191,6 +190,8 @@ void TieredStorage::ShardOpManager::Defragment(tiering::DiskSegment segment, str
191190 if (!IsValid (it))
192191 continue ;
193192
193+ // TODO: Handle upload and cooling via type dependent decoders
194+
194195 stats_.total_defrags ++;
195196 PrimeValue& pv = it->second ;
196197 if (pv.IsCool ()) {
@@ -210,12 +211,13 @@ void TieredStorage::ShardOpManager::Defragment(tiering::DiskSegment segment, str
210211 }
211212}
212213
213- bool TieredStorage::ShardOpManager::NotifyFetched (EntryId id, string_view value ,
214- tiering::DiskSegment segment, bool modified ) {
214+ bool TieredStorage::ShardOpManager::NotifyFetched (EntryId id, tiering::DiskSegment segment ,
215+ tiering::Decoder* decoder ) {
215216 ++stats_.total_fetches ;
216217
217218 if (id == EntryId{kFragmentedBin }) { // Generally we read whole bins only for defrag
218- Defragment (segment, value);
219+ auto * bdecoder = static_cast <tiering::BareDecoder*>(decoder);
220+ Defragment (segment, bdecoder->slice );
219221 return true ; // delete
220222 }
221223
@@ -224,20 +226,20 @@ bool TieredStorage::ShardOpManager::NotifyFetched(EntryId id, string_view value,
224226 // Currently, our heuristic is not very smart, because we stop uploading any reads during
225227 // the snapshotting.
226228 // TODO: to revisit this when we rewrite it with more efficient snapshotting algorithm.
227- bool should_upload = modified;
228- should_upload |=
229- (ts_-> UploadBudget () > int64_t (value. length ())) && !SliceSnapshot::IsSnaphotInProgress ();
229+ bool should_upload = decoder-> modified ;
230+ should_upload |= (ts_-> UploadBudget () > int64_t (decoder-> estimated_mem_usage )) &&
231+ !SliceSnapshot::IsSnaphotInProgress ();
230232
231233 if (!should_upload)
232234 return false ;
233235
234236 auto key = get<OpManager::KeyRef>(id);
235237 auto * pv = Find (key);
236238 if (pv && pv->IsExternal () && segment == pv->GetExternalSlice ()) {
237- if (modified || pv->WasTouched ()) {
238- bool is_raw = !modified;
239+ if (decoder->modified || pv->WasTouched ()) {
239240 ++stats_.total_uploads ;
240- Upload (key.first , value, is_raw, segment.length , pv);
241+ decoder->Upload (pv);
242+ RecordDeleted (*pv, segment.length , GetDbTableStats (key.first ));
241243 return true ;
242244 }
243245 pv->SetTouched (true );
@@ -262,11 +264,7 @@ bool TieredStorage::ShardOpManager::NotifyDelete(tiering::DiskSegment segment) {
262264 if (bin.fragmented ) {
263265 // Trigger read to signal need for defragmentation. NotifyFetched will handle it.
264266 DVLOG (2 ) << " Enqueueing bin defragmentation for: " << bin.segment .offset ;
265- auto cb = [dummy = 5 ](auto res) -> bool {
266- (void )dummy; // a hack to make cb non constexpr that confuses some old) compilers.
267- return false ;
268- };
269- Enqueue (kFragmentedBin , bin.segment , std::move (cb));
267+ Enqueue (kFragmentedBin , bin.segment , tiering::BareDecoder{}, [](auto res) {});
270268 }
271269
272270 return false ;
@@ -324,14 +322,11 @@ void TieredStorage::Read(DbIndex dbid, std::string_view key, const PrimeValue& v
324322 std::function<void (io::Result<std::string>)> readf) {
325323 DCHECK (value.IsExternal ());
326324 DCHECK (!value.IsCool ());
327- auto cb = [readf = std::move (readf), enc = value.GetStrEncoding ()](auto res) mutable {
328- readf (res.transform ([enc](tiering::OpManager::FetchedEntry entry) {
329- auto [ptr, raw] = entry;
330- return raw ? enc.Decode (*ptr).Take () : *ptr; // TODO(vlad): optimize last value copy
331- }));
332- return false ;
325+ auto cb = [readf = std::move (readf)](io::Result<tiering::StringDecoder*> res) mutable {
326+ readf (res.transform ([](auto * d) { return string{d->Read ()}; }));
333327 };
334- op_manager_->Enqueue (KeyRef (dbid, key), value.GetExternalSlice (), std::move (cb));
328+ op_manager_->Enqueue (KeyRef (dbid, key), value.GetExternalSlice (), tiering::StringDecoder{value},
329+ std::move (cb));
335330}
336331
337332template <typename T>
@@ -341,21 +336,11 @@ TieredStorage::TResult<T> TieredStorage::Modify(DbIndex dbid, std::string_view k
341336 DCHECK (value.IsExternal ());
342337
343338 util::fb2::Future<io::Result<T>> future;
344- auto cb = [future, modf = std::move (modf), enc = value.GetStrEncoding ()](auto res) mutable {
345- if (!res.has_value ()) {
346- future.Resolve (res.get_unexpected ());
347- return false ;
348- }
349-
350- auto [raw_val, is_raw] = *res;
351- if (is_raw) {
352- raw_val->resize (enc.DecodedSize (*raw_val));
353- enc.Decode (*raw_val, raw_val->data ());
354- }
355- future.Resolve (modf (raw_val));
356- return true ;
339+ auto cb = [future, modf = std::move (modf)](io::Result<tiering::StringDecoder*> res) mutable {
340+ future.Resolve (res.transform ([&modf](auto * d) { return modf (d->Write ()); }));
357341 };
358- op_manager_->Enqueue (KeyRef (dbid, key), value.GetExternalSlice (), std::move (cb));
342+ op_manager_->Enqueue (KeyRef (dbid, key), value.GetExternalSlice (), tiering::StringDecoder{value},
343+ std::move (cb));
359344 return future;
360345}
361346
0 commit comments