Skip to content
This repository was archived by the owner on Jan 30, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,12 @@ Execute the test suite with:

## Usage

Run the Mix protocol proof-of-concept:
Run the Mix protocol proof-of-concepts:

```bash
nim c -r src/mix_poc.nim
nim c -r examples/poc_gossipsub.nim
nim c -r examples/poc_resp_ping.nim
nim c -r examples/poc_noresp_ping.nim
```

## Current Implementation Challenges
Expand Down
3 changes: 2 additions & 1 deletion examples/poc_noresp_ping.nim
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ proc mixnetSimulation() {.async: (raises: [Exception]).} =
return

discard await noRespPingProto[senderIndex].noRespPing(conn)
await sleepAsync(1.seconds)

await sleepAsync(3.seconds)

deleteNodeInfoFolder()
deletePubInfoFolder()
Expand Down
1 change: 1 addition & 0 deletions mix.nim
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ export MixParameters
export destReadBehaviorCb
export registerDestReadBehavior
export MixNodes
export initMixMultiAddrByIndex

proc readLp*(maxSize: int): destReadBehaviorCb =
## create callback to read length prefixed msg, with the length encoded as a varint
Expand Down
43 changes: 8 additions & 35 deletions mix/exit_layer.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import std/strutils
import chronicles, chronos, metrics
import libp2p, libp2p/[builders, stream/connection]
import ./[mix_metrics, reply_connection, serialization, utils]
import ./[mix_metrics, reply_connection, serialization]

type OnReplyDialer* =
proc(surb: SURB, message: seq[byte]) {.async: (raises: [CancelledError]).}
Expand Down Expand Up @@ -58,43 +57,17 @@ proc reply(
mix_messages_error.inc(labelValues = ["ExitLayer", "REPLY_FAILED"])

proc onMessage*(
self: ExitLayer, codec: string, message: seq[byte], nextHop: Hop, surbs: seq[SURB]
self: ExitLayer,
codec: string,
message: seq[byte],
destAddr: MultiAddress,
destPeerId: PeerId,
surbs: seq[SURB],
) {.async: (raises: [CancelledError]).} =
if nextHop == Hop():
error "no destination available"
return

# Forward to destination
let destBytes = getHop(nextHop)

let fullAddrStr = bytesToMultiAddr(destBytes).valueOr:
error "Failed to convert bytes to multiaddress", err = error
mix_messages_error.inc(labelValues = ["ExitLayer", "INVALID_DEST"])
return

let parts = fullAddrStr.split("/p2p/")
if parts.len != 2:
error "Invalid multiaddress format", parts
mix_messages_error.inc(labelValues = ["ExitLayer", "INVALID_DEST"])
return

# Create MultiAddress and PeerId
let locationAddr = MultiAddress.init(parts[0]).valueOr:
error "Failed to parse location multiaddress: ", err = error
mix_messages_error.inc(labelValues = ["ExitLayer", "INVALID_DEST"])
return

let peerId = PeerId.init(parts[1]).valueOr:
error "Failed to initialize PeerId", err = error
mix_messages_error.inc(labelValues = ["ExitLayer", "INVALID_DEST"])
return

trace "onMessage - exit is not destination", peerId, locationAddr, codec, message

var destConn: Connection
var response: seq[byte]
try:
destConn = await self.switch.dial(peerId, @[locationAddr], codec)
destConn = await self.switch.dial(destPeerId, @[destAddr], codec)
await destConn.write(message)

if surbs.len != 0:
Expand Down
8 changes: 8 additions & 0 deletions mix/mix_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -304,3 +304,11 @@ proc findByPeerId*(self: MixNodes, peerId: PeerId): Result[MixNodeInfo, string]
if peerIdRes == peerId:
return ok(node)
return err("No node with peer id: " & $peerId)

proc initMixMultiAddrByIndex*(
self: var MixNodes, index: int, multiAddr: string
): Result[void, string] =
if index < 0 or index >= self.len:
return err("Index must be between 0 and " & $(self.len))
self[index].multiAddr = multiAddr
ok()
156 changes: 152 additions & 4 deletions mix/mix_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import
libp2p/
[protocols/ping, protocols/protocol, stream/connection, stream/lpstream, switch]

when defined(enable_mix_benchmarks):
import stew/endians2

const MixProtocolID* = "/mix/1.0.0"

type ConnCreds = object
Expand All @@ -32,6 +35,30 @@ type MixProtocol* = ref object of LPProtocol
connCreds: Table[I, ConnCreds]
destReadBehavior: TableRef[string, destReadBehaviorCb]

proc benchmarkLog*(
eventName: static[string],
myPeerId: PeerId,
startTime: Moment,
msgId: uint64,
orig: uint64,
fromPeerId: Opt[PeerId],
toPeerId: Opt[PeerId],
) =
let endTime = Moment.now()
let procDelay = (endTime - startTime).milliseconds()
let fromPeerId =
if fromPeerId.isNone:
"None"
else:
fromPeerId.get().shortLog()
let toPeerId =
if toPeerId.isNone:
"None"
else:
toPeerId.get().shortLog()
info eventName,
msgId, fromPeerId, toPeerId, myPeerId, orig, current = startTime, procDelay

proc hasDestReadBehavior*(mixProto: MixProtocol, codec: string): bool =
return mixProto.destReadBehavior.hasKey(codec)

Expand Down Expand Up @@ -77,7 +104,16 @@ proc handleMixNodeConnection(
mixProto: MixProtocol, conn: Connection
) {.async: (raises: [CancelledError]).} =
var receivedBytes: seq[byte]

when defined(enable_mix_benchmarks):
var metadata: seq[byte]
var fromPeerId: PeerId

try:
when defined(enable_mix_benchmarks):
metadata = await conn.readLp(16)
fromPeerId = conn.peerId

receivedBytes = await conn.readLp(packetSize)
except Exception as e:
error "Failed to read: ", err = e.msg
Expand All @@ -88,6 +124,13 @@ proc handleMixNodeConnection(
except CatchableError as e:
error "Failed to close incoming stream: ", err = e.msg

when defined(enable_mix_benchmarks):
let startTime = Moment.now()

if metadata.len == 0:
mix_messages_error.inc(labelValues = ["Intermediate/Exit", "NO_DATA"])
return # No data, end of stream

if receivedBytes.len == 0:
mix_messages_error.inc(labelValues = ["Intermediate/Exit", "NO_DATA"])
return # No data, end of stream
Expand All @@ -105,6 +148,11 @@ proc handleMixNodeConnection(
mix_messages_error.inc(labelValues = ["Intermediate/Exit", "INVALID_SPHINX"])
return

when defined(enable_mix_benchmarks):
let
orig = uint64.fromBytesLE(metadata[0 ..< 8])
msgId = uint64.fromBytesLE(metadata[8 ..< 16])

case processedSP.status
of Exit:
mix_messages_recvd.inc(labelValues = [$processedSP.status])
Expand Down Expand Up @@ -132,8 +180,46 @@ proc handleMixNodeConnection(
trace "Exit node - Received mix message",
receiver = multiAddr, message = deserialized.message, codec = deserialized.codec

if processedSP.destination == Hop():
error "no destination available"
mix_messages_error.inc(labelValues = ["Exit", "NO_DESTINATION"])
return

let destBytes = getHop(processedSP.destination)

let fullAddrStr = bytesToMultiAddr(destBytes).valueOr:
error "Failed to convert bytes to multiaddress", err = error
mix_messages_error.inc(labelValues = ["Exit", "INVALID_DEST"])
return

let parts = fullAddrStr.split("/p2p/")
if parts.len != 2:
error "Invalid multiaddress format", parts
mix_messages_error.inc(labelValues = ["Exit", "INVALID_DEST"])
return

# Create MultiAddress and PeerId
let destAddr = MultiAddress.init(parts[0]).valueOr:
error "Failed to parse location multiaddress: ", err = error
mix_messages_error.inc(labelValues = ["Exit", "INVALID_DEST"])
return

let destPeerId = PeerId.init(parts[1]).valueOr:
error "Failed to initialize PeerId", err = error
mix_messages_error.inc(labelValues = ["Exit", "INVALID_DEST"])
return

when defined(enable_mix_benchmarks):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: If we plan to expand benchmark support or keep these logs around, it might be worth extracting the repeated logging logic in handleMixNodeConnection (e.g., startTime, endTime, procDelay, shared log fields) into a small helper.
This would complement the existing SendPacketConfig abstraction used in sendPacket and help reduce duplication across Exit, Reply, etc. paths.

benchmarkLog "Exit",
mixProto.switch.peerInfo.peerId,
startTime,
msgId,
orig,
Opt.some(fromPeerId),
Opt.some(destPeerId)

await mixProto.exitLayer.onMessage(
deserialized.codec, message, processedSP.destination, surbs
deserialized.codec, message, destAddr, destPeerId, surbs
)

mix_messages_forwarded.inc(labelValues = ["Exit"])
Expand Down Expand Up @@ -172,6 +258,15 @@ proc handleMixNodeConnection(
mix_messages_error.inc(labelValues = ["Reply", "INVALID_SPHINX"])
return

when defined(enable_mix_benchmarks):
benchmarkLog "Reply",
mixProto.switch.peerInfo.peerId,
startTime,
msgId,
orig,
Opt.some(fromPeerId),
Opt.none(PeerId)

await connCred.incoming.put(deserialized.message)
else:
error "could not process reply", id = processedSP.id
Expand Down Expand Up @@ -212,9 +307,22 @@ proc handleMixNodeConnection(
mix_messages_error.inc(labelValues = ["Intermediate", "INVALID_NEXTHOP"])
return

when defined(enable_mix_benchmarks):
benchmarkLog "Intermediate",
mixProto.switch.peerInfo.peerId,
startTime,
msgId,
orig,
Opt.some(fromPeerId),
Opt.some(peerId)

var nextHopConn: Connection
try:
nextHopConn = await mixProto.switch.dial(peerId, @[locationAddr], MixProtocolID)

when defined(enable_mix_benchmarks):
await nextHopConn.writeLp(metadata)

await nextHopConn.writeLp(processedSP.serializedSphinxPacket)
mix_messages_forwarded.inc(labelValues = ["Intermediate"])
except CatchableError as e:
Expand Down Expand Up @@ -342,9 +450,25 @@ proc prepareMsgWithSurbs(

ok(serialized)

type SendPacketType* = enum
Entry
Reply

type SendPacketConfig = object
logType: SendPacketType
when defined(enable_mix_benchmarks):
startTime: Moment
orig: uint64
msgId: uint64
origAndMsgId: seq[byte]

proc sendPacket(
mixProto: MixProtocol, multiAddrs: string, sphinxPacket: seq[byte], label: string
mixProto: MixProtocol,
multiAddrs: string,
sphinxPacket: seq[byte],
config: SendPacketConfig,
) {.async: (raises: []).} =
let label = $config.logType
# Send the wrapped message to the first mix node in the selected path
let parts = multiAddrs.split("/p2p/")
if parts.len != 2:
Expand All @@ -362,10 +486,24 @@ proc sendPacket(
mix_messages_error.inc(labelValues = [label, "NON_RECOVERABLE"])
return

when defined(enable_mix_benchmarks):
if config.logType == Entry:
benchmarkLog "Sender",
mixProto.switch.peerInfo.peerId,
config.startTime,
config.msgId,
config.orig,
Opt.none(PeerId),
Opt.some(firstMixPeerId)

var nextHopConn: Connection
try:
nextHopConn =
await mixProto.switch.dial(firstMixPeerId, @[firstMixAddr], @[MixProtocolID])

when defined(enable_mix_benchmarks):
await nextHopConn.writeLp(config.origAndMsgId)

await nextHopConn.writeLp(sphinxPacket)
mix_messages_forwarded.inc(labelValues = ["Entry"])
except CatchableError as e:
Expand Down Expand Up @@ -420,8 +558,18 @@ proc anonymizeLocalProtocolSend*(
destination: MixDestination,
numSurbs: uint8,
) {.async.} =
var config = SendPacketConfig(logType: Entry)
when defined(enable_mix_benchmarks):
config.startTime = Moment.now()

let (multiAddr, _, _, _, _) = getMixNodeInfo(mixProto.mixNodeInfo)

when defined(enable_mix_benchmarks):
# Assumes a fixed gossipsub message layout of 100
config.orig = uint64.fromBytesLE(msg[5 ..< 13])
config.msgId = uint64.fromBytesLE(msg[13 ..< 21])
config.origAndMsgId = msg[5 ..< 21]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor suggestion: IIRC, this assumes a fixed GossipSub message layout (message size = 100) as defined in tmp/mix-gossipsub-logging branch. Might be worth adding a short comment or defining constants just to make that assumption clearer.


mix_messages_recvd.inc(labelValues = ["Entry"])

var
Expand Down Expand Up @@ -521,7 +669,7 @@ proc anonymizeLocalProtocolSend*(
return

# Send the wrapped message to the first mix node in the selected path
await mixProto.sendPacket(multiAddrs[0], sphinxPacket, "Entry")
await mixProto.sendPacket(multiAddrs[0], sphinxPacket, config)

proc reply(
mixProto: MixProtocol, surb: SURB, msg: seq[byte]
Expand All @@ -539,7 +687,7 @@ proc reply(
error "Use SURB error", err = error
return

await mixProto.sendPacket(multiAddr, sphinxPacket, "Reply")
await mixProto.sendPacket(multiAddr, sphinxPacket, SendPacketConfig(logType: Reply))

proc new*(
T: typedesc[MixProtocol],
Expand Down
Loading