@@ -2565,38 +2565,27 @@ void RdbLoader::CreateObjectOnShard(const DbContext& db_cntx, const Item* item,
2565
2565
item->val .rdb_type );
2566
2566
};
2567
2567
2568
- // The scope is important here, as we need to ensure that the object memory is properly
2569
- // accounted for.
2570
- DbSlice::ItAndUpdater append_res;
2571
-
2572
- LoadConfig tmp_load_config = item->load_config ;
2573
-
2574
- // If we're appending the item to an existing key, first load the
2575
- // object.
2576
- if (tmp_load_config.append ) {
2577
- append_res = db_slice->FindMutable (db_cntx, item->key );
2578
- if (IsValid (append_res.it )) {
2579
- pv_ptr = &append_res.it ->second ;
2568
+ // Streamed big values are stored in a separate map. unique_ptr for pointer stability
2569
+ thread_local std::unordered_map<std::string, std::unique_ptr<PrimeValue>> now_streamed;
2570
+ LoadConfig config_copy = item->load_config ;
2571
+ if (item->load_config .streamed && item->load_config .append ) {
2572
+ if (auto it = now_streamed.find (item->key ); it != now_streamed.end ()) {
2573
+ pv_ptr = &*now_streamed[item->key ];
2580
2574
} else {
2581
- // If the item has expired we may not find the key. Note if the key
2582
- // is found, but expired since we started loading, we still append to
2583
- // avoid an inconsistent state where only part of the key is loaded.
2584
- // We allow expiration for values inside sset/hmap,
2585
- // so the object can unexist if all keys in it were expired
2575
+ // Sets and hashes are deleted when all their entries are expired.
2576
+ // If it's the case, set reset append flag and start from scratch.
2586
2577
bool key_is_not_expired = item->expire_ms == 0 || db_cntx.time_now_ms < item->expire_ms ;
2587
2578
bool is_set_expiry_type = item->val .rdb_type == RDB_TYPE_HASH_WITH_EXPIRY ||
2588
2579
item->val .rdb_type == RDB_TYPE_SET_WITH_EXPIRY;
2589
2580
if (!is_set_expiry_type && key_is_not_expired) {
2590
2581
LOG (ERROR) << " Count not to find append key '" << item->key << " ' in DB " << db_ind;
2591
2582
return ;
2592
2583
}
2593
- // Because the previous batches of items for this key were fully expired,
2594
- // we need to create a new key
2595
- tmp_load_config.append = false ;
2584
+ config_copy.append = false ;
2596
2585
}
2597
2586
}
2598
2587
2599
- if (auto ec = FromOpaque (item->val , tmp_load_config , pv_ptr); ec) {
2588
+ if (auto ec = FromOpaque (item->val , config_copy , pv_ptr); ec) {
2600
2589
if (ec.value () == errc::value_expired) {
2601
2590
// hmap and sset values can expire and we ok with it,
2602
2591
// so we don't set ec_ in this case
@@ -2614,11 +2603,18 @@ void RdbLoader::CreateObjectOnShard(const DbContext& db_cntx, const Item* item,
2614
2603
}
2615
2604
LOG (ERROR) << " Could not load value for key '" << item->key << " ' in DB " << db_ind;
2616
2605
stop_early_ = true ;
2606
+ now_streamed.clear ();
2617
2607
return ;
2618
2608
}
2619
2609
2620
- if (tmp_load_config.append ) {
2621
- return ;
2610
+ if (item->load_config .streamed ) {
2611
+ if (now_streamed.find (item->key ) == now_streamed.end ())
2612
+ now_streamed.emplace (item->key , make_unique<PrimeValue>(std::move (pv)));
2613
+
2614
+ if (!item->load_config .finalize )
2615
+ return ;
2616
+
2617
+ pv = std::move (*now_streamed.extract (item->key ).mapped ());
2622
2618
}
2623
2619
2624
2620
// We need this extra check because we don't return empty_key
@@ -2722,7 +2718,8 @@ error_code RdbLoader::LoadKeyValPair(int type, ObjSettings* settings) {
2722
2718
continue ;
2723
2719
}
2724
2720
2725
- if (pending_read_.remaining > 0 ) {
2721
+ item->load_config .finalize = pending_read_.remaining == 0 ;
2722
+ if (!item->load_config .finalize ) {
2726
2723
item->key = key;
2727
2724
streamed = true ;
2728
2725
} else {
@@ -2760,7 +2757,7 @@ error_code RdbLoader::LoadKeyValPair(int type, ObjSettings* settings) {
2760
2757
FlushShardAsync (sid);
2761
2758
}
2762
2759
}
2763
- } while (pending_read_.remaining > 0 );
2760
+ } while (pending_read_.remaining > 0 && !stop_early_. load (memory_order_relaxed) );
2764
2761
2765
2762
int delta_ms = (absl::GetCurrentTimeNanos () - start) / 1000'000 ;
2766
2763
LOG_IF (INFO, delta_ms > 1000 ) << " Took " << delta_ms << " ms to load rdb_type " << type;
0 commit comments