diff --git a/packages/synapse-sdk/src/pdp/index.ts b/packages/synapse-sdk/src/pdp/index.ts index 0dae698e..ce357273 100644 --- a/packages/synapse-sdk/src/pdp/index.ts +++ b/packages/synapse-sdk/src/pdp/index.ts @@ -13,6 +13,7 @@ export { PDPAuthHelper } from './auth.ts' export type { AddPiecesResponse, CreateDataSetResponse, + CreateDataSetWithPiecesResponse, DataSetCreationStatusResponse, FindPieceResponse, PieceAdditionStatusResponse, diff --git a/packages/synapse-sdk/src/pdp/server.ts b/packages/synapse-sdk/src/pdp/server.ts index fcae6bfb..f20ef5ac 100644 --- a/packages/synapse-sdk/src/pdp/server.ts +++ b/packages/synapse-sdk/src/pdp/server.ts @@ -49,6 +49,16 @@ export interface CreateDataSetResponse { statusUrl: string } +/** + * Response from creating a data set with pieces (combined flow) + */ +export interface CreateDataSetWithPiecesResponse extends CreateDataSetResponse { + /** Transaction hash for the piece addition (if pieces were provided) */ + piecesTxHash?: string + /** URL to check piece addition status (if pieces were provided) */ + piecesStatusUrl?: string +} + /** * Response from checking data set creation status */ @@ -217,6 +227,127 @@ export class PDPServer { } } + /** + * Create a new data set with pieces in a single operation (M3 combined flow) + * @param clientDataSetId - Unique ID for the client's dataset + * @param payee - Address that will receive payments (service provider) + * @param metadata - Metadata entries for the data set (key-value pairs) + * @param recordKeeper - Address of the Warm Storage contract + * @param pieces - Optional pieces to add to the dataset immediately after creation + * @param piecesMetadata - Optional metadata for each piece (array of arrays, one per piece) + * @returns Promise that resolves with transaction hash and status URL for both operations + */ + async createDataSetWithPieces( + clientDataSetId: number, + payee: string, + metadata: MetadataEntry[], + recordKeeper: string, + pieces?: PieceCID[] | string[], + piecesMetadata?: MetadataEntry[][] + ): Promise { + validateDataSetMetadata(metadata) + + if (pieces != null && pieces.length > 0) { + if (piecesMetadata != null) { + for (let i = 0; i < piecesMetadata.length; i++) { + if (piecesMetadata[i] != null && piecesMetadata[i].length > 0) { + try { + validatePieceMetadata(piecesMetadata[i]) + } catch (error: any) { + throw new Error(`Piece ${i} metadata validation failed: ${error.message}`) + } + } + } + } + + for (const pieceData of pieces) { + const pieceCid = asPieceCID(pieceData) + if (pieceCid == null) { + throw new Error(`Invalid PieceCID: ${String(pieceData)}`) + } + } + + const finalPiecesMetadata = piecesMetadata ?? pieces.map(() => []) + if (finalPiecesMetadata.length !== pieces.length) { + throw new Error( + `Pieces metadata length (${finalPiecesMetadata.length}) must match pieces length (${pieces.length})` + ) + } + } + + const authData = await this.getAuthHelper().signCreateDataSet(clientDataSetId, payee, metadata) + + const extraData = this._encodeDataSetCreateData({ + payer: await this.getAuthHelper().getSignerAddress(), + metadata, + signature: authData.signature, + }) + + const requestBody: any = { + recordKeeper, + extraData: `0x${extraData}`, + } + + if (pieces != null && pieces.length > 0) { + const piecesAuthData = await this.getAuthHelper().signAddPieces( + clientDataSetId, + 0, // nextPieceId will be determined by the server after dataset creation + pieces, + piecesMetadata ?? pieces.map(() => []) + ) + + const piecesExtraData = this._encodeAddPiecesExtraData({ + signature: piecesAuthData.signature, + metadata: piecesMetadata ?? pieces.map(() => []), + }) + + requestBody.pieces = { + pieces: pieces.map((pieceData) => { + const cidString = typeof pieceData === 'string' ? pieceData : pieceData.toString() + return { + pieceCid: cidString, + subPieces: [ + { + subPieceCid: cidString, + }, + ], + } + }), + extraData: `0x${piecesExtraData}`, + } + } + + const response = await fetch(`${this._serviceURL}/pdp/data-sets`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify(requestBody), + }) + + if (response.status !== 201) { + const errorText = await response.text() + throw new Error(`Failed to create data set with pieces: ${response.status} ${response.statusText} - ${errorText}`) + } + + const location = response.headers.get('Location') + if (location == null) { + throw new Error('Server did not provide Location header in response') + } + + const locationMatch = location.match(/\/pdp\/data-sets\/created\/(.+)$/) + if (locationMatch == null) { + throw new Error(`Invalid Location header format: ${location}`) + } + + const txHash = locationMatch[1] + + return { + txHash, + statusUrl: `${this._serviceURL}${location}`, + } + } + /** * Add pieces to an existing data set * @param dataSetId - The ID of the data set to add pieces to diff --git a/packages/synapse-sdk/src/storage/context.ts b/packages/synapse-sdk/src/storage/context.ts index 12fdce3b..d4c49f8a 100644 --- a/packages/synapse-sdk/src/storage/context.ts +++ b/packages/synapse-sdk/src/storage/context.ts @@ -233,6 +233,164 @@ export class StorageContext { ) } + /** + * Create a new data set with pieces in a single operation (M3 combined flow) + */ + private static async createDataSetWithPieces( + synapse: Synapse, + warmStorageService: WarmStorageService, + provider: ProviderInfo, + withCDN: boolean, + pieces: PieceCID[], + piecesMetadata: MetadataEntry[][], + callbacks?: StorageCreationCallbacks, + metadata?: Record + ): Promise { + performance.mark('synapse:createDataSetWithPieces-start') + + const signer = synapse.getSigner() + const signerAddress = await signer.getAddress() + + const nextDatasetId = await warmStorageService.getNextClientDataSetId(signerAddress) + + const warmStorageAddress = synapse.getWarmStorageAddress() + const authHelper = new PDPAuthHelper(warmStorageAddress, signer, BigInt(synapse.getChainId())) + + if (!provider.products.PDP?.data.serviceURL) { + throw new Error(`Provider ${provider.id} does not have a PDP product with serviceURL`) + } + const pdpServer = new PDPServer(authHelper, provider.products.PDP.data.serviceURL) + + const baseMetadataObj = metadata ?? {} + const metadataObj = + withCDN && !(METADATA_KEYS.WITH_CDN in baseMetadataObj) + ? { ...baseMetadataObj, [METADATA_KEYS.WITH_CDN]: '' } + : baseMetadataObj + + const finalMetadata = objectToEntries(metadataObj) + + performance.mark('synapse:pdpServer.createDataSetWithPieces-start') + const createResult = await pdpServer.createDataSetWithPieces( + nextDatasetId, + provider.payee, + finalMetadata, + warmStorageAddress, + pieces, + piecesMetadata + ) + performance.mark('synapse:pdpServer.createDataSetWithPieces-end') + performance.measure( + 'synapse:pdpServer.createDataSetWithPieces', + 'synapse:pdpServer.createDataSetWithPieces-start', + 'synapse:pdpServer.createDataSetWithPieces-end' + ) + + const { txHash, statusUrl } = createResult + + const ethersProvider = synapse.getProvider() + let transaction: ethers.TransactionResponse | null = null + + const txRetryStartTime = Date.now() + const txPropagationTimeout = TIMING_CONSTANTS.TRANSACTION_PROPAGATION_TIMEOUT_MS + const txPropagationPollInterval = TIMING_CONSTANTS.TRANSACTION_PROPAGATION_POLL_INTERVAL_MS + + performance.mark('synapse:getTransaction-start') + while (Date.now() - txRetryStartTime < txPropagationTimeout) { + try { + transaction = await ethersProvider.getTransaction(txHash) + if (transaction !== null) { + break + } + } catch (error) { + console.warn(`Failed to fetch transaction ${txHash}, retrying...`, error) + } + + await new Promise((resolve) => setTimeout(resolve, txPropagationPollInterval)) + } + performance.mark('synapse:getTransaction-end') + performance.measure('synapse:getTransaction', 'synapse:getTransaction-start', 'synapse:getTransaction-end') + + if (transaction === null) { + throw createError( + 'StorageContext', + 'createDataSetWithPieces', + `Transaction ${txHash} not found after ${ + txPropagationTimeout / 1000 + } seconds. The transaction may not have propagated to the RPC node.` + ) + } + + try { + callbacks?.onDataSetCreationStarted?.(transaction, statusUrl) + } catch (error) { + console.error('Error in onDataSetCreationStarted callback:', error) + } + + let finalStatus: Awaited> + + performance.mark('synapse:waitForDataSetCreationWithStatus-start') + try { + finalStatus = await warmStorageService.waitForDataSetCreationWithStatus( + transaction, + pdpServer, + TIMING_CONSTANTS.DATA_SET_CREATION_TIMEOUT_MS, + TIMING_CONSTANTS.DATA_SET_CREATION_POLL_INTERVAL_MS, + async (status, elapsedMs) => { + try { + callbacks?.onDataSetCreationProgress?.({ + transactionMined: status.chainStatus.transactionMined, + transactionSuccess: status.chainStatus.transactionSuccess, + dataSetLive: status.chainStatus.dataSetLive, + serverConfirmed: status.serverStatus?.dataSetCreated ?? false, + dataSetId: status.summary.dataSetId ?? undefined, + elapsedMs, + }) + } catch (error) { + console.error('Error in onDataSetCreationProgress callback:', error) + } + } + ) + } catch (error) { + performance.mark('synapse:waitForDataSetCreationWithStatus-end') + performance.measure( + 'synapse:waitForDataSetCreationWithStatus', + 'synapse:waitForDataSetCreationWithStatus-start', + 'synapse:waitForDataSetCreationWithStatus-end' + ) + throw createError('StorageContext', 'createDataSetWithPieces', 'Failed to wait for data set creation', error) + } + performance.mark('synapse:waitForDataSetCreationWithStatus-end') + performance.measure( + 'synapse:waitForDataSetCreationWithStatus', + 'synapse:waitForDataSetCreationWithStatus-start', + 'synapse:waitForDataSetCreationWithStatus-end' + ) + + const dataSetId = finalStatus.summary.dataSetId + if (dataSetId == null) { + throw createError('StorageContext', 'createDataSetWithPieces', 'Data set ID not found in creation status') + } + + try { + callbacks?.onDataSetResolved?.({ + isExisting: false, + dataSetId, + provider: provider, + }) + } catch (error) { + console.error('Error in onDataSetResolved callback:', error) + } + + performance.mark('synapse:createDataSetWithPieces-end') + performance.measure( + 'synapse:createDataSetWithPieces', + 'synapse:createDataSetWithPieces-start', + 'synapse:createDataSetWithPieces-end' + ) + + return dataSetId + } + /** * Create a new data set with the selected provider */ @@ -912,6 +1070,109 @@ export class StorageContext { } } + /** + * Upload data and create a new dataset with the piece in a single operation (M3 combined flow) + * This method combines dataset creation and piece addition for improved performance + * @param data - The data to upload + * @param options - Optional upload options including metadata and callbacks + * @returns Promise that resolves with upload result including piece ID + */ + async uploadAndCreate(data: Uint8Array | ArrayBuffer, options?: UploadOptions): Promise { + performance.mark('synapse:uploadAndCreate-start') + + // Validation Phase: Check data size + const dataBytes = data instanceof ArrayBuffer ? new Uint8Array(data) : data + const sizeBytes = dataBytes.length + + // Validate size before proceeding + StorageContext.validateRawSize(sizeBytes, 'uploadAndCreate') + + // Upload Phase: Upload data to service provider + let uploadResult: { pieceCid: PieceCID; size: number } + try { + performance.mark('synapse:pdpServer.uploadPiece-start') + uploadResult = await this._pdpServer.uploadPiece(dataBytes) + performance.mark('synapse:pdpServer.uploadPiece-end') + performance.measure( + 'synapse:pdpServer.uploadPiece', + 'synapse:pdpServer.uploadPiece-start', + 'synapse:pdpServer.uploadPiece-end' + ) + } catch (error) { + performance.mark('synapse:pdpServer.uploadPiece-end') + performance.measure( + 'synapse:pdpServer.uploadPiece', + 'synapse:pdpServer.uploadPiece-start', + 'synapse:pdpServer.uploadPiece-end' + ) + throw createError('StorageContext', 'uploadPiece', 'Failed to upload piece to service provider', error) + } + + // Poll for piece to be "parked" (ready) + const maxWaitTime = TIMING_CONSTANTS.PIECE_PARKING_TIMEOUT_MS + const pollInterval = TIMING_CONSTANTS.PIECE_PARKING_POLL_INTERVAL_MS + const startTime = Date.now() + let pieceReady = false + + performance.mark('synapse:findPiece-start') + while (Date.now() - startTime < maxWaitTime) { + try { + await this._pdpServer.findPiece(uploadResult.pieceCid) + pieceReady = true + break + } catch { + // Piece not ready yet, wait and retry if we haven't exceeded timeout + if (Date.now() - startTime + pollInterval < maxWaitTime) { + await new Promise((resolve) => setTimeout(resolve, pollInterval)) + } + } + } + performance.mark('synapse:findPiece-end') + performance.measure('synapse:findPiece', 'synapse:findPiece-start', 'synapse:findPiece-end') + + if (!pieceReady) { + throw createError('StorageContext', 'findPiece', 'Timeout waiting for piece to be parked on service provider') + } + + // Notify upload complete + if (options?.onUploadComplete != null) { + options.onUploadComplete(uploadResult.pieceCid) + } + + // Validate metadata early (before dataset creation) to fail fast + if (options?.metadata != null) { + validatePieceMetadata(options.metadata) + } + + // Create dataset with the piece using combined flow + const pieceData = uploadResult.pieceCid + const pieceMetadata = options?.metadata ? objectToEntries(options.metadata) : [] + + // Use the combined flow to create dataset with piece + const dataSetId = await StorageContext.createDataSetWithPieces( + this._synapse, + this._warmStorageService, + this._provider, + this._withCDN, + [pieceData], + [pieceMetadata], + undefined, + this._dataSetMetadata + ) + + // Update this context's dataset ID + ;(this as any)._dataSetId = dataSetId + + // Return upload result with piece ID (for combined flow, piece ID is 0 since it's the first piece) + performance.mark('synapse:uploadAndCreate-end') + performance.measure('synapse:uploadAndCreate', 'synapse:uploadAndCreate-start', 'synapse:uploadAndCreate-end') + return { + pieceCid: uploadResult.pieceCid, + size: uploadResult.size, + pieceId: 0, // First piece in new dataset + } + } + /** * Upload data to the service provider */ diff --git a/packages/synapse-sdk/src/storage/manager.ts b/packages/synapse-sdk/src/storage/manager.ts index 265e3c7d..9c7483cc 100644 --- a/packages/synapse-sdk/src/storage/manager.ts +++ b/packages/synapse-sdk/src/storage/manager.ts @@ -165,6 +165,59 @@ export class StorageManager { }) } + /** + * Upload data and create a new dataset with the piece in a single operation (M3 combined flow) + * This method combines dataset creation and piece addition for improved performance + * @param data - The data to upload + * @param options - Optional settings including provider selection, metadata, and callbacks + * @returns Promise that resolves with upload result and dataset information + */ + async uploadAndCreate( + data: Uint8Array | ArrayBuffer, + options?: StorageManagerUploadOptions + ): Promise { + + if (options?.context != null) { + const invalidOptions = [] + if (options.providerId !== undefined) invalidOptions.push('providerId') + if (options.providerAddress !== undefined) invalidOptions.push('providerAddress') + if (options.dataSetId !== undefined) invalidOptions.push('dataSetId') + if (options.withCDN !== undefined) invalidOptions.push('withCDN') + if (options.forceCreateDataSet !== undefined) invalidOptions.push('forceCreateDataSet') + if (options.uploadBatchSize !== undefined) invalidOptions.push('uploadBatchSize') + + if (invalidOptions.length > 0) { + throw createError( + 'StorageManager', + 'uploadAndCreate', + `Cannot specify both 'context' and other options: ${invalidOptions.join(', ')}` + ) + } + } + + const context = + options?.context ?? + (await this.createContext({ + providerId: options?.providerId, + providerAddress: options?.providerAddress, + dataSetId: options?.dataSetId, + withCDN: options?.withCDN, + forceCreateDataSet: true, + uploadBatchSize: options?.uploadBatchSize, + callbacks: options?.callbacks, + })) + + const uploadResult = await context.uploadAndCreate(data, { + ...options?.callbacks, + metadata: options?.metadata, + }) + + return { + ...uploadResult, + dataSetId: context.dataSetId, + } + } + /** * Download data from storage * If context is provided, routes to context.download() diff --git a/packages/synapse-sdk/src/test/pdp-server.test.ts b/packages/synapse-sdk/src/test/pdp-server.test.ts index 37571eb3..53953a5c 100644 --- a/packages/synapse-sdk/src/test/pdp-server.test.ts +++ b/packages/synapse-sdk/src/test/pdp-server.test.ts @@ -93,6 +93,95 @@ describe('PDPServer', () => { }) }) + describe('createDataSetWithPieces', () => { + it('should handle successful data set creation with pieces', async () => { + const mockTxHash = '0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef' + + server.use( + http.post('http://pdp.local/pdp/data-sets', () => { + return new HttpResponse(null, { + status: 201, + headers: { Location: `/pdp/data-sets/created/${mockTxHash}` }, + }) + }) + ) + + const testData = new Uint8Array([1, 2, 3, 4, 5]) + const pieceCid = calculatePieceCID(testData) + + const result = await pdpServer.createDataSetWithPieces( + 0, // clientDataSetId + '0x70997970C51812dc3A010C7d01b50e0d17dc79C8', // payee + [], // metadata (empty for no CDN) + TEST_CONTRACT_ADDRESS, // recordKeeper + [pieceCid], // pieces + [[{ key: 'test', value: 'metadata' }]] // piecesMetadata + ) + + assert.strictEqual(result.txHash, mockTxHash) + assert.include(result.statusUrl, mockTxHash) + }) + + it('should handle data set creation without pieces', async () => { + const mockTxHash = '0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef' + + server.use( + http.post('http://pdp.local/pdp/data-sets', () => { + return new HttpResponse(null, { + status: 201, + headers: { Location: `/pdp/data-sets/created/${mockTxHash}` }, + }) + }) + ) + + const result = await pdpServer.createDataSetWithPieces( + 0, // clientDataSetId + '0x70997970C51812dc3A010C7d01b50e0d17dc79C8', // payee + [], // metadata (empty for no CDN) + TEST_CONTRACT_ADDRESS // recordKeeper + // No pieces provided + ) + + assert.strictEqual(result.txHash, mockTxHash) + assert.include(result.statusUrl, mockTxHash) + }) + + it('should validate piece metadata length matches pieces length', async () => { + const testData = new Uint8Array([1, 2, 3, 4, 5]) + const pieceCid = calculatePieceCID(testData) + + try { + await pdpServer.createDataSetWithPieces( + 0, // clientDataSetId + '0x70997970C51812dc3A010C7d01b50e0d17dc79C8', // payee + [], // metadata (empty for no CDN) + TEST_CONTRACT_ADDRESS, // recordKeeper + [pieceCid], // pieces + [[{ key: 'test', value: 'metadata' }], [{ key: 'extra', value: 'metadata' }]] // piecesMetadata - wrong length + ) + assert.fail('Should have thrown an error for mismatched metadata length') + } catch (error: any) { + assert.include(error.message, 'Pieces metadata length') + } + }) + + it('should validate piece CIDs', async () => { + try { + await pdpServer.createDataSetWithPieces( + 0, // clientDataSetId + '0x70997970C51812dc3A010C7d01b50e0d17dc79C8', // payee + [], // metadata (empty for no CDN) + TEST_CONTRACT_ADDRESS, // recordKeeper + ['invalid-cid'], // pieces - invalid CID + [[]] // piecesMetadata + ) + assert.fail('Should have thrown an error for invalid PieceCID') + } catch (error: any) { + assert.include(error.message, 'Invalid PieceCID') + } + }) + }) + describe('getPieceAdditionStatus', () => { it('should handle successful status check', async () => { const mockTxHash = '0x7890abcdef1234567890abcdef1234567890abcdef1234567890abcdef123456'