@@ -10,7 +10,7 @@ import { createReadStream, createWriteStream } from 'node:fs'
1010import fs from 'node:fs/promises'
1111import { Agent } from 'node:https'
1212import path from 'node:path'
13- import { PassThrough , Readable } from 'node:stream'
13+ import { PassThrough , Readable , Transform } from 'node:stream'
1414import { pipeline } from 'node:stream/promises'
1515import { createSingletonPromise } from '@antfu/utils'
1616import {
@@ -29,7 +29,43 @@ import { match } from 'ts-pattern'
2929import { getDatabase } from './db'
3030import { env } from './env'
3131import { generateNumberId } from './helpers'
32- import { getMetrics } from './metrics'
32+ import { getMetrics , type Metrics } from './metrics'
33+
34+ function createByteCountingTransform (
35+ metrics : Metrics ,
36+ operation : 'upload' | 'download' ,
37+ adapter : string ,
38+ ) : Transform {
39+ let bytesTransferred = 0
40+
41+ return new Transform ( {
42+ transform ( chunk : any , _encoding , callback ) {
43+ bytesTransferred += chunk . length
44+ callback ( null , chunk )
45+ } ,
46+
47+ flush ( callback ) {
48+ try {
49+ if ( operation === 'upload' ) {
50+ metrics . cacheBytesUploadedTotal . add ( bytesTransferred , {
51+ operation,
52+ adapter,
53+ route : '/upload/:uploadId' ,
54+ } )
55+ } else {
56+ metrics . cacheBytesDownloadedTotal . add ( bytesTransferred , {
57+ operation,
58+ adapter,
59+ route : '/download/:cacheEntryId' ,
60+ } )
61+ }
62+ } catch ( err ) {
63+ console . error ( 'Failed to record byte transfer metrics:' , err )
64+ }
65+ callback ( )
66+ } ,
67+ } )
68+ }
3369
3470class Storage {
3571 adapter
@@ -62,10 +98,24 @@ class Storage {
6298 const metrics = await getMetrics ( )
6399 const startTime = performance . now ( )
64100
65- await this . adapter . uploadStream (
66- `${ upload . folderName } /parts/${ partIndex } ` ,
67- Readable . fromWeb ( stream ) ,
68- )
101+ const nodeStream = Readable . fromWeb ( stream )
102+
103+ if ( metrics ) {
104+ const countingTransform = createByteCountingTransform (
105+ metrics ,
106+ 'upload' ,
107+ env . STORAGE_DRIVER ,
108+ )
109+ await this . adapter . uploadStream (
110+ `${ upload . folderName } /parts/${ partIndex } ` ,
111+ nodeStream . pipe ( countingTransform ) ,
112+ )
113+ } else {
114+ await this . adapter . uploadStream (
115+ `${ upload . folderName } /parts/${ partIndex } ` ,
116+ nodeStream ,
117+ )
118+ }
69119
70120 if ( metrics ) {
71121 const duration = ( performance . now ( ) - startTime ) / 1000
@@ -245,7 +295,21 @@ class Storage {
245295 }
246296
247297 private async downloadFromCacheEntryLocation ( location : StorageLocation ) {
248- if ( location . mergedAt ) return this . adapter . createDownloadStream ( `${ location . folderName } /merged` )
298+ if ( location . mergedAt ) {
299+ const stream = await this . adapter . createDownloadStream ( `${ location . folderName } /merged` )
300+ const metrics = await getMetrics ( )
301+
302+ if ( metrics ) {
303+ const countingTransform = createByteCountingTransform (
304+ metrics ,
305+ 'download' ,
306+ env . STORAGE_DRIVER ,
307+ )
308+ return stream . pipe ( countingTransform )
309+ }
310+
311+ return stream
312+ }
249313
250314 return Readable . from ( this . streamParts ( location ) )
251315 }
@@ -274,12 +338,31 @@ class Storage {
274338 private async * streamParts ( location : StorageLocation ) {
275339 if ( location . partsDeletedAt ) throw new Error ( 'No parts to feed for location with deleted parts' )
276340
341+ const metrics = await getMetrics ( )
342+
277343 for ( let i = 0 ; i < location . partCount ; i ++ ) {
278344 const partStream = await this . adapter . createDownloadStream (
279345 `${ location . folderName } /parts/${ i } ` ,
280346 )
281347
282- for await ( const chunk of partStream ) yield chunk
348+ let bytesInPart = 0
349+ for await ( const chunk of partStream ) {
350+ bytesInPart += chunk . length
351+ yield chunk
352+ }
353+
354+ // Record bytes for this part
355+ if ( metrics ) {
356+ try {
357+ metrics . cacheBytesDownloadedTotal . add ( bytesInPart , {
358+ operation : 'download' ,
359+ adapter : env . STORAGE_DRIVER ,
360+ route : '/download/:cacheEntryId' ,
361+ } )
362+ } catch ( err ) {
363+ console . error ( 'Failed to record download bytes:' , err )
364+ }
365+ }
283366
284367 await globalThis . gc ?.( )
285368 }
0 commit comments