@@ -293,22 +293,28 @@ bool MagmaKVStore::MagmaCompactionCB::canPurge(CollectionID collection) {
293293 return onlyThisCollection.value () == collection;
294294}
295295
296- struct MagmaKVStoreTransactionContext : public TransactionContext {
297- MagmaKVStoreTransactionContext (KVStore& kvstore,
298- Vbid vbid,
299- std::unique_ptr<PersistenceCallback> cb)
300- : TransactionContext(kvstore, vbid, std::move(cb)) {
301- }
302- /* *
303- * Container for pending Magma requests.
304- *
305- * Using deque as as the expansion behaviour is less aggressive compared to
306- * std::vector (MagmaRequest objects are ~176 bytes in size).
307- */
308- using PendingRequestQueue = std::deque<MagmaRequest>;
309-
310- PendingRequestQueue pendingReqs;
311- };
296+ MagmaKVStoreTransactionContext::MagmaKVStoreTransactionContext (
297+ KVStore& kvstore, Vbid vbid, std::unique_ptr<PersistenceCallback> cb)
298+ : TransactionContext(kvstore, vbid, std::move(cb)) {
299+ }
300+
301+ void MagmaKVStoreTransactionContext::preparePendingRequests () {
302+ // MB-55199: Magma requires key/seqno order, but ascending seqno.
303+ // KV-engine flusher writes out the batch in key/seqno, but descending seqno
304+ // order.
305+ std::sort (pendingReqs.begin (),
306+ pendingReqs.end (),
307+ [](const auto & lhs, const auto & rhs) {
308+ const auto comp = lhs->getItem ().getKey ().compare (
309+ rhs->getItem ().getKey ());
310+ // When keys are equal, sort by seqno.
311+ if (comp == 0 ) {
312+ return lhs->getItem ().getBySeqno () <
313+ rhs->getItem ().getBySeqno ();
314+ }
315+ return comp < 0 ;
316+ });
317+ }
312318
313319std::pair<Status, bool > MagmaKVStore::compactionCallBack (
314320 MagmaKVStore::MagmaCompactionCB& cbCtx,
@@ -778,7 +784,8 @@ void MagmaKVStore::commitCallback(MagmaKVStoreTransactionContext& txnCtx,
778784 int errCode,
779785 kvstats_ctx&) {
780786 const auto flushSuccess = (errCode == Status::Code::Ok);
781- for (const auto & req : txnCtx.pendingReqs ) {
787+ for (const auto & reqPtr : txnCtx.pendingReqs ) {
788+ const auto & req = *reqPtr;
782789 size_t mutationSize = req.getRawKeyLen () + req.getBodySize () +
783790 req.getDocMeta ().size ();
784791 st.io_num_write ++;
@@ -900,7 +907,8 @@ void MagmaKVStore::set(TransactionContext& txnCtx, queued_item item) {
900907 }
901908
902909 auto & ctx = static_cast <MagmaKVStoreTransactionContext&>(txnCtx);
903- ctx.pendingReqs .emplace_back (std::move (item));
910+ ctx.pendingReqs .emplace_back (
911+ std::make_unique<MagmaRequest>(std::move (item)));
904912}
905913
906914GetValue MagmaKVStore::get (const DiskDocKey& key,
@@ -1111,7 +1119,8 @@ void MagmaKVStore::del(TransactionContext& txnCtx, queued_item item) {
11111119 }
11121120
11131121 auto & ctx = dynamic_cast <MagmaKVStoreTransactionContext&>(txnCtx);
1114- ctx.pendingReqs .emplace_back (std::move (item));
1122+ ctx.pendingReqs .emplace_back (
1123+ std::make_unique<MagmaRequest>(std::move (item)));
11151124}
11161125
11171126void MagmaKVStore::delVBucket (Vbid vbid,
@@ -1487,6 +1496,7 @@ int MagmaKVStore::saveDocs(MagmaKVStoreTransactionContext& txnCtx,
14871496 };
14881497
14891498 auto & ctx = dynamic_cast <MagmaKVStoreTransactionContext&>(txnCtx);
1499+ ctx.preparePendingRequests ();
14901500
14911501 // Vector of updates to be written to the data store.
14921502 WriteOps writeOps;
@@ -1495,7 +1505,8 @@ int MagmaKVStore::saveDocs(MagmaKVStoreTransactionContext& txnCtx,
14951505 // TODO: Replace writeOps with Magma::WriteOperations when it
14961506 // becomes available. This will allow us to pass pendingReqs
14971507 // in and create the WriteOperation from the pendingReqs queue.
1498- for (auto & req : ctx.pendingReqs ) {
1508+ for (auto & reqPtr : ctx.pendingReqs ) {
1509+ auto & req = *reqPtr;
14991510 Slice valSlice{req.getBodyData (), req.getBodySize ()};
15001511
15011512 auto docMeta = req.getDocMeta ();
0 commit comments