Skip to content
This repository was archived by the owner on Jan 30, 2026. It is now read-only.

Commit 67cad04

Browse files
feat: create surbs and send response back (#73)
1 parent 2cb3cba commit 67cad04

16 files changed

+734
-143
lines changed

.github/workflows/examples.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,5 @@ jobs:
1414
nimble c ./examples/poc_gossipsub.nim
1515
nimble c ./examples/poc_gossipsub_repeated_runs.nim
1616
nimble c ./examples/poc_noresp_ping.nim
17+
nimble c ./examples/poc_resp_ping.nim
1718
nim-versions: '["version-2-0", "version-2-2", "devel"]'

examples/poc_gossipsub.nim

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ proc makeMixConnCb(mixProto: MixProtocol): CustomConnCreationProc =
114114
destAddr: Option[MultiAddress], destPeerId: PeerId, codec: string
115115
): Connection {.gcsafe, raises: [].} =
116116
try:
117-
return mixProto.toConnection(destPeerId, codec)
117+
return mixProto.toConnection(destPeerId, codec).get()
118118
except CatchableError as e:
119119
error "Error during execution of MixEntryConnection callback: ", err = e.msg
120120
return nil

examples/poc_gossipsub_repeated_runs.nim

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ proc makeMixConnCb(mixProto: MixProtocol): CustomConnCreationProc =
116116
destAddr: Option[MultiAddress], destPeerId: PeerId, codec: string
117117
): Connection {.gcsafe, raises: [].} =
118118
try:
119-
return mixProto.toConnection(destPeerId, codec)
119+
return mixProto.toConnection(destPeerId, codec).get()
120120
except CatchableError as e:
121121
error "Error during execution of MixEntryConnection callback: ", err = e.msg
122122
return nil

examples/poc_noresp_ping.nim

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ proc mixnetSimulation() {.async: (raises: [Exception]).} =
8585

8686
let proto = MixProtocol.new(index, numberOfNodes, nodes[index]).valueOr:
8787
error "Mix protocol initialization failed", err = error
88-
return
88+
return # We'll fwd requests, so let's register how should the exit node behave
8989

9090
mixProto.add(proto)
9191

@@ -109,7 +109,9 @@ proc mixnetSimulation() {.async: (raises: [Exception]).} =
109109
nodes[receiverIndex].peerInfo.peerId, nodes[receiverIndex].peerInfo.addrs[0]
110110
),
111111
NoRespPingCodec,
112-
)
112+
).valueOr:
113+
error "could not obtain connection", err = error
114+
return
113115

114116
discard await noRespPingProto[senderIndex].noRespPing(conn)
115117
await sleepAsync(1.seconds)

examples/poc_resp_ping.nim

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
import chronicles, chronos, results, strutils
2+
import std/[enumerate, sysrand]
3+
import libp2p
4+
import libp2p/[crypto/secp, protocols/ping]
5+
import ../mix
6+
7+
proc cryptoRandomInt(max: int): Result[int, string] =
8+
if max == 0:
9+
return err("Max cannot be zero.")
10+
var bytes: array[8, byte]
11+
discard urandom(bytes)
12+
let value = cast[uint64](bytes)
13+
return ok(int(value mod uint64(max)))
14+
15+
proc createSwitch(libp2pPrivKey: SkPrivateKey, multiAddr: MultiAddress): Switch =
16+
result = SwitchBuilder
17+
.new()
18+
.withPrivateKey(PrivateKey(scheme: Secp256k1, skkey: libp2pPrivKey))
19+
.withAddress(multiAddr)
20+
.withRng(crypto.newRng())
21+
.withYamux()
22+
.withTcpTransport()
23+
.withNoise()
24+
.build()
25+
26+
# Set up nodes
27+
proc setUpNodes(numNodes: int): seq[Switch] =
28+
# This is not actually GC-safe
29+
{.gcsafe.}:
30+
# Initialize mix nodes
31+
discard initializeMixNodes(numNodes)
32+
33+
var nodes: seq[Switch] = @[]
34+
35+
for index, node in enumerate(mixNodes):
36+
# Write public info of all mix nodes
37+
let nodePubInfoRes = getMixPubInfoByIndex(index)
38+
if nodePubInfoRes.isErr:
39+
error "Get mix pub info by index error", err = nodePubInfoRes.error
40+
continue
41+
let nodePubInfo = nodePubInfoRes.get()
42+
43+
let writePubRes = writeMixPubInfoToFile(nodePubInfo, index)
44+
if writePubRes.isErr:
45+
error "Failed to write pub info to file", nodeIndex = index
46+
continue
47+
48+
# Write info of all mix nodes
49+
let writeNodeRes = writeMixNodeInfoToFile(node, index)
50+
if writeNodeRes.isErr:
51+
error "Failed to write mix info to file", nodeIndex = index
52+
continue
53+
54+
# Extract private key and multiaddress
55+
let (multiAddrStr, _, _, _, libp2pPrivKey) = getMixNodeInfo(node)
56+
57+
let multiAddr = MultiAddress.init(multiAddrStr.split("/p2p/")[0]).valueOr:
58+
error "Failed to initialize MultiAddress", err = error
59+
return
60+
61+
# Create switch
62+
let switch = createSwitch(libp2pPrivKey, multiAddr)
63+
if not switch.isNil:
64+
nodes.add(switch)
65+
else:
66+
warn "Failed to set up node", nodeIndex = index
67+
68+
return nodes
69+
70+
proc mixnetSimulation() {.async: (raises: [Exception]).} =
71+
let
72+
numberOfNodes = 10
73+
nodes = setUpNodes(numberOfNodes)
74+
75+
var
76+
mixProto: seq[MixProtocol] = @[]
77+
pingProto: seq[Ping] = @[]
78+
79+
# Start nodes
80+
for index, _ in enumerate(nodes):
81+
pingProto.add(Ping.new())
82+
83+
let proto = MixProtocol.new(index, numberOfNodes, nodes[index]).valueOr:
84+
error "Mix protocol initialization failed", err = error
85+
return
86+
87+
# We'll fwd requests, so let's register how should the exit node will read responses
88+
proto.registerFwdReadBehavior(PingCodec, readExactly(32))
89+
90+
mixProto.add(proto)
91+
92+
nodes[index].mount(pingProto[index])
93+
nodes[index].mount(mixProto[index])
94+
95+
await nodes[index].start()
96+
await sleepAsync(1.seconds)
97+
98+
let cryptoRandomIntResult = cryptoRandomInt(numberOfNodes)
99+
if cryptoRandomIntResult.isErr:
100+
error "Failed to generate random number", err = cryptoRandomIntResult.error
101+
return
102+
let senderIndex = cryptoRandomIntResult.value
103+
var receiverIndex = 0
104+
if senderIndex < numberOfNodes - 1:
105+
receiverIndex = senderIndex + 1
106+
107+
let conn = mixProto[senderIndex].toConnection(
108+
Destination.forwardToAddr(
109+
nodes[receiverIndex].peerInfo.peerId, nodes[receiverIndex].peerInfo.addrs[0]
110+
),
111+
PingCodec,
112+
Opt.some(MixParameters(expectReply: Opt.some(true), numSurbs: Opt.some(byte(1)))),
113+
).valueOr:
114+
error "Could not obtain connection", err = error
115+
return
116+
117+
let response = await pingProto[senderIndex].ping(conn)
118+
119+
await sleepAsync(1.seconds)
120+
121+
deleteNodeInfoFolder()
122+
deletePubInfoFolder()
123+
124+
when isMainModule:
125+
waitFor(mixnetSimulation())

mix.nim

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import results
2-
import ./mix/[mix_protocol, mix_node, entry_connection]
2+
import chronos
3+
import libp2p
4+
import ./mix/[mix_protocol, mix_node, entry_connection, exit_layer]
35

46
export results
57

@@ -21,3 +23,20 @@ export Destination
2123
export DestinationType
2224
export forwardToAddr
2325
export mixNode
26+
export MixParameters
27+
export fwdReadBehaviorCb
28+
export registerFwdReadBehavior
29+
30+
proc readLp*(maxSize: int): fwdReadBehaviorCb =
31+
return proc(
32+
conn: Connection
33+
): Future[seq[byte]] {.async: (raises: [CancelledError, LPStreamError]).} =
34+
await conn.readLp(maxSize)
35+
36+
proc readExactly*(nBytes: int): fwdReadBehaviorCb =
37+
return proc(
38+
conn: Connection
39+
): Future[seq[byte]] {.async: (raises: [CancelledError, LPStreamError]).} =
40+
let buf = newSeqUninitialized[byte](nBytes)
41+
await conn.readExactly(addr buf[0], nBytes)
42+
return buf

mix/config.nim

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,8 @@ const
1212
packetSize* = 4608 # Total packet size (from spec)
1313
messageSize* = packetSize - headerSize - k # Size of the message itself
1414
payloadSize* = messageSize + k # Total payload size
15+
surbSize* = headerSize + k + addrSize
16+
# Size of a surb packet inside the message payload
17+
surbLenSize* = 1 # Size of the field storing the number of surbs
18+
surbIdLen* = 16 # Size of the identifier used when sending a message with surb
19+
defaultSurbs* = uint8(4) # Default number of SURBs to send

mix/entry_connection.nim

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import hashes, chronos, stew/byteutils, results, chronicles
22
import libp2p/stream/connection
33
import ./mix_protocol
4+
import ./config
45
from fragmentation import dataSize
56

67
type
@@ -33,14 +34,20 @@ type MixDialer* = proc(
3334
msg: seq[byte], codec: string, destination: Destination
3435
): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true).}
3536

37+
type MixParameters* = object
38+
expectReply*: Opt[bool]
39+
numSurbs*: Opt[uint8]
40+
3641
type MixEntryConnection* = ref object of Connection
3742
destination: Destination
3843
codec: string
3944
mixDialer: MixDialer
45+
params: Opt[MixParameters]
4046

4147
method readExactly*(
4248
self: MixEntryConnection, pbytes: pointer, nbytes: int
4349
): Future[void] {.async: (raises: [CancelledError, LPStreamError]), public.} =
50+
await sleepAsync(10.minutes) # TODO: implement readExactly
4451
raise
4552
newException(LPStreamError, "readExactly not implemented for MixEntryConnection")
4653

@@ -102,9 +109,6 @@ method writeLp*(
102109
proc shortLog*(self: MixEntryConnection): string {.raises: [].} =
103110
"[MixEntryConnection] Destination: " & $self.destination
104111

105-
method initStream*(self: MixEntryConnection) =
106-
discard
107-
108112
method closeImpl*(
109113
self: MixEntryConnection
110114
): Future[void] {.async: (raises: [], raw: true).} =
@@ -125,8 +129,10 @@ proc new*(
125129
destination: Destination,
126130
codec: string,
127131
mixDialer: MixDialer,
132+
params: Opt[MixParameters],
128133
): T =
129-
let instance = T(destination: destination, codec: codec, mixDialer: mixDialer)
134+
let instance =
135+
T(destination: destination, codec: codec, mixDialer: mixDialer, params: params)
130136

131137
when defined(libp2p_agents_metrics):
132138
instance.shortAgent = connection.shortAgent
@@ -138,7 +144,16 @@ proc new*(
138144
srcMix: MixProtocol,
139145
destination: Destination,
140146
codec: string,
147+
params: Opt[MixParameters],
141148
): T {.raises: [].} =
149+
let params = params.get(MixParameters())
150+
let expectReply = params.expectReply.get(false)
151+
let surbs =
152+
if expectReply:
153+
params.numSurbs.get(defaultSurbs)
154+
else:
155+
0
156+
142157
var sendDialerFunc = proc(
143158
msg: seq[byte], codec: string, dest: Destination
144159
): Future[void] {.async: (raises: [CancelledError, LPStreamError]).} =
@@ -149,19 +164,28 @@ proc new*(
149164
else:
150165
(Opt.none(PeerId), Opt.some(MixDestination.init(dest.peerId, dest.address)))
151166

152-
await srcMix.anonymizeLocalProtocolSend(msg, codec, peerId, destination)
167+
await srcMix.anonymizeLocalProtocolSend(msg, codec, peerId, destination, surbs)
153168
except CatchableError as e:
154169
error "Error during execution of anonymizeLocalProtocolSend: ", err = e.msg
155170
return
156171

157-
T.new(srcMix, destination, codec, sendDialerFunc)
172+
T.new(srcMix, destination, codec, sendDialerFunc, Opt.some(params))
158173

159174
proc toConnection*(
160-
srcMix: MixProtocol, destination: Destination | PeerId, codec: string
161-
): Connection {.gcsafe, raises: [].} =
175+
srcMix: MixProtocol,
176+
destination: Destination | PeerId,
177+
codec: string,
178+
params: Opt[MixParameters] = Opt.none(MixParameters),
179+
): Result[Connection, string] {.gcsafe, raises: [].} =
162180
let dest =
163181
when destination is PeerId:
164182
Destination.mixNode(destination)
165183
else:
166184
destination
167-
MixEntryConnection.new(srcMix, dest, codec)
185+
186+
if dest.kind == DestinationType.ForwardAddr and
187+
params.get(MixParameters()).expectReply.get(false) and
188+
not srcMix.hasFwdBehavior(codec):
189+
return err("no forward behavior for codec")
190+
191+
ok(MixEntryConnection.new(srcMix, dest, codec, params))

mix/exit_connection.nim

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ from fragmentation import dataSize
44

55
type MixExitConnection* = ref object of Connection
66
message: seq[byte]
7+
response: seq[byte]
78

89
method join*(
910
self: MixExitConnection
@@ -117,8 +118,7 @@ method readLp*(
117118
method write*(
118119
self: MixExitConnection, msg: seq[byte]
119120
): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true), public.} =
120-
# TODO: dial back
121-
discard
121+
self.response.add(msg)
122122

123123
proc write*(
124124
self: MixExitConnection, msg: string
@@ -148,7 +148,7 @@ method writeLp*(
148148
buf[0 ..< vbytes.len] = vbytes.toOpenArray(0, vbytes.len - 1)
149149
buf[vbytes.len ..< buf.len] = msg
150150

151-
# TODO: dial back
151+
self.write(buf)
152152

153153
method writeLp*(
154154
self: MixExitConnection, msg: string
@@ -171,6 +171,11 @@ method closeImpl*(
171171
func hash*(self: MixExitConnection): Hash =
172172
discard
173173

174+
proc getResponse*(self: MixExitConnection): seq[byte] =
175+
let r = self.response
176+
self.response = @[]
177+
return r
178+
174179
proc new*(T: typedesc[MixExitConnection], message: seq[byte]): T =
175180
let instance = T(message: message)
176181

0 commit comments

Comments
 (0)