Skip to content

Commit 2502c44

Browse files
juslesanteogeb
andauthored
feat(sdk): [NET-1661] Configuration options for bypassing validation (#3302)
## Summary Add new configuration section `validation` that can be used to disable validation of stream permissions and partition counts. ## Changes - Added new `validation` section that can be used to set `permissions` and `partitions` to `false` to bypass validation. --------- Co-authored-by: Teo Gebhard <[email protected]>
1 parent fe18df1 commit 2502c44

20 files changed

+197
-76
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ Changes before Tatum release are not documented in this file.
1616
- Add `StreamrClient#findProxyNodes()` function for discovering proxy nodes via Operator nodes (https://github.com/streamr-dev/network/pull/3257)
1717
- Add `StreamrClient#publishRaw()` for publishing raw messages (https://github.com/streamr-dev/network/pull/3280)
1818
- Add new `keys` configuration to the `encryption` section (https://github.com/streamr-dev/network/pull/3284)
19+
- Add new `validation` configuration section (https://github.com/streamr-dev/network/pull/3302)
1920

2021
#### Changed
2122

packages/sdk/src/Config.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,11 @@ export interface StreamrClientConfig {
412412
pollInterval?: number
413413
}
414414

415+
validation?: {
416+
permissions?: boolean
417+
partitions?: boolean
418+
}
419+
415420
/**
416421
* Determines the telemetry metrics that are sent to the Streamr Network
417422
* at regular intervals.
@@ -457,6 +462,7 @@ export type StrictStreamrClientConfig = MarkOptional<Required<StreamrClientConfi
457462
network: Exclude<Required<StreamrClientConfig['network']>, undefined>
458463
contracts: Exclude<Required<StreamrClientConfig['contracts']>, undefined>
459464
encryption: Exclude<Required<StreamrClientConfig['encryption']>, undefined>
465+
validation: Exclude<Required<StreamrClientConfig['validation']>, undefined>
460466
cache: Exclude<Required<StreamrClientConfig['cache']>, undefined>
461467
/** @internal */
462468
_timeouts: Exclude<DeepRequired<StreamrClientConfig['_timeouts']>, undefined>

packages/sdk/src/config.schema.json

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,21 @@
388388
},
389389
"default": {}
390390
},
391+
"validation": {
392+
"type": "object",
393+
"additionalProperties": false,
394+
"properties": {
395+
"permissions": {
396+
"type": "boolean",
397+
"default": true
398+
},
399+
"partitions": {
400+
"type": "boolean",
401+
"default": true
402+
}
403+
},
404+
"default": {}
405+
},
391406
"metrics": {
392407
"anyOf": [
393408
{

packages/sdk/src/encryption/PublisherKeyExchange.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ export class PublisherKeyExchange {
5151
private readonly identity: Identity
5252
private readonly logger: Logger
5353
private readonly erc1271Publishers = new Set<UserID>()
54-
private readonly config: Pick<StrictStreamrClientConfig, 'encryption'>
54+
private readonly config: Pick<StrictStreamrClientConfig, 'encryption' | 'validation'>
5555

5656
constructor(
5757
networkNodeFacade: NetworkNodeFacade,
@@ -60,7 +60,7 @@ export class PublisherKeyExchange {
6060
messageSigner: MessageSigner,
6161
store: LocalGroupKeyStore,
6262
@inject(IdentityInjectionToken) identity: Identity,
63-
@inject(ConfigInjectionToken) config: Pick<StrictStreamrClientConfig, 'encryption'>,
63+
@inject(ConfigInjectionToken) config: Pick<StrictStreamrClientConfig, 'encryption' | 'validation'>,
6464
eventEmitter: StreamrClientEventEmitter,
6565
loggerFactory: LoggerFactory
6666
) {
@@ -110,7 +110,7 @@ export class PublisherKeyExchange {
110110
if (responseType !== ResponseType.NONE) {
111111
this.logger.debug('Handling group key request',
112112
{ requestId, responseType, keyEncryptionType: AsymmetricEncryptionType[keyEncryptionType] })
113-
await validateStreamMessage(request, this.streamRegistry, this.signatureValidator)
113+
await validateStreamMessage(request, this.streamRegistry, this.signatureValidator, this.config)
114114
const authenticatedUser = await this.identity.getUserId()
115115
const keys = without(
116116
await Promise.all(groupKeyIds.map((id: string) => this.store.get(id, authenticatedUser))),

packages/sdk/src/encryption/SubscriberKeyExchange.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ export class SubscriberKeyExchange {
4242
private readonly identity: Identity
4343
private readonly logger: Logger
4444
private readonly ensureStarted: () => Promise<void>
45+
private readonly config: Pick<StrictStreamrClientConfig, 'encryption' | 'validation'>
4546
requestGroupKey: (groupKeyId: string, publisherId: UserID, streamPartId: StreamPartID) => Promise<void>
4647

4748
constructor(
@@ -51,7 +52,7 @@ export class SubscriberKeyExchange {
5152
messageSigner: MessageSigner,
5253
store: LocalGroupKeyStore,
5354
subscriber: Subscriber,
54-
@inject(ConfigInjectionToken) config: Pick<StrictStreamrClientConfig, 'encryption'>,
55+
@inject(ConfigInjectionToken) config: Pick<StrictStreamrClientConfig, 'encryption' | 'validation'>,
5556
@inject(IdentityInjectionToken) identity: Identity,
5657
loggerFactory: LoggerFactory
5758
) {
@@ -63,6 +64,7 @@ export class SubscriberKeyExchange {
6364
this.subscriber = subscriber
6465
this.identity = identity
6566
this.logger = loggerFactory.createLogger(module)
67+
this.config = config
6668
// Setting explicit keys disables the key-exchange
6769
if (config.encryption.keys === undefined) {
6870
this.ensureStarted = pOnce(async () => {
@@ -142,7 +144,7 @@ export class SubscriberKeyExchange {
142144
if (await this.isAssignedToMe(msg.getStreamPartID(), recipientUserId, requestId)) {
143145
this.logger.debug('Handle group key response', { requestId })
144146
this.pendingRequests.delete(requestId)
145-
await validateStreamMessage(msg, this.streamRegistry, this.signatureValidator)
147+
await validateStreamMessage(msg, this.streamRegistry, this.signatureValidator, this.config)
146148
await Promise.all(encryptedGroupKeys.map(async (encryptedKey) => {
147149
const key = await EncryptionUtil.decryptWithPrivateKey(encryptedKey.data, this.keyPair!.getPrivateKey(), encryptionType)
148150
await this.store.set(encryptedKey.id, msg.getPublisherId(), key)

packages/sdk/src/generated/validateConfig.js

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/sdk/src/publish/MessageFactory.ts

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import { formLookupKey } from '../utils/utils'
2020
import { GroupKeyQueue } from './GroupKeyQueue'
2121
import { PublishMetadata } from './Publisher'
2222
import { createMessageRef, createRandomMsgChainId } from './messageChain'
23-
import { StreamrClientConfig } from '../Config'
23+
import { StrictStreamrClientConfig } from '../Config'
2424
import { isCompliantEncryptionType } from '../utils/encryptionCompliance'
2525

2626
export interface MessageFactoryOptions {
@@ -30,7 +30,7 @@ export interface MessageFactoryOptions {
3030
groupKeyQueue: GroupKeyQueue
3131
signatureValidator: SignatureValidator
3232
messageSigner: MessageSigner
33-
config: Pick<StreamrClientConfig, 'encryption'>
33+
config: Pick<StrictStreamrClientConfig, 'encryption' | 'validation'>
3434
}
3535

3636
export class MessageFactory {
@@ -45,7 +45,7 @@ export class MessageFactory {
4545
private readonly groupKeyQueue: GroupKeyQueue
4646
private readonly signatureValidator: SignatureValidator
4747
private readonly messageSigner: MessageSigner
48-
private readonly config: Pick<StreamrClientConfig, 'encryption'>
48+
private readonly config: Pick<StrictStreamrClientConfig, 'encryption' | 'validation'>
4949
private firstMessage = true
5050

5151
constructor(opts: MessageFactoryOptions) {
@@ -69,37 +69,49 @@ export class MessageFactory {
6969
explicitPartition?: number
7070
): Promise<StreamMessage> {
7171
const publisherId = await this.getPublisherId(metadata)
72-
const isPublisher = await this.streamRegistry.isStreamPublisher(this.streamId, publisherId)
73-
if (!isPublisher) {
74-
this.streamRegistry.invalidatePermissionCaches(this.streamId)
75-
throw new StreamrClientError(`You don't have permission to publish to this stream. Using address: ${publisherId}`, 'MISSING_PERMISSION')
72+
if (this.config.validation.permissions) {
73+
const isPublisher = await this.streamRegistry.isStreamPublisher(this.streamId, publisherId)
74+
if (!isPublisher) {
75+
this.streamRegistry.invalidatePermissionCaches(this.streamId)
76+
throw new StreamrClientError(
77+
`You don't have permission to publish to this stream. Using address: ${publisherId}`, 'MISSING_PERMISSION'
78+
)
79+
}
7680
}
77-
78-
const streamMetadata = await this.streamRegistry.getStreamMetadata(this.streamId)
79-
const partitionCount = getPartitionCount(streamMetadata)
8081
let partition
81-
if (explicitPartition !== undefined) {
82-
if ((explicitPartition < 0 || explicitPartition >= partitionCount)) {
83-
throw new Error(`Partition ${explicitPartition} is out of range (0..${partitionCount - 1})`)
84-
}
85-
if (metadata.partitionKey !== undefined) {
86-
throw new Error('Invalid combination of "partition" and "partitionKey"')
82+
if (!this.config.validation.partitions) {
83+
if (explicitPartition === undefined) {
84+
throw new Error(`Explicit partition must be set when partition validation is disabled`)
8785
}
8886
partition = explicitPartition
8987
} else {
90-
partition = (metadata.partitionKey !== undefined)
91-
? keyToArrayIndex(partitionCount, metadata.partitionKey)
92-
: this.getDefaultPartition(partitionCount)
88+
const streamMetadata = await this.streamRegistry.getStreamMetadata(this.streamId)
89+
const partitionCount = getPartitionCount(streamMetadata)
90+
if (explicitPartition !== undefined) {
91+
if ((explicitPartition < 0 || explicitPartition >= partitionCount)) {
92+
throw new Error(`Partition ${explicitPartition} is out of range (0..${partitionCount - 1})`)
93+
}
94+
if (metadata.partitionKey !== undefined) {
95+
throw new Error('Invalid combination of "partition" and "partitionKey"')
96+
}
97+
partition = explicitPartition
98+
} else {
99+
partition = (metadata.partitionKey !== undefined)
100+
? keyToArrayIndex(partitionCount, metadata.partitionKey)
101+
: this.getDefaultPartition(partitionCount)
102+
}
93103
}
94-
95104
const msgChainId = metadata.msgChainId ?? await this.defaultMessageChainIds.get(partition)
96105
const msgChainKey = formLookupKey([partition, msgChainId])
97106
const prevMsgRef = this.prevMsgRefs.get(msgChainKey)
98107
const msgRef = createMessageRef(metadata.timestamp, prevMsgRef)
99108
this.prevMsgRefs.set(msgChainKey, msgRef)
100109
const messageId = new MessageID(this.streamId, partition, msgRef.timestamp, msgRef.sequenceNumber, publisherId, msgChainId)
101-
102-
const encryptionType = (await this.streamRegistry.hasPublicSubscribePermission(this.streamId)) ? EncryptionType.NONE : EncryptionType.AES
110+
const encryptionType = this.config.validation.permissions
111+
? await this.streamRegistry.hasPublicSubscribePermission(this.streamId)
112+
? EncryptionType.NONE
113+
: EncryptionType.AES
114+
: EncryptionType.AES
103115
if (!isCompliantEncryptionType(encryptionType, this.config)) {
104116
throw new StreamrClientError(
105117
`Publishing to stream ${this.streamId} was prevented because configuration requires encryption!`,

packages/sdk/src/subscribe/messagePipeline.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ export interface MessagePipelineOptions {
2929
signatureValidator: SignatureValidator
3030
groupKeyManager: GroupKeyManager
3131
// eslint-disable-next-line max-len
32-
config: Pick<StrictStreamrClientConfig, 'encryption' | 'orderMessages' | 'gapFillTimeout' | 'retryResendAfter' | 'maxGapRequests' | 'gapFill' | 'gapFillStrategy'>
32+
config: Pick<StrictStreamrClientConfig, 'encryption' | 'orderMessages' | 'gapFillTimeout' | 'retryResendAfter' | 'maxGapRequests' | 'gapFill' | 'gapFillStrategy' | 'validation'>
3333
destroySignal: DestroySignal
3434
loggerFactory: LoggerFactory
3535
}
@@ -52,7 +52,7 @@ export const createMessagePipeline = (opts: MessagePipelineOptions): PushPipelin
5252

5353
const messageStream = new PushPipeline<StreamMessage, StreamMessage>
5454
const msgChainUtil = new MsgChainUtil(async (msg) => {
55-
await validateStreamMessage(msg, opts.streamRegistry, opts.signatureValidator)
55+
await validateStreamMessage(msg, opts.streamRegistry, opts.signatureValidator, opts.config)
5656

5757
if (msg.encryptionType !== EncryptionType.NONE && !isCompliantEncryptionType(msg.encryptionType, opts.config)) {
5858
throw new StreamrClientError(`A message in stream ${

packages/sdk/src/utils/validateStreamMessage.ts

Lines changed: 41 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,15 @@ import { SignatureValidator } from '../signature/SignatureValidator'
55
import { getPartitionCount } from '../StreamMetadata'
66
import { StreamrClientError } from '../StreamrClientError'
77
import { GroupKeyRequest, GroupKeyResponse } from '@streamr/trackerless-network'
8+
import { StrictStreamrClientConfig } from '../Config'
89

910
export const validateStreamMessage = async (
1011
msg: StreamMessage,
1112
streamRegistry: StreamRegistry,
12-
signatureValidator: SignatureValidator
13+
signatureValidator: SignatureValidator,
14+
config: Pick<StrictStreamrClientConfig, 'validation'>
1315
): Promise<void> => {
14-
await doValidate(msg, streamRegistry, signatureValidator).catch((err: any) => {
16+
await doValidate(msg, streamRegistry, signatureValidator, config).catch((err: any) => {
1517
// all StreamMessageError already have this streamMessage, maybe this is
1618
// here if e.g. contract call fails? TODO is this really needed as
1719
// the onError callback in messagePipeline knows which message
@@ -33,25 +35,28 @@ export const validateStreamMessage = async (
3335
const doValidate = async (
3436
streamMessage: StreamMessage,
3537
streamRegistry: StreamRegistry,
36-
signatureValidator: SignatureValidator
38+
signatureValidator: SignatureValidator,
39+
config: Pick<StrictStreamrClientConfig, 'validation'>
3740
): Promise<void> => {
3841
await signatureValidator.assertSignatureIsValid(streamMessage)
3942
switch (streamMessage.messageType) {
4043
case StreamMessageType.MESSAGE:
41-
return validateMessage(streamMessage, streamRegistry)
44+
return validateMessage(streamMessage, streamRegistry, config)
4245
case StreamMessageType.GROUP_KEY_REQUEST:
4346
return validateGroupKeyMessage(
4447
streamMessage,
4548
toUserId(GroupKeyRequest.fromBinary(streamMessage.content).recipientId),
4649
streamMessage.getPublisherId(),
47-
streamRegistry
50+
streamRegistry,
51+
config
4852
)
4953
case StreamMessageType.GROUP_KEY_RESPONSE:
5054
return validateGroupKeyMessage(
5155
streamMessage,
5256
streamMessage.getPublisherId(),
5357
toUserId(GroupKeyResponse.fromBinary(streamMessage.content).recipientId),
54-
streamRegistry
58+
streamRegistry,
59+
config
5560
)
5661
default:
5762
throw new StreamrClientError(`Unknown message type: ${streamMessage.messageType}!`, 'ASSERTION_FAILED', streamMessage)
@@ -60,38 +65,46 @@ const doValidate = async (
6065

6166
const validateMessage = async (
6267
streamMessage: StreamMessage,
63-
streamRegistry: StreamRegistry
68+
streamRegistry: StreamRegistry,
69+
config: Pick<StrictStreamrClientConfig, 'validation'>
6470
): Promise<void> => {
6571
const streamId = streamMessage.getStreamId()
66-
const streamMetadata = await streamRegistry.getStreamMetadata(streamId)
67-
const partitionCount = getPartitionCount(streamMetadata)
68-
if (streamMessage.getStreamPartition() < 0 || streamMessage.getStreamPartition() >= partitionCount) {
69-
throw new StreamrClientError(
70-
`Partition ${streamMessage.getStreamPartition()} is out of range (0..${partitionCount - 1})`,
71-
'INVALID_PARTITION',
72-
streamMessage
73-
)
72+
if (config.validation.partitions) {
73+
const streamMetadata = await streamRegistry.getStreamMetadata(streamId)
74+
const partitionCount = getPartitionCount(streamMetadata)
75+
if (streamMessage.getStreamPartition() < 0 || streamMessage.getStreamPartition() >= partitionCount) {
76+
throw new StreamrClientError(
77+
`Partition ${streamMessage.getStreamPartition()} is out of range (0..${partitionCount - 1})`,
78+
'INVALID_PARTITION',
79+
streamMessage
80+
)
81+
}
7482
}
75-
const sender = streamMessage.getPublisherId()
76-
const isPublisher = await streamRegistry.isStreamPublisher(streamId, sender)
77-
if (!isPublisher) {
78-
throw new StreamrClientError(`${sender} is not a publisher on stream ${streamId}`, 'MISSING_PERMISSION', streamMessage)
83+
if (config.validation.permissions) {
84+
const sender = streamMessage.getPublisherId()
85+
const isPublisher = await streamRegistry.isStreamPublisher(streamId, sender)
86+
if (!isPublisher) {
87+
throw new StreamrClientError(`${sender} is not a publisher on stream ${streamId}`, 'MISSING_PERMISSION', streamMessage)
88+
}
7989
}
8090
}
8191

8292
const validateGroupKeyMessage = async (
8393
streamMessage: StreamMessage,
8494
expectedPublisherId: UserID,
8595
expectedSubscriberId: UserID,
86-
streamRegistry: StreamRegistry
96+
streamRegistry: StreamRegistry,
97+
config: Pick<StrictStreamrClientConfig, 'validation'>
8798
): Promise<void> => {
88-
const streamId = streamMessage.getStreamId()
89-
const isPublisher = await streamRegistry.isStreamPublisher(streamId, expectedPublisherId)
90-
if (!isPublisher) {
91-
throw new StreamrClientError(`${expectedPublisherId} is not a publisher on stream ${streamId}`, 'MISSING_PERMISSION', streamMessage)
92-
}
93-
const isSubscriber = await streamRegistry.isStreamSubscriber(streamId, expectedSubscriberId)
94-
if (!isSubscriber) {
95-
throw new StreamrClientError(`${expectedSubscriberId} is not a subscriber on stream ${streamId}`, 'MISSING_PERMISSION', streamMessage)
99+
if (config.validation.permissions) {
100+
const streamId = streamMessage.getStreamId()
101+
const isPublisher = await streamRegistry.isStreamPublisher(streamId, expectedPublisherId)
102+
if (!isPublisher) {
103+
throw new StreamrClientError(`${expectedPublisherId} is not a publisher on stream ${streamId}`, 'MISSING_PERMISSION', streamMessage)
104+
}
105+
const isSubscriber = await streamRegistry.isStreamSubscriber(streamId, expectedSubscriberId)
106+
if (!isSubscriber) {
107+
throw new StreamrClientError(`${expectedSubscriberId} is not a subscriber on stream ${streamId}`, 'MISSING_PERMISSION', streamMessage)
108+
}
96109
}
97110
}

packages/sdk/test/integration/Resends.test.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { SignatureValidator } from '../../src/signature/SignatureValidator'
1313
import { FakeEnvironment } from '../test-utils/fake/FakeEnvironment'
1414
import { createGroupKeyQueue, createStreamRegistry } from '../test-utils/utils'
1515
import { EthereumKeyPairIdentity } from '../../src/identity/EthereumKeyPairIdentity'
16+
import { createStrictConfig } from '../../src/Config'
1617

1718
describe('Resends', () => {
1819

@@ -47,7 +48,7 @@ describe('Resends', () => {
4748
groupKeyQueue: await createGroupKeyQueue(identity, groupKey),
4849
signatureValidator: mock<SignatureValidator>(),
4950
messageSigner: new MessageSigner(identity),
50-
config: {},
51+
config: createStrictConfig()
5152
})
5253
// store the encryption key publisher's local group key store
5354
await publisher.updateEncryptionKey({

0 commit comments

Comments
 (0)