Skip to content

fix: panic handling in batchmap#161

Merged
yhl25 merged 3 commits intomainfrom
panic/batchmap
Jan 10, 2026
Merged

fix: panic handling in batchmap#161
yhl25 merged 3 commits intomainfrom
panic/batchmap

Conversation

@adarsh0728
Copy link
Member

@adarsh0728 adarsh0728 commented Dec 20, 2025

fixes #159
fixes #132

Testing

Simulating Panic in BatchMap

#[tonic::async_trait]
impl batchmap::BatchMapper for Cat {
    async fn batchmap(&self, mut input: tokio::sync::mpsc::Receiver<Datum>) -> Vec<BatchResponse> {
        let mut responses: Vec<BatchResponse> = Vec::new();
        while let Some(datum) = input.recv().await {
            println!("datum: id: {}, keys: {:?}", datum.id, datum.keys);
            panic!("Simulated panic");
            let mut response = BatchResponse::from_id(datum.id);
            response.append(Message::new(datum.value).with_keys(datum.keys.clone()));
            responses.push(response);
        }
        responses
    }
}

Pipeline

apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
  name: crash-batch-map-pipeline
spec:
  vertices:
    - name: in
      scale:
        min: 1
      # A self data generating source
      source:
        generator:
          rpu: 1
          duration: 1s
    - name: batch-cat
      scale:
        min: 1
      udf:
        container:
          image: quay.io/numaio/numaflow-rs/batchmap-cat:test # A UDF which simply cats the message
          imagePullPolicy: Never
    - name: out
      scale:
        min: 1
      sink:
        log: {}
  edges:
    - from: in
      to: batch-cat
    - from: batch-cat
      to: out

Logs

Numa

2026-01-05T06:01:08.311555Z ERROR numaflow_core::mapper::map: error received while performing batch map operation e=Grpc(Status { code: Internal, message: "UDF_EXECUTION_ERROR(udf): Simulated panic at examples/batchmap-cat/src/main.rs:17:13", details: b"\x08\r\x12TUDF_EXECUTION_ERROR(udf): Simulated panic at examples/batchmap-cat/src/main.rs:17:13\x1a\x8b\x0b\n(type.googleapis.com/google.rpc.DebugInfo\x12\xde\n\x12\xdb\n   0: std::backtrace::Backtrace::create\n   1: numaflow::shared::panic::init_panic_hook::{{closure}}::{{closure}}\n   2: std::panicking::rust_panic_with_hook\n   3: std::panicking::begin_panic_handler::{{closure}}\n   4: std::sys::backtrace::__rust_end_short_backtrace\n   5: rust_begin_unwind\n   6: core::panicking::panic_fmt\n   7: <batchmap_cat::Cat as numaflow::batchmap::BatchMapper>::batchmap::{{closure}}\n   8: numaflow::batchmap::BatchMapService<T>::process_map_batch::{{closure}}::{{closure}}\n   9: tokio::runtime::task::core::Core<T,S>::poll\n  10: tokio::runtime::task::harness::Harness<T,S>::poll\n  11: tokio::runtime::scheduler::multi_thread::worker::Context::run_task\n  12: tokio::runtime::scheduler::multi_thread::worker::Context::run\n  13: tokio::runtime::context::scoped::Scoped<T>::set\n  14: tokio::runtime::context::runtime::enter_runtime\n  15: tokio::runtime::scheduler::multi_thread::worker::run\n  16: <tokio::runtime::blocking::task::BlockingTask<T> as core::future::future::Future>::poll\n  17: tokio::runtime::task::core::Core<T,S>::poll\n  18: tokio::runtime::task::harness::Harness<T,S>::poll\n  19: tokio::runtime::blocking::pool::Inner::run\n  20: std::sys::backtrace::__rust_begin_short_backtrace\n  21: core::ops::function::FnOnce::call_once{{vtable.shim}}\n  22: std::sys::pal::unix::thread::Thread::new::thread_start\n  23: start_thread\n  24: <unknown>\n", source: None })
2026-01-05T06:06:22.496518Z ERROR numaflow_core: Pipeline failed because of UDF failure error=Status { code: Internal, message: "UDF_EXECUTION_ERROR(udf): Simulated panic at examples/batchmap-cat/src/main.rs:17:13", details: b"\x08\r\x12TUDF_EXECUTION_ERROR(udf): Simulated panic at examples/batchmap-cat/src/main.rs:17:13\x1a\x8b\x0b\n(type.googleapis.com/google.rpc.DebugInfo\x12\xde\n\x12\xdb\n   0: std::backtrace::Backtrace::create\n   1: numaflow::shared::panic::init_panic_hook::{{closure}}::{{closure}}\n   2: std::panicking::rust_panic_with_hook\n   3: std::panicking::begin_panic_handler::{{closure}}\n   4: std::sys::backtrace::__rust_end_short_backtrace\n   5: rust_begin_unwind\n   6: core::panicking::panic_fmt\n   7: <batchmap_cat::Cat as numaflow::batchmap::BatchMapper>::batchmap::{{closure}}\n   8: numaflow::batchmap::BatchMapService<T>::process_map_batch::{{closure}}::{{closure}}\n   9: tokio::runtime::task::core::Core<T,S>::poll\n  10: tokio::runtime::task::harness::Harness<T,S>::poll\n  11: tokio::runtime::scheduler::multi_thread::worker::Context::run_task\n  12: tokio::runtime::scheduler::multi_thread::worker::Context::run\n  13: tokio::runtime::context::scoped::Scoped<T>::set\n  14: tokio::runtime::context::runtime::enter_runtime\n  15: tokio::runtime::scheduler::multi_thread::worker::run\n  16: <tokio::runtime::blocking::task::BlockingTask<T> as core::future::future::Future>::poll\n  17: tokio::runtime::task::core::Core<T,S>::poll\n  18: tokio::runtime::task::harness::Harness<T,S>::poll\n  19: tokio::runtime::blocking::pool::Inner::run\n  20: std::sys::backtrace::__rust_begin_short_backtrace\n  21: core::ops::function::FnOnce::call_once{{vtable.shim}}\n  22: std::sys::pal::unix::thread::Thread::new::thread_start\n  23: start_thread\n  24: <unknown>\n", source: None }

UDF

datum: id: 165-0, keys: []

UI

Numa

image

UDF

image

Errors Tab

image

Success Scenario

2026-01-05T06:44:52.780264Z  INFO numaflow_core::pipeline::forwarder::map_forwarder: Creating buffer reader for stream Stream { name: "default-batch-map-pipeline-batch-cat-0", vertex: "batch-cat", partition: 0 }
2026-01-05T06:44:52.780302Z  INFO numaflow_shared::server_info: Server info file container_type=Mapper server_info=ServerInfo { protocol: "uds", language: "rust", minimum_numaflow_version: "1.4.0-z", version: "0.4.0", metadata: Some({"MAP_MODE": "batch-map"}) }
2026-01-05T06:44:52.780445Z  INFO numaflow_core::shared::grpc: Waiting for mapper client to be ready...
2026-01-05T06:44:52.781483Z  INFO numaflow_core::pipeline::forwarder::map_forwarder: Starting pending reader
2026-01-05T06:44:53.777342Z  INFO numaflow_core::tracker: Processed messages per second processed=0
2026-01-05T06:44:54.777269Z  INFO numaflow_core::tracker: Processed messages per second processed=0
2026-01-05T06:44:55.777131Z  INFO numaflow_core::tracker: Processed messages per second processed=6
2026-01-05T06:44:56.775412Z  INFO numaflow_core::tracker: Processed messages per second processed=2

Signed-off-by: adarsh0728 <gooneriitk@gmail.com>
Signed-off-by: adarsh0728 <gooneriitk@gmail.com>
@adarsh0728 adarsh0728 requested a review from BulkBeing January 5, 2026 06:13
@adarsh0728 adarsh0728 marked this pull request as ready for review January 5, 2026 06:13
Signed-off-by: Vigith Maurice <vigith@gmail.com>
@yhl25 yhl25 merged commit 2f60ef2 into main Jan 10, 2026
2 checks passed
@yhl25 yhl25 deleted the panic/batchmap branch January 10, 2026 22:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Incorrect panic handling in batchmap Improve Error Handling In Batch Map

3 participants