Skip to content

Commit 75301ac

Browse files
authored
feat: add .echo method to echo protocol (#2766)
To make it easier to invoke the echo protocol, add a method to the service.
1 parent c5bbb25 commit 75301ac

File tree

4 files changed

+61
-3
lines changed

4 files changed

+61
-3
lines changed

packages/protocol-echo/package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@
5353
"dependencies": {
5454
"@libp2p/interface": "^2.1.3",
5555
"@libp2p/interface-internal": "^2.0.8",
56+
"@multiformats/multiaddr": "^12.3.1",
57+
"it-byte-stream": "^1.1.0",
5658
"it-pipe": "^3.0.1"
5759
},
5860
"devDependencies": {

packages/protocol-echo/src/echo.ts

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
import { byteStream } from 'it-byte-stream'
12
import { pipe } from 'it-pipe'
23
import { PROTOCOL_NAME, PROTOCOL_VERSION } from './constants.js'
34
import type { Echo as EchoInterface, EchoComponents, EchoInit } from './index.js'
4-
import type { Logger, Startable } from '@libp2p/interface'
5+
import type { AbortOptions, Logger, PeerId, Startable } from '@libp2p/interface'
6+
import type { Multiaddr } from '@multiformats/multiaddr'
57

68
/**
79
* A simple echo stream, any data received will be sent back to the sender
@@ -31,7 +33,8 @@ export class Echo implements Startable, EchoInterface {
3133
})
3234
}, {
3335
maxInboundStreams: this.init.maxInboundStreams,
34-
maxOutboundStreams: this.init.maxOutboundStreams
36+
maxOutboundStreams: this.init.maxOutboundStreams,
37+
runOnLimitedConnection: this.init.runOnLimitedConnection
3538
})
3639
this.started = true
3740
}
@@ -44,4 +47,22 @@ export class Echo implements Startable, EchoInterface {
4447
isStarted (): boolean {
4548
return this.started
4649
}
50+
51+
async echo (peer: PeerId | Multiaddr | Multiaddr[], buf: Uint8Array, options?: AbortOptions): Promise<Uint8Array> {
52+
const conn = await this.components.connectionManager.openConnection(peer, options)
53+
const stream = await conn.newStream(this.protocol, {
54+
...this.init,
55+
...options
56+
})
57+
const bytes = byteStream(stream)
58+
59+
const [, output] = await Promise.all([
60+
bytes.write(buf, options),
61+
bytes.read(buf.byteLength, options)
62+
])
63+
64+
await stream.close(options)
65+
66+
return output.subarray()
67+
}
4768
}

packages/protocol-echo/src/index.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,15 @@
4343
*/
4444

4545
import { Echo as EchoClass } from './echo.js'
46-
import type { ComponentLogger } from '@libp2p/interface'
46+
import type { ComponentLogger, PeerId } from '@libp2p/interface'
4747
import type { ConnectionManager, Registrar } from '@libp2p/interface-internal'
48+
import type { Multiaddr } from '@multiformats/multiaddr'
4849

4950
export interface EchoInit {
5051
protocolPrefix?: string
5152
maxInboundStreams?: number
5253
maxOutboundStreams?: number
54+
runOnLimitedConnection?: boolean
5355
}
5456

5557
export interface EchoComponents {
@@ -60,6 +62,7 @@ export interface EchoComponents {
6062

6163
export interface Echo {
6264
protocol: string
65+
echo(peer: PeerId | Multiaddr | Multiaddr[], buf: Uint8Array): Promise<Uint8Array>
6366
}
6467

6568
export function echo (init: EchoInit = {}): (components: EchoComponents) => Echo {

packages/protocol-echo/test/index.spec.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import { start, stop } from '@libp2p/interface'
44
import { defaultLogger } from '@libp2p/logger'
5+
import { multiaddr } from '@multiformats/multiaddr'
56
import { expect } from 'aegir/chai'
67
import all from 'it-all'
78
import { duplexPair } from 'it-pair/duplex'
@@ -76,4 +77,35 @@ describe('echo', () => {
7677

7778
expect(output).to.equalBytes(input)
7879
})
80+
81+
it('should echo data using method', async () => {
82+
await start(echo)
83+
84+
const duplex = duplexPair<any>()
85+
const outgoingStream = stubInterface<Stream>()
86+
outgoingStream.source = duplex[0].source
87+
outgoingStream.sink.callsFake(async source => duplex[0].sink(source))
88+
89+
const incomingStream = stubInterface<Stream>()
90+
incomingStream.source = duplex[1].source
91+
incomingStream.sink.callsFake(async source => duplex[1].sink(source))
92+
93+
const handler = components.registrar.handle.getCall(0).args[1]
94+
handler({
95+
stream: incomingStream,
96+
connection: stubInterface<Connection>()
97+
})
98+
99+
const ma = multiaddr('/ip4/123.123.123.123/tcp/1234')
100+
101+
components.connectionManager.openConnection.withArgs(ma).resolves(stubInterface<Connection>({
102+
newStream: async () => outgoingStream
103+
}))
104+
105+
const input = Uint8Array.from([0, 1, 2, 3])
106+
107+
const output = await echo.echo(ma, input)
108+
109+
expect(output).to.equalBytes(output)
110+
})
79111
})

0 commit comments

Comments
 (0)