Skip to content
This repository was archived by the owner on Jan 30, 2026. It is now read-only.

Commit d856fcc

Browse files
committed
feat: add back exit == destination
1 parent 80a97d3 commit d856fcc

File tree

5 files changed

+377
-68
lines changed

5 files changed

+377
-68
lines changed

README.md

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,31 +38,56 @@ It provides a basis for future development and invites community experimentation
3838
```bash
3939
nimble install
4040
```
41-
41+
4242
## Running Tests
4343

4444
Execute the test suite with:
4545

46-
```bash
47-
nimble test
48-
```
46+
```bash
47+
nimble test
48+
```
4949

5050
## Usage
5151

5252
Run the Mix protocol proof-of-concepts:
5353

54-
```bash
55-
nim c -r examples/poc_gossipsub.nim
56-
nim c -r examples/poc_resp_ping.nim
57-
nim c -r examples/poc_noresp_ping.nim
58-
```
54+
```bash
55+
nim c -r examples/poc_gossipsub.nim
56+
nim c -r examples/poc_resp_ping.nim
57+
nim c -r examples/poc_noresp_ping.nim
58+
```
59+
60+
## Using experimental `exit == destination`
61+
62+
1. Compile with: `-d:mix_experimental_exit_is_destination`
63+
2. In `toConnection` you can now specify the behavior the exit node will have:`
64+
65+
```
66+
# Exit != destination (the default)
67+
# The exit node will forward the request to the destination
68+
# You can also use MixDestination.init instead
69+
let theDestination = MixDestination.forwardToAddr(thePeerId, theMultiAddress)
70+
let conn = mixProto.toConnection(
71+
theDestination,
72+
theCodec,
73+
).expect("should build connection")
74+
75+
76+
# Exit == destination
77+
# The protocol handler will be executed at the exit node
78+
let theDestination = MixDestination.exitNode(thePeerId)
79+
let conn = mixProto.toConnection(
80+
theDestination,
81+
theCodec,
82+
).expect("should build connection")
83+
```
5984

6085
## Current Implementation Challenges
6186

6287
1. **Protocol Handler Diversity**: Existing protocols have diverse input formats for handlers and send functions,
63-
complicating the integration.
88+
complicating the integration.
6489
2. **Function Call Complexity**: Difficulty in calling Mix send/handler functions from existing protocols
65-
without significant overrides to send functions (and handlers in some cases, *e.g.,* ping).
90+
without significant overrides to send functions (and handlers in some cases, _e.g.,_ ping).
6691

6792
## Transport Approach
6893

mix.nim

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ export registerDestReadBehavior
2626
export MixNodes
2727
export initMixMultiAddrByIndex
2828

29+
when defined(mix_experimental_exit_is_destination):
30+
export exitNode
31+
export forwardToAddr
32+
2933
proc readLp*(maxSize: int): destReadBehaviorCb =
3034
## create callback to read length prefixed msg, with the length encoded as a varint
3135
return proc(

mix/exit_connection.nim

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
import hashes, chronos, libp2p/varint, stew/byteutils
2+
import libp2p/stream/connection
3+
from fragmentation import dataSize
4+
5+
type MixExitConnection* = ref object of Connection
6+
message: seq[byte]
7+
response: seq[byte]
8+
9+
method join*(
10+
self: MixExitConnection
11+
): Future[void] {.async: (raises: [CancelledError], raw: true), public.} =
12+
discard
13+
14+
method readExactly*(
15+
self: MixExitConnection, pbytes: pointer, nbytes: int
16+
): Future[void] {.async: (raises: [CancelledError, LPStreamError]), public.} =
17+
if nbytes == 0:
18+
return
19+
20+
if self.message.len < nbytes:
21+
raise newException(
22+
LPStreamError, "Not enough data in to read exactly " & $nbytes & " bytes."
23+
)
24+
25+
var pbuffer = cast[ptr UncheckedArray[byte]](pbytes)
26+
for i in 0 ..< nbytes:
27+
pbuffer[i] = self.message[i]
28+
29+
if nbytes < self.message.len:
30+
self.message = self.message[nbytes .. ^1]
31+
else:
32+
self.isEof = true
33+
self.message = @[]
34+
35+
# ToDo: Check readLine, readVarint implementations
36+
method readLine*(
37+
self: MixExitConnection, limit = 0, sep = "\r\n"
38+
): Future[string] {.async: (raises: [CancelledError, LPStreamError]), public.} =
39+
var
40+
lim = if limit <= 0: -1 else: limit
41+
result: seq[byte] = @[]
42+
state = 0
43+
44+
while true:
45+
if state < len(sep):
46+
if self.message.len == 0:
47+
raise newException(LPStreamError, "Not enough data to read line.")
48+
49+
let ch = self.message[0]
50+
self.message.delete(0)
51+
52+
if byte(sep[state]) == ch:
53+
inc(state)
54+
if state == len(sep):
55+
break
56+
else:
57+
result.add(ch)
58+
state = 0
59+
60+
if lim > 0 and len(result) == lim:
61+
break
62+
else:
63+
break
64+
65+
return cast[string](result)
66+
67+
method readVarint*(
68+
self: MixExitConnection
69+
): Future[uint64] {.async: (raises: [CancelledError, LPStreamError]), public.} =
70+
var
71+
buffer: array[10, byte]
72+
bytesRead = 0
73+
74+
while bytesRead < buffer.len:
75+
if self.message.len == 0:
76+
raise newException(LPStreamError, "Not enough data to read varint")
77+
78+
buffer[bytesRead] = self.message[0]
79+
self.message.delete(0)
80+
bytesRead += 1
81+
82+
var
83+
varint: uint64
84+
length: int
85+
let res = PB.getUVarint(buffer.toOpenArray(0, bytesRead - 1), length, varint)
86+
if res.isOk():
87+
return varint
88+
if res.error() != VarintError.Incomplete:
89+
break
90+
91+
raise newException(LPStreamError, "Cannot parse varint")
92+
93+
method readLp*(
94+
self: MixExitConnection, maxSize: int
95+
): Future[seq[byte]] {.async: (raises: [CancelledError, LPStreamError]), public.} =
96+
let
97+
length = await self.readVarint()
98+
maxLen = uint64(if maxSize < 0: int.high else: maxSize)
99+
100+
if length > maxLen:
101+
raise (ref MaxSizeError)(msg: "Message exceeds maximum length")
102+
103+
if length == 0:
104+
self.isEof = true
105+
return @[]
106+
107+
if self.message.len < int(length):
108+
raise newException(LPStreamError, "Not enough data to read " & $length & " bytes.")
109+
110+
result = self.message[0 ..< int(length)]
111+
if int(length) == self.message.len:
112+
self.isEof = true
113+
self.message = @[]
114+
else:
115+
self.message = self.message[int(length) .. ^1]
116+
return result
117+
118+
method write*(
119+
self: MixExitConnection, msg: seq[byte]
120+
): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true), public.} =
121+
self.response.add(msg)
122+
let fut = newFuture[void]()
123+
fut.complete()
124+
return fut
125+
126+
proc write*(
127+
self: MixExitConnection, msg: string
128+
): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true), public.} =
129+
self.write(msg.toBytes())
130+
131+
method writeLp*(
132+
self: MixExitConnection, msg: openArray[byte]
133+
): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true), public.} =
134+
if msg.len() > dataSize:
135+
let fut = newFuture[void]()
136+
fut.fail(
137+
newException(LPStreamError, "exceeds max msg size of " & $dataSize & " bytes")
138+
)
139+
return fut
140+
141+
var
142+
vbytes: seq[byte] = @[]
143+
value = msg.len().uint64
144+
145+
while value >= 128:
146+
vbytes.add(byte((value and 127) or 128))
147+
value = value shr 7
148+
vbytes.add(byte(value))
149+
150+
var buf = newSeqUninitialized[byte](msg.len() + vbytes.len)
151+
buf[0 ..< vbytes.len] = vbytes.toOpenArray(0, vbytes.len - 1)
152+
buf[vbytes.len ..< buf.len] = msg
153+
154+
self.write(buf)
155+
156+
method writeLp*(
157+
self: MixExitConnection, msg: string
158+
): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true), public.} =
159+
self.writeLp(msg.toOpenArrayByte(0, msg.high))
160+
161+
func shortLog*(self: MixExitConnection): string {.raises: [].} =
162+
"MixExitConnection"
163+
164+
method initStream*(self: MixExitConnection) =
165+
discard
166+
167+
method closeImpl*(
168+
self: MixExitConnection
169+
): Future[void] {.async: (raises: [], raw: true).} =
170+
let fut = newFuture[void]()
171+
fut.complete()
172+
return fut
173+
174+
func hash*(self: MixExitConnection): Hash =
175+
discard
176+
177+
proc getResponse*(self: MixExitConnection): seq[byte] =
178+
let r = self.response
179+
self.response = @[]
180+
return r
181+
182+
proc new*(T: typedesc[MixExitConnection], message: seq[byte]): T =
183+
let instance = T(message: message)
184+
185+
when defined(libp2p_agents_metrics):
186+
instance.shortAgent = connection.shortAgent
187+
188+
instance
189+
190+
when defined(libp2p_agents_metrics):
191+
proc setShortAgent*(self: MixExitConnection, shortAgent: string) =
192+
discard

mix/exit_layer.nim

Lines changed: 76 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
import chronicles, chronos, metrics
1+
import chronicles, chronos, metrics, std/[enumerate, strutils]
22
import libp2p, libp2p/[builders, stream/connection]
3-
import ./[mix_metrics, reply_connection, serialization]
3+
import ./[mix_metrics, exit_connection, reply_connection, serialization, utils]
44

55
type OnReplyDialer* =
66
proc(surb: SURB, message: seq[byte]) {.async: (raises: [CancelledError]).}
@@ -56,14 +56,68 @@ proc reply(
5656
error "could not reply", description = exc.msg
5757
mix_messages_error.inc(labelValues = ["ExitLayer", "REPLY_FAILED"])
5858

59-
proc onMessage*(
59+
when defined(mix_experimental_exit_is_destination):
60+
proc runHandler(
61+
self: ExitLayer, codec: string, message: seq[byte], surbs: seq[SURB]
62+
) {.async: (raises: [CancelledError]).} =
63+
let exitConn = MixExitConnection.new(message)
64+
defer:
65+
if not exitConn.isNil:
66+
await exitConn.close()
67+
68+
var hasHandler: bool = false
69+
for index, handler in enumerate(self.switch.ms.handlers):
70+
if codec in handler.protos:
71+
try:
72+
hasHandler = true
73+
await handler.protocol.handler(exitConn, codec)
74+
except CatchableError as e:
75+
error "Error during execution of MixProtocol handler: ", err = e.msg
76+
77+
if not hasHandler:
78+
error "Handler doesn't exist", codec = codec
79+
return
80+
81+
if surbs.len != 0:
82+
let response = exitConn.getResponse()
83+
await self.reply(surbs, response)
84+
85+
proc fwdRequest(
6086
self: ExitLayer,
6187
codec: string,
6288
message: seq[byte],
63-
destAddr: MultiAddress,
64-
destPeerId: PeerId,
89+
destination: Hop,
6590
surbs: seq[SURB],
6691
) {.async: (raises: [CancelledError]).} =
92+
if destination == Hop():
93+
error "no destination available"
94+
mix_messages_error.inc(labelValues = ["Exit", "NO_DESTINATION"])
95+
return
96+
97+
let destBytes = getHop(destination)
98+
99+
let fullAddrStr = bytesToMultiAddr(destBytes).valueOr:
100+
error "Failed to convert bytes to multiaddress", err = error
101+
mix_messages_error.inc(labelValues = ["Exit", "INVALID_DEST"])
102+
return
103+
104+
let parts = fullAddrStr.split("/p2p/")
105+
if parts.len != 2:
106+
error "Invalid multiaddress format", parts
107+
mix_messages_error.inc(labelValues = ["Exit", "INVALID_DEST"])
108+
return
109+
110+
# Create MultiAddress and PeerId
111+
let destAddr = MultiAddress.init(parts[0]).valueOr:
112+
error "Failed to parse location multiaddress: ", err = error
113+
mix_messages_error.inc(labelValues = ["Exit", "INVALID_DEST"])
114+
return
115+
116+
let destPeerId = PeerId.init(parts[1]).valueOr:
117+
error "Failed to initialize PeerId", err = error
118+
mix_messages_error.inc(labelValues = ["Exit", "INVALID_DEST"])
119+
return
120+
67121
var destConn: Connection
68122
var response: seq[byte]
69123
try:
@@ -91,3 +145,20 @@ proc onMessage*(
91145
await destConn.close()
92146

93147
await self.reply(surbs, response)
148+
149+
proc onMessage*(
150+
self: ExitLayer,
151+
codec: string,
152+
message: seq[byte],
153+
destination: Hop,
154+
surbs: seq[SURB],
155+
) {.async: (raises: [CancelledError]).} =
156+
when defined(mix_experimental_exit_is_destination):
157+
if destination == Hop():
158+
trace "onMessage - exit is destination", codec, message
159+
await self.runHandler(codec, message, surbs)
160+
else:
161+
trace "onMessage - exist is not destination", codec, message
162+
await self.fwdRequest(codec, message, destination, surbs)
163+
else:
164+
await self.fwdRequest(codec, message, destination, surbs)

0 commit comments

Comments
 (0)