Skip to content

Commit bde94c9

Browse files
authored
feat(cli-tools): Binary publish (#3282)
Added a `--binary` flag to the `stream subscribe` and `stream publish` commands. When this flag is enabled, the commands read and write binary bytes instead of normal JSON or hex strings. If the `--with-metadata` flag is also used, the bytes represent the protobuf-encoded `StreamMessage` objects. All messages use a length-prefixed frame format: each item is prefixed with a 4-byte header indicating the frame length. A new `--raw` flag was added to `stream publish`, allowing protobuf binary messages to be sent exactly as provided. Currently this flag is supported only when both `--binary` and `--with-metadata` are used, since raw publishing requires `StreamMessage` objects as input. ## Helpers These existing helpers may be useful when working with the new features introduced in this PR: - `convertStreamMessageToBytes()` and `convertBytesToStreamMessage()` from `@streamr/sdk` can be used to convert between `StreamMessage` objects and their protobuf byte form - `toLengthPrefixedFrame()` and `LengthPrefixedFrameDecoder` from `@streamr/utils` can be used to create and parse length-prefixed frames ## Related changes Added `MessageSigner` export to the SDK. This class is required in `stream-publish.test.ts`. Refactored the CLI tool’s `startCommand` test utilities so that they can capture either text or binary output. ## Future improvements We could support `--raw` publishing without the `--binary` flag by constructing `StreamMessage` instances directly in the CLI tool. See `stream-publish.test.ts` for an example of how to create messages.
1 parent 1723408 commit bde94c9

File tree

8 files changed

+273
-69
lines changed

8 files changed

+273
-69
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ Changes before Tatum release are not documented in this file.
4848

4949
- Add `--partition` flag to `stream publish` (https://github.com/streamr-dev/network/pull/3262)
5050
- Add `--with-metadata` flag to `stream publish` (https://github.com/streamr-dev/network/pull/3265)
51+
- Add `--binary` flag to `stream publish` and `stream subcribe` (https://github.com/streamr-dev/network/pull/3282)
52+
- Add `--raw` flag to `stream publish` (https://github.com/streamr-dev/network/pull/3282)
5153

5254
#### Added
5355

Lines changed: 73 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
#!/usr/bin/env node
22
import '../src/logLevel'
33

4-
import { PublishMetadata, StreamrClient } from '@streamr/sdk'
5-
import { hexToBinary, merge, wait } from '@streamr/utils'
4+
import { convertBytesToStreamMessage, PublishMetadata, StreamMessage, StreamrClient } from '@streamr/sdk'
5+
import { hexToBinary, LengthPrefixedFrameDecoder, merge, toEthereumAddress, toStreamID, wait } from '@streamr/utils'
66
import es from 'event-stream'
77
import { Writable } from 'stream'
88
import { Options as BaseOptions, createClientCommand } from '../src/command'
@@ -11,53 +11,83 @@ import { createFnParseInt } from '../src/common'
1111
interface Options extends BaseOptions {
1212
partition?: number
1313
partitionKeyField?: string
14+
raw: boolean
1415
withMetadata: boolean
16+
binary: boolean
1517
}
1618

1719
const isHexadecimal = (str: string): boolean => {
1820
return /^[0-9a-fA-F]+$/.test(str)
1921
}
2022

21-
const publishStream = (
23+
const publishStream = async (
2224
streamId: string,
2325
partition: number | undefined,
2426
partitionKeyField: string | undefined,
27+
raw: boolean,
2528
withMetadata: boolean,
29+
binary: boolean,
2630
client: StreamrClient
27-
): Writable => {
31+
): Promise<Writable> => {
32+
const fullStreamId = toStreamID(streamId, toEthereumAddress(await client.getAddress()))
2833
const writable = new Writable({
2934
objectMode: true,
3035
write: (data: any, _: any, done: any) => {
3136
let content: any
3237
let metadata: PublishMetadata
33-
// ignore newlines, etc
34-
if (!data || String(data).trim() === '') {
35-
done()
36-
return
37-
}
38-
const trimmedData = String(data).trim()
39-
try {
38+
let streamMessage: StreamMessage | undefined = undefined
39+
if (binary) {
4040
if (withMetadata) {
41-
const payload = JSON.parse(trimmedData)
42-
if (payload.content === undefined) {
43-
throw new Error('invalid input: no content')
41+
streamMessage = convertBytesToStreamMessage(data)
42+
content = streamMessage.content
43+
metadata = {
44+
timestamp: streamMessage.getTimestamp(),
45+
msgChainId: streamMessage.getMsgChainId()
4446
}
45-
content = isHexadecimal(payload.content) ? hexToBinary(payload.content) : payload.content
46-
metadata = payload.metadata ?? {}
4747
} else {
48-
content = isHexadecimal(trimmedData) ? hexToBinary(trimmedData) : JSON.parse(trimmedData)
48+
content = data
4949
metadata = {}
5050
}
51-
} catch (e) {
52-
console.error(data.toString())
53-
done(e)
54-
return
51+
} else {
52+
// ignore newlines, etc
53+
if (!data || String(data).trim() === '') {
54+
done()
55+
return
56+
}
57+
const trimmedData = String(data).trim()
58+
try {
59+
if (withMetadata) {
60+
const payload = JSON.parse(trimmedData)
61+
if (payload.content === undefined) {
62+
throw new Error('invalid input: no content')
63+
}
64+
content = isHexadecimal(payload.content) ? hexToBinary(payload.content) : payload.content
65+
metadata = payload.metadata ?? {}
66+
} else {
67+
content = isHexadecimal(trimmedData) ? hexToBinary(trimmedData) : JSON.parse(trimmedData)
68+
metadata = {}
69+
}
70+
} catch (e) {
71+
console.error(data.toString())
72+
done(e)
73+
return
74+
}
75+
}
76+
if (raw) {
77+
if (streamMessage!.getStreamId() !== fullStreamId) {
78+
throw new Error(`invalid input: stream IDs don't match: expected=${fullStreamId}, actual=${streamMessage!.getStreamId()}`)
79+
}
80+
client.publishRaw(streamMessage!).then(
81+
() => done(),
82+
(err) => done(err)
83+
)
84+
} else {
85+
const partitionKey = (partitionKeyField !== undefined && typeof content === 'object') ? content[partitionKeyField] : undefined
86+
client.publish({ streamId, partition }, content, merge(metadata, { partitionKey })).then(
87+
() => done(),
88+
(err) => done(err)
89+
)
5590
}
56-
const partitionKey = (partitionKeyField !== undefined && typeof content === 'object') ? content[partitionKeyField] : undefined
57-
client.publish({ streamId, partition }, content, merge(metadata, { partitionKey })).then(
58-
() => done(),
59-
(err) => done(err)
60-
)
6191
}
6292
})
6393
return writable
@@ -68,10 +98,22 @@ createClientCommand(async (client: StreamrClient, streamId: string, options: Opt
6898
console.error('Invalid combination of "partition" and "partition-key-field"')
6999
process.exit(1)
70100
}
71-
const ps = publishStream(streamId, options.partition, options.partitionKeyField, options.withMetadata, client)
101+
if (options.raw) {
102+
if (!options.binary || !options.withMetadata) {
103+
console.error('raw publish not supported when publishing without metadata and binary')
104+
process.exit(1)
105+
}
106+
if (options.partitionKeyField !== undefined) {
107+
console.error('partition key field not supported when publishing raw')
108+
process.exit(1)
109+
}
110+
}
111+
const ps = await publishStream(streamId, options.partition, options.partitionKeyField, options.raw, options.withMetadata, options.binary, client)
72112
return new Promise((resolve, reject) => {
73-
process.stdin
74-
.pipe(es.split())
113+
const inputStream = options.binary
114+
? process.stdin.pipe(new LengthPrefixedFrameDecoder())
115+
: process.stdin.pipe(es.split())
116+
inputStream
75117
.pipe(ps)
76118
.once('finish', async () => {
77119
// We need to wait some time because the client.publish() may resolve the promise
@@ -92,5 +134,7 @@ createClientCommand(async (client: StreamrClient, streamId: string, options: Opt
92134
.option('-p, --partition <partition>', 'partition', createFnParseInt('--partition'))
93135
// eslint-disable-next-line max-len
94136
.option('-k, --partition-key-field <string>', 'field name in each message to use for assigning the message to a stream partition (only for JSON data)')
137+
.option('-r, --raw', 'publish raw', false)
95138
.option('-m, --with-metadata', 'each input contains both the content and the metadata', false)
139+
.option('-b, --binary', 'binary input using length-prefixed frames', false)
96140
.parseAsync()
Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,52 @@
11
#!/usr/bin/env node
22
import '../src/logLevel'
33

4-
import omit from 'lodash/omit'
5-
import isString from 'lodash/isString'
4+
import { convertStreamMessageToBytes, MessageMetadata, StreamMessage, StreamrClient } from '@streamr/sdk'
5+
import { binaryToHex, toLengthPrefixedFrame } from '@streamr/utils'
66
import mapValues from 'lodash/mapValues'
7-
import { StreamrClient, MessageMetadata } from '@streamr/sdk'
8-
import { createClientCommand, Options as BaseOptions } from '../src/command'
7+
import isString from 'lodash/isString'
8+
import omit from 'lodash/omit'
9+
import { Options as BaseOptions, createClientCommand } from '../src/command'
910
import { createFnParseInt } from '../src/common'
10-
import { binaryToHex } from '@streamr/utils'
1111

1212
interface Options extends BaseOptions {
1313
partition: number
1414
disableOrdering: boolean
1515
raw: boolean
1616
withMetadata: boolean
17+
binary: boolean
1718
}
1819

1920
const withBinaryFieldsAsHex = (metadata: Record<string, any>) => {
2021
return mapValues(metadata, (value) => value instanceof Uint8Array ? binaryToHex(value) : value)
2122
}
2223

2324
createClientCommand(async (client: StreamrClient, streamId: string, options: Options) => {
24-
const formContent = (content: unknown) => content instanceof Uint8Array ? binaryToHex(content) : content
25-
const formMessage = options.withMetadata
26-
? (content: unknown, metadata: MessageMetadata) => ({
27-
content: formContent(content),
28-
metadata: withBinaryFieldsAsHex(omit(metadata, 'streamMessage'))
29-
})
30-
: (content: unknown) => formContent(content)
31-
await client.subscribe({
25+
const sub = await client.subscribe({
3226
streamId,
3327
partition: options.partition,
3428
raw: options.raw
35-
}, (content, metadata) => {
36-
const output = formMessage(content, metadata)
37-
console.info(isString(output) ? output : JSON.stringify(output))
3829
})
30+
for await (const msg of sub) {
31+
if (options.binary) {
32+
// @ts-expect-error private field
33+
const streamMessage = msg.streamMessage as StreamMessage
34+
const binaryData = options.withMetadata
35+
? convertStreamMessageToBytes(streamMessage)
36+
: streamMessage.content
37+
process.stdout.write(toLengthPrefixedFrame(binaryData))
38+
} else {
39+
const formContent = (content: unknown) => content instanceof Uint8Array ? binaryToHex(content) : content
40+
const formMessage = options.withMetadata
41+
? (content: unknown, metadata: MessageMetadata) => ({
42+
content: formContent(content),
43+
metadata: withBinaryFieldsAsHex(omit(metadata, 'streamMessage'))
44+
})
45+
: (content: unknown) => formContent(content)
46+
const output = formMessage(msg.content, omit(msg, 'content'))
47+
console.info(isString(output) ? output : JSON.stringify(output))
48+
}
49+
}
3950
}, {
4051
autoDestroyClient: false,
4152
clientOptionsFactory: (options) => ({
@@ -48,4 +59,5 @@ createClientCommand(async (client: StreamrClient, streamId: string, options: Opt
4859
.option('-d, --disable-ordering', 'disable ordering of messages by OrderingUtil', false)
4960
.option('-r, --raw', 'subscribe raw', false)
5061
.option('-m, --with-metadata', 'print each message with its metadata included', false)
62+
.option('-b, --binary', 'binary output using length-prefixed frames', false)
5163
.parseAsync()

packages/cli-tools/test/mock-data.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ describe('mock-data', () => {
99
const outputIterable = startCommand('mock-data generate', {
1010
abortSignal: abortController.signal,
1111
devEnvironment: false
12-
})
12+
}).asLines()
1313
const firstLine = (await collect(outputIterable, 1))[0]
1414
abortController.abort()
1515
const json = JSON.parse(firstLine)
@@ -21,7 +21,7 @@ describe('mock-data', () => {
2121
const outputIterable = startCommand('mock-data generate --binary --min-length 32 --max-length 64', {
2222
abortSignal: abortController.signal,
2323
devEnvironment: false
24-
})
24+
}).asLines()
2525
const firstLine = (await collect(outputIterable, 1))[0]
2626
abortController.abort()
2727
expect(firstLine).toMatch(/^[0-9a-f]+$/)

0 commit comments

Comments
 (0)