@@ -2697,133 +2697,126 @@ couchstore_error_t CouchKVStore::saveDocs(Vbid vbid,
26972697 vbid,
26982698 uint64_t (docs.size ()));
26992699 return errCode;
2700- } else {
2701- auto & db = *openResult.db ;
2702- uint64_t maxDBSeqno = 0 ;
2703-
2704- // Count of logical bytes written (key + ep-engine meta + value),
2705- // used to calculated Write Amplification.
2706- size_t docsLogicalBytes = 0 ;
2707-
2708- // Only do a couchstore_save_documents if there are docs
2709- if (!docs.empty ()) {
2710- for (size_t idx = 0 ; idx < docs.size (); idx++) {
2711- maxDBSeqno = std::max (maxDBSeqno, docinfos[idx]->db_seq );
2712-
2713- // Accumulate the size of the useful data in this docinfo.
2714- docsLogicalBytes +=
2715- calcLogicalDataSize (docs[idx], *docinfos[idx]);
2716- }
2700+ }
27172701
2718- kvctx.commitData .collections .setDroppedCollectionsForStore (
2719- getDroppedCollections (*db));
2720-
2721- auto cs_begin = std::chrono::steady_clock::now ();
2722-
2723- uint64_t flags = COMPRESS_DOC_BODIES | COUCHSTORE_SEQUENCE_AS_IS;
2724- errCode = couchstore_save_documents_and_callback (
2725- db,
2726- docs.data (),
2727- docinfos.data (),
2728- (unsigned )docs.size (),
2729- flags,
2730- &saveDocsCallback,
2731- &kvctx);
2732-
2733- st.saveDocsHisto .add (
2734- std::chrono::duration_cast<std::chrono::microseconds>(
2735- std::chrono::steady_clock::now () - cs_begin));
2736- if (errCode != COUCHSTORE_SUCCESS) {
2737- logger.warn (
2738- " CouchKVStore::saveDocs: couchstore_save_documents "
2739- " error:{} [{}], {}, numdocs:{}" ,
2740- couchstore_strerror (errCode),
2741- couchkvstore_strerrno (db, errCode),
2742- vbid,
2743- uint64_t (docs.size ()));
2744- return errCode;
2745- }
2746- }
2702+ auto & db = *openResult.db ;
2703+ uint64_t maxDBSeqno = 0 ;
27472704
2748- kvctx.commitData .collections .saveCollectionStats (
2749- std::bind (&CouchKVStore::saveCollectionStats,
2750- this ,
2751- std::ref (*db),
2752- std::placeholders::_1,
2753- std::placeholders::_2));
2705+ // Count of logical bytes written (key + ep-engine meta + value),
2706+ // used to calculated Write Amplification.
2707+ size_t docsLogicalBytes = 0 ;
27542708
2755- vbucket_state& state = kvctx. commitData . proposedVBState ;
2756- state. onDiskPrepares += kvctx. onDiskPrepareDelta ;
2757- pendingLocalReqsQ. emplace_back ( " _local/vbstate " ,
2758- makeJsonVBState (state) );
2709+ // Only do a couchstore_save_documents if there are docs
2710+ if (!docs. empty ()) {
2711+ for ( size_t idx = 0 ; idx < docs. size (); idx++) {
2712+ maxDBSeqno = std::max (maxDBSeqno, docinfos[idx]-> db_seq );
27592713
2760- if (kvctx. commitData . collections . isReadyForCommit ()) {
2761- updateCollectionsMeta (*db, kvctx. commitData . collections );
2714+ // Accumulate the size of the useful data in this docinfo.
2715+ docsLogicalBytes += calcLogicalDataSize (docs[idx], *docinfos[idx] );
27622716 }
27632717
2764- // / Update the local documents before we commit
2765- errCode = updateLocalDocuments (*db, pendingLocalReqsQ);
2766- if (errCode) {
2767- logger.warn (
2768- " CouchKVStore::saveDocs: updateLocalDocuments size:{} "
2769- " error:{} [{}]" ,
2770- pendingLocalReqsQ.size (),
2771- couchstore_strerror (errCode),
2772- couchkvstore_strerrno (db, errCode));
2773- }
2774- pendingLocalReqsQ.clear ();
2718+ kvctx.commitData .collections .setDroppedCollectionsForStore (
2719+ getDroppedCollections (*db));
27752720
27762721 auto cs_begin = std::chrono::steady_clock::now ();
27772722
2778- errCode = couchstore_commit (db);
2779- st.commitHisto .add (
2723+ uint64_t flags = COMPRESS_DOC_BODIES | COUCHSTORE_SEQUENCE_AS_IS;
2724+ errCode = couchstore_save_documents_and_callback (db,
2725+ docs.data (),
2726+ docinfos.data (),
2727+ (unsigned )docs.size (),
2728+ flags,
2729+ &saveDocsCallback,
2730+ &kvctx);
2731+
2732+ st.saveDocsHisto .add (
27802733 std::chrono::duration_cast<std::chrono::microseconds>(
27812734 std::chrono::steady_clock::now () - cs_begin));
2782- if (errCode) {
2735+ if (errCode != COUCHSTORE_SUCCESS ) {
27832736 logger.warn (
2784- " CouchKVStore::saveDocs: couchstore_commit error:{} [{}]" ,
2737+ " CouchKVStore::saveDocs: couchstore_save_documents "
2738+ " error:{} [{}], {}, numdocs:{}" ,
27852739 couchstore_strerror (errCode),
2786- couchkvstore_strerrno (db, errCode));
2740+ couchkvstore_strerrno (db, errCode),
2741+ vbid,
2742+ uint64_t (docs.size ()));
27872743 return errCode;
27882744 }
2745+ }
27892746
2790- st.batchSize .add (docs.size ());
2747+ kvctx.commitData .collections .saveCollectionStats (
2748+ std::bind (&CouchKVStore::saveCollectionStats,
2749+ this ,
2750+ std::ref (*db),
2751+ std::placeholders::_1,
2752+ std::placeholders::_2));
27912753
2792- // If available, record the write amplification we did for this commit -
2793- // i.e. for each byte of user data (key+value+meta) how many overhead
2794- // bytes were written.
2795- auto * stats = couchstore_get_db_filestats (db);
2796- if (stats != nullptr && docsLogicalBytes) {
2797- const auto writeBytes = stats->getWriteBytes ();
2798- uint64_t writeAmp = (writeBytes * 10 ) / docsLogicalBytes;
2799- st.flusherWriteAmplificationHisto .addValue (writeAmp);
2800- }
2754+ vbucket_state& state = kvctx.commitData .proposedVBState ;
2755+ state.onDiskPrepares += kvctx.onDiskPrepareDelta ;
2756+ pendingLocalReqsQ.emplace_back (" _local/vbstate" , makeJsonVBState (state));
28012757
2802- // retrieve storage system stats for file fragmentation computation
2803- const auto info = cb::couchstore::getHeader (*db.getDb ());
2804- cachedSpaceUsed[vbid.get ()] = info.spaceUsed ;
2805- cachedFileSize[vbid.get ()] = info.fileSize ;
2806- cachedDeleteCount[vbid.get ()] = info.deletedCount ;
2807- cachedDocCount[vbid.get ()] = info.docCount ;
2758+ if (kvctx.commitData .collections .isReadyForCommit ()) {
2759+ updateCollectionsMeta (*db, kvctx.commitData .collections );
2760+ }
28082761
2809- // Check seqno if we wrote documents
2810- if (!docs.empty () && maxDBSeqno != info.updateSeqNum ) {
2811- logger.warn (
2812- " CouchKVStore::saveDocs: Seqno in db header ({})"
2813- " is not matched with what was persisted ({})"
2814- " for {}" ,
2815- info.updateSeqNum ,
2816- maxDBSeqno,
2817- vbid);
2818- }
2819- state.highSeqno = info.updateSeqNum ;
2762+ // / Update the local documents before we commit
2763+ errCode = updateLocalDocuments (*db, pendingLocalReqsQ);
2764+ if (errCode) {
2765+ logger.warn (
2766+ " CouchKVStore::saveDocs: updateLocalDocuments size:{} "
2767+ " error:{} [{}]" ,
2768+ pendingLocalReqsQ.size (),
2769+ couchstore_strerror (errCode),
2770+ couchkvstore_strerrno (db, errCode));
2771+ }
2772+ pendingLocalReqsQ.clear ();
2773+
2774+ auto cs_begin = std::chrono::steady_clock::now ();
2775+
2776+ errCode = couchstore_commit (db);
2777+ st.commitHisto .add (std::chrono::duration_cast<std::chrono::microseconds>(
2778+ std::chrono::steady_clock::now () - cs_begin));
2779+ if (errCode) {
2780+ logger.warn (" CouchKVStore::saveDocs: couchstore_commit error:{} [{}]" ,
2781+ couchstore_strerror (errCode),
2782+ couchkvstore_strerrno (db, errCode));
2783+ return errCode;
28202784 }
28212785
2822- /* update stat */
2823- if (errCode == COUCHSTORE_SUCCESS) {
2824- st.docsCommitted = docs.size ();
2786+ st.batchSize .add (docs.size ());
2787+
2788+ // If available, record the write amplification we did for this commit -
2789+ // i.e. for each byte of user data (key+value+meta) how many overhead
2790+ // bytes were written.
2791+ auto * stats = couchstore_get_db_filestats (db);
2792+ if (stats != nullptr && docsLogicalBytes) {
2793+ const auto writeBytes = stats->getWriteBytes ();
2794+ uint64_t writeAmp = (writeBytes * 10 ) / docsLogicalBytes;
2795+ st.flusherWriteAmplificationHisto .addValue (writeAmp);
28252796 }
28262797
2798+ // retrieve storage system stats for file fragmentation computation
2799+ const auto info = cb::couchstore::getHeader (*db.getDb ());
2800+ cachedSpaceUsed[vbid.get ()] = info.spaceUsed ;
2801+ cachedFileSize[vbid.get ()] = info.fileSize ;
2802+ cachedDeleteCount[vbid.get ()] = info.deletedCount ;
2803+ cachedDocCount[vbid.get ()] = info.docCount ;
2804+
2805+ // Check seqno if we wrote documents
2806+ if (!docs.empty () && maxDBSeqno != info.updateSeqNum ) {
2807+ logger.warn (
2808+ " CouchKVStore::saveDocs: Seqno in db header ({})"
2809+ " is not matched with what was persisted ({})"
2810+ " for {}" ,
2811+ info.updateSeqNum ,
2812+ maxDBSeqno,
2813+ vbid);
2814+ }
2815+ state.highSeqno = info.updateSeqNum ;
2816+
2817+ // update stat
2818+ st.docsCommitted = docs.size ();
2819+
28272820 return errCode;
28282821}
28292822
0 commit comments