Skip to content
5 changes: 5 additions & 0 deletions src/accountsdb/snapshot/download.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1060,6 +1060,7 @@ test "downloadInfo Incremental" {
.tpu_addr = null,
.tvu_addr = null,
.tpu_quic_addr = null,
.tpu_vote_addr = null,
},
.full_snapshot = .{ .slot = 100, .hash = .ZEROES },
.inc_snapshot = .{ .slot = 101, .hash = .ZEROES },
Expand All @@ -1076,6 +1077,7 @@ test "downloadInfo Incremental" {
.tpu_addr = null,
.tvu_addr = null,
.tpu_quic_addr = null,
.tpu_vote_addr = null,
},
.full_snapshot = .{ .slot = 100, .hash = .ZEROES },
.inc_snapshot = null,
Expand All @@ -1092,6 +1094,7 @@ test "downloadInfo Incremental" {
.tpu_addr = null,
.tvu_addr = null,
.tpu_quic_addr = null,
.tpu_vote_addr = null,
},
.full_snapshot = .{ .slot = 100, .hash = .ZEROES },
.inc_snapshot = .{ .slot = 101, .hash = .ZEROES },
Expand Down Expand Up @@ -1156,6 +1159,7 @@ test "downloadInfo Full" {
.tpu_addr = null,
.tvu_addr = null,
.tpu_quic_addr = null,
.tpu_vote_addr = null,
},
.full_snapshot = .{ .slot = 100, .hash = .ZEROES },
.inc_snapshot = .{ .slot = 101, .hash = .ZEROES },
Expand All @@ -1172,6 +1176,7 @@ test "downloadInfo Full" {
.tpu_addr = null,
.tvu_addr = null,
.tpu_quic_addr = null,
.tpu_vote_addr = null,
},
.full_snapshot = .{ .slot = 100, .hash = .ZEROES },
.inc_snapshot = null,
Expand Down
15 changes: 10 additions & 5 deletions src/cmd.zig
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,16 @@ pub fn main() !void {
var gpa_state: GpaOrCAllocator(.{}) = .{};
// defer _ = gpa_state.deinit();

var tracing_gpa = tracy.TracingAllocator{
var tracing_gpa: tracy.TracingAllocator = .{
.name = "gpa",
.parent = gpa_state.allocator(),
};
const gpa = tracing_gpa.allocator();

var gossip_gpa_state: GpaOrCAllocator(.{ .stack_trace_frames = 100 }) = .{};
var tracing_gossip_gpa = tracy.TracingAllocator{
var tracing_gossip_gpa: tracy.TracingAllocator = .{
.name = "gossip gpa",
.parent = gossip_gpa_state.allocator(),
.parent = tracing_gpa.allocator(),
};
// defer _ = gossip_gpa_state.deinit();
const gossip_gpa = tracing_gossip_gpa.allocator();

const argv = try std.process.argsAlloc(gpa);
Expand Down Expand Up @@ -182,6 +180,7 @@ pub fn main() !void {
params.gossip_base.apply(&current_config);
params.gossip_node.apply(&current_config);
params.repair.apply(&current_config);
current_config.genesis_file_path = params.genesis_file_path;
current_config.shred_network.dump_shred_tracker = params.repair.dump_shred_tracker;
current_config.shred_network.log_finished_slots = params.repair.log_finished_slots;
current_config.turbine.overwrite_stake_for_testing =
Expand Down Expand Up @@ -978,6 +977,7 @@ const Cmd = struct {
gossip_base: GossipArgumentsCommon,
gossip_node: GossipArgumentsNode,
repair: RepairArgumentsBase,
genesis_file_path: ?[]const u8,
/// TODO: Remove when no longer needed
overwrite_stake_for_testing: bool,
no_retransmit: bool,
Expand Down Expand Up @@ -1005,6 +1005,7 @@ const Cmd = struct {
.gossip_base = GossipArgumentsCommon.cmd_info,
.gossip_node = GossipArgumentsNode.cmd_info,
.repair = RepairArgumentsBase.cmd_info,
.genesis_file_path = genesis_file_path_arg,
.overwrite_stake_for_testing = .{
.kind = .named,
.name_override = null,
Expand Down Expand Up @@ -1681,6 +1682,7 @@ fn validator(
&app_base,
if (maybe_vote_sockets) |*vs| vs else null,
&gossip_votes,
&gossip_service.gossip_table_rw,
);

const rpc_server_thread = if (cfg.rpc_port) |rpc_port|
Expand Down Expand Up @@ -1866,6 +1868,7 @@ fn replayOffline(
&app_base,
null,
null,
null,
);

replay_thread.join();
Expand Down Expand Up @@ -2762,6 +2765,7 @@ const ReplayAndConsensusServiceState = struct {
app_base: *const AppBase,
vote_sockets: ?*const replay.consensus.core.VoteSockets,
gossip_votes: ?*sig.sync.Channel(sig.gossip.data.Vote),
gossip_table: ?*sig.sync.RwMux(sig.gossip.GossipTable),
) !std.Thread {
return try app_base.spawnService(
"replay",
Expand All @@ -2782,6 +2786,7 @@ const ReplayAndConsensusServiceState = struct {
.senders = c.senders,
.receivers = c.receivers,
.vote_sockets = vote_sockets,
.gossip_table = gossip_table,
} else null,
},
);
Expand Down
17 changes: 13 additions & 4 deletions src/core/transaction.zig
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,21 @@ pub const Transaction = struct {
const msg_bytes_bounded = message.serializeBounded(version) catch return error.BadMessage;
const msg_bytes = msg_bytes_bounded.constSlice();

const signatures = try allocator.alloc(Signature, keypairs.len);
// TODO: remove these allocations, we have well-known bounds on all the numbers
const signatures = try allocator.alloc(Signature, message.signature_count);
errdefer allocator.free(signatures);

for (signatures, keypairs) |*signature, keypair| {
const msg_signature = keypair.sign(msg_bytes, null) catch return error.SigningError;
signature.* = .fromSignature(msg_signature);
// TODO: This method can result in us signing the same message for the same keypair more than once.
// Should investigate whether a hashmap to amortize the signing is faster. The current only usecase
// is when we send votes, and that *does* sign the same message twice.
Comment on lines +151 to +153
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's rare to have more than 2-3 signers in a transaction so I doubt a hashmap would be favorable over a linear search.

Is "This method can result in us signing the same message for the same keypair more than once." supposed to be part of the todo, or is it a separate note?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point - I left this in as part of @Rexicon226's original suggestion (pinged in case he has any specific views or intel on this).

The redundant signage should probably be its own comment.

const signing_keys = message.account_keys[0..message.signature_count];
for (signing_keys, 0..) |key, i| {
for (keypairs) |kp| {
const public_key: Pubkey = .fromPublicKey(&kp.public_key);
if (!key.equals(&public_key)) continue;
const msg_signature = kp.sign(msg_bytes, null) catch return error.SigningError;
signatures[i] = .fromSignature(msg_signature);
}
}

return .{
Expand Down
16 changes: 10 additions & 6 deletions src/gossip/data.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1500,20 +1500,22 @@ pub const ThreadSafeContactInfo = struct {
tpu_addr: ?SocketAddr,
tvu_addr: ?SocketAddr,
tpu_quic_addr: ?SocketAddr,
tpu_vote_addr: ?SocketAddr,

pub fn initRandom(
random: std.Random,
pubkey: Pubkey,
shred_version: u16,
) !ThreadSafeContactInfo {
) ThreadSafeContactInfo {
return .{
.pubkey = pubkey,
.shred_version = shred_version,
.gossip_addr = SocketAddr.initRandom(random),
.rpc_addr = SocketAddr.initRandom(random),
.tpu_addr = SocketAddr.initRandom(random),
.tvu_addr = SocketAddr.initRandom(random),
.tpu_quic_addr = SocketAddr.initRandom(random),
.gossip_addr = .initRandom(random),
.rpc_addr = .initRandom(random),
.tpu_addr = .initRandom(random),
.tvu_addr = .initRandom(random),
.tpu_quic_addr = .initRandom(random),
.tpu_vote_addr = .initRandom(random),
};
}

Expand All @@ -1526,6 +1528,7 @@ pub const ThreadSafeContactInfo = struct {
.tpu_addr = contact_info.getSocket(.tpu),
.tvu_addr = contact_info.getSocket(.turbine_recv),
.tpu_quic_addr = contact_info.getSocket(.tpu_quic),
.tpu_vote_addr = contact_info.getSocket(.tpu_vote),
};
}

Expand All @@ -1538,6 +1541,7 @@ pub const ThreadSafeContactInfo = struct {
.tpu_addr = legacy_contact_info.tpu,
.tvu_addr = legacy_contact_info.turbine_recv,
.tpu_quic_addr = null,
.tpu_vote_addr = legacy_contact_info.tpu_vote,
};
}
};
Expand Down
39 changes: 23 additions & 16 deletions src/gossip/service.zig
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ const GossipMessageWithEndpoint = struct {

pub const PULL_REQUEST_RATE: Duration = .fromSecs(1);
pub const PULL_RESPONSE_TIMEOUT: Duration = .fromSecs(5);
pub const ACTIVE_SET_REFRESH_RATE: Duration = .fromSecs(15);
pub const ACTIVE_SET_REFRESH_RATE: Duration = .fromMillis(500);
pub const DATA_TIMEOUT: Duration = .fromSecs(15);
pub const TABLE_TRIM_RATE: Duration = .fromSecs(10);
pub const BUILD_MESSAGE_LOOP_MIN: Duration = .fromSecs(1);
Expand Down Expand Up @@ -384,6 +384,8 @@ pub const GossipService = struct {
for (push_msg_queue.queue.items) |*v| v.deinit(push_msg_queue.data_allocator);
push_msg_queue.queue.deinit();
}

// self.broker.deinit();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, nice catch. At some point early on I was debugging gossip, and wanted to fix some leaks for some local testing code, but the specific way in which this works caused issues in the actual validator on exits I believe, so I commented it out. Probably will just delete it, unless you think it's worth handling deallocation here for whatever reason.

}

pub const RunThreadsParams = struct {
Expand Down Expand Up @@ -926,15 +928,16 @@ pub const GossipService = struct {
defer pull_req_timer.reset();
// this also includes sending ping messages to other peers
const now = getWallclockMs();
const pull_req_packets = self.buildPullRequests(
var pull_req_packets = self.buildPullRequests(
self.allocator,
random,
pull_request.MAX_BLOOM_SIZE,
now,
) catch |e| {
self.logger.err().logf("failed to generate pull requests: {any}", .{e});
break :pull_blk;
};
defer pull_req_packets.deinit();
defer pull_req_packets.deinit(self.allocator);
for (pull_req_packets.items) |packet| {
try self.packet_outgoing_channel.send(packet);
}
Expand Down Expand Up @@ -1211,11 +1214,12 @@ pub const GossipService = struct {
/// to be sent to a random set of gossip nodes.
fn buildPullRequests(
self: *GossipService,
gpa: std.mem.Allocator,
random: std.Random,
/// the bloomsize of the pull request's filters
bloom_size: usize,
now: u64,
) !ArrayList(Packet) {
) !std.ArrayListUnmanaged(Packet) {
const zone = tracy.Zone.init(@src(), .{ .name = "gossip buildPullRequests" });
defer zone.deinit();

Expand Down Expand Up @@ -1278,13 +1282,13 @@ pub const GossipService = struct {
if (num_peers != 0) n_packets += filters.items.len;
if (entrypoint_index != null) n_packets += filters.items.len;

var packet_batch = try ArrayList(Packet).initCapacity(self.allocator, n_packets);
packet_batch.appendNTimesAssumeCapacity(Packet.ANY_EMPTY, n_packets);
var packet_index: usize = 0;
var packet_batch: std.ArrayListUnmanaged(Packet) = .empty;
errdefer packet_batch.deinit(gpa);
try packet_batch.ensureTotalCapacityPrecise(gpa, n_packets);

// update wallclock and sign
self.my_contact_info.wallclock = now;
const my_contact_info_value = SignedGossipData.initSigned(
const my_contact_info_value: SignedGossipData = .initSigned(
&self.my_keypair,
// safe to copy contact info since it is immediately serialized
.{ .ContactInfo = self.my_contact_info },
Expand All @@ -1302,12 +1306,10 @@ pub const GossipService = struct {
}
if (peer_contact_info.gossip_addr) |gossip_addr| {
const message: GossipMessage = .{ .PullRequest = .{ filter_i, my_contact_info_value } };
var packet = &packet_batch.items[packet_index];

const bytes = try bincode.writeToSlice(&packet.buffer, message, bincode.Params{});
const packet = try packet_batch.addOne(gpa);
const bytes = try bincode.writeToSlice(&packet.buffer, message, .standard);
packet.size = bytes.len;
packet.addr = gossip_addr;
packet_index += 1;
}
}
}
Expand All @@ -1316,10 +1318,9 @@ pub const GossipService = struct {
if (entrypoint_index) |entrypoint_idx| {
const entrypoint = self.entrypoints[entrypoint_idx];
for (filters.items) |filter| {
const packet = &packet_batch.items[packet_index];
const packet = try packet_batch.addOne(gpa);
const message: GossipMessage = .{ .PullRequest = .{ filter, my_contact_info_value } };
try packet.populateFromBincode(entrypoint.addr, message);
packet_index += 1;
}
}

Expand Down Expand Up @@ -2049,6 +2050,12 @@ pub const LocalMessageBroker = struct {
else => {},
}
}

fn deinit(self: *const LocalMessageBroker) void {
if (self.vote_collector) |vcs| {
while (vcs.tryReceive()) |vote| vote.deinit(vcs.allocator);
}
}
};

/// stats that we publish to prometheus
Expand Down Expand Up @@ -2955,11 +2962,11 @@ fn testBuildPullRequests(
}
}

var packets = gossip_service.buildPullRequests(random, 2, now) catch |err| {
var packets = gossip_service.buildPullRequests(allocator, random, 2, now) catch |err| {
std.log.err("\nThe failing now time is: '{d}'\n", .{now});
return err;
};
defer packets.deinit();
defer packets.deinit(allocator);

try std.testing.expect(packets.items.len > 1);
try std.testing.expect(!std.mem.eql(u8, packets.items[0].data(), packets.items[1].data()));
Expand Down
2 changes: 1 addition & 1 deletion src/net/socket_utils.zig
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ fn sendmmsg(
while (true) {
const rc = std.os.linux.sendmmsg(sockfd, msgvec.ptr, @intCast(msgvec.len), flags);

return switch (std.posix.errno(rc)) {
return switch (std.os.linux.E.init(rc)) {
.SUCCESS => @intCast(rc),
.ACCES => error.AccessDenied,
.AGAIN => error.WouldBlock,
Expand Down
3 changes: 3 additions & 0 deletions src/replay/consensus/cluster_sync.zig
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ pub const SlotData = struct {
self.purge_repair_slot_counter.deinit(allocator);
self.unfrozen_gossip_verified_vote_hashes.deinit(allocator);
self.duplicate_slots.deinit(allocator);

var latest_validator_votes = self.latest_validator_votes;
latest_validator_votes.deinit(allocator);
}
};

Expand Down
Loading