Skip to content

Commit 8c5f0cd

Browse files
authored
feat(sdk): Raw publish (#3280)
Added the `StreamrClient#publishRaw()` method. It is the counterpart to the existing `StreamrClient#subscribe({ raw: true }`) method. In raw publish/subscribe mode, the SDK’s message pipeline is bypassed: e.g. validation, decryption, and message ordering are skipped.
1 parent 3187e70 commit 8c5f0cd

File tree

3 files changed

+69
-8
lines changed

3 files changed

+69
-8
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ Changes before Tatum release are not documented in this file.
1414

1515
- Proxy connections now support bidirectionality and it is the default behavior. (https://github.com/streamr-dev/network/pull/3260)
1616
- Added `findProxyNodes` function to `StreamrClient` for discovering proxy nodes via Operator nodes.
17+
- Add `StreamrClient#publishRaw()` for publishing raw messages (https://github.com/streamr-dev/network/pull/3280)
1718

1819
#### Changed
1920

packages/sdk/src/StreamrClient.ts

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,22 @@
22
* Importing 'timers' ensures `setImmediate` is available in browsers,
33
* as it's polyfilled by `timers-browserify`. In Node.js, it's already global.
44
*/
5-
import 'timers'
65
import 'reflect-metadata'
6+
import 'timers'
77
import './utils/PatchTsyringe'
88

99
import { DhtAddress } from '@streamr/dht'
1010
import { ProxyDirection, StreamPartDeliveryOptions } from '@streamr/trackerless-network'
11-
import { DEFAULT_PARTITION_COUNT, EthereumAddress, HexString, Logger, StreamID,
12-
TheGraphClient, toEthereumAddress, toUserId } from '@streamr/utils'
11+
import {
12+
DEFAULT_PARTITION_COUNT, EthereumAddress, HexString, Logger, StreamID,
13+
TheGraphClient, toEthereumAddress, toUserId
14+
} from '@streamr/utils'
1315
import type { Overrides } from 'ethers'
1416
import EventEmitter from 'eventemitter3'
1517
import merge from 'lodash/merge'
1618
import omit from 'lodash/omit'
1719
import { container as rootContainer } from 'tsyringe'
1820
import { PublishMetadata, Publisher } from '../src/publish/Publisher'
19-
import { Identity, IdentityInjectionToken, SignerWithProvider } from './identity/Identity'
2021
import {
2122
ConfigInjectionToken,
2223
NetworkPeerDescriptor,
@@ -29,6 +30,7 @@ import { DestroySignal } from './DestroySignal'
2930
import { Message, convertStreamMessageToMessage } from './Message'
3031
import { MetricsPublisher } from './MetricsPublisher'
3132
import { NetworkNodeFacade } from './NetworkNodeFacade'
33+
import { ProxyNodeFinder } from './ProxyNodeFinder'
3234
import { RpcProviderSource } from './RpcProviderSource'
3335
import { Stream } from './Stream'
3436
import { StreamIDBuilder } from './StreamIDBuilder'
@@ -37,6 +39,7 @@ import { ChainEventPoller } from './contracts/ChainEventPoller'
3739
import { ContractFactory } from './contracts/ContractFactory'
3840
import { Operator } from './contracts/Operator'
3941
import { OperatorRegistry } from './contracts/OperatorRegistry'
42+
import { SponsorshipFactory } from './contracts/SponsorshipFactory'
4043
import { StorageNodeMetadata, StorageNodeRegistry } from './contracts/StorageNodeRegistry'
4144
import { StreamRegistry } from './contracts/StreamRegistry'
4245
import { StreamStorageRegistry } from './contracts/StreamStorageRegistry'
@@ -46,7 +49,10 @@ import { LocalGroupKeyStore, UpdateEncryptionKeyOptions } from './encryption/Loc
4649
import { PublisherKeyExchange } from './encryption/PublisherKeyExchange'
4750
import { getEthersOverrides as _getEthersOverrides } from './ethereumUtils'
4851
import { StreamrClientEventEmitter, StreamrClientEvents } from './events'
52+
import { Identity, IdentityInjectionToken, SignerWithProvider } from './identity/Identity'
53+
import { createIdentityFromConfig } from './identity/IdentityMapping'
4954
import { PermissionAssignment, PermissionQuery, toInternalPermissionAssignment, toInternalPermissionQuery } from './permission'
55+
import { StreamMessage } from './protocol/StreamMessage'
5056
import { MessageListener, MessageStream } from './subscribe/MessageStream'
5157
import { ResendOptions, Resends, toInternalResendOptions } from './subscribe/Resends'
5258
import { Subscriber } from './subscribe/Subscriber'
@@ -57,12 +63,9 @@ import { StreamDefinition } from './types'
5763
import { map } from './utils/GeneratorUtils'
5864
import { LoggerFactory } from './utils/LoggerFactory'
5965
import { addStreamToStorageNode } from './utils/addStreamToStorageNode'
66+
import { assertCompliantIdentity } from './utils/encryptionCompliance'
6067
import { pOnce } from './utils/promises'
6168
import { convertPeerDescriptorToNetworkPeerDescriptor, createTheGraphClient } from './utils/utils'
62-
import { createIdentityFromConfig } from './identity/IdentityMapping'
63-
import { assertCompliantIdentity } from './utils/encryptionCompliance'
64-
import { SponsorshipFactory } from './contracts/SponsorshipFactory'
65-
import { ProxyNodeFinder } from './ProxyNodeFinder'
6669

6770
// TODO: this type only exists to enable tsdoc to generate proper documentation
6871
export type SubscribeOptions = StreamDefinition & ExtraSubscribeOptions
@@ -181,6 +184,11 @@ export class StreamrClient {
181184
return convertStreamMessageToMessage(result)
182185
}
183186

187+
async publishRaw(message: StreamMessage, deliveryOptions?: StreamPartDeliveryOptions): Promise<void> {
188+
await this.node.broadcast(message, deliveryOptions)
189+
this.eventEmitter.emit('messagePublished', message)
190+
}
191+
184192
/**
185193
* Manually updates the encryption key used when publishing messages to a given stream.
186194
*/
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import { StreamID, toUserId } from '@streamr/utils'
2+
import { createTestClient, createTestStream } from '../test-utils/utils'
3+
import { nextValue } from '../../src/utils/iterators'
4+
import { EthereumKeyPairIdentity } from '../../src/identity/EthereumKeyPairIdentity'
5+
import { MessageSigner } from '../../src/signature/MessageSigner'
6+
import { Wallet } from 'ethers'
7+
import { MessageID } from '../../src/protocol/MessageID'
8+
import { ContentType, EncryptionType, SignatureType } from '@streamr/trackerless-network'
9+
import { StreamMessageType, StreamPermission } from '../../src'
10+
import { createTestWallet } from '@streamr/test-utils'
11+
12+
describe('publish-subscribe-raw', () => {
13+
14+
let streamId: StreamID
15+
let publisherWallet: Wallet
16+
17+
beforeEach(async () => {
18+
const creatorWallet = await createTestWallet({ gas: true })
19+
const creatorClient = createTestClient(creatorWallet.privateKey)
20+
const stream = await createTestStream(creatorClient, module)
21+
streamId = stream.id
22+
publisherWallet = await createTestWallet()
23+
stream.grantPermissions({
24+
userId: publisherWallet.address.toLowerCase(),
25+
permissions: [StreamPermission.PUBLISH]
26+
})
27+
await creatorClient.destroy()
28+
})
29+
30+
async function createTestMessage() {
31+
const messageSigner = new MessageSigner(EthereumKeyPairIdentity.fromPrivateKey(publisherWallet.privateKey))
32+
return await messageSigner.createSignedMessage({
33+
messageId: new MessageID(streamId, 0, 123456789, 0, toUserId(publisherWallet.address), 'mock-msgChainId'),
34+
content: new Uint8Array([1, 2, 3]),
35+
contentType: ContentType.BINARY,
36+
encryptionType: EncryptionType.NONE,
37+
messageType: StreamMessageType.MESSAGE
38+
}, SignatureType.ECDSA_SECP256K1_EVM)
39+
}
40+
41+
it('happy path', async () => {
42+
const publisher = createTestClient()
43+
const subscriber = createTestClient()
44+
const subcription = await subscriber.subscribe({ streamId, raw: true })
45+
const sentMessage = await createTestMessage()
46+
await publisher.publishRaw(sentMessage)
47+
const receivedMessage = await nextValue(subcription[Symbol.asyncIterator]())
48+
expect(receivedMessage! .streamMessage).toEqual(sentMessage)
49+
await publisher.destroy()
50+
await subscriber.destroy()
51+
})
52+
})

0 commit comments

Comments
 (0)