Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Changes before Tatum release are not documented in this file.
- Add `StreamrClient#findProxyNodes()` function for discovering proxy nodes via Operator nodes (https://github.com/streamr-dev/network/pull/3257)
- Add `StreamrClient#publishRaw()` for publishing raw messages (https://github.com/streamr-dev/network/pull/3280)
- Add new `keys` configuration to the `encryption` section (https://github.com/streamr-dev/network/pull/3284)
- Add new `validation` configuration section (https://github.com/streamr-dev/network/pull/3302)

#### Changed

Expand Down
6 changes: 6 additions & 0 deletions packages/sdk/src/Config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,11 @@ export interface StreamrClientConfig {
pollInterval?: number
}

validation?: {
permissions?: boolean
partitions?: boolean
}

/**
* Determines the telemetry metrics that are sent to the Streamr Network
* at regular intervals.
Expand Down Expand Up @@ -457,6 +462,7 @@ export type StrictStreamrClientConfig = MarkOptional<Required<StreamrClientConfi
network: Exclude<Required<StreamrClientConfig['network']>, undefined>
contracts: Exclude<Required<StreamrClientConfig['contracts']>, undefined>
encryption: Exclude<Required<StreamrClientConfig['encryption']>, undefined>
validation: Exclude<Required<StreamrClientConfig['validation']>, undefined>
cache: Exclude<Required<StreamrClientConfig['cache']>, undefined>
/** @internal */
_timeouts: Exclude<DeepRequired<StreamrClientConfig['_timeouts']>, undefined>
Expand Down
15 changes: 15 additions & 0 deletions packages/sdk/src/config.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,21 @@
},
"default": {}
},
"validation": {
"type": "object",
"additionalProperties": false,
"properties": {
"permissions": {
"type": "boolean",
"default": true
},
"partitions": {
"type": "boolean",
"default": true
}
},
"default": {}
},
"metrics": {
"anyOf": [
{
Expand Down
6 changes: 3 additions & 3 deletions packages/sdk/src/encryption/PublisherKeyExchange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export class PublisherKeyExchange {
private readonly identity: Identity
private readonly logger: Logger
private readonly erc1271Publishers = new Set<UserID>()
private readonly config: Pick<StrictStreamrClientConfig, 'encryption'>
private readonly config: Pick<StrictStreamrClientConfig, 'encryption' | 'validation'>

constructor(
networkNodeFacade: NetworkNodeFacade,
Expand All @@ -60,7 +60,7 @@ export class PublisherKeyExchange {
messageSigner: MessageSigner,
store: LocalGroupKeyStore,
@inject(IdentityInjectionToken) identity: Identity,
@inject(ConfigInjectionToken) config: Pick<StrictStreamrClientConfig, 'encryption'>,
@inject(ConfigInjectionToken) config: Pick<StrictStreamrClientConfig, 'encryption' | 'validation'>,
eventEmitter: StreamrClientEventEmitter,
loggerFactory: LoggerFactory
) {
Expand Down Expand Up @@ -110,7 +110,7 @@ export class PublisherKeyExchange {
if (responseType !== ResponseType.NONE) {
this.logger.debug('Handling group key request',
{ requestId, responseType, keyEncryptionType: AsymmetricEncryptionType[keyEncryptionType] })
await validateStreamMessage(request, this.streamRegistry, this.signatureValidator)
await validateStreamMessage(request, this.streamRegistry, this.signatureValidator, this.config)
const authenticatedUser = await this.identity.getUserId()
const keys = without(
await Promise.all(groupKeyIds.map((id: string) => this.store.get(id, authenticatedUser))),
Expand Down
6 changes: 4 additions & 2 deletions packages/sdk/src/encryption/SubscriberKeyExchange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ export class SubscriberKeyExchange {
private readonly identity: Identity
private readonly logger: Logger
private readonly ensureStarted: () => Promise<void>
private readonly config: Pick<StrictStreamrClientConfig, 'encryption' | 'validation'>
requestGroupKey: (groupKeyId: string, publisherId: UserID, streamPartId: StreamPartID) => Promise<void>

constructor(
Expand All @@ -51,7 +52,7 @@ export class SubscriberKeyExchange {
messageSigner: MessageSigner,
store: LocalGroupKeyStore,
subscriber: Subscriber,
@inject(ConfigInjectionToken) config: Pick<StrictStreamrClientConfig, 'encryption'>,
@inject(ConfigInjectionToken) config: Pick<StrictStreamrClientConfig, 'encryption' | 'validation'>,
@inject(IdentityInjectionToken) identity: Identity,
loggerFactory: LoggerFactory
) {
Expand All @@ -63,6 +64,7 @@ export class SubscriberKeyExchange {
this.subscriber = subscriber
this.identity = identity
this.logger = loggerFactory.createLogger(module)
this.config = config
// Setting explicit keys disables the key-exchange
if (config.encryption.keys === undefined) {
this.ensureStarted = pOnce(async () => {
Expand Down Expand Up @@ -142,7 +144,7 @@ export class SubscriberKeyExchange {
if (await this.isAssignedToMe(msg.getStreamPartID(), recipientUserId, requestId)) {
this.logger.debug('Handle group key response', { requestId })
this.pendingRequests.delete(requestId)
await validateStreamMessage(msg, this.streamRegistry, this.signatureValidator)
await validateStreamMessage(msg, this.streamRegistry, this.signatureValidator, this.config)
await Promise.all(encryptedGroupKeys.map(async (encryptedKey) => {
const key = await EncryptionUtil.decryptWithPrivateKey(encryptedKey.data, this.keyPair!.getPrivateKey(), encryptionType)
await this.store.set(encryptedKey.id, msg.getPublisherId(), key)
Expand Down
2 changes: 1 addition & 1 deletion packages/sdk/src/generated/validateConfig.js

Large diffs are not rendered by default.

56 changes: 34 additions & 22 deletions packages/sdk/src/publish/MessageFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import { formLookupKey } from '../utils/utils'
import { GroupKeyQueue } from './GroupKeyQueue'
import { PublishMetadata } from './Publisher'
import { createMessageRef, createRandomMsgChainId } from './messageChain'
import { StreamrClientConfig } from '../Config'
import { StrictStreamrClientConfig } from '../Config'
import { isCompliantEncryptionType } from '../utils/encryptionCompliance'

export interface MessageFactoryOptions {
Expand All @@ -30,7 +30,7 @@ export interface MessageFactoryOptions {
groupKeyQueue: GroupKeyQueue
signatureValidator: SignatureValidator
messageSigner: MessageSigner
config: Pick<StreamrClientConfig, 'encryption'>
config: Pick<StrictStreamrClientConfig, 'encryption' | 'validation'>
}

export class MessageFactory {
Expand All @@ -45,7 +45,7 @@ export class MessageFactory {
private readonly groupKeyQueue: GroupKeyQueue
private readonly signatureValidator: SignatureValidator
private readonly messageSigner: MessageSigner
private readonly config: Pick<StreamrClientConfig, 'encryption'>
private readonly config: Pick<StrictStreamrClientConfig, 'encryption' | 'validation'>
private firstMessage = true

constructor(opts: MessageFactoryOptions) {
Expand All @@ -69,37 +69,49 @@ export class MessageFactory {
explicitPartition?: number
): Promise<StreamMessage> {
const publisherId = await this.getPublisherId(metadata)
const isPublisher = await this.streamRegistry.isStreamPublisher(this.streamId, publisherId)
if (!isPublisher) {
this.streamRegistry.invalidatePermissionCaches(this.streamId)
throw new StreamrClientError(`You don't have permission to publish to this stream. Using address: ${publisherId}`, 'MISSING_PERMISSION')
if (this.config.validation.permissions) {
const isPublisher = await this.streamRegistry.isStreamPublisher(this.streamId, publisherId)
if (!isPublisher) {
this.streamRegistry.invalidatePermissionCaches(this.streamId)
throw new StreamrClientError(
`You don't have permission to publish to this stream. Using address: ${publisherId}`, 'MISSING_PERMISSION'
)
}
}

const streamMetadata = await this.streamRegistry.getStreamMetadata(this.streamId)
const partitionCount = getPartitionCount(streamMetadata)
let partition
if (explicitPartition !== undefined) {
if ((explicitPartition < 0 || explicitPartition >= partitionCount)) {
throw new Error(`Partition ${explicitPartition} is out of range (0..${partitionCount - 1})`)
}
if (metadata.partitionKey !== undefined) {
throw new Error('Invalid combination of "partition" and "partitionKey"')
if (!this.config.validation.partitions) {
if (explicitPartition === undefined) {
throw new Error(`Explicit partition must be set when partition validation is disabled`)
}
partition = explicitPartition
} else {
partition = (metadata.partitionKey !== undefined)
? keyToArrayIndex(partitionCount, metadata.partitionKey)
: this.getDefaultPartition(partitionCount)
const streamMetadata = await this.streamRegistry.getStreamMetadata(this.streamId)
const partitionCount = getPartitionCount(streamMetadata)
if (explicitPartition !== undefined) {
if ((explicitPartition < 0 || explicitPartition >= partitionCount)) {
throw new Error(`Partition ${explicitPartition} is out of range (0..${partitionCount - 1})`)
}
if (metadata.partitionKey !== undefined) {
throw new Error('Invalid combination of "partition" and "partitionKey"')
}
partition = explicitPartition
} else {
partition = (metadata.partitionKey !== undefined)
? keyToArrayIndex(partitionCount, metadata.partitionKey)
: this.getDefaultPartition(partitionCount)
}
}

const msgChainId = metadata.msgChainId ?? await this.defaultMessageChainIds.get(partition)
const msgChainKey = formLookupKey([partition, msgChainId])
const prevMsgRef = this.prevMsgRefs.get(msgChainKey)
const msgRef = createMessageRef(metadata.timestamp, prevMsgRef)
this.prevMsgRefs.set(msgChainKey, msgRef)
const messageId = new MessageID(this.streamId, partition, msgRef.timestamp, msgRef.sequenceNumber, publisherId, msgChainId)

const encryptionType = (await this.streamRegistry.hasPublicSubscribePermission(this.streamId)) ? EncryptionType.NONE : EncryptionType.AES
const encryptionType = this.config.validation.permissions
? await this.streamRegistry.hasPublicSubscribePermission(this.streamId)
? EncryptionType.NONE
: EncryptionType.AES
: EncryptionType.AES
if (!isCompliantEncryptionType(encryptionType, this.config)) {
throw new StreamrClientError(
`Publishing to stream ${this.streamId} was prevented because configuration requires encryption!`,
Expand Down
4 changes: 2 additions & 2 deletions packages/sdk/src/subscribe/messagePipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export interface MessagePipelineOptions {
signatureValidator: SignatureValidator
groupKeyManager: GroupKeyManager
// eslint-disable-next-line max-len
config: Pick<StrictStreamrClientConfig, 'encryption' | 'orderMessages' | 'gapFillTimeout' | 'retryResendAfter' | 'maxGapRequests' | 'gapFill' | 'gapFillStrategy'>
config: Pick<StrictStreamrClientConfig, 'encryption' | 'orderMessages' | 'gapFillTimeout' | 'retryResendAfter' | 'maxGapRequests' | 'gapFill' | 'gapFillStrategy' | 'validation'>
destroySignal: DestroySignal
loggerFactory: LoggerFactory
}
Expand All @@ -52,7 +52,7 @@ export const createMessagePipeline = (opts: MessagePipelineOptions): PushPipelin

const messageStream = new PushPipeline<StreamMessage, StreamMessage>
const msgChainUtil = new MsgChainUtil(async (msg) => {
await validateStreamMessage(msg, opts.streamRegistry, opts.signatureValidator)
await validateStreamMessage(msg, opts.streamRegistry, opts.signatureValidator, opts.config)

if (msg.encryptionType !== EncryptionType.NONE && !isCompliantEncryptionType(msg.encryptionType, opts.config)) {
throw new StreamrClientError(`A message in stream ${
Expand Down
69 changes: 41 additions & 28 deletions packages/sdk/src/utils/validateStreamMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import { SignatureValidator } from '../signature/SignatureValidator'
import { getPartitionCount } from '../StreamMetadata'
import { StreamrClientError } from '../StreamrClientError'
import { GroupKeyRequest, GroupKeyResponse } from '@streamr/trackerless-network'
import { StrictStreamrClientConfig } from '../Config'

export const validateStreamMessage = async (
msg: StreamMessage,
streamRegistry: StreamRegistry,
signatureValidator: SignatureValidator
signatureValidator: SignatureValidator,
config: Pick<StrictStreamrClientConfig, 'validation'>
): Promise<void> => {
await doValidate(msg, streamRegistry, signatureValidator).catch((err: any) => {
await doValidate(msg, streamRegistry, signatureValidator, config).catch((err: any) => {
// all StreamMessageError already have this streamMessage, maybe this is
// here if e.g. contract call fails? TODO is this really needed as
// the onError callback in messagePipeline knows which message
Expand All @@ -33,25 +35,28 @@ export const validateStreamMessage = async (
const doValidate = async (
streamMessage: StreamMessage,
streamRegistry: StreamRegistry,
signatureValidator: SignatureValidator
signatureValidator: SignatureValidator,
config: Pick<StrictStreamrClientConfig, 'validation'>
): Promise<void> => {
await signatureValidator.assertSignatureIsValid(streamMessage)
switch (streamMessage.messageType) {
case StreamMessageType.MESSAGE:
return validateMessage(streamMessage, streamRegistry)
return validateMessage(streamMessage, streamRegistry, config)
case StreamMessageType.GROUP_KEY_REQUEST:
return validateGroupKeyMessage(
streamMessage,
toUserId(GroupKeyRequest.fromBinary(streamMessage.content).recipientId),
streamMessage.getPublisherId(),
streamRegistry
streamRegistry,
config
)
case StreamMessageType.GROUP_KEY_RESPONSE:
return validateGroupKeyMessage(
streamMessage,
streamMessage.getPublisherId(),
toUserId(GroupKeyResponse.fromBinary(streamMessage.content).recipientId),
streamRegistry
streamRegistry,
config
)
default:
throw new StreamrClientError(`Unknown message type: ${streamMessage.messageType}!`, 'ASSERTION_FAILED', streamMessage)
Expand All @@ -60,38 +65,46 @@ const doValidate = async (

const validateMessage = async (
streamMessage: StreamMessage,
streamRegistry: StreamRegistry
streamRegistry: StreamRegistry,
config: Pick<StrictStreamrClientConfig, 'validation'>
): Promise<void> => {
const streamId = streamMessage.getStreamId()
const streamMetadata = await streamRegistry.getStreamMetadata(streamId)
const partitionCount = getPartitionCount(streamMetadata)
if (streamMessage.getStreamPartition() < 0 || streamMessage.getStreamPartition() >= partitionCount) {
throw new StreamrClientError(
`Partition ${streamMessage.getStreamPartition()} is out of range (0..${partitionCount - 1})`,
'INVALID_PARTITION',
streamMessage
)
if (config.validation.partitions) {
const streamMetadata = await streamRegistry.getStreamMetadata(streamId)
const partitionCount = getPartitionCount(streamMetadata)
if (streamMessage.getStreamPartition() < 0 || streamMessage.getStreamPartition() >= partitionCount) {
throw new StreamrClientError(
`Partition ${streamMessage.getStreamPartition()} is out of range (0..${partitionCount - 1})`,
'INVALID_PARTITION',
streamMessage
)
}
}
const sender = streamMessage.getPublisherId()
const isPublisher = await streamRegistry.isStreamPublisher(streamId, sender)
if (!isPublisher) {
throw new StreamrClientError(`${sender} is not a publisher on stream ${streamId}`, 'MISSING_PERMISSION', streamMessage)
if (config.validation.permissions) {
const sender = streamMessage.getPublisherId()
const isPublisher = await streamRegistry.isStreamPublisher(streamId, sender)
if (!isPublisher) {
throw new StreamrClientError(`${sender} is not a publisher on stream ${streamId}`, 'MISSING_PERMISSION', streamMessage)
}
}
}

const validateGroupKeyMessage = async (
streamMessage: StreamMessage,
expectedPublisherId: UserID,
expectedSubscriberId: UserID,
streamRegistry: StreamRegistry
streamRegistry: StreamRegistry,
config: Pick<StrictStreamrClientConfig, 'validation'>
): Promise<void> => {
const streamId = streamMessage.getStreamId()
const isPublisher = await streamRegistry.isStreamPublisher(streamId, expectedPublisherId)
if (!isPublisher) {
throw new StreamrClientError(`${expectedPublisherId} is not a publisher on stream ${streamId}`, 'MISSING_PERMISSION', streamMessage)
}
const isSubscriber = await streamRegistry.isStreamSubscriber(streamId, expectedSubscriberId)
if (!isSubscriber) {
throw new StreamrClientError(`${expectedSubscriberId} is not a subscriber on stream ${streamId}`, 'MISSING_PERMISSION', streamMessage)
if (config.validation.permissions) {
const streamId = streamMessage.getStreamId()
const isPublisher = await streamRegistry.isStreamPublisher(streamId, expectedPublisherId)
if (!isPublisher) {
throw new StreamrClientError(`${expectedPublisherId} is not a publisher on stream ${streamId}`, 'MISSING_PERMISSION', streamMessage)
}
const isSubscriber = await streamRegistry.isStreamSubscriber(streamId, expectedSubscriberId)
if (!isSubscriber) {
throw new StreamrClientError(`${expectedSubscriberId} is not a subscriber on stream ${streamId}`, 'MISSING_PERMISSION', streamMessage)
}
}
}
3 changes: 2 additions & 1 deletion packages/sdk/test/integration/Resends.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { SignatureValidator } from '../../src/signature/SignatureValidator'
import { FakeEnvironment } from '../test-utils/fake/FakeEnvironment'
import { createGroupKeyQueue, createStreamRegistry } from '../test-utils/utils'
import { EthereumKeyPairIdentity } from '../../src/identity/EthereumKeyPairIdentity'
import { createStrictConfig } from '../../src/Config'

describe('Resends', () => {

Expand Down Expand Up @@ -47,7 +48,7 @@ describe('Resends', () => {
groupKeyQueue: await createGroupKeyQueue(identity, groupKey),
signatureValidator: mock<SignatureValidator>(),
messageSigner: new MessageSigner(identity),
config: {},
config: createStrictConfig()
})
// store the encryption key publisher's local group key store
await publisher.updateEncryptionKey({
Expand Down
Loading