Skip to content

GPUFuture Joins errors out #2676

@niklasha

Description

@niklasha

I think I have stepped on a mine in Vulkano, when porting Candle (the Rust pytorch-like tensor/ML environment) to Vulkan. I have come pretty far when doing explicit GpuFuture syncing, and actually got a small LLM generate correct responses to prompts. Fun, fun.
(You can follow it here if you're keen, but it is usually slow-moving, even if I found some time the last weeks to progress rather quickly).

However since the performance is abysmal, I went on to do real GpuFuture chaining, and found that I got a check_buffer_access failure inside vulkano. I am using 0.35.1. So I boiled it down to th following test case, which shows my troubles, at least on MacOS (Macbook M1 pro).
I have debugged it a bit, but as I am not very well-versed in Vulkano internals it is pure speculation when I say I believe the ResourceUsage does not build proper ranges which the access check checks. it seems to think both branches of joined futures both works on a full buffer, when in fact it uses two disjoint subbuffers, allocated from the same Vulkan buffer. This is a nono, and trips a check.

I hope it is not me having misunderstood how things are supposed to work, please tell me if so.

Here is my boiled down test-case, below it is the output from my machine:

src/main.rs
mod test {
    use bytemuck::{Pod, Zeroable};
    use std::sync::{Arc, Mutex};
    use vulkano::buffer::allocator::{SubbufferAllocator, SubbufferAllocatorCreateInfo};
    use vulkano::buffer::{Buffer, BufferCreateInfo, BufferUsage};
    use vulkano::command_buffer::allocator::StandardCommandBufferAllocator;
    use vulkano::command_buffer::{
        AutoCommandBufferBuilder, CommandBufferUsage, CopyBufferInfo, PrimaryCommandBufferAbstract,
    };
    use vulkano::descriptor_set::allocator::{
        DescriptorSetAllocator, StandardDescriptorSetAllocator,
    };
    use vulkano::descriptor_set::{DescriptorSet, WriteDescriptorSet};
    use vulkano::device::physical::PhysicalDeviceType;
    use vulkano::device::{
        Device, DeviceCreateInfo, DeviceExtensions, DeviceFeatures, Queue, QueueCreateInfo,
        QueueFlags,
    };
    use vulkano::instance::{Instance, InstanceCreateFlags, InstanceCreateInfo};
    use vulkano::memory::allocator::{
        AllocationCreateInfo, DeviceLayout, MemoryTypeFilter, StandardMemoryAllocator,
    };
    use vulkano::pipeline::compute::ComputePipelineCreateInfo;
    use vulkano::pipeline::layout::PipelineDescriptorSetLayoutCreateInfo;
    use vulkano::pipeline::{
        ComputePipeline, Pipeline, PipelineBindPoint, PipelineLayout, PipelineShaderStageCreateInfo,
    };
    use vulkano::shader::EntryPoint;
    use vulkano::sync::future::GpuFuture;
    use vulkano::{DeviceSize, VulkanLibrary};

    #[derive(Clone, Copy, Debug, Default, Zeroable, Pod, PartialEq)]
    #[repr(C)]
    struct Data {
        value: i32,
    }

    mod accumulate_cs {
        vulkano_shaders::shader! {
            ty: "compute",
            src: r"
            #version 450

            layout(local_size_x = 1) in;

            layout(set = 0, binding = 0) buffer A { int a[]; };
            layout(set = 0, binding = 1) buffer B { int b[]; };

            void main() {
                uint idx = gl_GlobalInvocationID.x;
                a[idx] += b[idx];
            }
        "
        }
    }

    struct VulkanContext {
        _device: Arc<Device>,
        queue: Arc<Queue>,
        memory_allocator: Arc<StandardMemoryAllocator>,
        command_buffer_allocator: Arc<StandardCommandBufferAllocator>,
        descriptor_set_allocator: Arc<dyn DescriptorSetAllocator>,
        arena: Arc<Mutex<SubbufferAllocator>>,
        pipeline: Arc<ComputePipeline>,
    }

    fn init_vulkan_context() -> Result<VulkanContext, Box<dyn std::error::Error>> {
        let library = VulkanLibrary::new()?;
        let layers = vec!["VK_LAYER_KHRONOS_validation".to_string()];
        let extensions = vulkano::instance::InstanceExtensions {
            ..vulkano::instance::InstanceExtensions::empty()
        };
        let instance = Instance::new(
            library,
            InstanceCreateInfo {
                /* flags: InstanceCreateFlags::ENUMERATE_PORTABILITY, */
                enabled_layers: layers.clone(),
                enabled_extensions: extensions,
                flags: InstanceCreateFlags::ENUMERATE_PORTABILITY,
                ..Default::default()
            },
        )?;
        let required_extensions = DeviceExtensions {
            khr_storage_buffer_storage_class: true,
            ..DeviceExtensions::empty()
        };
        let (physical_device, queue_family_index) = instance
            .enumerate_physical_devices()?
            .filter(|p| p.supported_extensions().contains(&required_extensions))
            .filter_map(|p| {
                p.queue_family_properties()
                    .iter()
                    .position(|q| {
                        q.queue_flags
                            .intersects(QueueFlags::COMPUTE | QueueFlags::TRANSFER)
                    })
                    .map(|i| (p, i as u32))
            })
            .min_by_key(|(p, _)| match p.properties().device_type {
                PhysicalDeviceType::DiscreteGpu => 0,
                _ => 1,
            })
            .ok_or("No suitable physical device found")?;
        let features = DeviceFeatures::empty();
        let (device, mut queues) = Device::new(
            physical_device,
            DeviceCreateInfo {
                enabled_extensions: required_extensions,
                enabled_features: features,
                queue_create_infos: vec![QueueCreateInfo {
                    queue_family_index,
                    ..Default::default()
                }],
                ..Default::default()
            },
        )?;
        let queue = queues.next().ok_or("Failed to get queue")?;
        let memory_allocator = Arc::new(StandardMemoryAllocator::new_default(device.clone()));
        let command_buffer_allocator = Arc::new(StandardCommandBufferAllocator::new(
            device.clone(),
            Default::default(),
        ));
        let descriptor_set_allocator = Arc::new(StandardDescriptorSetAllocator::new(
            device.clone(),
            Default::default(),
        ));
        let subbuffer_allocator = SubbufferAllocator::new(
            memory_allocator.clone(),
            SubbufferAllocatorCreateInfo {
                arena_size: 1024 * 1024,
                buffer_usage: BufferUsage::STORAGE_BUFFER
                    | BufferUsage::TRANSFER_DST
                    | BufferUsage::TRANSFER_SRC,
                memory_type_filter: MemoryTypeFilter::PREFER_DEVICE,
                ..Default::default()
            },
        );
        let arena = Arc::new(Mutex::new(subbuffer_allocator));
        let (pipeline, _entry_point) = init_shader_pipeline(&device)?;
        Ok(VulkanContext {
            _device: device,
            queue,
            memory_allocator,
            command_buffer_allocator,
            descriptor_set_allocator, // Store the specific type or trait object
            arena,
            pipeline,
        })
    }

    fn init_shader_pipeline(
        device: &Arc<Device>,
    ) -> Result<(Arc<ComputePipeline>, Arc<EntryPoint>), Box<dyn std::error::Error>> {
        let shader = accumulate_cs::load(device.clone())?;
        let cs_entry = shader.entry_point("main").unwrap();
        let stage = PipelineShaderStageCreateInfo::new(cs_entry.clone());

        let set_layout_create_info =
            PipelineDescriptorSetLayoutCreateInfo::from_stages(&[stage.clone()]);
        let pipeline_layout = PipelineLayout::new(
            device.clone(),
            set_layout_create_info.into_pipeline_layout_create_info(device.clone())?,
        )?;
        let pipeline = ComputePipeline::new(
            device.clone(),
            None,
            ComputePipelineCreateInfo::stage_layout(stage, pipeline_layout.clone()),
        )?;
        Ok((pipeline, cs_entry.into()))
    }

    fn create_and_fill_buffer_ctx(
        initial_value: i32,
        ctx: &VulkanContext,
    ) -> Result<
        (
            vulkano::buffer::Subbuffer<[Data]>,
            Box<dyn GpuFuture + Send>,
        ),
        Box<dyn std::error::Error>,
    > {
        let data = [Data {
            value: initial_value,
        }];
        let size_bytes = std::mem::size_of::<Data>() as DeviceSize;
        let alignment = std::mem::align_of::<Data>() as DeviceSize;
        let layout = DeviceLayout::from_size_alignment(size_bytes, alignment)
            .ok_or("could not create layout for buffer")?;

        // Use arena from context
        let device_buffer = ctx
            .arena
            .lock()
            .unwrap()
            .allocate(layout)?
            .reinterpret::<[Data]>();
        let host_buffer = Buffer::from_iter(
            ctx.memory_allocator.clone(),
            BufferCreateInfo {
                usage: BufferUsage::TRANSFER_SRC,
                ..Default::default()
            },
            AllocationCreateInfo {
                memory_type_filter: MemoryTypeFilter::PREFER_HOST
                    | MemoryTypeFilter::HOST_SEQUENTIAL_WRITE,
                ..Default::default()
            },
            data.iter().copied(),
        )?;
        let mut builder = AutoCommandBufferBuilder::primary(
            ctx.command_buffer_allocator.clone(),
            ctx.queue.queue_family_index(),
            CommandBufferUsage::OneTimeSubmit,
        )?;
        builder.copy_buffer(CopyBufferInfo::buffers(host_buffer, device_buffer.clone()))?;
        let cb_copy = builder.build()?;
        let copy_future = cb_copy.execute(ctx.queue.clone())?.boxed_send();
        Ok((device_buffer, copy_future))
    }

    // --- Test Function ---
    #[test]
    fn minimal_join_test() -> Result<(), Box<dyn std::error::Error>> {
        let ctx = init_vulkan_context()?;
        let (buf_a, future_a) = create_and_fill_buffer_ctx(0, &ctx)?;
        let (buf_b, future_b) = create_and_fill_buffer_ctx(0, &ctx)?;
        let current_future = future_a.join(future_b).boxed_send();
        let set_layout = ctx
            .pipeline
            .layout()
            .set_layouts()
            .get(0)
            .ok_or("Pipeline missing set 0 layout")?;
        let set = DescriptorSet::new(
            ctx.descriptor_set_allocator.clone(),
            set_layout.clone(),
            [
                WriteDescriptorSet::buffer(0, buf_a),
                WriteDescriptorSet::buffer(1, buf_b),
            ],
            [],
        )?;
        let mut builder = AutoCommandBufferBuilder::primary(
            ctx.command_buffer_allocator.clone(),
            ctx.queue.queue_family_index(),
            CommandBufferUsage::OneTimeSubmit,
        )?;
        builder
            .bind_pipeline_compute(ctx.pipeline.clone())?
            .bind_descriptor_sets(
                PipelineBindPoint::Compute,
                ctx.pipeline.layout().clone(),
                0,
                set,
            )?;
        unsafe { builder.dispatch([1, 1, 1]) }?;
        let cb = builder.build()?;
        let future = current_future
            .then_execute(ctx.queue.clone(), cb)?
            .boxed_send();
        assert!(future.then_signal_fence_and_flush()?.wait(None).is_ok());
        Ok(())
    }
}
Cargo.toml
# Cargo.toml
[package]
name = "vulkano-demo-1"
version = "0.1.0"
edition = "2021"

[dependencies]
vulkano = "0.35.1"
vulkano-shaders = "0.35"
bytemuck = "1.22"

cargo test output

running 1 test
test test::minimal_join_test ... FAILED

failures:

---- test::minimal_join_test stdout ----

thread 'test::minimal_join_test' panicked at /Users/niklas/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/vulkano-0.35.1/src/sync/future/join.rs:205:9:
Two futures gave exclusive access to the same resource
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace


failures:
    test::minimal_join_test

test result: FAILED. 0 passed; 1 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.14s

Metadata

Metadata

Assignees

No one assigned

    Labels

    status: wontfix / old syncThis issue relates to the old synchronization using GpuFuture and co.

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions