11import hashes, chronos, stew/ byteutils, results, chronicles
22import libp2p/ stream/ connection
3+ import libp2p/ varint
34import ./ mix_protocol
45import ./ config
56from fragmentation import dataSize
@@ -44,27 +45,134 @@ type MixEntryConnection* = ref object of Connection
4445 mixDialer: MixDialer
4546 params: Opt [MixParameters ]
4647
48+ incoming: AsyncQueue [seq [byte ]]
49+ incomingFut: Future [void ]
50+ replyReceivedFut: Future [void ]
51+ cached: seq [byte ]
52+
53+ method readOnce * (
54+ s: MixEntryConnection , pbytes: pointer , nbytes: int
55+ ): Future [int ] {.async : (raises: [CancelledError , LPStreamError ]), public .} =
56+ if s.isEof:
57+ raise newLPStreamEOFError ()
58+
59+ try :
60+ await s.replyReceivedFut
61+ s.isEof = true
62+ if s.cached.len == 0 :
63+ raise newLPStreamEOFError ()
64+ except CancelledError as exc:
65+ raise exc
66+ except LPStreamEOFError as exc:
67+ raise exc
68+ except CatchableError as exc:
69+ raise (ref LPStreamError )(msg: " error in readOnce: " & exc.msg, parent: exc)
70+
71+ let toRead = min (nbytes, s.cached.len)
72+ copyMem (pbytes, addr s.cached[0 ], toRead)
73+ s.cached = s.cached[toRead ..^ 1 ]
74+ return toRead
75+
4776method readExactly * (
48- self : MixEntryConnection , pbytes: pointer , nbytes: int
77+ s : MixEntryConnection , pbytes: pointer , nbytes: int
4978): Future [void ] {.async : (raises: [CancelledError , LPStreamError ]), public .} =
50- await sleepAsync (10 .minutes) # TODO : implement readExactly
51- raise
52- newException (LPStreamError , " readExactly not implemented for MixEntryConnection" )
79+ # # Waits for `nbytes` to be available, then read
80+ # # them and return them
81+ if s.atEof:
82+ var ch: char
83+ discard await s.readOnce (addr ch, 1 )
84+ raise newLPStreamEOFError ()
85+
86+ if nbytes == 0 :
87+ return
88+
89+ logScope:
90+ s
91+ nbytes = nbytes
92+ objName = s.objName
93+
94+ var pbuffer = cast [ptr UncheckedArray [byte ]](pbytes)
95+ var read = 0
96+ while read < nbytes and not (s.atEof ()):
97+ read += await s.readOnce (addr pbuffer[read], nbytes - read)
98+
99+ if read == 0 :
100+ doAssert s.atEof ()
101+ trace " couldn't read all bytes, stream EOF" , s, nbytes, read
102+ # Re-readOnce to raise a more specific error than EOF
103+ # Raise EOF if it doesn't raise anything(shouldn't happen)
104+ discard await s.readOnce (addr pbuffer[read], nbytes - read)
105+
106+ raise newLPStreamEOFError ()
107+
108+ if read < nbytes:
109+ trace " couldn't read all bytes, incomplete data" , s, nbytes, read
110+ raise newLPStreamIncompleteError ()
53111
54112method readLine * (
55- self : MixEntryConnection , limit = 0 , sep = " \r\n "
113+ s : MixEntryConnection , limit = 0 , sep = " \r\n "
56114): Future [string ] {.async : (raises: [CancelledError , LPStreamError ]), public .} =
57- raise newException (LPStreamError , " readLine not implemented for MixEntryConnection" )
115+ # # Reads up to `limit` bytes are read, or a `sep` is found
116+ # TODO replace with something that exploits buffering better
117+ var lim = if limit <= 0 : - 1 else : limit
118+ var state = 0
119+
120+ while true :
121+ var ch: char
122+ await readExactly (s, addr ch, 1 )
123+
124+ if sep[state] == ch:
125+ inc (state)
126+ if state == len (sep):
127+ break
128+ else :
129+ state = 0
130+ if limit > 0 :
131+ let missing = min (state, lim - len (result ) - 1 )
132+ result .add (sep[0 ..< missing])
133+ else :
134+ result .add (sep[0 ..< state])
135+
136+ result .add (ch)
137+ if len (result ) == lim:
138+ break
58139
59140method readVarint * (
60- self : MixEntryConnection
141+ conn : MixEntryConnection
61142): Future [uint64 ] {.async : (raises: [CancelledError , LPStreamError ]), public .} =
62- raise newException (LPStreamError , " readVarint not implemented for MixEntryConnection" )
143+ var buffer: array [10 , byte ]
144+
145+ for i in 0 ..< len (buffer):
146+ await conn.readExactly (addr buffer[i], 1 )
147+
148+ var
149+ varint: uint64
150+ length: int
151+ let res = PB .getUVarint (buffer.toOpenArray (0 , i), length, varint)
152+ if res.isOk ():
153+ return varint
154+ if res.error () != VarintError .Incomplete :
155+ break
156+ if true : # can't end with a raise apparently
157+ raise (ref InvalidVarintError )(msg: " Cannot parse varint" )
63158
64159method readLp * (
65- self : MixEntryConnection , maxSize: int
160+ s : MixEntryConnection , maxSize: int
66161): Future [seq [byte ]] {.async : (raises: [CancelledError , LPStreamError ]), public .} =
67- raise newException (LPStreamError , " readLp not implemented for MixEntryConnection" )
162+ # # read length prefixed msg, with the length encoded as a varint
163+ let
164+ length = await s.readVarint ()
165+ maxLen = uint64 (if maxSize < 0 : int .high else : maxSize)
166+
167+ if length > maxLen:
168+ raise (ref MaxSizeError )(msg: " Message exceeds maximum length" )
169+
170+ if length == 0 :
171+ return
172+
173+ var res = newSeqUninitialized [byte ](length)
174+ await s.readExactly (addr res[0 ], res.len)
175+ res
68176
69177method write * (
70178 self: MixEntryConnection , msg: seq [byte ]
@@ -112,6 +220,7 @@ proc shortLog*(self: MixEntryConnection): string {.raises: [].} =
112220method closeImpl * (
113221 self: MixEntryConnection
114222): Future [void ] {.async : (raises: [], raw: true ).} =
223+ self.incomingFut.cancelSoon ()
115224 let fut = newFuture [void ]()
116225 fut.complete ()
117226 return fut
@@ -123,22 +232,6 @@ when defined(libp2p_agents_metrics):
123232 proc setShortAgent * (self: MixEntryConnection , shortAgent: string ) =
124233 discard
125234
126- proc new * (
127- T: typedesc [MixEntryConnection ],
128- srcMix: MixProtocol ,
129- destination: Destination ,
130- codec: string ,
131- mixDialer: MixDialer ,
132- params: Opt [MixParameters ],
133- ): T =
134- let instance =
135- T (destination: destination, codec: codec, mixDialer: mixDialer, params: params)
136-
137- when defined (libp2p_agents_metrics):
138- instance.shortAgent = connection.shortAgent
139-
140- instance
141-
142235proc new * (
143236 T: typedesc [MixEntryConnection ],
144237 srcMix: MixProtocol ,
@@ -154,7 +247,20 @@ proc new*(
154247 else :
155248 0
156249
157- var sendDialerFunc = proc (
250+ var instance = T ()
251+ instance.destination = destination
252+ instance.codec = codec
253+ instance.params = Opt .some (params)
254+
255+ if expectReply:
256+ instance.incoming = newAsyncQueue [seq [byte ]]()
257+ instance.replyReceivedFut = newFuture [void ]()
258+ let checkForIncoming = proc (): Future [void ] {.async : (raises: [CancelledError ]).} =
259+ instance.cached = await instance.incoming.get ()
260+ instance.replyReceivedFut.complete ()
261+ instance.incomingFut = checkForIncoming ()
262+
263+ instance.mixDialer = proc (
158264 msg: seq [byte ], codec: string , dest: Destination
159265 ): Future [void ] {.async : (raises: [CancelledError , LPStreamError ]).} =
160266 try :
@@ -164,12 +270,17 @@ proc new*(
164270 else :
165271 (Opt .none (PeerId ), Opt .some (MixDestination .init (dest.peerId, dest.address)))
166272
167- await srcMix.anonymizeLocalProtocolSend (msg, codec, peerId, destination, surbs)
273+ await srcMix.anonymizeLocalProtocolSend (
274+ instance.incoming, msg, codec, peerId, destination, surbs
275+ )
168276 except CatchableError as e:
169277 error " Error during execution of anonymizeLocalProtocolSend: " , err = e.msg
170278 return
171279
172- T.new (srcMix, destination, codec, sendDialerFunc, Opt .some (params))
280+ when defined (libp2p_agents_metrics):
281+ instance.shortAgent = connection.shortAgent
282+
283+ instance
173284
174285proc toConnection * (
175286 srcMix: MixProtocol ,
0 commit comments