Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ Changes before Tatum release are not documented in this file.
- it is supported for `PUBLISH` and `SUBSCRIBE` permissions
- new `StreamrClient#getUserId()` method
- Method `StreamrClient#getDiagnosticInfo()` provides diagnostic info about network (https://github.com/streamr-dev/network/pull/2740, https://github.com/streamr-dev/network/pull/2741)
- Add accessors for stream metadata fields: (https://github.com/streamr-dev/network/pull/2825, https://github.com/streamr-dev/network/pull/2845)
- Add accessors for stream metadata fields: (https://github.com/streamr-dev/network/pull/2825, https://github.com/streamr-dev/network/pull/2845, https://github.com/streamr-dev/network/pull/2883)
- `Stream#getPartitionCount()`
- `Stream#getDescription()` and `Stream#setDescription()`
- `Stream#getStorageDayCount()` and `Stream#setStorageDayCount()`
- Add method `StreamrClient#getStreamMetadata()` (https://github.com/streamr-dev/network/pull/2883)
- Add validation for public permissions (https://github.com/streamr-dev/network/pull/2819)
- Add `opts` parameter to `StreamrClient#addStreamToStorageNode` (https://github.com/streamr-dev/network/pull/2858)
- controls how long to wait for storage node to pick up on assignment
Expand All @@ -40,6 +41,7 @@ Changes before Tatum release are not documented in this file.
- **BREAKING CHANGE:** Replace methods `StreamrClient#updateStream()` and `Stream#update()`: (https://github.com/streamr-dev/network/pull/2826, https://github.com/streamr-dev/network/pull/2855, https://github.com/streamr-dev/network/pull/2859, https://github.com/streamr-dev/network/pull/2862)
- use `StreamrClient#setStreamMetadata()` and `Stream#setMetadata()` instead
- both methods overwrite metadata instead of merging it
- **BREAKING CHANGE:** Methods `Stream#getMetadata()` and `Stream#getStreamParts()` are async (https://github.com/streamr-dev/network/pull/2883)
- Caching changes:
- storage node addresses (https://github.com/streamr-dev/network/pull/2877, https://github.com/streamr-dev/network/pull/2878)
- stream metadata and permissions (https://github.com/streamr-dev/network/pull/2889)
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/usage/streams/partitioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const stream = await streamr.createStream({
});
console.log(
`Stream created: ${stream.id}. It has ${
stream.getPartitionCount()
await stream.getPartitionCount()
} partitions.`
);
```
Expand Down
6 changes: 3 additions & 3 deletions packages/cli-tools/bin/streamr-storage-node-list-streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import { createClientCommand } from '../src/command'
createClientCommand((async (client: StreamrClient, storageNodeAddress: string) => {
const { streams } = await client.getStoredStreams(storageNodeAddress)
if (streams.length > 0) {
console.info(EasyTable.print(streams.map((stream) => {
console.info(EasyTable.print(await Promise.all(streams.map(async (stream) => {
return {
id: stream.id,
partitions: stream.getPartitionCount()
partitions: await stream.getPartitionCount()
}
})))
}))))
}
}))
.arguments('<storageNodeAddress>')
Expand Down
2 changes: 1 addition & 1 deletion packages/cli-tools/bin/streamr-stream-create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ createClientCommand(async (client: StreamrClient, streamIdOrPath: string, option
partitions: options.partitions
}
const stream = await client.createStream(body)
console.info(JSON.stringify({ id: stream.id, ...stream.getMetadata() }, null, 2))
console.info(JSON.stringify({ id: stream.id, ...await stream.getMetadata() }, null, 2))
})
.arguments('<streamId>')
.description('create a new stream')
Expand Down
2 changes: 1 addition & 1 deletion packages/cli-tools/bin/streamr-stream-show.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const withRenamedField = (obj: any, from: string, to: string) => {

createClientCommand(async (client: StreamrClient, streamId: string, options: Options) => {
const stream = await client.getStream(streamId)
const obj: any = { id: stream.id, ...stream.getMetadata() }
const obj: any = { id: stream.id, ...await stream.getMetadata() }
if (options.includePermissions) {
const assigments = await stream.getPermissions()
obj.permissions = assigments.map((assignment) => {
Expand Down
2 changes: 1 addition & 1 deletion packages/cli-tools/test/stream-create.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ describe('create stream', () => {
})
const client = createTestClient()
const stream = await client.getStream(streamId)
expect(stream.getPartitionCount()).toBe(1)
expect(await stream.getPartitionCount()).toBe(1)
await client.destroy()
}, 20 * 1000)

Expand Down
2 changes: 1 addition & 1 deletion packages/node/src/plugins/storage/DeleteExpiredCmd.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ export class DeleteExpiredCmd {
return {
streamId: stream.streamId,
partition: stream.partition,
storageDays: streamFromChain.getStorageDayCount() ?? 365
storageDays: (await streamFromChain.getStorageDayCount()) ?? 365
}
} catch (err) { logger.error('Failed to fetch stream info', { err }) }
})
Expand Down
16 changes: 8 additions & 8 deletions packages/node/src/plugins/storage/StorageConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@ export class StorageConfig {
this.clusterSize = clusterSize
this.myIndexInCluster = myIndexInCluster
this.listener = listener
this.storagePoller = new StoragePoller(clusterId, pollInterval, streamrClient, (streams, block) => {
const streamParts = streams.flatMap((stream: Stream) => ([
...this.createMyStreamParts(stream)
]))
this.storagePoller = new StoragePoller(clusterId, pollInterval, streamrClient, async (streams, block) => {
const streamParts = (await Promise.all(streams.map(async (stream: Stream) => {
return [...await this.createMyStreamParts(stream)]
}))).flat()
this.handleDiff(this.synchronizer.ingestSnapshot(new Set<StreamPartID>(streamParts), block))
})
this.storageEventListener = new StorageEventListener(clusterId, streamrClient, (stream, type, block) => {
const streamParts = this.createMyStreamParts(stream)
this.storageEventListener = new StorageEventListener(clusterId, streamrClient, async (stream, type, block) => {
const streamParts = await this.createMyStreamParts(stream)
this.handleDiff(this.synchronizer.ingestPatch(streamParts, type, block))
})
this.abortController = new AbortController()
Expand All @@ -82,8 +82,8 @@ export class StorageConfig {
return this.synchronizer.getState()
}

private createMyStreamParts(stream: Stream): Set<StreamPartID> {
return new Set<StreamPartID>(stream.getStreamParts().filter((streamPart) => {
private async createMyStreamParts(stream: Stream): Promise<Set<StreamPartID>> {
return new Set<StreamPartID>((await stream.getStreamParts()).filter((streamPart) => {
const hashedIndex = keyToArrayIndex(this.clusterSize, streamPart)
return hashedIndex === this.myIndexInCluster
}))
Expand Down
8 changes: 4 additions & 4 deletions packages/node/src/plugins/storage/StorageEventListener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ const logger = new Logger(module)
export class StorageEventListener {
private readonly clusterId: EthereumAddress
private readonly streamrClient: StreamrClient
private readonly onEvent: (stream: Stream, type: 'added' | 'removed', block: number) => void
private readonly onEvent: (stream: Stream, type: 'added' | 'removed', block: number) => Promise<void>
private readonly onAddToStorageNode: (event: StorageNodeAssignmentEvent) => void
private readonly onRemoveFromStorageNode: (event: StorageNodeAssignmentEvent) => void

constructor(
clusterId: EthereumAddress,
streamrClient: StreamrClient,
onEvent: (stream: Stream, type: 'added' | 'removed', block: number) => void
onEvent: (stream: Stream, type: 'added' | 'removed', block: number) => Promise<void>
) {
this.clusterId = clusterId
this.streamrClient = streamrClient
Expand All @@ -26,14 +26,14 @@ export class StorageEventListener {
this.onRemoveFromStorageNode = (event: StorageNodeAssignmentEvent) => this.handleEvent(event, 'removed')
}

private async handleEvent(event: StorageNodeAssignmentEvent, type: 'added' | 'removed') {
private async handleEvent(event: StorageNodeAssignmentEvent, type: 'added' | 'removed'): Promise<void> {
if (event.nodeAddress !== this.clusterId) {
return
}
logger.info('Received StorageNodeAssignmentEvent', { type, event })
try {
const stream = await this.streamrClient.getStream(event.streamId)
this.onEvent(stream, type, event.blockNumber)
await this.onEvent(stream, type, event.blockNumber)
} catch (err) {
logger.warn('Encountered error handling StorageNodeAssignmentEvent', { err, event, type })
}
Expand Down
6 changes: 3 additions & 3 deletions packages/node/src/plugins/storage/StoragePoller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ export class StoragePoller {
private readonly clusterId: string
private readonly pollInterval: number
private readonly streamrClient: StreamrClient
private readonly onNewSnapshot: (streams: Stream[], block: number) => void
private readonly onNewSnapshot: (streams: Stream[], block: number) => Promise<void>

constructor(
clusterId: string,
pollInterval: number,
streamrClient: StreamrClient,
onNewSnapshot: (streams: Stream[], block: number) => void
onNewSnapshot: (streams: Stream[], block: number) => Promise<void>
) {
this.clusterId = clusterId
this.pollInterval = pollInterval
Expand All @@ -39,7 +39,7 @@ export class StoragePoller {
foundStreams: streams.length,
blockNumber
})
this.onNewSnapshot(streams, blockNumber)
await this.onNewSnapshot(streams, blockNumber)
}

private async tryPoll(): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,21 +101,21 @@ describe('MaintainTopologyService', () => {
await maintainTopologyHelper.start()

await until(async () => {
return containsAll(await getSubscribedStreamPartIds(client), stream1.getStreamParts())
return containsAll(await getSubscribedStreamPartIds(client), await stream1.getStreamParts())
}, 10000, 1000)

await stake(operatorContract, await sponsorship2.getAddress(), 10000)
await until(async () => {
return containsAll(await getSubscribedStreamPartIds(client), [
...stream1.getStreamParts(),
...stream2.getStreamParts()
...await stream1.getStreamParts(),
...await stream2.getStreamParts()
])
}, 10000, 1000)

await (await operatorContract.unstake(await sponsorship1.getAddress())).wait()
await until(async () => {
const state = await getSubscribedStreamPartIds(client)
return containsAll(state, stream2.getStreamParts()) && doesNotContainAny(state, stream1.getStreamParts())
return containsAll(state, await stream2.getStreamParts()) && doesNotContainAny(state, await stream1.getStreamParts())
}, 10000, 1000)
}, 120 * 1000)
})
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ function makeStubStream(streamId: string): Stream {
const partitions = PARTITION_COUNT_LOOKUP[streamId]
const stub: Partial<Stream> = {
id: toStreamID(streamId),
getStreamParts(): StreamPartID[] { // TODO: duplicated code from client
async getStreamParts(): Promise<StreamPartID[]> { // TODO: duplicated code from client
return range(0, partitions).map((p) => toStreamPartID(toStreamID(streamId), p))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ const otherClusterId = toEthereumAddress('0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb
describe(StorageEventListener, () => {
let stubClient: Pick<StreamrClient, 'getStream' | 'on' | 'off'>
const storageEventListeners: Map<keyof StreamrClientEvents, ((event: StorageNodeAssignmentEvent) => any)> = new Map()
let onEvent: jest.Mock<void, [stream: Stream, type: 'added' | 'removed', block: number]>
let onEvent: jest.Mock<Promise<void>, [stream: Stream, type: 'added' | 'removed', block: number]>
let listener: StorageEventListener

beforeEach(() => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const POLL_RESULT = Object.freeze({

describe(StoragePoller, () => {
let getStoredStreams: jest.Mock<Promise<{ streams: Stream[], blockNumber: number }>, [nodeAddress: EthereumAddress]>
let onNewSnapshot: jest.Mock<void, [streams: Stream[], block: number]>
let onNewSnapshot: jest.Mock<Promise<void>, [streams: Stream[], block: number]>
let stubClient: Pick<StreamrClient, 'getStoredStreams'>
let poller: StoragePoller
let abortController: AbortController
Expand Down
42 changes: 19 additions & 23 deletions packages/sdk/src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,14 @@ import {
*/
export class Stream {
readonly id: StreamID
private metadata: StreamMetadata
private readonly client: StreamrClient

/** @internal */
constructor(
id: StreamID,
metadata: StreamMetadata,
client: StreamrClient
) {
this.id = id
this.metadata = metadata
this.client = client
}

Expand All @@ -49,14 +46,6 @@ export class Stream {
return this.client.publish(this.id, content, metadata)
}

/**
* Updates the metadata of the stream.
*/
async setMetadata(metadata: StreamMetadata): Promise<void> {
await this.client.setStreamMetadata(this.id, metadata)
this.metadata = metadata
}

/**
* See {@link StreamrClient.hasPermission | StreamrClient.hasPermission}.
*
Expand Down Expand Up @@ -122,16 +111,16 @@ export class Stream {
/**
* Returns the partitions of the stream.
*/
getStreamParts(): StreamPartID[] {
return range(0, this.getPartitionCount()).map((p) => toStreamPartID(this.id, p))
async getStreamParts(): Promise<StreamPartID[]> {
return range(0, await this.getPartitionCount()).map((p) => toStreamPartID(this.id, p))
}

getPartitionCount(): number {
return getPartitionCount(this.getMetadata())
async getPartitionCount(): Promise<number> {
return getPartitionCount(await this.getMetadata())
}

getDescription(): string | undefined {
const value = this.getMetadata().description
async getDescription(): Promise<string | undefined> {
const value = (await this.getMetadata()).description
if (isString(value)) {
return value
} else {
Expand All @@ -141,16 +130,16 @@ export class Stream {

async setDescription(description: string): Promise<void> {
await this.setMetadata({
...this.getMetadata(),
...await this.getMetadata(),
description
})
}

/**
* Gets the value of `storageDays` field
*/
getStorageDayCount(): number | undefined {
const value = this.getMetadata().storageDays
async getStorageDayCount(): Promise<number | undefined> {
const value = (await this.getMetadata()).storageDays
if (isNumber(value)) {
return value
} else {
Expand All @@ -163,15 +152,22 @@ export class Stream {
*/
async setStorageDayCount(count: number): Promise<void> {
await this.setMetadata({
...this.getMetadata(),
...await this.getMetadata(),
storageDays: count
})
}

/**
* Returns the metadata of the stream.
*/
getMetadata(): StreamMetadata {
return this.metadata
async getMetadata(): Promise<StreamMetadata> {
return this.client.getStreamMetadata(this.id)
}

/**
* Updates the metadata of the stream.
*/
async setMetadata(metadata: StreamMetadata): Promise<void> {
await this.client.setStreamMetadata(this.id, metadata)
}
}
Loading
Loading