1- import * as lp from 'it-length-prefixed '
2- import { pipe } from 'it-pipe '
1+ import { TimeoutError } from '@libp2p/interface '
2+ import { pbStream } from 'it-protobuf-stream '
33import { Message , MessageType } from '../message/dht.js'
44import { AddProviderHandler } from './handlers/add-provider.js'
55import { FindNodeHandler } from './handlers/find-node.js'
@@ -30,6 +30,7 @@ export interface RPCInit {
3030 metricsPrefix : string
3131 datastorePrefix : string
3232 peerInfoMapper : PeerInfoMapper
33+ incomingMessageTimeout ?: number
3334}
3435
3536export interface RPCComponents extends GetValueHandlerComponents , PutValueHandlerComponents , FindNodeHandlerComponents , GetProvidersHandlerComponents {
@@ -38,21 +39,22 @@ export interface RPCComponents extends GetValueHandlerComponents, PutValueHandle
3839
3940export class RPC {
4041 private readonly handlers : Record < string , DHTMessageHandler >
41- private readonly routingTable : RoutingTable
4242 private readonly log : Logger
4343 private readonly metrics : {
4444 operations ?: CounterGroup
4545 errors ?: CounterGroup
4646 }
4747
48+ private readonly incomingMessageTimeout : number
49+
4850 constructor ( components : RPCComponents , init : RPCInit ) {
4951 this . metrics = {
5052 operations : components . metrics ?. registerCounterGroup ( `${ init . metricsPrefix } _inbound_rpc_requests_total` ) ,
5153 errors : components . metrics ?. registerCounterGroup ( `${ init . metricsPrefix } _inbound_rpc_errors_total` )
5254 }
5355
5456 this . log = components . logger . forComponent ( `${ init . logPrefix } :rpc` )
55- this . routingTable = init . routingTable
57+ this . incomingMessageTimeout = init . incomingMessageTimeout ?? 10_000
5658 this . handlers = {
5759 [ MessageType . GET_VALUE . toString ( ) ] : new GetValueHandler ( components , init ) ,
5860 [ MessageType . PUT_VALUE . toString ( ) ] : new PutValueHandler ( components , init ) ,
@@ -92,34 +94,46 @@ export class RPC {
9294 * Handle incoming streams on the dht protocol
9395 */
9496 onIncomingStream ( data : IncomingStreamData ) : void {
95- let message = 'unknown'
97+ const message = 'unknown'
9698
9799 Promise . resolve ( ) . then ( async ( ) => {
98100 const { stream, connection } = data
99- const peerId = connection . remotePeer
100-
101- const self = this
102-
103- await pipe (
104- stream ,
105- ( source ) => lp . decode ( source ) ,
106- async function * ( source ) {
107- for await ( const msg of source ) {
108- // handle the message
109- const desMessage = Message . decode ( msg )
110- message = desMessage . type
111- self . log ( 'incoming %s from %p' , desMessage . type , peerId )
112- const res = await self . handleMessage ( peerId , desMessage )
113-
114- // Not all handlers will return a response
115- if ( res != null ) {
116- yield Message . encode ( res )
117- }
101+
102+ const abortListener = ( ) : void => {
103+ stream . abort ( new TimeoutError ( ) )
104+ }
105+
106+ let signal = AbortSignal . timeout ( this . incomingMessageTimeout )
107+ signal . addEventListener ( 'abort' , abortListener )
108+
109+ const messages = pbStream ( stream ) . pb ( Message )
110+
111+ try {
112+ while ( true ) {
113+ const message = await messages . read ( {
114+ signal
115+ } )
116+
117+ // handle the message
118+ this . log ( 'incoming %s from %p' , message . type , connection . remotePeer )
119+ const res = await this . handleMessage ( connection . remotePeer , message )
120+
121+ // Not all handlers will return a response
122+ if ( res != null ) {
123+ await messages . write ( res , {
124+ signal
125+ } )
118126 }
119- } ,
120- ( source ) => lp . encode ( source ) ,
121- stream
122- )
127+
128+ // we have received a message so reset the timeout controller to
129+ // allow the remote to send another
130+ signal . removeEventListener ( 'abort' , abortListener )
131+ signal = AbortSignal . timeout ( this . incomingMessageTimeout )
132+ signal . addEventListener ( 'abort' , abortListener )
133+ }
134+ } catch ( err : any ) {
135+ stream . abort ( err )
136+ }
123137 } )
124138 . catch ( err => {
125139 this . log . error ( 'error handling %s RPC message from %p - %e' , message , data . connection . remotePeer , err )
0 commit comments