@@ -11,6 +11,7 @@ import (
11
11
"go.mongodb.org/mongo-driver/bson"
12
12
"go.mongodb.org/mongo-driver/bson/primitive"
13
13
"go.mongodb.org/mongo-driver/mongo"
14
+ "go.mongodb.org/mongo-driver/mongo/options"
14
15
"golang.org/x/sync/errgroup"
15
16
16
17
"github.com/percona/percona-backup-mongodb/pbm/archive"
@@ -90,7 +91,23 @@ func (b *Backup) doLogical(
90
91
rsMeta .FirstWriteTS ,
91
92
func (ctx context.Context , w io.WriterTo , from , till primitive.Timestamp ) (int64 , error ) {
92
93
filename := rsMeta .OplogName + "/" + FormatChunkName (from , till , bcp .Compression )
93
- return storage .Upload (ctx , w , stg , bcp .Compression , bcp .CompressionLevel , filename , - 1 )
94
+
95
+ bytesPerSecond , err := getOplogBytesPerSecond (ctx , b .nodeConn )
96
+ if err != nil {
97
+ return 0 , errors .Wrap (err , "oplog stats unavailable" )
98
+ }
99
+
100
+ // Estimate size with headroom and enforce minimum part size for cloud uploads.
101
+ estimatedSize := bytesPerSecond * float64 (till .T - from .T )
102
+ if bcp .Compression == compress .CompressionTypeNone {
103
+ estimatedSize *= 2
104
+ }
105
+ estimatedSize *= 2
106
+ if estimatedSize < 1 << 30 { // 1 GiB
107
+ estimatedSize = 1 << 30
108
+ }
109
+
110
+ return storage .Upload (ctx , w , stg , bcp .Compression , bcp .CompressionLevel , filename , int64 (estimatedSize ))
94
111
})
95
112
// ensure slicer is stopped in any case (done, error or canceled)
96
113
defer stopOplogSlicer () //nolint:errcheck
@@ -458,6 +475,58 @@ func getNamespacesSize(ctx context.Context, m *mongo.Client, nss []string) (map[
458
475
return rv , err
459
476
}
460
477
478
+ // getOplogBytesPerSecond returns the average number of bytes written to the
479
+ // replica‑set oplog each second. It fetches the total size of the
480
+ // `local.oplog.rs` collection, reads the timestamps of the oldest and newest
481
+ // entries, and divides the size by the seconds between those timestamps.
482
+ func getOplogBytesPerSecond (ctx context.Context , m * mongo.Client ) (float64 , error ) {
483
+ coll := m .Database ("local" ).Collection ("oplog.rs" )
484
+
485
+ var stat struct {
486
+ Size int64 `bson:"size"`
487
+ }
488
+ cmd := bson.D {{Key : "collStats" , Value : "oplog.rs" }}
489
+ res := m .Database ("local" ).RunCommand (ctx , cmd )
490
+ if res .Err () != nil {
491
+ return 0 , errors .Wrap (res .Err (), "collStats local.oplog.rs" )
492
+ }
493
+ if err := res .Decode (& stat ); err != nil {
494
+ return 0 , errors .Wrap (err , "decode collStats" )
495
+ }
496
+
497
+ oldestTS , err := oplogTimestamp (ctx , coll , 1 )
498
+ if err != nil {
499
+ return 0 , err
500
+ }
501
+ newestTS , err := oplogTimestamp (ctx , coll , - 1 )
502
+ if err != nil {
503
+ return 0 , err
504
+ }
505
+
506
+ duration := int64 (newestTS .T - oldestTS .T )
507
+ if duration <= 0 {
508
+ return 0 , errors .New ("invalid oplog window duration" )
509
+ }
510
+
511
+ return float64 (stat .Size ) / float64 (duration ), nil
512
+ }
513
+
514
+ // oplogTimestamp returns the timestamp of the first (sort=1) or last (sort=-1) document in the oplog.
515
+ func oplogTimestamp (ctx context.Context , coll * mongo.Collection , sort int ) (primitive.Timestamp , error ) {
516
+ var doc bson.M
517
+ err := coll .FindOne (ctx , bson.D {}, options .FindOne ().SetSort (bson.D {{Key : "$natural" , Value : sort }})).Decode (& doc )
518
+ if err != nil {
519
+ return primitive.Timestamp {}, errors .Wrap (err , "query oplog timestamp" )
520
+ }
521
+
522
+ ts , ok := doc ["ts" ].(primitive.Timestamp )
523
+ if ! ok {
524
+ return primitive.Timestamp {}, errors .New ("missing or invalid 'ts' field" )
525
+ }
526
+
527
+ return ts , nil
528
+ }
529
+
461
530
func (b * Backup ) checkForTimeseries (ctx context.Context , nss []string ) error {
462
531
if ! b .brief .Version .IsShardedTimeseriesSupported () || ! b .brief .Sharded {
463
532
return nil
0 commit comments