Skip to content

Commit a53f21a

Browse files
authored
refactor(sdk)!: Stateless Stream (#2883)
**This is a breaking changes as this changes the API** `Stream` instances no longer have metadata field. Instead the metadata is queried from `StreamRegistry`. - `StreamRegistry` already caches the metadata. Now we explicity populate the cache e.g. in `searchStreams()` (which queries `id` + `metadata`) from The Graph). ## API These `Stream` methods are now async: - `getMetadata()` - `getPartitionCount()` - `getDescription()` - `getStorageDayCount()` Also added `StreamrClient#getStreamMetadata()` method which is used by `Stream#getMetadata()`. ## Testing Added new end-to-end tests to assert cache population logic. The new `ProxyHttpServer` helper class which is needed by those tests.
1 parent 5c639da commit a53f21a

31 files changed

+285
-152
lines changed

CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,11 @@ Changes before Tatum release are not documented in this file.
1616
- it is supported for `PUBLISH` and `SUBSCRIBE` permissions
1717
- new `StreamrClient#getUserId()` method
1818
- Method `StreamrClient#getDiagnosticInfo()` provides diagnostic info about network (https://github.com/streamr-dev/network/pull/2740, https://github.com/streamr-dev/network/pull/2741)
19-
- Add accessors for stream metadata fields: (https://github.com/streamr-dev/network/pull/2825, https://github.com/streamr-dev/network/pull/2845)
19+
- Add accessors for stream metadata fields: (https://github.com/streamr-dev/network/pull/2825, https://github.com/streamr-dev/network/pull/2845, https://github.com/streamr-dev/network/pull/2883)
2020
- `Stream#getPartitionCount()`
2121
- `Stream#getDescription()` and `Stream#setDescription()`
2222
- `Stream#getStorageDayCount()` and `Stream#setStorageDayCount()`
23+
- Add method `StreamrClient#getStreamMetadata()` (https://github.com/streamr-dev/network/pull/2883)
2324
- Add validation for public permissions (https://github.com/streamr-dev/network/pull/2819)
2425
- Add `opts` parameter to `StreamrClient#addStreamToStorageNode` (https://github.com/streamr-dev/network/pull/2858)
2526
- controls how long to wait for storage node to pick up on assignment
@@ -40,6 +41,7 @@ Changes before Tatum release are not documented in this file.
4041
- **BREAKING CHANGE:** Replace methods `StreamrClient#updateStream()` and `Stream#update()`: (https://github.com/streamr-dev/network/pull/2826, https://github.com/streamr-dev/network/pull/2855, https://github.com/streamr-dev/network/pull/2859, https://github.com/streamr-dev/network/pull/2862)
4142
- use `StreamrClient#setStreamMetadata()` and `Stream#setMetadata()` instead
4243
- both methods overwrite metadata instead of merging it
44+
- **BREAKING CHANGE:** Methods `Stream#getMetadata()` and `Stream#getStreamParts()` are async (https://github.com/streamr-dev/network/pull/2883)
4345
- Caching changes:
4446
- storage node addresses (https://github.com/streamr-dev/network/pull/2877, https://github.com/streamr-dev/network/pull/2878)
4547
- stream metadata and permissions (https://github.com/streamr-dev/network/pull/2889)

docs/docs/usage/streams/partitioning.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ const stream = await streamr.createStream({
3838
});
3939
console.log(
4040
`Stream created: ${stream.id}. It has ${
41-
stream.getPartitionCount()
41+
await stream.getPartitionCount()
4242
} partitions.`
4343
);
4444
```

packages/cli-tools/bin/streamr-storage-node-list-streams.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@ import { createClientCommand } from '../src/command'
88
createClientCommand((async (client: StreamrClient, storageNodeAddress: string) => {
99
const { streams } = await client.getStoredStreams(storageNodeAddress)
1010
if (streams.length > 0) {
11-
console.info(EasyTable.print(streams.map((stream) => {
11+
console.info(EasyTable.print(await Promise.all(streams.map(async (stream) => {
1212
return {
1313
id: stream.id,
14-
partitions: stream.getPartitionCount()
14+
partitions: await stream.getPartitionCount()
1515
}
16-
})))
16+
}))))
1717
}
1818
}))
1919
.arguments('<storageNodeAddress>')

packages/cli-tools/bin/streamr-stream-create.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ createClientCommand(async (client: StreamrClient, streamIdOrPath: string, option
1919
partitions: options.partitions
2020
}
2121
const stream = await client.createStream(body)
22-
console.info(JSON.stringify({ id: stream.id, ...stream.getMetadata() }, null, 2))
22+
console.info(JSON.stringify({ id: stream.id, ...await stream.getMetadata() }, null, 2))
2323
})
2424
.arguments('<streamId>')
2525
.description('create a new stream')

packages/cli-tools/bin/streamr-stream-show.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ const withRenamedField = (obj: any, from: string, to: string) => {
1919

2020
createClientCommand(async (client: StreamrClient, streamId: string, options: Options) => {
2121
const stream = await client.getStream(streamId)
22-
const obj: any = { id: stream.id, ...stream.getMetadata() }
22+
const obj: any = { id: stream.id, ...await stream.getMetadata() }
2323
if (options.includePermissions) {
2424
const assigments = await stream.getPermissions()
2525
obj.permissions = assigments.map((assignment) => {

packages/cli-tools/test/stream-create.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ describe('create stream', () => {
1919
})
2020
const client = createTestClient()
2121
const stream = await client.getStream(streamId)
22-
expect(stream.getPartitionCount()).toBe(1)
22+
expect(await stream.getPartitionCount()).toBe(1)
2323
await client.destroy()
2424
}, 20 * 1000)
2525

packages/node/src/plugins/storage/DeleteExpiredCmd.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ export class DeleteExpiredCmd {
116116
return {
117117
streamId: stream.streamId,
118118
partition: stream.partition,
119-
storageDays: streamFromChain.getStorageDayCount() ?? 365
119+
storageDays: (await streamFromChain.getStorageDayCount()) ?? 365
120120
}
121121
} catch (err) { logger.error('Failed to fetch stream info', { err }) }
122122
})

packages/node/src/plugins/storage/StorageConfig.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,14 @@ export class StorageConfig {
5151
this.clusterSize = clusterSize
5252
this.myIndexInCluster = myIndexInCluster
5353
this.listener = listener
54-
this.storagePoller = new StoragePoller(clusterId, pollInterval, streamrClient, (streams, block) => {
55-
const streamParts = streams.flatMap((stream: Stream) => ([
56-
...this.createMyStreamParts(stream)
57-
]))
54+
this.storagePoller = new StoragePoller(clusterId, pollInterval, streamrClient, async (streams, block) => {
55+
const streamParts = (await Promise.all(streams.map(async (stream: Stream) => {
56+
return [...await this.createMyStreamParts(stream)]
57+
}))).flat()
5858
this.handleDiff(this.synchronizer.ingestSnapshot(new Set<StreamPartID>(streamParts), block))
5959
})
60-
this.storageEventListener = new StorageEventListener(clusterId, streamrClient, (stream, type, block) => {
61-
const streamParts = this.createMyStreamParts(stream)
60+
this.storageEventListener = new StorageEventListener(clusterId, streamrClient, async (stream, type, block) => {
61+
const streamParts = await this.createMyStreamParts(stream)
6262
this.handleDiff(this.synchronizer.ingestPatch(streamParts, type, block))
6363
})
6464
this.abortController = new AbortController()
@@ -82,8 +82,8 @@ export class StorageConfig {
8282
return this.synchronizer.getState()
8383
}
8484

85-
private createMyStreamParts(stream: Stream): Set<StreamPartID> {
86-
return new Set<StreamPartID>(stream.getStreamParts().filter((streamPart) => {
85+
private async createMyStreamParts(stream: Stream): Promise<Set<StreamPartID>> {
86+
return new Set<StreamPartID>((await stream.getStreamParts()).filter((streamPart) => {
8787
const hashedIndex = keyToArrayIndex(this.clusterSize, streamPart)
8888
return hashedIndex === this.myIndexInCluster
8989
}))

packages/node/src/plugins/storage/StorageEventListener.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@ const logger = new Logger(module)
1010
export class StorageEventListener {
1111
private readonly clusterId: EthereumAddress
1212
private readonly streamrClient: StreamrClient
13-
private readonly onEvent: (stream: Stream, type: 'added' | 'removed', block: number) => void
13+
private readonly onEvent: (stream: Stream, type: 'added' | 'removed', block: number) => Promise<void>
1414
private readonly onAddToStorageNode: (event: StorageNodeAssignmentEvent) => void
1515
private readonly onRemoveFromStorageNode: (event: StorageNodeAssignmentEvent) => void
1616

1717
constructor(
1818
clusterId: EthereumAddress,
1919
streamrClient: StreamrClient,
20-
onEvent: (stream: Stream, type: 'added' | 'removed', block: number) => void
20+
onEvent: (stream: Stream, type: 'added' | 'removed', block: number) => Promise<void>
2121
) {
2222
this.clusterId = clusterId
2323
this.streamrClient = streamrClient
@@ -26,14 +26,14 @@ export class StorageEventListener {
2626
this.onRemoveFromStorageNode = (event: StorageNodeAssignmentEvent) => this.handleEvent(event, 'removed')
2727
}
2828

29-
private async handleEvent(event: StorageNodeAssignmentEvent, type: 'added' | 'removed') {
29+
private async handleEvent(event: StorageNodeAssignmentEvent, type: 'added' | 'removed'): Promise<void> {
3030
if (event.nodeAddress !== this.clusterId) {
3131
return
3232
}
3333
logger.info('Received StorageNodeAssignmentEvent', { type, event })
3434
try {
3535
const stream = await this.streamrClient.getStream(event.streamId)
36-
this.onEvent(stream, type, event.blockNumber)
36+
await this.onEvent(stream, type, event.blockNumber)
3737
} catch (err) {
3838
logger.warn('Encountered error handling StorageNodeAssignmentEvent', { err, event, type })
3939
}

packages/node/src/plugins/storage/StoragePoller.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,13 @@ export class StoragePoller {
1010
private readonly clusterId: string
1111
private readonly pollInterval: number
1212
private readonly streamrClient: StreamrClient
13-
private readonly onNewSnapshot: (streams: Stream[], block: number) => void
13+
private readonly onNewSnapshot: (streams: Stream[], block: number) => Promise<void>
1414

1515
constructor(
1616
clusterId: string,
1717
pollInterval: number,
1818
streamrClient: StreamrClient,
19-
onNewSnapshot: (streams: Stream[], block: number) => void
19+
onNewSnapshot: (streams: Stream[], block: number) => Promise<void>
2020
) {
2121
this.clusterId = clusterId
2222
this.pollInterval = pollInterval
@@ -39,7 +39,7 @@ export class StoragePoller {
3939
foundStreams: streams.length,
4040
blockNumber
4141
})
42-
this.onNewSnapshot(streams, blockNumber)
42+
await this.onNewSnapshot(streams, blockNumber)
4343
}
4444

4545
private async tryPoll(): Promise<void> {

0 commit comments

Comments
 (0)