Skip to content

Commit a62a946

Browse files
feat: update to flecs with safety checks (#864)
1 parent 3523db7 commit a62a946

File tree

8 files changed

+299
-265
lines changed

8 files changed

+299
-265
lines changed

Cargo.lock

Lines changed: 223 additions & 100 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/hyperion/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ valence_registry = { workspace = true }
7272
valence_server = { workspace = true }
7373
valence_text = { workspace = true }
7474
ordered-float = { workspace = true }
75+
dashmap = "6.1.0"
7576

7677
[dev-dependencies]
7778
approx = { workspace = true }

crates/hyperion/src/egress/sync_entity_state.rs

Lines changed: 17 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -73,59 +73,29 @@ impl Module for EntityStateSyncModule {
7373
.singleton()
7474
.multi_threaded()
7575
.kind::<flecs::pipeline::OnStore>()
76-
.run(|mut table| {
76+
.each_iter(|table, idx, (compose, net, prev_xp, current)| {
77+
const {
78+
assert!(size_of::<Xp>() == size_of::<u16>());
79+
assert!(align_of::<Xp>() == align_of::<u16>());
80+
}
7781
let system = table.system();
78-
while table.next() {
79-
let count = table.count();
80-
81-
unsafe {
82-
const _: () = assert!(size_of::<Xp>() == size_of::<u16>());
83-
const _: () = assert!(align_of::<Xp>() == align_of::<u16>());
84-
85-
/// Number of lanes in the SIMD vector
86-
const LANES: usize = 32; // up to AVX512
87-
88-
let compose = table.field_unchecked::<Compose>(0);
89-
let compose = compose.first().unwrap();
90-
91-
let net = table.field_unchecked::<ConnectionId>(1);
92-
let net = net.get(..).unwrap();
93-
94-
let mut prev_xp = table.field_unchecked::<Xp>(2);
95-
let prev_xp = prev_xp.get_mut(..).unwrap();
96-
let prev_xp: &mut [u16] =
97-
core::slice::from_raw_parts_mut(prev_xp.as_mut_ptr().cast(), count);
98-
99-
let mut xp = table.field_unchecked::<Xp>(3);
100-
let xp = xp.get_mut(..).unwrap();
101-
let xp: &mut [u16] =
102-
core::slice::from_raw_parts_mut(xp.as_mut_ptr().cast(), count);
10382

104-
simd_utils::copy_and_get_diff::<_, LANES>(
105-
prev_xp,
106-
xp,
107-
|idx, prev, current| {
108-
debug_assert!(prev != current);
83+
if prev_xp != current {
84+
let visual = current.get_visual();
10985

110-
let net = net.get(idx).unwrap();
111-
112-
let current = Xp::from(*current);
113-
let visual = current.get_visual();
114-
115-
let packet = play::ExperienceBarUpdateS2c {
116-
bar: visual.prop,
117-
level: VarInt(i32::from(visual.level)),
118-
total_xp: VarInt::default(),
119-
};
86+
let packet = play::ExperienceBarUpdateS2c {
87+
bar: visual.prop,
88+
level: VarInt(i32::from(visual.level)),
89+
total_xp: VarInt::default(),
90+
};
12091

121-
let entity = table.entity(idx);
122-
entity.modified::<Xp>();
92+
let entity = table.entity(idx);
93+
entity.modified::<Xp>();
12394

124-
compose.unicast(&packet, *net, system).unwrap();
125-
},
126-
);
127-
}
95+
compose.unicast(&packet, *net, system).unwrap();
12896
}
97+
98+
*prev_xp = *current;
12999
});
130100

131101
system!("entity_metadata_sync", world, &Compose($), &mut MetadataChanges)

crates/hyperion/src/ingress/mod.rs

Lines changed: 24 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use crate::{
3535
skin::PlayerSkin,
3636
},
3737
storage::{Events, PlayerJoinServer, SkinHandler},
38-
util::{SendableRef, TracingExt, mojang::MojangClient},
38+
util::{TracingExt, mojang::MojangClient},
3939
};
4040

4141
#[derive(Component, Debug)]
@@ -311,9 +311,9 @@ impl Module for IngressModule {
311311

312312
let world = it.world();
313313

314-
let mut recv = receive.0.lock();
314+
let recv = &receive.0;
315315

316-
for connect in recv.player_connect.drain(..) {
316+
for connect in recv.player_connect.lock().drain(..) {
317317
info!("player_connect");
318318
let view = world
319319
.entity()
@@ -328,7 +328,7 @@ impl Module for IngressModule {
328328
lookup.insert(connect, view.id());
329329
}
330330

331-
for disconnect in recv.player_disconnect.drain(..) {
331+
for disconnect in recv.player_disconnect.lock().drain(..) {
332332
// will initiate the removal of entity
333333
info!("queue pending remove");
334334
let Some(id) = lookup.get(&disconnect).copied() else {
@@ -341,64 +341,28 @@ impl Module for IngressModule {
341341
}
342342
});
343343

344-
#[expect(
345-
clippy::unwrap_used,
346-
reason = "this is only called once on startup; it should be fine. we mostly care \
347-
about crashing during server execution"
348-
)]
349-
let num_threads = i32::try_from(rayon::current_num_threads()).unwrap();
350-
351-
let worlds = (0..num_threads)
352-
// SAFETY: promoting world to static lifetime, system won't outlive world
353-
.map(|i| unsafe { std::mem::transmute(world.stage(i)) })
354-
.map(SendableRef)
355-
.collect::<Vec<_>>();
356-
357-
system!(
358-
"ingress_to_ecs",
359-
world,
360-
&StreamLookup($),
361-
&ReceiveState($),
362-
)
363-
.immediate(true)
364-
.kind::<flecs::pipeline::PostLoad>()
365-
.each(move |(lookup, receive)| {
366-
use rayon::prelude::*;
367-
368-
// 134µs with par_iter
369-
// 150-208µs with regular drain
370-
let span = info_span!("ingress_to_ecs");
371-
let _enter = span.enter();
372-
373-
let mut recv = receive.0.lock();
374-
375-
recv.packets.par_drain().for_each(|(entity_id, bytes)| {
376-
#[expect(
377-
clippy::indexing_slicing,
378-
reason = "it should be impossible to get a thread index that is out of bounds \
379-
unless the rayon thread pool changes size which does not occur"
380-
)]
381-
let world = &worlds[rayon::current_thread_index().unwrap_or_default()];
382-
let world = &world.0;
383-
384-
let Some(entity_id) = lookup.get(&entity_id) else {
385-
// this is not necessarily a bug; race conditions occur
386-
warn!("player_packets: entity for {entity_id:?}");
387-
return;
388-
};
389-
390-
if !world.is_alive(*entity_id) {
344+
world
345+
.system_named::<(&ReceiveState, &ConnectionId, &mut PacketDecoder)>("ingress_to_ecs")
346+
.term_at(0u32)
347+
.singleton() // StreamLookup
348+
// .multi_threaded()
349+
.immediate(true)
350+
.kind::<flecs::pipeline::PostLoad>()
351+
.each(move |(receive, connection_id, decoder)| {
352+
// 134µs with par_iter
353+
// 150-208µs with regular drain
354+
let span = info_span!("ingress_to_ecs");
355+
let _enter = span.enter();
356+
357+
let mut bytes = receive.0.packets.get_mut(&connection_id.inner()).unwrap();
358+
if bytes.is_empty() {
391359
return;
392360
}
393361

394-
let entity = world.entity_from_id(*entity_id);
395-
396-
entity.get::<&mut PacketDecoder>(|decoder| {
397-
decoder.shift_excess();
398-
decoder.queue_slice(bytes.as_ref());
399-
});
362+
decoder.shift_excess();
363+
decoder.queue_slice(bytes.as_ref());
364+
bytes.clear();
400365
});
401-
});
402366

403367
system!(
404368
"remove_player_from_visibility",
@@ -591,9 +555,10 @@ impl Module for IngressModule {
591555
// todo: better way?
592556
if let Some((position, pose)) = position.as_mut().zip(pose.as_mut()) {
593557
let world = &world;
558+
let id = entity.id();
594559

595560
let mut query = PacketSwitchQuery {
596-
id: entity.id(),
561+
id,
597562
view: entity,
598563
compose,
599564
io_ref,

crates/hyperion/src/net/proxy.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
//! Communication to a proxy which forwards packets to the players.
22
3-
use std::{collections::HashMap, io::Cursor, net::SocketAddr, process::Command, sync::Arc};
3+
use std::{io::Cursor, net::SocketAddr, process::Command, sync::Arc};
44

55
use bytes::{Buf, BytesMut};
6+
use dashmap::DashMap;
67
use flecs_ecs::macros::Component;
78
use hyperion_proto::ArchivedProxyToServerMessage;
89
use parking_lot::Mutex;
@@ -15,11 +16,11 @@ use crate::{runtime::AsyncRuntime, simulation::EgressComm};
1516
#[derive(Default)]
1617
pub struct ReceiveStateInner {
1718
/// All players who have recently connected to the server.
18-
pub player_connect: Vec<u64>,
19+
pub player_connect: Mutex<Vec<u64>>,
1920
/// All players who have recently disconnected from the server.
20-
pub player_disconnect: Vec<u64>,
21+
pub player_disconnect: Mutex<Vec<u64>>,
2122
/// A map of stream ids to the corresponding [`BytesMut`] buffers. This represents data from the client to the server.
22-
pub packets: HashMap<u64, BytesMut>,
23+
pub packets: DashMap<u64, BytesMut>,
2324
}
2425

2526
fn get_pid_from_port(port: u16) -> Result<Option<u32>, std::io::Error> {
@@ -44,7 +45,7 @@ fn get_pid_from_port(port: u16) -> Result<Option<u32>, std::io::Error> {
4445
async fn inner(
4546
socket: SocketAddr,
4647
mut server_to_proxy: tokio::sync::mpsc::UnboundedReceiver<bytes::Bytes>,
47-
shared: Arc<Mutex<ReceiveStateInner>>,
48+
shared: Arc<ReceiveStateInner>,
4849
) {
4950
let listener = match tokio::net::TcpListener::bind(socket).await {
5051
Ok(listener) => listener,
@@ -120,17 +121,16 @@ async fn inner(
120121
ArchivedProxyToServerMessage::PlayerConnect(message) => {
121122
let Ok(stream) = rkyv::deserialize::<u64, !>(&message.stream);
122123

123-
shared.lock().player_connect.push(stream);
124+
shared.player_connect.lock().push(stream);
124125
}
125126
ArchivedProxyToServerMessage::PlayerDisconnect(message) => {
126127
let Ok(stream) = rkyv::deserialize::<u64, !>(&message.stream);
127-
shared.lock().player_disconnect.push(stream);
128+
shared.player_disconnect.lock().push(stream);
128129
}
129130
ArchivedProxyToServerMessage::PlayerPackets(message) => {
130131
let Ok(stream) = rkyv::deserialize::<u64, !>(&message.stream);
131132

132133
shared
133-
.lock()
134134
.packets
135135
.entry(stream)
136136
.or_default()
@@ -152,13 +152,13 @@ async fn inner(
152152

153153
/// A wrapper around [`ReceiveStateInner`]
154154
#[derive(Component)]
155-
pub struct ReceiveState(pub Arc<Mutex<ReceiveStateInner>>);
155+
pub struct ReceiveState(pub Arc<ReceiveStateInner>);
156156

157157
/// Initializes proxy communications.
158158
#[must_use]
159159
pub fn init_proxy_comms(tasks: &AsyncRuntime, socket: SocketAddr) -> (ReceiveState, EgressComm) {
160160
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
161-
let shared = Arc::new(Mutex::new(ReceiveStateInner::default()));
161+
let shared = Arc::new(ReceiveStateInner::default());
162162

163163
tasks.block_on(async {
164164
inner(socket, rx, shared.clone()).await;

crates/hyperion/src/simulation/handlers.rs

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -88,25 +88,23 @@ fn change_position_or_correct_client(
8888
.entity_view(query.world)
8989
.set(PendingTeleportation::new(pose.position));
9090
}
91-
query
92-
.view
93-
.get::<(&mut MovementTracking, &Yaw)>(|(tracking, yaw)| {
94-
tracking.received_movement_packets += 1;
95-
let y_delta = proposed.y - pose.y;
96-
97-
if y_delta > 0. && tracking.was_on_ground && !on_ground {
98-
tracking.server_velocity.y = 0.419_999_986_886_978_15;
99-
100-
if tracking.sprinting {
101-
let smth = yaw.yaw * 0.017_453_292;
102-
tracking.server_velocity += DVec3::new(
103-
f64::from(-smth.sin()) * 0.2,
104-
0.0,
105-
f64::from(smth.cos()) * 0.2,
106-
);
107-
}
91+
query.view.get::<&mut MovementTracking>(|tracking| {
92+
tracking.received_movement_packets += 1;
93+
let y_delta = proposed.y - pose.y;
94+
95+
if y_delta > 0. && tracking.was_on_ground && !on_ground {
96+
tracking.server_velocity.y = 0.419_999_986_886_978_15;
97+
98+
if tracking.sprinting {
99+
let smth = query.yaw.yaw * 0.017_453_292;
100+
tracking.server_velocity += DVec3::new(
101+
f64::from(-smth.sin()) * 0.2,
102+
0.0,
103+
f64::from(smth.cos()) * 0.2,
104+
);
108105
}
109-
});
106+
}
107+
});
110108

111109
**pose = proposed;
112110
}
@@ -589,13 +587,13 @@ pub fn confirm_teleportation(
589587
) -> anyhow::Result<()> {
590588
let entity = query.id.entity_view(query.world);
591589

592-
entity.get::<(Option<&PendingTeleportation>, &mut Position)>(|(pending_teleport, position)| {
590+
entity.get::<Option<&PendingTeleportation>>(|pending_teleport| {
593591
if let Some(pending_teleport) = pending_teleport {
594592
if VarInt(pending_teleport.teleport_id) != pkt.teleport_id {
595593
return;
596594
}
597595

598-
position.position = pending_teleport.destination;
596+
**query.position = pending_teleport.destination;
599597
entity.remove::<PendingTeleportation>();
600598
}
601599
});

crates/hyperion/src/simulation/metadata/mod.rs

Lines changed: 4 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -47,34 +47,11 @@ where
4747
)>(system_name)
4848
.multi_threaded()
4949
.kind::<flecs::pipeline::OnUpdate>()
50-
.run(move |mut table| {
51-
while table.next() {
52-
unsafe {
53-
let mut prev = table.field_unchecked::<T>(0);
54-
let prev = prev.get_mut(..).unwrap();
55-
56-
let current = table.field_unchecked::<T>(1);
57-
let current = current.get(..).unwrap();
58-
59-
let mut metadata_changes = table.field_unchecked::<MetadataChanges>(2);
60-
let metadata_changes = metadata_changes.get_mut(..).unwrap();
61-
62-
// todo(perf): big optimization treating as raw bytes and SIMD
63-
// or code that can easily be compiled to SIMD
64-
// also can do copy_from_slice in one pass but want SIMD-optimized
65-
// first
66-
// todo(learn): reborrowing in-depth
67-
for (idx, (prev, current)) in itertools::zip_eq(&mut *prev, current).enumerate()
68-
{
69-
if prev != current {
70-
let metadata_changes = metadata_changes.get_unchecked_mut(idx);
71-
metadata_changes.encode(*current);
72-
}
73-
}
74-
75-
prev.copy_from_slice(current);
76-
}
50+
.each(|(prev, current, metadata_changes)| {
51+
if prev != current {
52+
metadata_changes.encode(*current);
7753
}
54+
*prev = *current;
7855
});
7956

8057
let register = |view: &mut EntityView<'_>| {

justfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ fmt:
4949
cargo fmt
5050

5151
proxy:
52-
ulimit -Sn {{fds}} && cargo run --profile release-full --bin hyperion-proxy
52+
ulimit -Sn {{fds}} && cargo run --profile release-full --bin hyperion-proxy -- --server "127.0.0.1:35565" "0.0.0.0:25565"
5353

5454
tag-full:
5555
cargo run --bin tag --profile release-full
@@ -83,7 +83,7 @@ watch:
8383
debug:
8484
#!/usr/bin/env -S parallel --shebang --ungroup --jobs 3
8585
RUST_BACKTRACE=full RUN_MODE=debug-{{arch}} cargo watch --postpone --no-vcs-ignores -w {{project_root}}/.trigger-debug -s './target/debug/tag'
86-
RUST_BACKTRACE=full ulimit -Sn {{fds}} && cargo run --bin hyperion-proxy --release-full -- --server "127.0.0.1:35565" "0.0.0.0:25565"
86+
RUST_BACKTRACE=full ulimit -Sn {{fds}} && cargo run --bin hyperion-proxy -- --server "127.0.0.1:35565" "0.0.0.0:25565"
8787
cargo watch -w '{{project_root}}/crates/hyperion' -w '{{project_root}}/events/tag' -s 'cargo check -p tag && cargo build -p tag' -s 'touch {{project_root}}/.trigger-debug'
8888

8989
# run in release mode with tracy; auto-restarts on changes

0 commit comments

Comments
 (0)