Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 18 additions & 31 deletions src/core/data-set/calculate-actual-storage.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import type { Synapse } from '@filoz/synapse-sdk'
import { PDPVerifier, type Synapse, WarmStorageService } from '@filoz/synapse-sdk'
import PQueue from 'p-queue'
import type { Logger } from 'pino'
import { PDP_LEAF_SIZE } from '../payments/constants.js'
import { getClientAddress } from '../synapse/index.js'
import type { ProgressEvent, ProgressEventHandler, Warning } from '../utils/types.js'
import { getDataSetPieces } from './get-data-set-pieces.js'
import type { DataSetSummary } from './types.js'

export interface ActualStorageResult {
Expand Down Expand Up @@ -44,16 +44,16 @@ const getProviderKey = ({ providerId, serviceProvider, dataSetId }: DataSetSumma
/**
* Calculate actual storage from all active data sets for an address
*
* This function queries all active/live data sets and sums up the actual piece sizes.
* It's more accurate than deriving storage from billing rates, but can be slow for
* users with many pieces.
* This function queries all active/live data sets and sums up their PDP leaf counts.
* It avoids fetching per-piece details, which makes it much faster than walking every
* piece in every data set.
*
* The calculation respects abort signals - if aborted, it will return partial results
* with a timedOut flag set to true.
*
* Example usage:
* ```typescript
* const result = await calculateActualStorage(synapse, {
* const result = await calculateActualStorage(synapse, dataSets, {
* address: '0x1234...',
* signal: AbortSignal.timeout(30000), // 30 second timeout
* logger: myLogger
Expand Down Expand Up @@ -138,35 +138,22 @@ export async function calculateActualStorage(
}

logger?.info({ dataSetCount: dataSets.length, address }, 'Calculating actual storage across data sets')
signal?.throwIfAborted()

const warmStorage = await WarmStorageService.create(synapse.getProvider(), synapse.getWarmStorageAddress())
const pdpVerifier = new PDPVerifier(synapse.getProvider(), warmStorage.getPDPVerifierAddress())

const processDataSet = async (dataSet: (typeof dataSets)[number]): Promise<void> => {
signal?.throwIfAborted()

try {
const storageContext = await synapse.storage.createContext({ dataSetId: dataSet.dataSetId })

signal?.throwIfAborted()

const getPiecesOptions: { logger?: Logger; signal?: AbortSignal } = {}
if (logger) {
getPiecesOptions.logger = logger
}
if (signal) {
getPiecesOptions.signal = signal
}
const result = await getDataSetPieces(synapse, storageContext, getPiecesOptions)

if (result.totalSizeBytes) {
totalBytes += result.totalSizeBytes
}

pieceCount += result.pieces.length
const dataSetId = Number(dataSet.dataSetId)
const leafCount = await pdpVerifier.getDataSetLeafCount(dataSetId)
const dataSetBytes = BigInt(leafCount) * BigInt(PDP_LEAF_SIZE)
totalBytes += dataSetBytes
pieceCount += dataSet.currentPieceCount ?? 0
dataSetsProcessed++

if (result.warnings && result.warnings.length > 0) {
warnings.push(...result.warnings)
}

onProgress?.({
type: 'actual-storage:progress',
data: {
Expand All @@ -178,16 +165,16 @@ export async function calculateActualStorage(
})
} catch (error) {
if (error instanceof Error && error.name === 'AbortError') {
logger?.warn('Piece retrieval aborted')
logger?.warn('Leaf count retrieval aborted')
throw error // Re-throw AbortError to propagate cancellation
}

const errorMessage = error instanceof Error ? error.message : String(error)
logger?.warn({ dataSetId: dataSet.dataSetId, error: errorMessage }, 'Failed to get pieces for data set')
logger?.warn({ dataSetId: dataSet.dataSetId, error: errorMessage }, 'Failed to get leaf count for data set')

warnings.push({
code: 'DATA_SET_QUERY_FAILED',
message: `Failed to query pieces for data set ${dataSet.dataSetId}`,
message: `Failed to query leaf count for data set ${dataSet.dataSetId}`,
context: {
dataSetId: dataSet.dataSetId,
error: errorMessage,
Expand Down
147 changes: 61 additions & 86 deletions src/test/unit/calculate-actual-storage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,116 +8,98 @@ import { beforeEach, describe, expect, it, vi } from 'vitest'
import { calculateActualStorage } from '../../core/data-set/calculate-actual-storage.js'
import type { DataSetSummary } from '../../core/data-set/types.js'

// Mock the dependencies
const {
mockSynapse,
mockCreateStorageContext,
mockGetDataSetPieces,
defaultCreateStorageContext,
defaultGetDataSetPieces,
state,
} = vi.hoisted(() => {
const state = {
pieces: [] as Array<{ pieceId: number; pieceCid: string; size?: number }>,
}

const defaultGetDataSetPieces = async (_synapse: any, _context: any, _options?: any) => {
if (_options?.signal?.aborted) {
const error = new Error('This operation was aborted')
error.name = 'AbortError'
throw error
}

const pieces = state.pieces.map((p) => ({
pieceId: p.pieceId,
pieceCid: p.pieceCid,
size: p.size ?? undefined,
}))
vi.mock('../../core/payments/constants.js', () => ({
PDP_LEAF_SIZE: 32,
}))

const totalSizeBytes = pieces.reduce((sum, p) => sum + BigInt(p.size ?? 0), 0n)
vi.mock('../../core/synapse/index.js', () => ({
getClientAddress: (synapse: { client: { account: string | { address: string } } }) =>
typeof synapse.client.account === 'string' ? synapse.client.account : synapse.client.account.address,
}))

return {
pieces,
dataSetId: _context?.dataSetId ?? 1,
totalSizeBytes,
warnings: [],
// Mock the dependencies
const { mockSynapse, mockWarmStorageInstance, mockWarmStorageCreate, mockGetDataSetLeafCount, state } = vi.hoisted(
() => {
const state = {
leafCount: 0,
}
}

const mockGetDataSetPieces = vi.fn(defaultGetDataSetPieces)
const mockGetDataSetLeafCount = vi.fn(async (_dataSetId: number) => state.leafCount)

const defaultCreateStorageContext = async ({ dataSetId }: any) => ({
storage: { dataSetId },
providerInfo: { id: 1 },
})
const mockWarmStorageInstance = {
getPDPVerifierAddress: vi.fn(() => '0xpdp-verifier'),
}

const mockCreateStorageContext = vi.fn(defaultCreateStorageContext)
const mockWarmStorageCreate = vi.fn(async () => mockWarmStorageInstance)

const mockSynapse = {
client: {
account: {
address: '0xtest-address' as const,
const mockSynapse = {
client: {
account: {
address: '0xtest-address' as const,
},
},
},
storage: {
createContext: mockCreateStorageContext,
},
getProvider: () => '0xprovider',
getWarmStorageAddress: () => '0xwarm-storage',
}

return {
mockSynapse,
mockWarmStorageInstance,
mockWarmStorageCreate,
mockGetDataSetLeafCount,
state,
}
}
)

vi.mock('@filoz/synapse-sdk', async () => {
const sharedMock = await import('../mocks/synapse-sdk.js')
return {
mockSynapse,
mockCreateStorageContext,
mockGetDataSetPieces,
defaultCreateStorageContext,
defaultGetDataSetPieces,
state,
...sharedMock,
WarmStorageService: { create: mockWarmStorageCreate },
PDPVerifier: class MockPDPVerifier {
getDataSetLeafCount = mockGetDataSetLeafCount
},
}
})

vi.mock('../../core/data-set/get-data-set-pieces.js', () => ({
getDataSetPieces: mockGetDataSetPieces,
}))

describe('calculateActualStorage', () => {
beforeEach(() => {
vi.resetAllMocks()
state.pieces = []
state.leafCount = 0

mockCreateStorageContext.mockImplementation(defaultCreateStorageContext)
mockGetDataSetPieces.mockImplementation(defaultGetDataSetPieces)
mockWarmStorageCreate.mockImplementation(async () => mockWarmStorageInstance)
mockGetDataSetLeafCount.mockImplementation(async (_dataSetId: number) => state.leafCount)
})

describe('basic calculation', () => {
it('should calculate total storage from multiple data sets', async () => {
// Setup: 2 data sets with different piece sizes
const dataSets: DataSetSummary[] = [
{
dataSetId: 1,
providerId: 1,
serviceProvider: '0xprovider1',
isLive: true,
currentPieceCount: 2,
} as unknown as DataSetSummary,
{
dataSetId: 2,
providerId: 1,
serviceProvider: '0xprovider1',
isLive: true,
currentPieceCount: 3,
} as unknown as DataSetSummary,
]

const oneGiB = 1024n * 1024n * 1024n
// pieces apply to both data sets
state.pieces = [
{ pieceId: 1, pieceCid: 'bafy1', size: Number(oneGiB) },
{ pieceId: 2, pieceCid: 'bafy2', size: Number(oneGiB) },
]
const leavesPerGiB = (1024 * 1024 * 1024) / 32
state.leafCount = leavesPerGiB * 2

const result = await calculateActualStorage(mockSynapse as any, dataSets)

expect(result.dataSetCount).toBe(2)
expect(result.dataSetsProcessed).toBe(2)
expect(result.totalBytes).toBe(oneGiB * 2n * 2n) // 2 pieces × 2 datasets
expect(result.pieceCount).toBe(4)
expect(result.totalBytes).toBe(BigInt(leavesPerGiB) * 2n * 32n * 2n)
expect(result.pieceCount).toBe(5)
expect(result.timedOut).toBeFalsy()
expect(result.warnings).toHaveLength(0)
})
Expand All @@ -139,11 +121,10 @@ describe('calculateActualStorage', () => {
providerId: 1,
serviceProvider: '0xprovider1',
isLive: true,
currentPieceCount: 0,
} as unknown as DataSetSummary,
]

state.pieces = []

const result = await calculateActualStorage(mockSynapse as any, dataSets)

expect(result.dataSetCount).toBe(1)
Expand All @@ -161,6 +142,7 @@ describe('calculateActualStorage', () => {
providerId: 1,
serviceProvider: '0xprovider1',
isLive: true,
currentPieceCount: 1,
} as unknown as DataSetSummary,
]

Expand All @@ -184,28 +166,24 @@ describe('calculateActualStorage', () => {
providerId: 1,
serviceProvider: '0xprovider1',
isLive: true,
currentPieceCount: 1,
} as unknown as DataSetSummary,
{
dataSetId: 2,
providerId: 2,
serviceProvider: '0xprovider2',
isLive: true,
currentPieceCount: 2,
} as unknown as DataSetSummary,
]

const controller = new AbortController()

// Allow first dataset to complete
let callCount = 0
mockGetDataSetPieces.mockImplementation(async (_synapse: any, _context: any, _options?: any) => {
mockGetDataSetLeafCount.mockImplementation(async () => {
callCount++
if (callCount === 1) {
return {
pieces: [{ pieceId: 1, pieceCid: 'bafy1', size: 1024 }],
dataSetId: 1,
totalSizeBytes: 1024n,
warnings: [],
}
return 32
}

controller.abort()
Expand All @@ -219,7 +197,7 @@ describe('calculateActualStorage', () => {
})

expect(result.timedOut).toBe(true)
expect(result.totalBytes).toBe(1024n) // Partial result from first dataset
expect(result.totalBytes).toBe(1024n)
expect(result.dataSetsProcessed).toBe(1)
expect(result.pieceCount).toBe(1)
expect(result.warnings.some((w) => w.code === 'CALCULATION_ABORTED')).toBe(true)
Expand All @@ -234,28 +212,25 @@ describe('calculateActualStorage', () => {
providerId: 1,
serviceProvider: '0xprovider1',
isLive: true,
currentPieceCount: 1,
} as unknown as DataSetSummary,
{
dataSetId: 2,
providerId: 2,
serviceProvider: '0xprovider2',
isLive: true,
currentPieceCount: 2,
} as unknown as DataSetSummary,
]

let callCount = 0
mockGetDataSetPieces.mockImplementation(async (_synapse: any, _context: any, _options?: any) => {
mockGetDataSetLeafCount.mockImplementation(async () => {
callCount++
if (callCount === 1) {
throw new Error('Dataset query failed')
}

return {
pieces: [{ pieceId: 1, pieceCid: 'bafy1', size: 1024 }],
dataSetId: 2,
totalSizeBytes: 1024n,
warnings: [],
}
return 32
})

const result = await calculateActualStorage(mockSynapse as any, dataSets)
Expand Down