21
21
22
22
import com .automq .stream .ByteBufSeqAlloc ;
23
23
import com .automq .stream .s3 .ByteBufAlloc ;
24
+ import com .automq .stream .s3 .metrics .stats .StorageOperationStats ;
24
25
import com .automq .stream .s3 .model .StreamRecordBatch ;
25
26
import com .automq .stream .s3 .operator .ObjectStorage ;
26
27
import com .automq .stream .s3 .wal .AppendResult ;
34
35
import com .automq .stream .s3 .wal .exception .RuntimeIOException ;
35
36
import com .automq .stream .s3 .wal .exception .WALFencedException ;
36
37
import com .automq .stream .s3 .wal .impl .DefaultRecordOffset ;
37
- import com .automq .stream .s3 .wal .metrics .ObjectWALMetricsManager ;
38
38
import com .automq .stream .s3 .wal .util .WALUtil ;
39
39
import com .automq .stream .utils .FutureUtil ;
40
40
import com .automq .stream .utils .Systems ;
@@ -123,9 +123,6 @@ public DefaultWriter(Time time, ObjectStorage objectStorage, ObjectWALConfig con
123
123
if (!(config .openMode () == OpenMode .READ_WRITE || config .openMode () == OpenMode .FAILOVER )) {
124
124
throw new IllegalArgumentException ("The open mode must be READ_WRITE or FAILOVER, but got " + config .openMode ());
125
125
}
126
- ObjectWALMetricsManager .setInflightUploadCountSupplier (() -> (long ) uploadingBulks .size ());
127
- ObjectWALMetricsManager .setBufferedDataInBytesSupplier (bufferedDataBytes ::get );
128
- ObjectWALMetricsManager .setObjectDataInBytesSupplier (objectDataBytes ::get );
129
126
}
130
127
131
128
public void start () {
@@ -239,7 +236,6 @@ public RecordOffset confirmOffset() {
239
236
240
237
public CompletableFuture <AppendResult > append0 (
241
238
StreamRecordBatch streamRecordBatch ) throws OverCapacityException , WALFencedException {
242
- long startTime = time .nanoseconds ();
243
239
checkWriteStatus ();
244
240
245
241
if (bufferedDataBytes .get () > config .maxUnflushedBytes ()) {
@@ -273,10 +269,7 @@ public CompletableFuture<AppendResult> append0(
273
269
bufferedDataBytes .addAndGet (-dataSize );
274
270
if (throwable != null ) {
275
271
LOGGER .error ("Failed to append record to S3 WAL" , throwable );
276
- } else {
277
- ObjectWALMetricsManager .recordOperationDataSize (dataSize , "append" );
278
272
}
279
- ObjectWALMetricsManager .recordOperationLatency (time .nanoseconds () - startTime , "append" , throwable == null );
280
273
});
281
274
}
282
275
@@ -386,12 +379,11 @@ private void uploadBulk0() {
386
379
FutureUtil .propagate (objectStorage .write (writeOptions , path , objectBuffer ), bulk .uploadCf );
387
380
long finalLastRecordOffset = lastRecordOffset ;
388
381
bulk .uploadCf .whenCompleteAsync ((rst , ex ) -> {
389
- ObjectWALMetricsManager .recordOperationLatency (time .nanoseconds () - startTime , "upload" , ex == null );
390
382
if (ex != null ) {
391
383
fenced = true ;
392
384
LOGGER .error ("S3WAL upload {} fail" , path , ex );
393
385
} else {
394
- ObjectWALMetricsManager . recordOperationDataSize ( objectLength , "upload" );
386
+ StorageOperationStats . getInstance (). appendWALWriteStats . record ( time . nanoseconds () - startTime );
395
387
lastRecordOffset2object .put (finalLastRecordOffset , new WALObject (rst .bucket (), path , config .epoch (), firstOffset , endOffset , objectLength ));
396
388
objectDataBytes .addAndGet (objectLength );
397
389
}
@@ -418,10 +410,8 @@ private void callback() {
418
410
}
419
411
// The inflight uploading bulks count was decreased, then trigger the upload of Bulk in waitingUploadBulks
420
412
tryUploadBulkInWaiting ();
421
- long commitStartTime = time .nanoseconds ();
422
413
return reservationService .verify (config .nodeId (), config .epoch (), config .openMode () == OpenMode .FAILOVER )
423
414
.whenComplete ((rst , ex ) -> {
424
- ObjectWALMetricsManager .recordOperationLatency (time .nanoseconds () - commitStartTime , "commit" , ex == null );
425
415
if (ex != null ) {
426
416
LOGGER .error ("Unexpected S3WAL lease check fail. Make the WAL fenced" , ex );
427
417
fenced = true ;
@@ -484,7 +474,6 @@ public CompletableFuture<Void> trim0(long newStartOffset) throws WALFencedExcept
484
474
checkStatus ();
485
475
List <ObjectStorage .ObjectPath > deleteObjectList = new ArrayList <>();
486
476
AtomicLong deletedObjectSize = new AtomicLong ();
487
- long startTime = time .nanoseconds ();
488
477
CompletableFuture <?> persistTrimOffsetCf ;
489
478
lock .writeLock ().lock ();
490
479
try {
@@ -530,7 +519,6 @@ public CompletableFuture<Void> trim0(long newStartOffset) throws WALFencedExcept
530
519
}
531
520
532
521
return objectStorage .delete (deleteObjectList ).whenComplete ((v , throwable ) -> {
533
- ObjectWALMetricsManager .recordOperationLatency (time .nanoseconds () - startTime , "trim" , throwable == null );
534
522
objectDataBytes .addAndGet (-1 * deletedObjectSize .get ());
535
523
// Never fail the delete task, the under layer storage will retry forever.
536
524
if (throwable != null ) {
0 commit comments