diff --git a/Cargo.lock b/Cargo.lock index 9b5e417e28..fda9edaac5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2103,6 +2103,7 @@ dependencies = [ "libloading", "log", "num-bigint", + "num_cpus", "pkcs8 0.11.0-rc.8", "pumpkin-config", "pumpkin-data", diff --git a/pumpkin-world/src/chunk/format/mod.rs b/pumpkin-world/src/chunk/format/mod.rs index 5ebf4b5503..9bf256a34b 100644 --- a/pumpkin-world/src/chunk/format/mod.rs +++ b/pumpkin-world/src/chunk/format/mod.rs @@ -299,9 +299,11 @@ impl ChunkEntityData { | (uuid[3] as u128), ), None => { - println!( + log::error!( "Entity in chunk {},{} is missing UUID: {:?}", - position.x, position.y, entity_nbt + position.x, + position.y, + entity_nbt ); continue; } diff --git a/pumpkin-world/src/chunk_system.rs b/pumpkin-world/src/chunk_system.rs index 25f2749b38..b7cc914e20 100644 --- a/pumpkin-world/src/chunk_system.rs +++ b/pumpkin-world/src/chunk_system.rs @@ -12,6 +12,8 @@ use crate::chunk::{ChunkData, ChunkHeightmapType, ChunkLight, ChunkSections, Sub use pumpkin_data::dimension::Dimension; use std::default::Default; use std::pin::Pin; +use std::time::Instant; +use tokio_util::sync::CancellationToken; use crate::generation::height_limit::HeightLimitView; @@ -33,6 +35,7 @@ use pumpkin_util::HeightMap; use pumpkin_util::math::position::BlockPos; use pumpkin_util::math::vector2::Vector2; use pumpkin_util::math::vector3::Vector3; +use tokio::runtime::Builder; use std::cmp::{Ordering, PartialEq, max, min}; use std::collections::hash_map::Entry; @@ -59,7 +62,34 @@ type HashMapType = FxHashMap; type HashSetType = FxHashSet; type ChunkPos = Vector2; type ChunkLevel = HashMapType; -type IOLock = Arc<(Mutex>, Condvar)>; + +#[derive(Debug, Clone)] +struct PendingWrites { + pending: Arc>>, +} + +impl PendingWrites { + fn new() -> Self { + Self { + pending: Default::default(), + } + } +} + +#[derive(Debug, Clone)] +struct PendingWriteEntry { + token: Arc, + count: usize, +} + +impl PendingWriteEntry { + fn new() -> Self { + Self { + token: Arc::new(CancellationToken::new()), + count: 1, + } + } +} pub struct HeapNode(i8, ChunkPos); impl PartialEq for HeapNode { @@ -1386,9 +1416,9 @@ pub struct GenerationSchedule { chunk_map: HashMap, unload_chunks: HashSetType, - io_lock: IOLock, + pending_writes: PendingWrites, running_task_count: u16, - recv_chunk: crossfire::MRx<(ChunkPos, RecvChunk)>, + recv_chunk: crossfire::MAsyncRx<(ChunkPos, RecvChunk)>, io_read: crossfire::MTx, io_write: crossfire::Tx>, generate: crossfire::MTx<(ChunkPos, Cache, StagedChunkEnum)>, @@ -1397,26 +1427,26 @@ pub struct GenerationSchedule { impl GenerationSchedule { pub fn create( - oi_read_thread_count: usize, + io_read_thread_count: usize, gen_thread_count: usize, level: Arc, level_channel: Arc, listener: Arc, thread_tracker: &mut Vec>, + write_futures: &mut Vec + Send + 'static>>>, ) { let tracker = &level.chunk_system_tasks; - let (send_chunk, recv_chunk) = crossfire::mpmc::unbounded_blocking(); - let (send_read_io, recv_read_io) = - crossfire::mpmc::bounded_tx_blocking_rx_async(oi_read_thread_count + 2); + let (send_chunk, recv_chunk) = crossfire::mpmc::unbounded_async(); + let (send_read_io, recv_read_io) = crossfire::mpmc::unbounded_async(); let (send_write_io, recv_write_io) = crossfire::spsc::unbounded_async(); - let (send_gen, recv_gen) = crossfire::mpmc::bounded_blocking(gen_thread_count + 5); - let io_lock = Arc::new((Mutex::new(HashMapType::default()), Condvar::new())); - for _ in 0..oi_read_thread_count { + let (send_gen, recv_gen) = crossfire::mpmc::unbounded_blocking(); + let pending_writes: PendingWrites = PendingWrites::new(); + for _ in 0..io_read_thread_count { tracker.spawn(Self::io_read_work( recv_read_io.clone(), send_chunk.clone(), level.clone(), - io_lock.clone(), + pending_writes.clone(), )); } for i in 0..gen_thread_count { @@ -1433,34 +1463,40 @@ impl GenerationSchedule { ); } - tracker.spawn(Self::io_write_work( + let level_clone = level.clone(); + let pending_writes_clone = pending_writes.clone(); + write_futures.push(Box::pin(Self::io_write_work( recv_write_io, - level.clone(), - io_lock.clone(), - )); + level_clone, + pending_writes_clone, + ))); let builder = thread::Builder::new().name("Schedule Thread".to_string()); thread_tracker.push( builder .spawn(move || { - Self { - queue: BinaryHeap::new(), - graph: DAG::default(), - last_level: ChunkLevel::default(), - last_high_priority: Vec::new(), - send_level: level_channel, - public_chunk_map: level.loaded_chunks.clone(), - unload_chunks: HashSetType::default(), - io_lock, - running_task_count: 0, - recv_chunk, - io_read: send_read_io, - io_write: send_write_io, - generate: send_gen, - listener, - chunk_map: Default::default(), - } - .work(level); + let rt = Builder::new_current_thread().enable_all().build().unwrap(); + rt.block_on(async move { + Self { + queue: BinaryHeap::new(), + graph: DAG::default(), + last_level: ChunkLevel::default(), + last_high_priority: Vec::new(), + send_level: level_channel, + public_chunk_map: level.loaded_chunks.clone(), + unload_chunks: HashSetType::default(), + pending_writes, + running_task_count: 0, + recv_chunk, + io_read: send_read_io, + io_write: send_write_io, + generate: send_gen, + listener, + chunk_map: Default::default(), + } + .work(level) + .await; + }); }) .unwrap(), ) @@ -1626,7 +1662,7 @@ impl GenerationSchedule { recv: crossfire::MAsyncRx, send: crossfire::MTx<(ChunkPos, RecvChunk)>, level: Arc, - lock: IOLock, + lock: PendingWrites, ) { log::info!("io read thread start"); use crate::biome::hash_seed; @@ -1635,12 +1671,19 @@ impl GenerationSchedule { let (t_send, mut t_recv) = tokio::sync::mpsc::channel(2); while let Ok(pos) = recv.recv().await { // debug!("io read thread receive chunk pos {pos:?}"); - { - let mut data = lock.0.lock().unwrap(); - while data.contains_key(&pos) { - data = tokio::task::block_in_place(|| lock.1.wait(data).unwrap()); + loop { + let notify = { + let pending_writes = lock.pending.lock().await; + pending_writes.get(&pos).cloned() + }; + match notify { + Some(n) => { + n.token.cancelled().await; + } + None => break, } } + level .chunk_saver .fetch_chunks(&level.level_folder, &[pos], t_send.clone()) @@ -1694,11 +1737,17 @@ impl GenerationSchedule { log::info!("io read thread stop"); } - async fn io_write_work(recv: AsyncRx>, level: Arc, lock: IOLock) { - log::info!("io write thread start",); + async fn io_write_work( + recv: AsyncRx>, + level: Arc, + lock: PendingWrites, + ) { + log::info!("io write thread start"); + while let Ok(data) = recv.recv().await { - // debug!("io write thread receive chunks size {}", data.len()); let mut vec = Vec::with_capacity(data.len()); + log::info!("io write thread receive chunks size {}", data.len()); + let start = Instant::now(); for (pos, chunk) in data { match chunk { Chunk::Level(chunk) => vec.push((pos, chunk)), @@ -1710,34 +1759,31 @@ impl GenerationSchedule { } } } - let pos = vec.iter().map(|(pos, _)| *pos).collect_vec(); + let positions = vec.iter().map(|(pos, _)| *pos).collect_vec(); level .chunk_saver .save_chunks(&level.level_folder, vec) .await .unwrap(); - for i in pos { - let mut data = lock.0.lock().unwrap(); - match data.entry(i) { - Entry::Occupied(mut entry) => { - let rc = entry.get_mut(); - if *rc == 1 { - entry.remove(); - drop(data); - lock.1.notify_all(); - } else { - *rc -= 1; + + { + let mut data = lock.pending.lock().await; + for pos in &positions { + if let Entry::Occupied(mut entry) = data.entry(*pos) { + let e = entry.get_mut(); + e.count -= 1; + if e.count == 0 { + let n = entry.remove(); + n.token.cancel(); } + } else { + panic!("Position {:?} not in lock map!", pos); } - Entry::Vacant(_) => panic!(), } } + log::info!("Completed write in {:?}", start.elapsed()); } - log::info!( - "io write thread stop id: {:?} name: {}", - thread::current().id(), - thread::current().name().unwrap_or("unknown") - ); + log::info!("io write task stop"); } fn generation_work( @@ -1745,15 +1791,13 @@ impl GenerationSchedule { send: crossfire::MTx<(ChunkPos, RecvChunk)>, level: Arc, ) { - log::debug!( - "generation thread start id: {:?} name: {}", - thread::current().id(), - thread::current().name().unwrap_or("unknown") - ); + log::info!("{:?} - generation thread start", thread::current().name()); let settings = gen_settings_from_dimension(&level.world_gen.dimension); + log::info!("Waiting for initial generation request!"); while let Ok((pos, mut cache, stage)) = recv.recv() { - // debug!("generation thread receive chunk pos {pos:?} to stage {stage:?}"); + log::info!("generation thread received chunk pos {pos:?} to stage {stage:?}"); + let start = Instant::now(); cache.advance( stage, level.block_registry.as_ref(), @@ -1763,18 +1807,23 @@ impl GenerationSchedule { &level.world_gen.base_router, level.world_gen.dimension, ); + log::info!( + "generation of {pos:?} to stage {stage:?} took {:?}", + start.elapsed() + ); if send.send((pos, RecvChunk::Generation(cache))).is_err() { + log::error!("Send of generated chunk failed, stopping thread!"); break; } + log::info!("Waiting for new generation request!"); } - log::debug!( - "generation thread stop id: {:?} name: {}", - thread::current().id(), - thread::current().name().unwrap_or("unknown") + log::info!( + "{:?} - generation thread stop", + thread::current().name().unwrap_or("Unknown") ); } - fn unload_chunk(&mut self) { + async fn unload_chunk(&mut self) { let mut unload_chunks = HashSetType::default(); swap(&mut unload_chunks, &mut self.unload_chunks); let mut chunks = Vec::with_capacity(unload_chunks.len()); @@ -1818,15 +1867,20 @@ impl GenerationSchedule { if chunks.is_empty() { return; } - let mut data = self.io_lock.0.lock().unwrap(); + let mut data = self.pending_writes.pending.lock().await; for (pos, _chunk) in &chunks { - *data.entry(*pos).or_insert(0) += 1; + if data.contains_key(pos) { + continue; + } + data.entry(*pos) + .and_modify(|it| it.count += 1) + .or_insert(PendingWriteEntry::new()); } drop(data); self.io_write.send(chunks).expect("io write thread stop"); } - fn save_all_chunk(&self, save_proto_chunk: bool) { + async fn save_all_chunks(&self, save_proto_chunk: bool) { let mut chunks = Vec::with_capacity(self.chunk_map.len()); for (pos, chunk) in &self.chunk_map { if let Some(chunk) = &chunk.chunk { @@ -1846,9 +1900,14 @@ impl GenerationSchedule { if chunks.is_empty() { return; } - let mut data = self.io_lock.0.lock().unwrap(); + let mut data = self.pending_writes.pending.lock().await; for (pos, _chunk) in &chunks { - *data.entry(*pos).or_insert(0) += 1; + if data.contains_key(pos) { + continue; + } + data.entry(*pos) + .and_modify(|it| it.count += 1) + .or_insert(PendingWriteEntry::new()); } drop(data); self.io_write.send(chunks).expect("io write thread stop"); @@ -1977,28 +2036,24 @@ impl GenerationSchedule { self.running_task_count -= 1; } - fn work(mut self, level: Arc) { - log::info!( - "schedule thread start id: {:?} name: {}", - thread::current().id(), - thread::current().name().unwrap_or("unknown") - ); + async fn work(mut self, level: Arc) { + log::info!("schedule thread start"); // let mut clock = Instant::now(); loop { + if level.shut_down_chunk_system.load(Relaxed) { + log::info!("shut down signal"); + break; + } if level.should_unload.load(Relaxed) { - // log::debug!("unload chunk signal"); - self.unload_chunk(); + log::info!("unload chunk signal"); + self.unload_chunk().await; level.should_unload.store(false, Relaxed); } if level.should_save.load(Relaxed) { - // log::debug!("save all chunk signal"); - self.save_all_chunk(false); + log::info!("save all chunk signal"); + self.save_all_chunks(false).await; level.should_save.store(false, Relaxed); } - if level.shut_down_chunk_system.load(Relaxed) { - // log::debug!("shut down signal"); - break; - } 'out2: while let Some(task) = self.queue.pop() { if self.resort_work(self.send_level.get()) { @@ -2113,7 +2168,7 @@ impl GenerationSchedule { self.running_task_count += 1; self.generate .send((node.pos, cache, node.stage)) - .expect("generate thread close unexpectedly"); + .expect("generate thread closed unexpectedly"); } } } @@ -2121,7 +2176,7 @@ impl GenerationSchedule { if self.queue.is_empty() { // debug!("the queue is empty. thread sleep"); while self.running_task_count > 0 && self.queue.is_empty() { - let (pos, data) = self.recv_chunk.recv().expect("recv_chunk stop"); + let (pos, data) = self.recv_chunk.recv().await.expect("recv_chunk stop"); self.receive_chunk(pos, data); self.resort_work(self.send_level.get()); } @@ -2133,19 +2188,18 @@ impl GenerationSchedule { } } } - log::info!("waiting all generation task finished"); + log::info!("Waiting for all generation tasks to be finished"); while self.running_task_count > 0 { - let (pos, data) = self.recv_chunk.recv().expect("recv_chunk stop"); + let (pos, data) = self.recv_chunk.recv().await.expect("recv_chunk stop"); self.receive_chunk(pos, data); } log::info!("saving all chunks"); - self.save_all_chunk(true); - log::info!("there are {} chunks to write", self.io_write.len()); log::info!( - "schedule thread stop id: {:?} name: {}", - thread::current().id(), - thread::current().name().unwrap_or("unknown") + "there are {} chunks to write for final save", + self.io_write.len() ); + self.save_all_chunks(true).await; + log::info!("schedule thread stop id"); } fn debug_check(&self) -> bool { diff --git a/pumpkin-world/src/dimension.rs b/pumpkin-world/src/dimension.rs index d6c3af4f0d..02f05e974b 100644 --- a/pumpkin-world/src/dimension.rs +++ b/pumpkin-world/src/dimension.rs @@ -1,4 +1,4 @@ -use std::{path::PathBuf, sync::Arc}; +use std::{path::PathBuf, pin::Pin, sync::Arc}; use pumpkin_config::world::LevelConfig; use pumpkin_data::dimension::Dimension; @@ -11,6 +11,7 @@ pub fn into_level( mut base_directory: PathBuf, block_registry: Arc, seed: i64, + write_futures: &mut Vec + Send + 'static>>>, ) -> Arc { if dimension == Dimension::OVERWORLD { } else if dimension == Dimension::THE_NETHER { @@ -24,5 +25,6 @@ pub fn into_level( block_registry, seed, dimension, + write_futures, ) } diff --git a/pumpkin-world/src/level.rs b/pumpkin-world/src/level.rs index 5b13ede20c..123de6679f 100644 --- a/pumpkin-world/src/level.rs +++ b/pumpkin-world/src/level.rs @@ -2,14 +2,14 @@ use crate::chunk_system::{ChunkListener, ChunkLoading, GenerationSchedule, Level use crate::generation::generator::VanillaGenerator; use crate::{ BlockStateId, - block::{RawBlockState, entities::BlockEntity}, + block::RawBlockState, chunk::{ ChunkData, ChunkEntityData, ChunkReadingError, format::{anvil::AnvilChunkFile, linear::LinearFile}, io::{Dirtiable, FileIO, LoadedData, file_manager::ChunkFileManager}, }, generation::get_world_gen, - tick::{OrderedTick, ScheduledTick, TickPriority}, + tick::{ScheduledTick, TickPriority}, world::BlockRegistryExt, }; use crossbeam::channel::Sender; @@ -19,12 +19,12 @@ use num_traits::Zero; use pumpkin_config::{chunk::ChunkConfig, world::LevelConfig}; use pumpkin_data::biome::Biome; use pumpkin_data::dimension::Dimension; -use pumpkin_data::{Block, block_properties::has_random_ticks, fluid::Fluid}; +use pumpkin_data::{Block, fluid::Fluid}; use pumpkin_util::math::{position::BlockPos, vector2::Vector2}; use pumpkin_util::world_seed::Seed; -use rand::{Rng, SeedableRng, rngs::SmallRng}; +use std::ops::Div; +use std::pin::Pin; use std::sync::Mutex; -// use std::time::Duration; use std::{ collections::HashMap, path::PathBuf, @@ -34,17 +34,17 @@ use std::{ }, thread, }; -// use tokio::runtime::Handle; use tokio::time::Instant; use tokio::{ select, sync::{ - Notify, RwLock, + RwLock, mpsc::{self, UnboundedReceiver}, oneshot, }, task::JoinHandle, }; +use tokio_util::sync::CancellationToken; use tokio_util::task::TaskTracker; pub type SyncChunk = Arc>; @@ -84,7 +84,7 @@ pub struct Level { tasks: TaskTracker, pub chunk_system_tasks: TaskTracker, /// Notification that interrupts tasks for shutdown - pub shutdown_notifier: Notify, + pub shutdown_notifier: CancellationToken, pub is_shutting_down: AtomicBool, pub shut_down_chunk_system: AtomicBool, @@ -99,13 +99,6 @@ pub struct Level { pub chunk_listener: Arc, } -pub struct TickData { - pub block_ticks: Vec>, - pub fluid_ticks: Vec>, - pub random_ticks: Vec>, - pub block_entities: Vec>, -} - #[derive(Clone)] pub struct LevelFolder { pub root_folder: PathBuf, @@ -133,6 +126,7 @@ impl Level { block_registry: Arc, seed: i64, dimension: Dimension, + write_futures: &mut Vec + Send + 'static>>>, ) -> Arc { // If we are using an already existing world we want to read the seed from the level.dat, If not we want to check if there is a seed in the config, if not lets create a random one let region_folder = root_folder.join("region"); @@ -191,7 +185,7 @@ impl Level { chunk_watchers: Arc::new(DashMap::new()), tasks: TaskTracker::new(), chunk_system_tasks: TaskTracker::new(), - shutdown_notifier: Notify::new(), + shutdown_notifier: CancellationToken::new(), is_shutting_down: AtomicBool::new(false), shut_down_chunk_system: AtomicBool::new(false), should_save: AtomicBool::new(false), @@ -203,7 +197,7 @@ impl Level { chunk_listener: listener.clone(), }); - let num_threads = num_cpus::get().saturating_sub(2).max(1); + let num_threads = (3 * num_cpus::get()).div(12).max(1); GenerationSchedule::create( 4, @@ -212,11 +206,12 @@ impl Level { level_channel, listener, level_ref.thread_tracker.lock().unwrap().as_mut(), + write_futures, ); // let mut tracker = level_ref.thread_tracker.lock().unwrap(); // Entity Chunks - for thread_id in 0..(num_threads / 2).max(1) { + for thread_id in 0..1 { let level_clone = level_ref.clone(); let pending_clone = pending_entity_generations.clone(); let rx = gen_entity_request_rx.clone(); @@ -284,10 +279,10 @@ impl Level { log::info!("Saving level..."); self.is_shutting_down.store(true, Ordering::Relaxed); - self.shutdown_notifier.notify_waiters(); + self.shutdown_notifier.cancel(); self.tasks.close(); - log::debug!("Awaiting level tasks"); + log::info!("Awaiting level tasks"); #[cfg(feature = "tokio_taskdump")] match tokio::time::timeout(std::time::Duration::from_secs(30), self.tasks.wait()).await { Ok(guard) => guard, @@ -297,7 +292,7 @@ impl Level { } }; self.tasks.wait().await; - log::debug!("Done awaiting level chunk tasks"); + log::info!("Done awaiting level chunk tasks"); self.shut_down_chunk_system.store(true, Ordering::Relaxed); self.level_channel.notify(); @@ -308,6 +303,9 @@ impl Level { lock.drain(..).collect() }; + log::info!("Wait chunk system tasks stop"); + self.chunk_system_tasks.close(); + for handle in handles { log::info!( "Waiting for thread {:?} ({}) to stop", @@ -320,8 +318,6 @@ impl Level { } } - log::info!("Wait chunk system tasks stop"); - self.chunk_system_tasks.close(); #[cfg(feature = "tokio_taskdump")] match tokio::time::timeout(std::time::Duration::from_secs(30), self.tasks.wait()).await { Ok(guard) => guard, @@ -491,82 +487,6 @@ impl Level { }); } - // Gets random ticks, block ticks and fluid ticks - pub async fn get_tick_data(&self) -> TickData { - let mut ticks = TickData { - block_ticks: Vec::new(), - fluid_ticks: Vec::new(), - random_ticks: Vec::with_capacity(self.loaded_chunks.len() * 3 * 16 * 16), - block_entities: Vec::new(), - }; - - let mut rng = SmallRng::from_rng(&mut rand::rng()); - let chunks = self - .loaded_chunks - .iter() - .map(|x| x.value().clone()) - .collect::>(); - for chunk in chunks { - let mut chunk = chunk.write().await; - ticks.block_ticks.append(&mut chunk.block_ticks.step_tick()); - ticks.fluid_ticks.append(&mut chunk.fluid_ticks.step_tick()); - - let chunk = chunk.downgrade(); - - let chunk_x_base = chunk.x * 16; - let chunk_z_base = chunk.z * 16; - - let mut section_blocks = Vec::new(); - for i in 0..chunk.section.sections.len() { - let mut section_block_data = Vec::new(); - - //TODO use game rules to determine how many random ticks to perform - for _ in 0..3 { - let r = rng.random::(); - let x_offset = (r & 0xF) as i32; - let y_offset = ((r >> 4) & 0xF) as i32 - 32; - let z_offset = (r >> 8 & 0xF) as i32; - - let random_pos = BlockPos::new( - chunk_x_base + x_offset, - i as i32 * 16 + y_offset, - chunk_z_base + z_offset, - ); - - let block_state_id = chunk - .section - .get_block_absolute_y(x_offset as usize, random_pos.0.y, z_offset as usize) - .unwrap_or(Block::AIR.default_state.id); - - section_block_data.push((random_pos, block_state_id)); - } - section_blocks.push(section_block_data); - } - - for section_data in section_blocks { - for (random_pos, block_state_id) in section_data { - if has_random_ticks(block_state_id) { - ticks.random_ticks.push(ScheduledTick { - position: random_pos, - delay: 0, - priority: TickPriority::Normal, - value: (), - }); - } - } - } - - ticks - .block_entities - .extend(chunk.block_entities.values().cloned()); - } - - ticks.block_ticks.sort_unstable(); - ticks.fluid_ticks.sort_unstable(); - - ticks - } - pub async fn clean_entity_chunk(self: &Arc, chunk: &Vector2) { self.clean_entity_chunks(&[*chunk]).await; } @@ -649,7 +569,7 @@ impl Level { let level = self.clone(); self.spawn_task(async move { - let cancel_notifier = level.shutdown_notifier.notified(); + let cancel_notifier = level.shutdown_notifier.cancelled(); let fetch_task = async { let mut to_fetch = Vec::new(); diff --git a/pumpkin/Cargo.toml b/pumpkin/Cargo.toml index e4f964e4b7..4f09b31d38 100644 --- a/pumpkin/Cargo.toml +++ b/pumpkin/Cargo.toml @@ -43,6 +43,7 @@ tokio = { workspace = true, features = [ ] } thiserror.workspace = true futures.workspace = true +num_cpus.workspace = true # config serde.workspace = true diff --git a/pumpkin/src/main.rs b/pumpkin/src/main.rs index b04e1f3d0e..bd0f4d334b 100644 --- a/pumpkin/src/main.rs +++ b/pumpkin/src/main.rs @@ -51,7 +51,7 @@ use std::{ use tokio::signal::ctrl_c; #[cfg(unix)] use tokio::signal::unix::{SignalKind, signal}; -use tokio::sync::RwLock; +use tokio::{runtime::Builder, sync::RwLock}; use pumpkin::{LoggerOption, PumpkinServer, SHOULD_STOP, STOP_INTERRUPT, stop_server}; @@ -96,82 +96,90 @@ const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION"); // WARNING: All rayon calls from the tokio runtime must be non-blocking! This includes things // like `par_iter`. These should be spawned in the the rayon pool and then passed to the tokio // runtime with a channel! See `Level::fetch_chunks` as an example! -#[tokio::main] -async fn main() { +fn main() { #[cfg(feature = "console-subscriber")] console_subscriber::init(); - let time = Instant::now(); + let runtime = Builder::new_multi_thread() + .enable_all() + .worker_threads(num_cpus::get() / 2) + .build() + .expect("Failed to create tokio runtime to start the server!"); + runtime.block_on(async move { + let time = Instant::now(); - let exec_dir = std::env::current_dir().unwrap(); - let config_dir = exec_dir.join("config"); + let exec_dir = std::env::current_dir().unwrap(); + let config_dir = exec_dir.join("config"); - let basic_config = BasicConfiguration::load(&config_dir); - let advanced_config = AdvancedConfiguration::load(&config_dir); + let basic_config = BasicConfiguration::load(&config_dir); + let advanced_config = AdvancedConfiguration::load(&config_dir); - pumpkin::init_logger(&advanced_config); + pumpkin::init_logger(&advanced_config); - if let Some((logger_impl, level)) = pumpkin::LOGGER_IMPL.wait() { - log::set_logger(logger_impl).unwrap(); - log::set_max_level(*level); - } - - let default_panic = std::panic::take_hook(); - std::panic::set_hook(Box::new(move |info| { - default_panic(info); - // TODO: Gracefully exit? - // We need to abide by the panic rules here. - std::process::exit(1); - })); - log::info!("Starting Pumpkin {CARGO_PKG_VERSION} Minecraft (Protocol {CURRENT_MC_PROTOCOL})",); - - log::debug!( - "Build info: FAMILY: \"{}\", OS: \"{}\", ARCH: \"{}\", BUILD: \"{}\"", - std::env::consts::FAMILY, - std::env::consts::OS, - std::env::consts::ARCH, - if cfg!(debug_assertions) { - "Debug" - } else { - "Release" + if let Some((logger_impl, level)) = pumpkin::LOGGER_IMPL.wait() { + log::set_logger(logger_impl).unwrap(); + log::set_max_level(*level); } - ); - - log::warn!("Pumpkin is currently under heavy development!"); - log::info!("Report issues on https://github.com/Pumpkin-MC/Pumpkin/issues"); - log::info!("Join our Discord for community support: https://discord.com/invite/wT8XjrjKkf"); - tokio::spawn(async { - setup_sighandler() - .await - .expect("Unable to setup signal handlers"); + let default_panic = std::panic::take_hook(); + std::panic::set_hook(Box::new(move |info| { + default_panic(info); + // TODO: Gracefully exit? + // We need to abide by the panic rules here. + std::process::exit(1); + })); + log::info!( + "Starting Pumpkin {CARGO_PKG_VERSION} Minecraft (Protocol {CURRENT_MC_PROTOCOL})", + ); + + log::debug!( + "Build info: FAMILY: \"{}\", OS: \"{}\", ARCH: \"{}\", BUILD: \"{}\"", + std::env::consts::FAMILY, + std::env::consts::OS, + std::env::consts::ARCH, + if cfg!(debug_assertions) { + "Debug" + } else { + "Release" + } + ); + + log::warn!("Pumpkin is currently under heavy development!"); + log::info!("Report issues on https://github.com/Pumpkin-MC/Pumpkin/issues"); + log::info!("Join our Discord for community support: https://discord.com/invite/wT8XjrjKkf"); + + tokio::spawn(async { + setup_sighandler() + .await + .expect("Unable to setup signal handlers"); + }); + + let pumpkin_server = PumpkinServer::new(basic_config, advanced_config).await; + pumpkin_server.init_plugins().await; + + log::info!("Started server; took {}ms", time.elapsed().as_millis()); + let basic_config = &pumpkin_server.server.basic_config; + log::info!( + "Server is now running. Connect using port: {}{}{}", + if basic_config.java_edition { + format!("Java Edition: {}", basic_config.java_edition_address) + } else { + String::new() + }, + if basic_config.java_edition && basic_config.bedrock_edition { + " | " // Separator if both are enabled + } else { + "" + }, + if basic_config.bedrock_edition { + format!("Bedrock Edition: {}", basic_config.bedrock_edition_address) + } else { + String::new() + } + ); + + pumpkin_server.start().await; + log::info!("The server has stopped."); }); - - let pumpkin_server = PumpkinServer::new(basic_config, advanced_config).await; - pumpkin_server.init_plugins().await; - - log::info!("Started server; took {}ms", time.elapsed().as_millis()); - let basic_config = &pumpkin_server.server.basic_config; - log::info!( - "Server is now running. Connect using port: {}{}{}", - if basic_config.java_edition { - format!("Java Edition: {}", basic_config.java_edition_address) - } else { - String::new() - }, - if basic_config.java_edition && basic_config.bedrock_edition { - " | " // Separator if both are enabled - } else { - "" - }, - if basic_config.bedrock_edition { - format!("Bedrock Edition: {}", basic_config.bedrock_edition_address) - } else { - String::new() - } - ); - - pumpkin_server.start().await; - log::info!("The server has stopped."); } fn handle_interrupt() { diff --git a/pumpkin/src/server/mod.rs b/pumpkin/src/server/mod.rs index 2600587d33..6b8dcffabd 100644 --- a/pumpkin/src/server/mod.rs +++ b/pumpkin/src/server/mod.rs @@ -15,6 +15,7 @@ use key_store::KeyStore; use pumpkin_config::{AdvancedConfiguration, BasicConfiguration}; use pumpkin_data::dimension::Dimension; use pumpkin_world::dimension::into_level; +use tokio::runtime::Builder; use crate::command::CommandSender; use pumpkin_macros::send_cancellable; @@ -34,13 +35,14 @@ use pumpkin_world::world_info::{LevelData, WorldInfoError, WorldInfoReader, Worl use rand::seq::{IndexedRandom, IteratorRandom, SliceRandom}; use rsa::RsaPublicKey; use std::collections::HashSet; -use std::fs; use std::net::IpAddr; +use std::pin::Pin; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicI32, AtomicI64, AtomicU32}; +use std::{fs, thread}; use std::{future::Future, sync::atomic::Ordering, time::Duration}; use tokio::sync::{Mutex, RwLock}; -use tokio::task::JoinHandle; +use tokio::task::{JoinHandle, JoinSet}; use tokio_util::task::TaskTracker; mod connection_cache; @@ -107,6 +109,7 @@ pub struct Server { // Gets unlocked when dropped // TODO: Make this a trait _locker: Arc>, + write_thread: Mutex>>, } impl Server { @@ -207,8 +210,11 @@ impl Server { world_info_writer: Arc::new(AnvilLevelInfo), level_info: level_info.clone(), _locker: Arc::new(locker), + write_thread: Mutex::new(None), }; + let mut write_futures: Vec + Send + 'static>>> = Vec::new(); + let server = Arc::new(server); let weak = Arc::downgrade(&server); let level_config = &server.advanced_config.world; @@ -221,6 +227,7 @@ impl Server { world_path.clone(), block_registry.clone(), seed, + &mut write_futures, ), level_info.clone(), Dimension::OVERWORLD, @@ -235,6 +242,7 @@ impl Server { world_path.clone(), block_registry.clone(), seed, + &mut write_futures, ), level_info.clone(), Dimension::THE_NETHER, @@ -249,12 +257,35 @@ impl Server { world_path, block_registry.clone(), seed, + &mut write_futures, ), level_info, Dimension::THE_END, block_registry, weak, ); + + let join_handle = thread::Builder::new() + .name(String::from("Writer Thread")) + .spawn(move || { + Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + .block_on(async { + let mut join_set = JoinSet::new(); + + for future in write_futures { + join_set.spawn(future); + } + + while join_set.join_next().await.is_some() {} + }); + }) + .expect("Failed to start Writer Thread!"); + + *server.write_thread.lock().await = Some(join_handle); + *server .worlds .try_write() @@ -416,13 +447,25 @@ impl Server { self.tasks.wait().await; log::debug!("Done awaiting tasks for server"); - log::info!("Starting worlds"); + log::info!("Shutting down worlds"); for world in self.worlds.read().await.iter() { + log::info!("Shutting down {:?}", world.dimension.minecraft_name); world.shutdown().await; } + + log::info!("Shutting down Writer Thread"); + self.write_thread + .lock() + .await + .take() + .expect("Writer Thread was never initialized!") + .join() + .expect("Writer Thread panicked during execution!"); + + log::info!("Writer Thread stopped!"); + let level_data = self.level_info.read().await; // then lets save the world info - if let Err(err) = self .world_info_writer .write_world_info(&level_data, &self.basic_config.get_world_path()) diff --git a/pumpkin/src/world/mod.rs b/pumpkin/src/world/mod.rs index 23c4b02081..51e7624748 100644 --- a/pumpkin/src/world/mod.rs +++ b/pumpkin/src/world/mod.rs @@ -1,7 +1,6 @@ use std::pin::Pin; use std::sync::Weak; use std::sync::atomic::Ordering::Relaxed; -use std::time::Duration; use std::{ collections::HashMap, sync::{Arc, atomic::Ordering}, @@ -11,10 +10,12 @@ pub mod chunker; pub mod explosion; pub mod loot; pub mod portal; +pub mod tick_data; pub mod time; use crate::block::RandomTickArgs; use crate::world::loot::LootContextParameters; +use crate::world::tick_data::TickData; use crate::{ PLUGIN_MANAGER, block::{ @@ -38,6 +39,7 @@ use bytes::BufMut; use crossbeam::queue::SegQueue; use explosion::Explosion; use pumpkin_config::BasicConfiguration; +use pumpkin_data::block_properties::has_random_ticks; use pumpkin_data::data_component_impl::EquipmentSlot; use pumpkin_data::dimension::Dimension; use pumpkin_data::entity::MobCategory; @@ -107,6 +109,7 @@ use pumpkin_util::{ random::{RandomImpl, get_seed, xoroshiro128::Xoroshiro}, }; use pumpkin_world::chunk::palette::BlockPalette; +use pumpkin_world::tick::ScheduledTick; use pumpkin_world::world::{GetBlockError, WorldFuture}; use pumpkin_world::{ BlockStateId, CURRENT_BEDROCK_MC_VERSION, biome, block::entities::BlockEntity, @@ -115,8 +118,9 @@ use pumpkin_world::{ use pumpkin_world::{chunk::ChunkData, world::BlockAccessor}; use pumpkin_world::{level::Level, tick::TickPriority}; use pumpkin_world::{world::BlockFlags, world_info::LevelData}; +use rand::rngs::SmallRng; use rand::seq::SliceRandom; -use rand::{Rng, rng}; +use rand::{Rng, SeedableRng, rng}; use scoreboard::Scoreboard; use serde::Serialize; use time::LevelTime; @@ -191,6 +195,7 @@ pub struct World { synced_block_event_queue: Mutex>, /// A map of unsent block changes, keyed by block position. unsent_block_changes: Mutex>, + tick_data: tokio::sync::Mutex, } impl World { @@ -222,6 +227,7 @@ impl World { decrease_block_light_queue: SegQueue::new(), increase_block_light_queue: SegQueue::new(), server, + tick_data: Mutex::new(TickData::default()), } } @@ -683,8 +689,8 @@ impl World { } pub async fn tick_chunks(self: &Arc) { - let tick_data = self.level.get_tick_data().await; - for scheduled_tick in tick_data.block_ticks { + self.calculate_tick_data().await; + for scheduled_tick in &self.tick_data.lock().await.block_ticks { let block = self.get_block(&scheduled_tick.position).await; if let Some(pumpkin_block) = self.block_registry.get_pumpkin_block(block) { pumpkin_block @@ -696,7 +702,8 @@ impl World { .await; } } - for scheduled_tick in tick_data.fluid_ticks { + + for scheduled_tick in &self.tick_data.lock().await.fluid_ticks { let fluid = self.get_fluid(&scheduled_tick.position).await; if let Some(pumpkin_fluid) = self.block_registry.get_pumpkin_fluid(fluid) { pumpkin_fluid @@ -705,9 +712,7 @@ impl World { } } - // TODO: Fix this deadlock - // TODO: ^ find this deadlock ^ - for scheduled_tick in tick_data.random_ticks { + for scheduled_tick in &self.tick_data.lock().await.random_ticks { let block = self.get_block(&scheduled_tick.position).await; if let Some(pumpkin_block) = self.block_registry.get_pumpkin_block(block) { pumpkin_block @@ -720,8 +725,6 @@ impl World { } } - let spawn_entity_clock_start = tokio::time::Instant::now(); - let mut spawning_chunks_map = HashMap::new(); // TODO use FixedPlayerDistanceChunkTracker @@ -745,9 +748,6 @@ impl World { let mut spawning_chunks: Vec<(Vector2, Arc>)> = spawning_chunks_map.into_iter().collect(); - let get_chunks_clock = spawn_entity_clock_start.elapsed(); - // log::debug!("spawning chunks size {}", spawning_chunks.len()); - let mut spawn_state = SpawnState::new(spawning_chunks.len() as i32, &self.entities, self).await; // TODO store it @@ -765,28 +765,82 @@ impl World { spawning_chunks.shuffle(&mut rng()); - // TODO i think it can be multithread for (pos, chunk) in &spawning_chunks { self.tick_spawning_chunk(pos, chunk, &spawn_list, &mut spawn_state) .await; } - log::trace!( - "Spawning entity took {:?}, getting chunks {:?}, spawning chunks: {}, avg {:?} per chunk", - spawn_entity_clock_start.elapsed(), - get_chunks_clock, - spawning_chunks.len(), - spawn_entity_clock_start - .elapsed() - .checked_div(spawning_chunks.len() as u32) - .unwrap_or(Duration::new(0, 0)) - ); - for block_entity in tick_data.block_entities { + for block_entity in &self.tick_data.lock().await.block_entities { let world: Arc = self.clone(); block_entity.tick(world).await; } } + /// Calculate Block-, Fluid- and Randomticks and write into the `tick_data` field + pub async fn calculate_tick_data(&self) { + let random_tick_speed = { + let lock = self.level_info.read().await; + lock.game_rules.random_tick_speed + }; + let mut ticks = self.tick_data.lock().await; + ticks.clear(); + + let mut rng = SmallRng::from_rng(&mut rand::rng()); + ticks + .cloned_chunks + .extend(self.level.loaded_chunks.iter().map(|x| x.value().clone())); + let TickData { + block_ticks, + fluid_ticks, + random_ticks, + block_entities, + cloned_chunks, + } = &mut *ticks; + for chunk in cloned_chunks { + let mut chunk = chunk.write().await; + block_ticks.append(&mut chunk.block_ticks.step_tick()); + fluid_ticks.append(&mut chunk.fluid_ticks.step_tick()); + + let chunk = chunk.downgrade(); + + let chunk_base_x = chunk.x * 16; + let chunk_base_z = chunk.z * 16; + for i in 0..chunk.section.sections.len() { + for _ in 0..random_tick_speed { + let r = rng.random::(); + let x_offset = (r & 0xF) as i32; + let y_offset = ((r >> 4) & 0xF) as i32 - 32; + let z_offset = (r >> 8 & 0xF) as i32; + + let random_pos = BlockPos::new( + chunk_base_x + x_offset, + i as i32 * 16 + y_offset, + chunk_base_z + z_offset, + ); + + let block_id = chunk + .section + .get_block_absolute_y(x_offset as usize, random_pos.0.y, z_offset as usize) + .unwrap_or(Block::AIR.default_state.id); + + if has_random_ticks(block_id) { + random_ticks.push(ScheduledTick { + position: random_pos, + delay: 0, + priority: TickPriority::Normal, + value: (), + }); + } + } + } + + block_entities.extend(chunk.block_entities.values().cloned()); + } + + block_ticks.sort_unstable(); + fluid_ticks.sort_unstable(); + } + pub async fn get_fluid_collisions(self: &Arc, bounding_box: BoundingBox) -> Vec { let mut collisions = Vec::new(); diff --git a/pumpkin/src/world/tick_data.rs b/pumpkin/src/world/tick_data.rs new file mode 100644 index 0000000000..10b769e3cc --- /dev/null +++ b/pumpkin/src/world/tick_data.rs @@ -0,0 +1,28 @@ +use std::sync::Arc; + +use pumpkin_data::{Block, fluid::Fluid}; +use pumpkin_world::{ + block::entities::BlockEntity, + chunk::ChunkData, + tick::{OrderedTick, ScheduledTick}, +}; +use tokio::sync::RwLock; + +#[derive(Default)] +pub struct TickData { + pub block_ticks: Vec>, + pub fluid_ticks: Vec>, + pub random_ticks: Vec>, + pub block_entities: Vec>, + pub cloned_chunks: Vec>>, +} + +impl TickData { + pub fn clear(&mut self) { + self.block_entities.clear(); + self.block_ticks.clear(); + self.fluid_ticks.clear(); + self.random_ticks.clear(); + self.cloned_chunks.clear(); + } +}