Skip to content

Commit d8d873b

Browse files
committed
chore: fix ping
1 parent f52569f commit d8d873b

File tree

6 files changed

+65
-76
lines changed

6 files changed

+65
-76
lines changed

packages/interface-internal/src/connection-manager.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { AbortOptions, PendingDial, Connection, MultiaddrConnection, PeerId, IsDialableOptions, OpenConnectionProgressEvents, Stream } from '@libp2p/interface'
1+
import type { AbortOptions, PendingDial, Connection, MultiaddrConnection, PeerId, IsDialableOptions, OpenConnectionProgressEvents, Stream, NewStreamOptions } from '@libp2p/interface'
22
import type { PeerMap } from '@libp2p/peer-collections'
33
import type { Multiaddr } from '@multiformats/multiaddr'
44
import type { ProgressOptions } from 'progress-events'
@@ -96,7 +96,7 @@ export interface ConnectionManager {
9696
* @param options - Optional parameters for connection handling.
9797
* @returns A promise that resolves to a `Connection` object.
9898
*/
99-
openStream(peer: PeerId | Multiaddr | Multiaddr[], protocol: string | string[], options?: OpenConnectionOptions): Promise<Stream>
99+
openStream(peer: PeerId | Multiaddr | Multiaddr[], protocol: string | string[], options?: OpenConnectionOptions & NewStreamOptions): Promise<Stream>
100100

101101
/**
102102
* Close our connections to a peer

packages/libp2p/src/connection-manager/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import { ReconnectQueue } from './reconnect-queue.js'
1212
import { dnsaddrResolver } from './resolvers/index.ts'
1313
import { findExistingConnection, multiaddrToIpNet } from './utils.js'
1414
import type { IpNet } from '@chainsafe/netmask'
15-
import type { PendingDial, AddressSorter, Libp2pEvents, AbortOptions, ComponentLogger, Logger, Connection, MultiaddrConnection, ConnectionGater, Metrics, PeerId, PeerStore, Startable, PendingDialStatus, PeerRouting, IsDialableOptions, MultiaddrResolver, Stream } from '@libp2p/interface'
15+
import type { PendingDial, AddressSorter, Libp2pEvents, AbortOptions, ComponentLogger, Logger, Connection, MultiaddrConnection, ConnectionGater, Metrics, PeerId, PeerStore, Startable, PendingDialStatus, PeerRouting, IsDialableOptions, MultiaddrResolver, Stream, NewStreamOptions } from '@libp2p/interface'
1616
import type { ConnectionManager, OpenConnectionOptions, TransportManager } from '@libp2p/interface-internal'
1717
import type { JobStatus } from '@libp2p/utils'
1818
import type { Multiaddr } from '@multiformats/multiaddr'
@@ -614,7 +614,7 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
614614
}
615615
}
616616

617-
async openStream (peerIdOrMultiaddr: PeerId | Multiaddr | Multiaddr[], protocol: string | string[], options: OpenConnectionOptions = {}): Promise<Stream> {
617+
async openStream (peerIdOrMultiaddr: PeerId | Multiaddr | Multiaddr[], protocol: string | string[], options: OpenConnectionOptions & NewStreamOptions = {}): Promise<Stream> {
618618
const connection = await this.openConnection(peerIdOrMultiaddr, options)
619619

620620
return connection.newStream(protocol, options)

packages/protocol-echo/src/echo.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { ProtocolError } from '@libp2p/interface'
1+
import { ProtocolError, setMaxListeners } from '@libp2p/interface'
22
import { UnexpectedEOFError } from '@libp2p/utils'
33
import { pEvent } from 'p-event'
44
import { Uint8ArrayList } from 'uint8arraylist'
@@ -33,6 +33,7 @@ export class Echo implements Startable, EchoInterface {
3333
const log = stream.log.newScope('echo')
3434
const start = Date.now()
3535
const signal = AbortSignal.timeout(this.timeout)
36+
setMaxListeners(Infinity, signal)
3637
let bytes = 0
3738

3839
for await (const buf of stream) {
@@ -44,9 +45,11 @@ export class Echo implements Startable, EchoInterface {
4445
}
4546

4647
if (!stream.send(buf)) {
47-
log('waiting for stream to drain')
48-
await pEvent(stream, 'drain')
49-
log('stream drained')
48+
await pEvent(stream, 'drain', {
49+
rejectionEvents: [
50+
'close'
51+
]
52+
})
5053
}
5154
}
5255

packages/protocol-ping/package.json

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,15 @@
4646
"@libp2p/crypto": "^5.1.7",
4747
"@libp2p/interface": "^2.10.5",
4848
"@libp2p/interface-internal": "^2.3.18",
49-
"@libp2p/utils": "^6.7.1",
5049
"@multiformats/multiaddr": "^12.4.4",
51-
"main-event": "^1.0.1",
50+
"p-event": "^6.0.1",
51+
"race-signal": "^2.0.0",
52+
"uint8arraylist": "^2.4.8",
5253
"uint8arrays": "^5.1.0"
5354
},
5455
"devDependencies": {
55-
"@libp2p/logger": "^5.1.21",
5656
"@libp2p/peer-id": "^5.1.8",
57+
"@libp2p/utils": "^6.7.1",
5758
"aegir": "^47.0.14",
5859
"delay": "^6.0.0",
5960
"sinon": "^21.0.0",

packages/protocol-ping/src/ping.ts

Lines changed: 48 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import { randomBytes } from '@libp2p/crypto'
2-
import { ProtocolError, serviceCapabilities } from '@libp2p/interface'
3-
import { byteStream } from '@libp2p/utils'
4-
import { setMaxListeners } from 'main-event'
2+
import { ProtocolError, serviceCapabilities, setMaxListeners, TimeoutError } from '@libp2p/interface'
3+
import { pEvent } from 'p-event'
4+
import { raceSignal } from 'race-signal'
5+
import { Uint8ArrayList } from 'uint8arraylist'
56
import { equals as uint8ArrayEquals } from 'uint8arrays/equals'
67
import { PROTOCOL_PREFIX, PROTOCOL_NAME, PING_LENGTH, PROTOCOL_VERSION, TIMEOUT, MAX_INBOUND_STREAMS, MAX_OUTBOUND_STREAMS } from './constants.js'
78
import type { PingComponents, PingInit, Ping as PingInterface } from './index.js'
@@ -59,25 +60,32 @@ export class Ping implements Startable, PingInterface {
5960
async handlePing (stream: Stream, connection: Connection): Promise<void> {
6061
const log = stream.log.newScope('ping')
6162
log.trace('ping from %p', connection.remotePeer)
62-
6363
const signal = AbortSignal.timeout(this.timeout)
6464
setMaxListeners(Infinity, signal)
65-
65+
signal.addEventListener('abort', () => {
66+
stream.abort(new TimeoutError('Ping timed out'))
67+
})
6668
const start = Date.now()
67-
const bytes = byteStream(stream)
6869

69-
while (stream.readStatus === 'readable') {
70-
const buf = await bytes.read({
71-
bytes: PING_LENGTH,
72-
signal
73-
})
74-
await bytes.write(buf, {
75-
signal
76-
})
70+
for await (const buf of stream) {
71+
if (stream.status !== 'open') {
72+
log('stream status changed to %s', stream.status)
73+
break
74+
}
7775

78-
log('ping from %p complete in %dms', connection.remotePeer, Date.now() - start)
76+
if (!stream.send(buf)) {
77+
log('waiting for stream to drain')
78+
await pEvent(stream, 'drain', {
79+
rejectionEvents: [
80+
'close'
81+
]
82+
})
83+
log('stream drained')
84+
}
7985
}
8086

87+
log('ping from %p complete in %dms', connection.remotePeer, Date.now() - start)
88+
8189
await stream.close({
8290
signal
8391
})
@@ -87,50 +95,38 @@ export class Ping implements Startable, PingInterface {
8795
* Ping a given peer and wait for its response, getting the operation latency.
8896
*/
8997
async ping (peer: PeerId | Multiaddr | Multiaddr[], options: AbortOptions = {}): Promise<number> {
90-
const start = Date.now()
9198
const data = randomBytes(PING_LENGTH)
92-
const connection = await this.components.connectionManager.openConnection(peer, options)
93-
const log = connection.log.newScope('ping')
94-
let stream: Stream | undefined
95-
96-
if (options.signal == null) {
97-
const signal = AbortSignal.timeout(this.timeout)
98-
99-
options = {
100-
...options,
101-
signal
102-
}
103-
}
99+
const stream = await this.components.connectionManager.openStream(peer, this.protocol, {
100+
runOnLimitedConnection: this.runOnLimitedConnection,
101+
...options
102+
})
103+
const log = stream.log.newScope('ping')
104104

105105
try {
106-
stream = await connection.newStream(this.protocol, {
107-
...options,
108-
runOnLimitedConnection: this.runOnLimitedConnection
106+
const start = Date.now()
107+
const finished = Promise.withResolvers<number>()
108+
const received = new Uint8ArrayList()
109+
110+
stream.addEventListener('message', (evt) => {
111+
received.append(evt.data)
112+
113+
if (received.byteLength === PING_LENGTH) {
114+
const rtt = Date.now() - start
115+
116+
if (!uint8ArrayEquals(data, received.subarray())) {
117+
finished.reject(new ProtocolError(`Received wrong ping ack after ${rtt}ms`))
118+
} else {
119+
finished.resolve(rtt)
120+
}
121+
}
109122
})
110123

111-
const bytes = byteStream(stream)
112-
113-
const [, result] = await Promise.all([
114-
bytes.write(data, options),
115-
bytes.read({
116-
...options,
117-
bytes: PING_LENGTH
118-
})
119-
])
120-
121-
const ms = Date.now() - start
122-
123-
stream.close()
124-
125-
if (!uint8ArrayEquals(data, result.subarray())) {
126-
throw new ProtocolError(`Received wrong ping ack after ${ms}ms`)
127-
}
128-
129-
log('ping %p complete in %dms', connection.remotePeer, ms)
124+
stream.send(data)
125+
await stream.close(options)
130126

131-
return ms
127+
return await raceSignal(finished.promise, options.signal)
132128
} catch (err: any) {
133-
log.error('error while pinging %p', connection.remotePeer, err)
129+
log.error('error while pinging %o - %e', peer, err)
134130

135131
stream?.abort(err)
136132

packages/protocol-ping/test/index.spec.ts

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import { generateKeyPair } from '@libp2p/crypto/keys'
44
import { start } from '@libp2p/interface'
5-
import { defaultLogger } from '@libp2p/logger'
65
import { peerIdFromPrivateKey } from '@libp2p/peer-id'
76
import { streamPair, byteStream, echo } from '@libp2p/utils'
87
import { expect } from 'aegir/chai'
@@ -50,12 +49,7 @@ describe('ping', () => {
5049
incomingStream.close()
5150
})
5251

53-
const connection = stubInterface<Connection>({
54-
log: defaultLogger().forComponent('connection')
55-
})
56-
components.connectionManager.openConnection.withArgs(remotePeer).resolves(connection)
57-
58-
connection.newStream.withArgs(PING_PROTOCOL).resolves(outgoingStream)
52+
components.connectionManager.openStream.withArgs(remotePeer, PING_PROTOCOL).resolves(outgoingStream)
5953

6054
// Run ping
6155
await expect(ping.ping(remotePeer)).to.eventually.be.gte(0)
@@ -71,12 +65,7 @@ describe('ping', () => {
7165

7266
void echo(incomingStream)
7367

74-
const connection = stubInterface<Connection>({
75-
log: defaultLogger().forComponent('connection')
76-
})
77-
components.connectionManager.openConnection.withArgs(remotePeer).resolves(connection)
78-
79-
connection.newStream.withArgs(PING_PROTOCOL).resolves(outgoingStream)
68+
components.connectionManager.openStream.withArgs(remotePeer, PING_PROTOCOL).resolves(outgoingStream)
8069

8170
// 10 ms timeout
8271
const signal = AbortSignal.timeout(timeout)

0 commit comments

Comments
 (0)