From c64d736ac0f708ca8186b022f55aa6bc30d18df0 Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Wed, 11 Mar 2026 17:27:46 +0100 Subject: [PATCH 01/11] Add pipeline building capabilities to C API 2.0. Signed-off-by: Michal Zientkiewicz --- dali/c_api_2/op_test/pipeline_builder_test.cc | 252 ++++++++++++++++++ dali/c_api_2/pipeline.cc | 175 +++++++++++- include/dali/dali.h | 108 +++++++- 3 files changed, 533 insertions(+), 2 deletions(-) create mode 100644 dali/c_api_2/op_test/pipeline_builder_test.cc diff --git a/dali/c_api_2/op_test/pipeline_builder_test.cc b/dali/c_api_2/op_test/pipeline_builder_test.cc new file mode 100644 index 00000000000..35fcf33974d --- /dev/null +++ b/dali/c_api_2/op_test/pipeline_builder_test.cc @@ -0,0 +1,252 @@ +// Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include "dali/dali.h" +#include "dali/dali_cpp_wrappers.h" +#include "dali/c_api_2/pipeline.h" +#include "dali/c_api_2/pipeline_test_utils.h" +#include "dali/c_api_2/data_objects.h" +#include "dali/pipeline/executor/executor2/exec2_ops_for_test.h" +#include "dali/pipeline/pipeline.h" +#include "dali/pipeline/pipeline_params.h" +#include "dali/pipeline/data/tensor_list.h" + +namespace dali::c_api::test { + +namespace { + +constexpr int kBatchSize = 4; +constexpr int kNumThreads = 4; + +// ----- Helpers for building reference (C++) pipelines ----- + +// Counter + two CPU TestOps: ctr -> op1(+1000), ctr -> op2(+2000); outputs: op1, op2 +std::unique_ptr BuildRefCPUPipeline(int device_id) { + auto p = std::make_unique(MakePipelineParams(kBatchSize, kNumThreads, device_id)); + p->AddOperator( + OpSpec(exec2::test::kCounterOpName) + .AddArg("ctr", std::string("ctr")) + .AddOutput("ctr", StorageDevice::CPU), + "ctr"); + p->AddOperator( + OpSpec(exec2::test::kTestOpName) + .AddArg("name", std::string("op1")) + .AddArg("device", std::string("cpu")) + .AddInput("ctr", StorageDevice::CPU) + .AddOutput("op1", StorageDevice::CPU) + .AddArg("addend", 1000), + "op1"); + p->AddOperator( + OpSpec(exec2::test::kTestOpName) + .AddArg("name", std::string("op2")) + .AddArg("device", std::string("cpu")) + .AddInput("ctr", StorageDevice::CPU) + .AddOutput("op2", StorageDevice::CPU) + .AddArg("addend", 2000), + "op2"); + p->SetOutputDescs({ {"op1", "cpu"}, {"op2", "cpu"} }); + p->Build(); + return p; +} + +// ExternalSource (cpu) -> output "ext" directly; used for feed-input comparison +std::unique_ptr BuildRefExtSrcPipeline(int device_id) { + auto p = std::make_unique(MakePipelineParams(kBatchSize, kNumThreads, device_id)); + p->AddExternalInput("ext", "cpu"); + p->SetOutputDescs({ {"ext", "cpu"} }); + p->Build(); + return p; +} + +// ----- Helpers for building C API pipelines ----- + +daliPipelineParams_t MakeCApiParams(int device_id) { + daliPipelineParams_t params{}; + params.max_batch_size_present = true; + params.max_batch_size = kBatchSize; + params.num_threads_present = true; + params.num_threads = kNumThreads; + params.device_id_present = true; + params.device_id = device_id; + return params; +} + +// Counter + two CPU TestOps built via C API, matching BuildRefCPUPipeline +PipelineHandle BuildCApiCPUPipeline(int device_id) { + auto params = MakeCApiParams(device_id); + daliPipeline_h h = nullptr; + CHECK_DALI(daliPipelineCreate(&h, ¶ms)); + PipelineHandle pipe(h); + + // CounterOp "ctr": schema arg "ctr"="ctr", output "ctr" (CPU) + daliArgDesc_t ctr_args[1]; + ctr_args[0].arg_name = "ctr"; + ctr_args[0].dtype = DALI_STRING; + ctr_args[0].str = "ctr"; + + daliIODesc_t ctr_out[1]; + ctr_out[0].name = "ctr"; + ctr_out[0].device_type = DALI_STORAGE_CPU; + + daliOperatorDesc_t ctr_op{}; + ctr_op.schema_name = exec2::test::kCounterOpName; + ctr_op.instance_name = "ctr"; + ctr_op.backend = DALI_BACKEND_CPU; + ctr_op.num_outputs = 1; + ctr_op.num_args = 1; + ctr_op.outputs = ctr_out; + ctr_op.args = ctr_args; + CHECK_DALI(daliPipelineAddOperator(h, &ctr_op)); + + // TestOp "op1": input "ctr" (CPU), output "op1" (CPU), addend=1000 + daliArgDesc_t op1_args[2]; + op1_args[0].arg_name = "name"; + op1_args[0].dtype = DALI_STRING; + op1_args[0].str = "op1"; + op1_args[1].arg_name = "addend"; + op1_args[1].dtype = DALI_INT32; + op1_args[1].ivalue = 1000; + + daliIODesc_t op1_in[1]; + op1_in[0].name = "ctr"; + op1_in[0].device_type = DALI_STORAGE_CPU; + + daliIODesc_t op1_out[1]; + op1_out[0].name = "op1"; + op1_out[0].device_type = DALI_STORAGE_CPU; + + daliOperatorDesc_t op1_op{}; + op1_op.schema_name = exec2::test::kTestOpName; + op1_op.instance_name = "op1"; + op1_op.backend = DALI_BACKEND_CPU; + op1_op.num_inputs = 1; + op1_op.num_outputs = 1; + op1_op.num_args = 2; + op1_op.inputs = op1_in; + op1_op.outputs = op1_out; + op1_op.args = op1_args; + CHECK_DALI(daliPipelineAddOperator(h, &op1_op)); + + // TestOp "op2": input "ctr" (CPU), output "op2" (CPU), addend=2000 + daliArgDesc_t op2_args[2]; + op2_args[0].arg_name = "name"; + op2_args[0].dtype = DALI_STRING; + op2_args[0].str = "op2"; + op2_args[1].arg_name = "addend"; + op2_args[1].dtype = DALI_INT32; + op2_args[1].ivalue = 2000; + + daliIODesc_t op2_in[1]; + op2_in[0].name = "ctr"; + op2_in[0].device_type = DALI_STORAGE_CPU; + + daliIODesc_t op2_out[1]; + op2_out[0].name = "op2"; + op2_out[0].device_type = DALI_STORAGE_CPU; + + daliOperatorDesc_t op2_op{}; + op2_op.schema_name = exec2::test::kTestOpName; + op2_op.instance_name = "op2"; + op2_op.backend = DALI_BACKEND_CPU; + op2_op.num_inputs = 1; + op2_op.num_outputs = 1; + op2_op.num_args = 2; + op2_op.inputs = op2_in; + op2_op.outputs = op2_out; + op2_op.args = op2_args; + CHECK_DALI(daliPipelineAddOperator(h, &op2_op)); + + // Set outputs: op1 (cpu), op2 (cpu) + daliPipelineIODesc_t out_descs[2]; + out_descs[0] = {}; + out_descs[0].name = "op1"; + out_descs[0].device = DALI_STORAGE_CPU; + out_descs[1] = {}; + out_descs[1].name = "op2"; + out_descs[1].device = DALI_STORAGE_CPU; + CHECK_DALI(daliPipelineSetOutputs(h, 2, out_descs)); + + CHECK_DALI(daliPipelineBuild(h)); + return pipe; +} + +// ExternalSource "ext" (CPU) built via C API, matching BuildRefExtSrcPipeline +PipelineHandle BuildCApiExtSrcPipeline(int device_id) { + auto params = MakeCApiParams(device_id); + daliPipeline_h h = nullptr; + CHECK_DALI(daliPipelineCreate(&h, ¶ms)); + PipelineHandle pipe(h); + + daliPipelineIODesc_t ext_desc{}; + ext_desc.name = "ext"; + ext_desc.device = DALI_STORAGE_CPU; + CHECK_DALI(daliPipelineAddExternalInput(h, &ext_desc)); + + daliPipelineIODesc_t out_desc{}; + out_desc.name = "ext"; + out_desc.device = DALI_STORAGE_CPU; + CHECK_DALI(daliPipelineSetOutputs(h, 1, &out_desc)); + + CHECK_DALI(daliPipelineBuild(h)); + return pipe; +} + +} // namespace + +// --- Test 1: Operator pipeline (counter + two CPU TestOps) --- +// Build both C++ and C API pipelines, run 5 iterations, compare outputs. + +TEST(CAPI2_PipelineBuilderTest, AddOperator_CPUOnly) { + auto ref = BuildRefCPUPipeline(CPU_ONLY_DEVICE_ID); + auto test = BuildCApiCPUPipeline(CPU_ONLY_DEVICE_ID); + ComparePipelineOutputs(*ref, test, /*iters=*/5, /*prefetch_on_first_iter=*/true); +} + +// --- Test 2: ExternalSource pipeline --- +// Build both C++ (AddExternalInput) and C API (daliPipelineAddExternalInput) pipelines. +// Feed identical data to both, run, compare outputs. + +TEST(CAPI2_PipelineBuilderTest, AddExternalInput) { + auto ref = BuildRefExtSrcPipeline(CPU_ONLY_DEVICE_ID); + auto test = BuildCApiExtSrcPipeline(CPU_ONLY_DEVICE_ID); + + // Determine how many times we need to feed before prefetching + int feed_count = ref->InputFeedCount("ext"); + + // Create and feed identical scalar TensorLists + for (int i = 0; i < feed_count; i++) { + auto cpp_tl = std::make_shared>(); + cpp_tl->Resize(uniform_list_shape(kBatchSize, TensorShape<>{}), DALI_INT32); + for (int s = 0; s < kBatchSize; s++) + (*cpp_tl)[s].mutable_data()[0] = i * kBatchSize + s; + + // Feed to reference C++ pipeline + ref->SetExternalInput("ext", *cpp_tl); + + // Feed to test C API pipeline + auto tl_handle = Wrap(cpp_tl); + CHECK_DALI(daliPipelineFeedInput(test, "ext", tl_handle.get(), nullptr, 0, nullptr)); + } + + // Prefetch both, then compare outputs for each prefetched batch + ref->Prefetch(); + CHECK_DALI(daliPipelinePrefetch(test)); + + for (int i = 0; i < feed_count; i++) + ComparePipelineOutput(*ref, test); +} + +} // namespace dali::c_api::test diff --git a/dali/c_api_2/pipeline.cc b/dali/c_api_2/pipeline.cc index 3004394a5fa..ef57521faa3 100644 --- a/dali/c_api_2/pipeline.cc +++ b/dali/c_api_2/pipeline.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -17,6 +17,8 @@ #include "dali/c_api_2/checkpoint.h" #include "dali/c_api_2/error_handling.h" #include "dali/pipeline/pipeline.h" +#include "dali/pipeline/operator/op_spec.h" +#include "dali/pipeline/pipeline_output_desc.h" #include "dali/c_api_2/utils.h" #include "dali/c_api_2/validation.h" @@ -513,3 +515,174 @@ daliResult_t daliCheckpointDestroy(daliCheckpoint_h checkpoint) { delete ToPointer(checkpoint); DALI_EPILOG(); } + +namespace { + +std::string_view BackendToString(daliBackend_t backend) { + switch (backend) { + case DALI_BACKEND_CPU: return "cpu"; + case DALI_BACKEND_GPU: return "gpu"; + case DALI_BACKEND_MIXED: return "mixed"; + default: + throw std::invalid_argument(dali::make_string( + "Invalid backend value: ", static_cast(backend))); + } +} + +void AddArgToSpec(dali::OpSpec &spec, const daliArgDesc_t &arg) { + dali::c_api::CheckNotNull(arg.arg_name, "arg.arg_name"); + std::string_view name = arg.arg_name; + switch (arg.dtype) { + // --- scalar types --- + case DALI_INT8: + spec.AddArg(name, static_cast(arg.ivalue)); + break; + case DALI_INT16: + spec.AddArg(name, static_cast(arg.ivalue)); + break; + case DALI_INT32: + spec.AddArg(name, static_cast(arg.ivalue)); + break; + case DALI_INT64: + spec.AddArg(name, arg.ivalue); + break; + case DALI_UINT8: + spec.AddArg(name, static_cast(arg.uvalue)); + break; + case DALI_UINT16: + spec.AddArg(name, static_cast(arg.uvalue)); + break; + case DALI_UINT32: + spec.AddArg(name, static_cast(arg.uvalue)); + break; + case DALI_UINT64: + spec.AddArg(name, arg.uvalue); + break; + case DALI_FLOAT: + spec.AddArg(name, arg.fvalue); + break; + // NOT SUPPORTED / NOT IMPLEMENTED! + /*case DALI_FLOAT64: + spec.AddArg(name, arg.dvalue); + break;*/ + case DALI_BOOL: + spec.AddArg(name, static_cast(arg.ivalue)); + break; + case DALI_STRING: + dali::c_api::CheckNotNull(arg.str, "arg.str"); + spec.AddArg(name, std::string(arg.str)); + break; + // --- vector (list) types --- + case DALI_INT_VEC: { + if (arg.size > 0) + dali::c_api::CheckNotNull(arg.arr, "arg.arr"); + auto *d = static_cast(arg.arr); + spec.AddArg(name, std::vector(d, d + arg.size)); + break; + } + case DALI_FLOAT_VEC: { + if (arg.size > 0) + dali::c_api::CheckNotNull(arg.arr, "arg.arr"); + auto *d = static_cast(arg.arr); + spec.AddArg(name, std::vector(d, d + arg.size)); + break; + } + case DALI_BOOL_VEC: { + if (arg.size > 0) + dali::c_api::CheckNotNull(arg.arr, "arg.arr"); + auto *d = static_cast(arg.arr); + spec.AddArg(name, std::vector(d, d + arg.size)); + break; + } + case DALI_STRING_VEC: { + if (arg.size > 0) + dali::c_api::CheckNotNull(arg.arr, "arg.arr"); + auto *d = static_cast(arg.arr); + std::vector sv; + sv.reserve(arg.size); + for (int64_t i = 0; i < arg.size; i++) { + dali::c_api::CheckNotNull(d[i], "arg.arr[i]"); + sv.emplace_back(d[i]); + } + spec.AddArg(name, std::move(sv)); + break; + } + default: + throw std::invalid_argument(dali::make_string( + "Unsupported argument dtype: ", static_cast(arg.dtype))); + } +} + +} // namespace + +daliResult_t daliPipelineAddExternalInput( + daliPipeline_h pipeline, + const daliPipelineIODesc_t *input_desc) { + DALI_PROLOG(); + auto pipe = ToPointer(pipeline); + NOT_NULL(input_desc); + NOT_NULL(input_desc->name); + std::string device_str = input_desc->device == DALI_STORAGE_GPU ? "gpu" : "cpu"; + daliDataType_t dtype = input_desc->dtype_present ? input_desc->dtype : DALI_NO_TYPE; + int ndim = input_desc->ndim_present ? input_desc->ndim : -1; + const char *layout = input_desc->layout ? input_desc->layout : ""; + pipe->Unwrap()->AddExternalInput(input_desc->name, device_str, dtype, ndim, layout); + DALI_EPILOG(); +} + +daliResult_t daliPipelineAddOperator( + daliPipeline_h pipeline, + const daliOperatorDesc_t *op_desc) { + DALI_PROLOG(); + auto pipe = ToPointer(pipeline); + NOT_NULL(op_desc); + NOT_NULL(op_desc->schema_name); + dali::OpSpec spec(op_desc->schema_name); + spec.AddArg("device", std::string(BackendToString(op_desc->backend))); + for (int i = 0; i < op_desc->num_inputs; i++) { + NOT_NULL(op_desc->inputs[i].name); + spec.AddInput(op_desc->inputs[i].name, + static_cast(op_desc->inputs[i].device_type)); + } + for (int i = 0; i < op_desc->num_outputs; i++) { + NOT_NULL(op_desc->outputs[i].name); + spec.AddOutput(op_desc->outputs[i].name, + static_cast(op_desc->outputs[i].device_type)); + } + for (int i = 0; i < op_desc->num_arg_inputs; i++) { + NOT_NULL(op_desc->arg_inputs[i].arg_name); + NOT_NULL(op_desc->arg_inputs[i].input_name); + spec.AddArgumentInput(op_desc->arg_inputs[i].arg_name, + op_desc->arg_inputs[i].input_name); + } + for (int i = 0; i < op_desc->num_args; i++) + AddArgToSpec(spec, op_desc->args[i]); + if (op_desc->instance_name && op_desc->instance_name[0] != '\0') + pipe->Unwrap()->AddOperator(spec, op_desc->instance_name); + else + pipe->Unwrap()->AddOperator(spec); + DALI_EPILOG(); +} + +daliResult_t daliPipelineSetOutputs( + daliPipeline_h pipeline, + int num_outputs, + const daliPipelineIODesc_t *outputs) { + DALI_PROLOG(); + auto pipe = ToPointer(pipeline); + NOT_NULL(outputs); + std::vector descs; + descs.reserve(num_outputs); + for (int i = 0; i < num_outputs; i++) { + NOT_NULL(outputs[i].name); + dali::PipelineOutputDesc desc; + desc.name = outputs[i].name; + desc.device = static_cast(outputs[i].device); + if (outputs[i].dtype_present) desc.dtype = outputs[i].dtype; + if (outputs[i].ndim_present) desc.ndim = outputs[i].ndim; + if (outputs[i].layout) desc.layout = outputs[i].layout; + descs.push_back(std::move(desc)); + } + pipe->Unwrap()->SetOutputDescs(std::move(descs)); + DALI_EPILOG(); +} diff --git a/include/dali/dali.h b/include/dali/dali.h index dfc701d524a..11b5a8764eb 100644 --- a/include/dali/dali.h +++ b/include/dali/dali.h @@ -1,4 +1,4 @@ -// Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2024-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -352,6 +352,77 @@ typedef struct _DALIPipelineIODesc { const char *layout; } daliPipelineIODesc_t; +/** Specifies the processing backend of an operator */ +typedef enum _DALIBackend { + DALI_BACKEND_CPU = 0, + DALI_BACKEND_GPU = 1, + DALI_BACKEND_MIXED = 2, + DALI_BACKEND_FORCE_INT32 = 0x7fffffff +} daliBackend_t; + +/** Describes a single input or output of an operator */ +typedef struct _DALIIODesc { + const char *name; + daliStorageDevice_t device_type; +} daliIODesc_t; + +/** Describes a tensor argument input (argument fed as a TensorList at runtime) */ +typedef struct _DALIArgInputDesc { + const char *arg_name; + const char *input_name; +} daliArgInputDesc_t; + +/** Describes an operator argument value. + * + * The `dtype` field determines which union member to use: + * + * Scalar types: + * - DALI_INT8..DALI_INT64 -> ivalue + * - DALI_UINT8..DALI_UINT64 -> uvalue + * - DALI_FLOAT -> fvalue + * - DALI_FLOAT64 -> dvalue + * - DALI_BOOL -> ivalue (0 = false, non-zero = true) + * - DALI_STRING -> str (NULL-terminated) + * + * Vector (list) types — use the {size, arr} struct: + * - DALI_INT_VEC -> arr points to int[], size = element count + * - DALI_FLOAT_VEC -> arr points to float[], size = element count + * - DALI_BOOL_VEC -> arr points to bool[], size = element count + * - DALI_STRING_VEC -> arr points to const char*[], size = element count + * + * For vector types, `arr` must be non-NULL and `size` must be >= 0. + */ +typedef struct _DALIArgDesc { + const char *arg_name; + daliDataType_t dtype; + union { + int64_t ivalue; /**< signed integer scalar types and bool */ + uint64_t uvalue; /**< unsigned integer scalar types */ + float fvalue; /**< DALI_FLOAT */ + double dvalue; /**< DALI_FLOAT64 */ + const char *str; /**< DALI_STRING — NULL-terminated C string */ + struct { + int64_t size; /**< number of elements */ + const void *arr; /**< pointer to the element data for vector types */ + }; + }; +} daliArgDesc_t; + +/** Describes an operator to be added to the pipeline */ +typedef struct _DALIOperatorDesc { + const char *schema_name; + const char *instance_name; /**< may be NULL or empty */ + daliBackend_t backend; + int num_inputs; + int num_outputs; + int num_args; + int num_arg_inputs; + const daliIODesc_t *inputs; + const daliIODesc_t *outputs; + const daliArgDesc_t *args; + const daliArgInputDesc_t *arg_inputs; +} daliOperatorDesc_t; + /** Creates an empty pipeline. */ DALI_API daliResult_t daliPipelineCreate( daliPipeline_h *out_pipe_handle, @@ -380,6 +451,41 @@ DALI_API daliResult_t daliPipelineDeserialize( const daliPipelineParams_t *param_overrides); +/** Adds an external input to the pipeline. + * + * Equivalent to Pipeline::AddExternalInput. + * The input is described by a daliPipelineIODesc_t: + * - name: the name of the input (also the name of the output produced by ExternalSource) + * - device: DALI_STORAGE_CPU or DALI_STORAGE_GPU + * - dtype, ndim, layout: optional metadata (use the _present flags) + */ +DALI_API daliResult_t daliPipelineAddExternalInput( + daliPipeline_h pipeline, + const daliPipelineIODesc_t *input_desc); + +/** Adds an operator to the pipeline. + * + * Constructs an OpSpec from op_desc and calls Pipeline::AddOperator. + * The `instance_name` field of op_desc may be NULL or empty to use an auto-generated name. + */ +DALI_API daliResult_t daliPipelineAddOperator( + daliPipeline_h pipeline, + const daliOperatorDesc_t *op_desc); + +/** Sets the output descriptors of the pipeline. + * + * Must be called before daliPipelineBuild. Equivalent to Pipeline::SetOutputDescs. + * + * @param pipeline the pipeline + * @param num_outputs number of elements in `outputs` + * @param outputs array of output descriptors; the `name` and `device` fields are required; + * `dtype`, `ndim`, and `layout` are optional (use the _present flags) + */ +DALI_API daliResult_t daliPipelineSetOutputs( + daliPipeline_h pipeline, + int num_outputs, + const daliPipelineIODesc_t *outputs); + /** Prepares the pipeline for execution */ DALI_API daliResult_t daliPipelineBuild(daliPipeline_h pipeline); From 860ee56218fda73613dbf77bcc1779ce0e74807b Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Thu, 12 Mar 2026 10:34:57 +0100 Subject: [PATCH 02/11] Add builder tests with a complex pipeline and real operators. Signed-off-by: Michal Zientkiewicz --- dali/c_api_2/op_test/complex_pipeline_test.cc | 221 +++++++++++++++--- .../{op_test => }/pipeline_builder_test.cc | 32 +-- dali/c_api_2/pipeline_test_utils.h | 10 +- 3 files changed, 210 insertions(+), 53 deletions(-) rename dali/c_api_2/{op_test => }/pipeline_builder_test.cc (89%) diff --git a/dali/c_api_2/op_test/complex_pipeline_test.cc b/dali/c_api_2/op_test/complex_pipeline_test.cc index 227e711419e..81022868cac 100644 --- a/dali/c_api_2/op_test/complex_pipeline_test.cc +++ b/dali/c_api_2/op_test/complex_pipeline_test.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -13,6 +13,7 @@ // limitations under the License. #include +#include "dali/dali.h" #include "dali/c_api_2/pipeline_test_utils.h" #include "dali/core/common.h" #include "dali/test/dali_test_config.h" @@ -53,6 +54,110 @@ ReaderDecoderPipe( return pipe; } +PipelineHandle +ReaderDecoderCApiPipe( + std::string_view decoder_device, + StorageDevice output_device, + daliPipelineParams_t params = {}) { + std::string file_root = testing::dali_extra_path() + "/db/single/jpeg/"; + std::string file_list = file_root + "image_list.txt"; + + if (!params.max_batch_size_present) { + params.max_batch_size_present = true; + params.max_batch_size = 4; + } + if (!params.num_threads_present) { + params.num_threads_present = true; + params.num_threads = 1; + } + if (!params.seed_present) { + params.seed_present = true; + params.seed = 12345; + } + if (!params.exec_type_present) { + params.exec_type_present = true; + params.exec_type = DALI_EXEC_DYNAMIC; + } + + daliPipeline_h h = nullptr; + CHECK_DALI(daliPipelineCreate(&h, ¶ms)); + PipelineHandle pipe(h); + + // FileReader: no inputs, outputs "compressed_images" and "labels" on CPU + daliArgDesc_t reader_args[3]; + reader_args[0].arg_name = "device"; + reader_args[0].dtype = DALI_STRING; + reader_args[0].str = "cpu"; + reader_args[1].arg_name = "file_root"; + reader_args[1].dtype = DALI_STRING; + reader_args[1].str = file_root.c_str(); + reader_args[2].arg_name = "file_list"; + reader_args[2].dtype = DALI_STRING; + reader_args[2].str = file_list.c_str(); + + daliIODesc_t reader_out[2]; + reader_out[0].name = "compressed_images"; + reader_out[0].device_type = DALI_STORAGE_CPU; + reader_out[1].name = "labels"; + reader_out[1].device_type = DALI_STORAGE_CPU; + + daliOperatorDesc_t reader_op{}; + reader_op.schema_name = "FileReader"; + reader_op.backend = DALI_BACKEND_CPU; + reader_op.num_inputs = 0; + reader_op.num_outputs = 2; + reader_op.num_args = 3; + reader_op.outputs = reader_out; + reader_op.args = reader_args; + CHECK_DALI(daliPipelineAddOperator(h, &reader_op)); + + // ImageDecoder: input "compressed_images" (CPU), output "decoded" (CPU or GPU) + bool is_mixed = (decoder_device == "mixed"); + daliBackend_t decoder_backend = is_mixed ? DALI_BACKEND_MIXED : DALI_BACKEND_CPU; + daliStorageDevice_t decoded_dev = is_mixed ? DALI_STORAGE_GPU : DALI_STORAGE_CPU; + std::string decoder_device_str(decoder_device); + + daliArgDesc_t decoder_args[2]; + decoder_args[0].arg_name = "device"; + decoder_args[0].dtype = DALI_STRING; + decoder_args[0].str = decoder_device_str.c_str(); + decoder_args[1].arg_name = "output_type"; + decoder_args[1].dtype = DALI_INT32; + decoder_args[1].ivalue = DALI_RGB; + + daliIODesc_t decoder_in[1]; + decoder_in[0].name = "compressed_images"; + decoder_in[0].device_type = DALI_STORAGE_CPU; + + daliIODesc_t decoder_out[1]; + decoder_out[0].name = "decoded"; + decoder_out[0].device_type = decoded_dev; + + daliOperatorDesc_t decoder_op{}; + decoder_op.schema_name = "ImageDecoder"; + decoder_op.backend = decoder_backend; + decoder_op.num_inputs = 1; + decoder_op.num_outputs = 1; + decoder_op.num_args = 2; + decoder_op.inputs = decoder_in; + decoder_op.outputs = decoder_out; + decoder_op.args = decoder_args; + CHECK_DALI(daliPipelineAddOperator(h, &decoder_op)); + + daliStorageDevice_t out_dev = + output_device == StorageDevice::GPU ? DALI_STORAGE_GPU : DALI_STORAGE_CPU; + daliPipelineIODesc_t out_descs[2]; + out_descs[0] = {}; + out_descs[0].name = "decoded"; + out_descs[0].device = out_dev; + out_descs[1] = {}; + out_descs[1].name = "labels"; + out_descs[1].device = out_dev; + CHECK_DALI(daliPipelineSetOutputs(h, 2, out_descs)); + + return pipe; +} + void TestReaderDecoder(std::string_view decoder_device, StorageDevice output_device) { auto ref_pipe = ReaderDecoderPipe(decoder_device, output_device); auto proto = ref_pipe->SerializeToProtobuf(); @@ -66,48 +171,60 @@ void TestReaderDecoder(std::string_view decoder_device, StorageDevice output_dev ComparePipelineOutputs(*ref_pipe, pipe); } -TEST(CAPI2_PipelineTest, ReaderDecoderCPU) { +TEST(CAPI2_SerializedPipelineTest, ReaderDecoderCPU) { TestReaderDecoder("cpu", StorageDevice::CPU); } -TEST(CAPI2_PipelineTest, ReaderDecoderCPU2GPU) { +TEST(CAPI2_SerializedPipelineTest, ReaderDecoderCPU2GPU) { TestReaderDecoder("cpu", StorageDevice::GPU); } -TEST(CAPI2_PipelineTest, ReaderDecoderMixed) { +TEST(CAPI2_SerializedPipelineTest, ReaderDecoderMixed) { if (!MixedOperatorRegistry::Registry().IsRegistered("ImageDecoder")) { GTEST_SKIP() << "ImageDecoder for mixed backend is not available"; } TestReaderDecoder("mixed", StorageDevice::GPU); } -TEST(CAPI2_PipelineTest, ReaderDecoderMixed2CPU) { +TEST(CAPI2_SerializedPipelineTest, ReaderDecoderMixed2CPU) { if (!MixedOperatorRegistry::Registry().IsRegistered("ImageDecoder")) { GTEST_SKIP() << "ImageDecoder for mixed backend is not available"; } TestReaderDecoder("mixed", StorageDevice::CPU); } -TEST(CAPI2_PipelineTest, Checkpointing) { - // This test creates three pipelines - a C++ pipeline (ref) and two C pipelines (pipe1, pipe2), - // created by deserializing the serialized representation of the C++ pipeline. - // - // (pipe1) advances 3 iterations and then a checkpoint is taken and restored in (ref), - // after which 5 iterations of outputs are compared. - // Then a checkpoint is taken in (ref) and restored in (pipe2), after which another 5 iterations - // are compared. - PipelineParams params{}; - params.enable_checkpointing = true; - params.seed = 1234; - auto ref = ReaderDecoderPipe("cpu", StorageDevice::GPU, params); - ref->Build(); - auto pipe_str = ref->SerializeToProtobuf(); // serialize the ref... - auto pipe1 = Deserialize(pipe_str, {}); // ...and create pipe1 - auto pipe2 = Deserialize(pipe_str, {}); // ...and pipe2 from serialized ref - CHECK_DALI(daliPipelineBuild(pipe1)); - CHECK_DALI(daliPipelineBuild(pipe2)); +void TestReaderDecoderBuilder(std::string_view decoder_device, StorageDevice output_device) { + auto ref_pipe = ReaderDecoderPipe(decoder_device, output_device); + ref_pipe->Build(); + auto test_pipe = ReaderDecoderCApiPipe(decoder_device, output_device); + CHECK_DALI(daliPipelineBuild(test_pipe)); + ComparePipelineOutputs(*ref_pipe, test_pipe); +} + +TEST(CAPI2_PipelineBuilderTest, ReaderDecoderCPU) { + TestReaderDecoderBuilder("cpu", StorageDevice::CPU); +} + +TEST(CAPI2_PipelineBuilderTest, ReaderDecoderCPU2GPU) { + TestReaderDecoderBuilder("cpu", StorageDevice::GPU); +} + +TEST(CAPI2_PipelineBuilderTest, ReaderDecoderMixed) { + if (!MixedOperatorRegistry::Registry().IsRegistered("ImageDecoder")) { + GTEST_SKIP() << "ImageDecoder for mixed backend is not available"; + } + TestReaderDecoderBuilder("mixed", StorageDevice::GPU); +} - // Advance a few iterations... +TEST(CAPI2_PipelineBuilderTest, ReaderDecoderMixed2CPU) { + if (!MixedOperatorRegistry::Registry().IsRegistered("ImageDecoder")) { + GTEST_SKIP() << "ImageDecoder for mixed backend is not available"; + } + TestReaderDecoderBuilder("mixed", StorageDevice::CPU); +} + +void RunCheckpointingTest(Pipeline &ref, daliPipeline_h pipe1, daliPipeline_h pipe2) { + // Advance a few iterations in pipe1... CHECK_DALI(daliPipelinePrefetch(pipe1)); daliPipelineOutputs_h out1_h{}; CHECK_DALI(daliPipelinePopOutputs(pipe1, &out1_h)); @@ -129,7 +246,7 @@ TEST(CAPI2_PipelineTest, Checkpointing) { ext.pipeline_data.size = strlen(ext.pipeline_data.data); daliCheckpoint_h checkpoint_h{}; - // Take a checkpoint... + // Take a checkpoint from pipe1... CHECK_DALI(daliPipelineGetCheckpoint(pipe1, &checkpoint_h, &ext)); CheckpointHandle checkpoint(checkpoint_h); @@ -138,15 +255,15 @@ TEST(CAPI2_PipelineTest, Checkpointing) { CHECK_DALI(daliPipelineSerializeCheckpoint(pipe1, checkpoint, &data, &size)); ASSERT_NE(data, nullptr); - // ...restore... - ref->RestoreFromSerializedCheckpoint(std::string(data, size)); + // ...restore into ref... + ref.RestoreFromSerializedCheckpoint(std::string(data, size)); // ...run and compare. - ComparePipelineOutputs(*ref, pipe1, 5, false); + ComparePipelineOutputs(ref, pipe1, 5, false); - // Now take another checkpoint... - auto chk_str = ref->GetSerializedCheckpoint({ ext.pipeline_data.data, ext.iterator_data.data }); + // Now take another checkpoint from ref... + auto chk_str = ref.GetSerializedCheckpoint({ ext.pipeline_data.data, ext.iterator_data.data }); - // ...deserialize... + // ...deserialize into pipe2... CHECK_DALI(daliPipelineDeserializeCheckpoint( pipe2, &checkpoint_h, chk_str.data(), chk_str.length())); CheckpointHandle checkpoint2(checkpoint_h); @@ -157,10 +274,46 @@ TEST(CAPI2_PipelineTest, Checkpointing) { EXPECT_STREQ(ext2.iterator_data.data, "ITER"); EXPECT_EQ(ext2.pipeline_data.size, pipeline_data_size); EXPECT_STREQ(ext2.pipeline_data.data, pipeline_data); - // ...restore... + // ...restore and compare. CHECK_DALI(daliPipelineRestoreCheckpoint(pipe2, checkpoint2)); - // ...run and compare. - ComparePipelineOutputs(*ref, pipe2, 5, false); + ComparePipelineOutputs(ref, pipe2, 5, false); +} + +TEST(CAPI2_SerializedPipelineTest, Checkpointing) { + // This test creates three pipelines - a C++ pipeline (ref) and two C pipelines (pipe1, pipe2), + // created by deserializing the serialized representation of the C++ pipeline. + PipelineParams params{}; + params.enable_checkpointing = true; + params.seed = 1234; + auto ref = ReaderDecoderPipe("cpu", StorageDevice::GPU, params); + ref->Build(); + auto pipe_str = ref->SerializeToProtobuf(); // serialize the ref... + auto pipe1 = Deserialize(pipe_str, {}); // ...and create pipe1 + auto pipe2 = Deserialize(pipe_str, {}); // ...and pipe2 from serialized ref + CHECK_DALI(daliPipelineBuild(pipe1)); + CHECK_DALI(daliPipelineBuild(pipe2)); + RunCheckpointingTest(*ref, pipe1, pipe2); +} + +TEST(CAPI2_PipelineBuilderTest, Checkpointing) { + // This test creates three pipelines - a C++ pipeline (ref) and two C API builder pipelines + // (pipe1, pipe2) with identical parameters, then verifies checkpoint save/restore. + PipelineParams cpp_params{}; + cpp_params.enable_checkpointing = true; + cpp_params.seed = 1234; + auto ref = ReaderDecoderPipe("cpu", StorageDevice::GPU, cpp_params); + ref->Build(); + + daliPipelineParams_t c_params{}; + c_params.enable_checkpointing_present = true; + c_params.enable_checkpointing = true; + c_params.seed_present = true; + c_params.seed = 1234; + auto pipe1 = ReaderDecoderCApiPipe("cpu", StorageDevice::GPU, c_params); + auto pipe2 = ReaderDecoderCApiPipe("cpu", StorageDevice::GPU, c_params); + CHECK_DALI(daliPipelineBuild(pipe1)); + CHECK_DALI(daliPipelineBuild(pipe2)); + RunCheckpointingTest(*ref, pipe1, pipe2); } } // namespace dali::c_api::test diff --git a/dali/c_api_2/op_test/pipeline_builder_test.cc b/dali/c_api_2/pipeline_builder_test.cc similarity index 89% rename from dali/c_api_2/op_test/pipeline_builder_test.cc rename to dali/c_api_2/pipeline_builder_test.cc index 35fcf33974d..d3bb6cc2019 100644 --- a/dali/c_api_2/op_test/pipeline_builder_test.cc +++ b/dali/c_api_2/pipeline_builder_test.cc @@ -34,8 +34,10 @@ constexpr int kNumThreads = 4; // ----- Helpers for building reference (C++) pipelines ----- // Counter + two CPU TestOps: ctr -> op1(+1000), ctr -> op2(+2000); outputs: op1, op2 -std::unique_ptr BuildRefCPUPipeline(int device_id) { - auto p = std::make_unique(MakePipelineParams(kBatchSize, kNumThreads, device_id)); +std::unique_ptr BuildRefCPUPipeline(std::optional device_id) { + auto p = std::make_unique(MakePipelineParams( + kBatchSize, kNumThreads, device_id.value_or(CPU_ONLY_DEVICE_ID))); + p->AddOperator( OpSpec(exec2::test::kCounterOpName) .AddArg("ctr", std::string("ctr")) @@ -63,8 +65,10 @@ std::unique_ptr BuildRefCPUPipeline(int device_id) { } // ExternalSource (cpu) -> output "ext" directly; used for feed-input comparison -std::unique_ptr BuildRefExtSrcPipeline(int device_id) { - auto p = std::make_unique(MakePipelineParams(kBatchSize, kNumThreads, device_id)); +std::unique_ptr BuildRefExtSrcPipeline(std::optional device_id) { + auto p = std::make_unique(MakePipelineParams( + kBatchSize, kNumThreads, device_id.value_or(CPU_ONLY_DEVICE_ID))); + p->AddExternalInput("ext", "cpu"); p->SetOutputDescs({ {"ext", "cpu"} }); p->Build(); @@ -73,19 +77,19 @@ std::unique_ptr BuildRefExtSrcPipeline(int device_id) { // ----- Helpers for building C API pipelines ----- -daliPipelineParams_t MakeCApiParams(int device_id) { +daliPipelineParams_t MakeCApiParams(std::optional device_id) { daliPipelineParams_t params{}; params.max_batch_size_present = true; params.max_batch_size = kBatchSize; params.num_threads_present = true; params.num_threads = kNumThreads; - params.device_id_present = true; - params.device_id = device_id; + params.device_id_present = device_id.has_value(); + params.device_id = device_id.value_or(-1); return params; } // Counter + two CPU TestOps built via C API, matching BuildRefCPUPipeline -PipelineHandle BuildCApiCPUPipeline(int device_id) { +PipelineHandle BuildCApiCPUPipeline(std::optional device_id) { auto params = MakeCApiParams(device_id); daliPipeline_h h = nullptr; CHECK_DALI(daliPipelineCreate(&h, ¶ms)); @@ -184,7 +188,7 @@ PipelineHandle BuildCApiCPUPipeline(int device_id) { } // ExternalSource "ext" (CPU) built via C API, matching BuildRefExtSrcPipeline -PipelineHandle BuildCApiExtSrcPipeline(int device_id) { +PipelineHandle BuildCApiExtSrcPipeline(std::optional device_id) { auto params = MakeCApiParams(device_id); daliPipeline_h h = nullptr; CHECK_DALI(daliPipelineCreate(&h, ¶ms)); @@ -210,8 +214,8 @@ PipelineHandle BuildCApiExtSrcPipeline(int device_id) { // Build both C++ and C API pipelines, run 5 iterations, compare outputs. TEST(CAPI2_PipelineBuilderTest, AddOperator_CPUOnly) { - auto ref = BuildRefCPUPipeline(CPU_ONLY_DEVICE_ID); - auto test = BuildCApiCPUPipeline(CPU_ONLY_DEVICE_ID); + auto ref = BuildRefCPUPipeline(std::nullopt); + auto test = BuildCApiCPUPipeline(std::nullopt); ComparePipelineOutputs(*ref, test, /*iters=*/5, /*prefetch_on_first_iter=*/true); } @@ -220,8 +224,8 @@ TEST(CAPI2_PipelineBuilderTest, AddOperator_CPUOnly) { // Feed identical data to both, run, compare outputs. TEST(CAPI2_PipelineBuilderTest, AddExternalInput) { - auto ref = BuildRefExtSrcPipeline(CPU_ONLY_DEVICE_ID); - auto test = BuildCApiExtSrcPipeline(CPU_ONLY_DEVICE_ID); + auto ref = BuildRefExtSrcPipeline(0); + auto test = BuildCApiExtSrcPipeline(0); // Determine how many times we need to feed before prefetching int feed_count = ref->InputFeedCount("ext"); @@ -238,7 +242,7 @@ TEST(CAPI2_PipelineBuilderTest, AddExternalInput) { // Feed to test C API pipeline auto tl_handle = Wrap(cpp_tl); - CHECK_DALI(daliPipelineFeedInput(test, "ext", tl_handle.get(), nullptr, 0, nullptr)); + CHECK_DALI(daliPipelineFeedInput(test, "ext", tl_handle.get(), nullptr, {}, nullptr)); } // Prefetch both, then compare outputs for each prefetched batch diff --git a/dali/c_api_2/pipeline_test_utils.h b/dali/c_api_2/pipeline_test_utils.h index 6e28dc490da..ad09af94698 100644 --- a/dali/c_api_2/pipeline_test_utils.h +++ b/dali/c_api_2/pipeline_test_utils.h @@ -1,4 +1,4 @@ -// Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -54,7 +54,7 @@ auto &Unwrap(daliTensorList_h h) { return static_cast(h)->Unwrap(); } -void CompareTensorLists(const TensorList &a, const TensorList &b) { +inline void CompareTensorLists(const TensorList &a, const TensorList &b) { ASSERT_EQ(a.type(), b.type()); ASSERT_EQ(a.sample_dim(), b.sample_dim()); ASSERT_EQ(a.num_samples(), b.num_samples()); @@ -65,7 +65,7 @@ void CompareTensorLists(const TensorList &a, const TensorList &a, const TensorList &b) { +inline void CompareTensorLists(const TensorList &a, const TensorList &b) { TensorList a_cpu, b_cpu; a_cpu.set_order(AccessOrder::host()); b_cpu.set_order(AccessOrder::host()); @@ -74,7 +74,7 @@ void CompareTensorLists(const TensorList &a, const TensorList Date: Thu, 12 Mar 2026 14:05:22 +0100 Subject: [PATCH 03/11] Rename arg_name to name in ArgDesc. Extend tests to include vector arguments. Signed-off-by: Michal Zientkiewicz --- dali/c_api_2/op_test/complex_pipeline_test.cc | 93 +++++++++++++------ dali/c_api_2/pipeline.cc | 6 +- dali/c_api_2/pipeline_builder_test.cc | 30 +++--- include/dali/dali.h | 2 +- 4 files changed, 84 insertions(+), 47 deletions(-) diff --git a/dali/c_api_2/op_test/complex_pipeline_test.cc b/dali/c_api_2/op_test/complex_pipeline_test.cc index 81022868cac..54c79945f87 100644 --- a/dali/c_api_2/op_test/complex_pipeline_test.cc +++ b/dali/c_api_2/op_test/complex_pipeline_test.cc @@ -31,6 +31,10 @@ ReaderDecoderPipe( PipelineParams params = {}) { std::string file_root = testing::dali_extra_path() + "/db/single/jpeg/"; std::string file_list = file_root + "image_list.txt"; + auto resize_device = decoder_device == "mixed" ? "gpu" : "cpu"; + auto out_dev_str = to_string(output_device); + auto decoder_out_dev = decoder_device == "mixed" + ? StorageDevice::GPU : StorageDevice::CPU; if (!params.max_batch_size) params.max_batch_size = 4; if (!params.num_threads) params.num_threads = 1; if (!params.seed) params.seed = 12345; @@ -47,10 +51,16 @@ ReaderDecoderPipe( .AddArg("device", decoder_device) .AddArg("output_type", DALI_RGB) .AddInput("compressed_images", StorageDevice::CPU) - .AddOutput("decoded", decoder_device == "cpu" ? StorageDevice::CPU : StorageDevice::GPU)); + .AddOutput("decoded", decoder_out_dev)); - auto out_dev_str = to_string(output_device); - pipe->SetOutputDescs({{ "decoded", out_dev_str }, { "labels", out_dev_str }}); + pipe->AddOperator(OpSpec("Resize") + .AddArg("device", resize_device) + .AddArg("output_type", DALI_RGB) + .AddArg("size", std::vector{ 224, 224 }) + .AddInput("decoded", decoder_out_dev) + .AddOutput("resized", decoder_out_dev)); + + pipe->SetOutputDescs({{ "resized", out_dev_str }, { "labels", out_dev_str }}); return pipe; } @@ -84,16 +94,13 @@ ReaderDecoderCApiPipe( PipelineHandle pipe(h); // FileReader: no inputs, outputs "compressed_images" and "labels" on CPU - daliArgDesc_t reader_args[3]; - reader_args[0].arg_name = "device"; - reader_args[0].dtype = DALI_STRING; - reader_args[0].str = "cpu"; - reader_args[1].arg_name = "file_root"; - reader_args[1].dtype = DALI_STRING; - reader_args[1].str = file_root.c_str(); - reader_args[2].arg_name = "file_list"; - reader_args[2].dtype = DALI_STRING; - reader_args[2].str = file_list.c_str(); + daliArgDesc_t reader_args[2]; + reader_args[0].name = "file_root"; + reader_args[0].dtype = DALI_STRING; + reader_args[0].str = file_root.c_str(); + reader_args[1].name = "file_list"; + reader_args[1].dtype = DALI_STRING; + reader_args[1].str = file_list.c_str(); daliIODesc_t reader_out[2]; reader_out[0].name = "compressed_images"; @@ -105,8 +112,8 @@ ReaderDecoderCApiPipe( reader_op.schema_name = "FileReader"; reader_op.backend = DALI_BACKEND_CPU; reader_op.num_inputs = 0; - reader_op.num_outputs = 2; - reader_op.num_args = 3; + reader_op.num_outputs = std::size(reader_out); + reader_op.num_args = std::size(reader_args); reader_op.outputs = reader_out; reader_op.args = reader_args; CHECK_DALI(daliPipelineAddOperator(h, &reader_op)); @@ -115,19 +122,14 @@ ReaderDecoderCApiPipe( bool is_mixed = (decoder_device == "mixed"); daliBackend_t decoder_backend = is_mixed ? DALI_BACKEND_MIXED : DALI_BACKEND_CPU; daliStorageDevice_t decoded_dev = is_mixed ? DALI_STORAGE_GPU : DALI_STORAGE_CPU; - std::string decoder_device_str(decoder_device); - daliArgDesc_t decoder_args[2]; - decoder_args[0].arg_name = "device"; - decoder_args[0].dtype = DALI_STRING; - decoder_args[0].str = decoder_device_str.c_str(); - decoder_args[1].arg_name = "output_type"; - decoder_args[1].dtype = DALI_INT32; - decoder_args[1].ivalue = DALI_RGB; + daliArgDesc_t decoder_args[1]; + decoder_args[0].name = "output_type"; + decoder_args[0].dtype = DALI_INT32; + decoder_args[0].ivalue = DALI_RGB; daliIODesc_t decoder_in[1]; - decoder_in[0].name = "compressed_images"; - decoder_in[0].device_type = DALI_STORAGE_CPU; + decoder_in[0] = reader_out[0]; daliIODesc_t decoder_out[1]; decoder_out[0].name = "decoded"; @@ -136,19 +138,52 @@ ReaderDecoderCApiPipe( daliOperatorDesc_t decoder_op{}; decoder_op.schema_name = "ImageDecoder"; decoder_op.backend = decoder_backend; - decoder_op.num_inputs = 1; - decoder_op.num_outputs = 1; - decoder_op.num_args = 2; + decoder_op.num_inputs = std::size(decoder_in); + decoder_op.num_outputs = std::size(decoder_out); + decoder_op.num_args = std::size(decoder_args); decoder_op.inputs = decoder_in; decoder_op.outputs = decoder_out; decoder_op.args = decoder_args; CHECK_DALI(daliPipelineAddOperator(h, &decoder_op)); + + // Resize: input "decoded" (CPU or GPU), output "resized" (same as input) + // If decoder is mixed, then resize is gpu + daliBackend_t resize_backend = is_mixed ? DALI_BACKEND_GPU : DALI_BACKEND_CPU; + + daliArgDesc_t resize_args[2]; + resize_args[0].name = "output_type"; + resize_args[0].dtype = DALI_INT32; + resize_args[0].ivalue = DALI_RGB; + resize_args[1].name = "size"; + resize_args[1].dtype = DALI_FLOAT_VEC; + float size[] = { 224, 224 }; + resize_args[1].arr = size; + resize_args[1].size = 2; + + daliIODesc_t resize_in[1]; + resize_in[0] = decoder_out[0]; + + daliIODesc_t resize_out[1]; + resize_out[0].name = "resized"; + resize_out[0].device_type = decoded_dev; + + daliOperatorDesc_t resize_op{}; + resize_op.schema_name = "Resize"; + resize_op.backend = resize_backend; + resize_op.num_inputs = std::size(resize_in); + resize_op.num_outputs = std::size(resize_out); + resize_op.num_args = std::size(resize_args); + resize_op.inputs = resize_in; + resize_op.outputs = resize_out; + resize_op.args = resize_args; + CHECK_DALI(daliPipelineAddOperator(h, &resize_op)); + daliStorageDevice_t out_dev = output_device == StorageDevice::GPU ? DALI_STORAGE_GPU : DALI_STORAGE_CPU; daliPipelineIODesc_t out_descs[2]; out_descs[0] = {}; - out_descs[0].name = "decoded"; + out_descs[0].name = "resized"; out_descs[0].device = out_dev; out_descs[1] = {}; out_descs[1].name = "labels"; diff --git a/dali/c_api_2/pipeline.cc b/dali/c_api_2/pipeline.cc index ef57521faa3..f33978836c6 100644 --- a/dali/c_api_2/pipeline.cc +++ b/dali/c_api_2/pipeline.cc @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include "dali/c_api_2/pipeline.h" #include "dali/c_api_2/pipeline_outputs.h" #include "dali/c_api_2/checkpoint.h" @@ -530,8 +531,8 @@ std::string_view BackendToString(daliBackend_t backend) { } void AddArgToSpec(dali::OpSpec &spec, const daliArgDesc_t &arg) { - dali::c_api::CheckNotNull(arg.arg_name, "arg.arg_name"); - std::string_view name = arg.arg_name; + dali::c_api::CheckNotNull(arg.name, "arg.name"); + std::string_view name = arg.name; switch (arg.dtype) { // --- scalar types --- case DALI_INT8: @@ -649,6 +650,7 @@ daliResult_t daliPipelineAddOperator( spec.AddOutput(op_desc->outputs[i].name, static_cast(op_desc->outputs[i].device_type)); } + // Argument inputs need to be added after regular inputs for (int i = 0; i < op_desc->num_arg_inputs; i++) { NOT_NULL(op_desc->arg_inputs[i].arg_name); NOT_NULL(op_desc->arg_inputs[i].input_name); diff --git a/dali/c_api_2/pipeline_builder_test.cc b/dali/c_api_2/pipeline_builder_test.cc index d3bb6cc2019..4fb43d869c4 100644 --- a/dali/c_api_2/pipeline_builder_test.cc +++ b/dali/c_api_2/pipeline_builder_test.cc @@ -97,9 +97,9 @@ PipelineHandle BuildCApiCPUPipeline(std::optional device_id) { // CounterOp "ctr": schema arg "ctr"="ctr", output "ctr" (CPU) daliArgDesc_t ctr_args[1]; - ctr_args[0].arg_name = "ctr"; - ctr_args[0].dtype = DALI_STRING; - ctr_args[0].str = "ctr"; + ctr_args[0].name = "ctr"; + ctr_args[0].dtype = DALI_STRING; + ctr_args[0].str = "ctr"; daliIODesc_t ctr_out[1]; ctr_out[0].name = "ctr"; @@ -117,12 +117,12 @@ PipelineHandle BuildCApiCPUPipeline(std::optional device_id) { // TestOp "op1": input "ctr" (CPU), output "op1" (CPU), addend=1000 daliArgDesc_t op1_args[2]; - op1_args[0].arg_name = "name"; - op1_args[0].dtype = DALI_STRING; - op1_args[0].str = "op1"; - op1_args[1].arg_name = "addend"; - op1_args[1].dtype = DALI_INT32; - op1_args[1].ivalue = 1000; + op1_args[0].name = "name"; + op1_args[0].dtype = DALI_STRING; + op1_args[0].str = "op1"; + op1_args[1].name = "addend"; + op1_args[1].dtype = DALI_INT32; + op1_args[1].ivalue = 1000; daliIODesc_t op1_in[1]; op1_in[0].name = "ctr"; @@ -146,12 +146,12 @@ PipelineHandle BuildCApiCPUPipeline(std::optional device_id) { // TestOp "op2": input "ctr" (CPU), output "op2" (CPU), addend=2000 daliArgDesc_t op2_args[2]; - op2_args[0].arg_name = "name"; - op2_args[0].dtype = DALI_STRING; - op2_args[0].str = "op2"; - op2_args[1].arg_name = "addend"; - op2_args[1].dtype = DALI_INT32; - op2_args[1].ivalue = 2000; + op2_args[0].name = "name"; + op2_args[0].dtype = DALI_STRING; + op2_args[0].str = "op2"; + op2_args[1].name = "addend"; + op2_args[1].dtype = DALI_INT32; + op2_args[1].ivalue = 2000; daliIODesc_t op2_in[1]; op2_in[0].name = "ctr"; diff --git a/include/dali/dali.h b/include/dali/dali.h index 11b5a8764eb..4d2b4f24561 100644 --- a/include/dali/dali.h +++ b/include/dali/dali.h @@ -393,7 +393,7 @@ typedef struct _DALIArgInputDesc { * For vector types, `arr` must be non-NULL and `size` must be >= 0. */ typedef struct _DALIArgDesc { - const char *arg_name; + const char *name; daliDataType_t dtype; union { int64_t ivalue; /**< signed integer scalar types and bool */ From 1f45179b7b1e4c3739f0236e292cf91a6402fd9d Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Thu, 12 Mar 2026 17:43:57 +0100 Subject: [PATCH 04/11] Improve error messages. Signed-off-by: Michal Zientkiewicz --- dali/c_api_2/pipeline.cc | 57 ++++++++++++++++++++++++++++++--------- dali/c_api_2/validation.h | 18 ++++++++++++- 2 files changed, 61 insertions(+), 14 deletions(-) diff --git a/dali/c_api_2/pipeline.cc b/dali/c_api_2/pipeline.cc index f33978836c6..0223b271967 100644 --- a/dali/c_api_2/pipeline.cc +++ b/dali/c_api_2/pipeline.cc @@ -531,8 +531,15 @@ std::string_view BackendToString(daliBackend_t backend) { } void AddArgToSpec(dali::OpSpec &spec, const daliArgDesc_t &arg) { - dali::c_api::CheckNotNull(arg.name, "arg.name"); + assert(arg.name != nullptr); // checked in daliPipelineAddOperator std::string_view name = arg.name; + + auto check_not_null = [&](auto *x, auto &&field) { + dali::c_api::CheckNotNull(x, [&]() { + return dali::make_string("`", field, "` of argument \"", name, "\""); + }); + }; + switch (arg.dtype) { // --- scalar types --- case DALI_INT8: @@ -570,39 +577,40 @@ void AddArgToSpec(dali::OpSpec &spec, const daliArgDesc_t &arg) { spec.AddArg(name, static_cast(arg.ivalue)); break; case DALI_STRING: - dali::c_api::CheckNotNull(arg.str, "arg.str"); + check_not_null(arg.str, "arg.str"); spec.AddArg(name, std::string(arg.str)); break; // --- vector (list) types --- case DALI_INT_VEC: { if (arg.size > 0) - dali::c_api::CheckNotNull(arg.arr, "arg.arr"); + check_not_null(arg.arr, "arg.arr"); auto *d = static_cast(arg.arr); spec.AddArg(name, std::vector(d, d + arg.size)); break; } case DALI_FLOAT_VEC: { if (arg.size > 0) - dali::c_api::CheckNotNull(arg.arr, "arg.arr"); + check_not_null(arg.arr, "arg.arr"); auto *d = static_cast(arg.arr); spec.AddArg(name, std::vector(d, d + arg.size)); break; } case DALI_BOOL_VEC: { if (arg.size > 0) - dali::c_api::CheckNotNull(arg.arr, "arg.arr"); + check_not_null(arg.arr, "arg.arr"); auto *d = static_cast(arg.arr); spec.AddArg(name, std::vector(d, d + arg.size)); break; } case DALI_STRING_VEC: { if (arg.size > 0) - dali::c_api::CheckNotNull(arg.arr, "arg.arr"); + check_not_null(arg.arr, "arg.arr"); auto *d = static_cast(arg.arr); std::vector sv; sv.reserve(arg.size); for (int64_t i = 0; i < arg.size; i++) { - dali::c_api::CheckNotNull(d[i], "arg.arr[i]"); + if (!d[i]) + check_not_null(d[i], dali::make_string("arg.arr[", i, "]")); sv.emplace_back(d[i]); } spec.AddArg(name, std::move(sv)); @@ -610,7 +618,8 @@ void AddArgToSpec(dali::OpSpec &spec, const daliArgDesc_t &arg) { } default: throw std::invalid_argument(dali::make_string( - "Unsupported argument dtype: ", static_cast(arg.dtype))); + "Unsupported argument `dtype`: ", static_cast(arg.dtype), + " in argument \"", arg.name, "\".")); } } @@ -638,27 +647,49 @@ daliResult_t daliPipelineAddOperator( auto pipe = ToPointer(pipeline); NOT_NULL(op_desc); NOT_NULL(op_desc->schema_name); + if (op_desc->num_inputs > 0) + NOT_NULL(op_desc->inputs); + if (op_desc->num_outputs > 0) + NOT_NULL(op_desc->outputs); + if (op_desc->num_arg_inputs > 0) + NOT_NULL(op_desc->arg_inputs); + if (op_desc->num_args > 0) + NOT_NULL(op_desc->args); dali::OpSpec spec(op_desc->schema_name); spec.AddArg("device", std::string(BackendToString(op_desc->backend))); for (int i = 0; i < op_desc->num_inputs; i++) { - NOT_NULL(op_desc->inputs[i].name); + dali::c_api::CheckNotNull(op_desc->inputs[i].name, [i]() { + return dali::make_string("`op_desc->inputs[", i, "].name`"); + }); spec.AddInput(op_desc->inputs[i].name, static_cast(op_desc->inputs[i].device_type)); } for (int i = 0; i < op_desc->num_outputs; i++) { - NOT_NULL(op_desc->outputs[i].name); + dali::c_api::CheckNotNull(op_desc->outputs[i].name, [i]() { + return dali::make_string("`op_desc->outputs[", i, "].name`"); + }); spec.AddOutput(op_desc->outputs[i].name, static_cast(op_desc->outputs[i].device_type)); } // Argument inputs need to be added after regular inputs for (int i = 0; i < op_desc->num_arg_inputs; i++) { - NOT_NULL(op_desc->arg_inputs[i].arg_name); - NOT_NULL(op_desc->arg_inputs[i].input_name); + dali::c_api::CheckNotNull(op_desc->arg_inputs[i].arg_name, [i]() { + return dali::make_string( + "`arg_input[", i, "].arg_name`"); + }); + dali::c_api::CheckNotNull(op_desc->arg_inputs[i].input_name, [i]() { + return dali::make_string( + "`arg_input[", i, "].input_name`"); + }); spec.AddArgumentInput(op_desc->arg_inputs[i].arg_name, op_desc->arg_inputs[i].input_name); } - for (int i = 0; i < op_desc->num_args; i++) + for (int i = 0; i < op_desc->num_args; i++) { + dali::c_api::CheckNotNull(op_desc->args[i].name, [i]() { + return dali::make_string("`op_desc->args[", i, "].name`"); + }); AddArgToSpec(spec, op_desc->args[i]); + } if (op_desc->instance_name && op_desc->instance_name[0] != '\0') pipe->Unwrap()->AddOperator(spec, op_desc->instance_name); else diff --git a/dali/c_api_2/validation.h b/dali/c_api_2/validation.h index 09ec421f994..dd1a2711f65 100644 --- a/dali/c_api_2/validation.h +++ b/dali/c_api_2/validation.h @@ -1,4 +1,4 @@ -// Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -15,9 +15,11 @@ #ifndef DALI_C_API_2_VALIDATION_H_ #define DALI_C_API_2_VALIDATION_H_ +#include #include #include #include +#include #include "dali/dali.h" #include "dali/core/format.h" #include "dali/core/span.h" @@ -112,11 +114,25 @@ inline void CheckArg(bool assertion, const std::string &what) { throw std::invalid_argument(what); } +template CB> +inline void CheckArg(bool assertion, CB &&message_callback) { + if (!assertion) + throw std::invalid_argument(std::forward(message_callback)()); +} + template void CheckNotNull(T *x, std::string_view what) { CheckArg(x != nullptr, make_string(what, " must not be NULL.")); } +template CB> +void CheckNotNull(T *x, CB &&message_callback) { + if (x == nullptr) { + throw std::invalid_argument(make_string( + std::forward(message_callback)(), " must not be NULL.")); + } +} + #define CHECK_OUTPUT(output_param) \ ::dali::c_api::CheckNotNull(output_param, "The output parameter `" #output_param "`"); From cf466f46fe68a7cb40e9cad0fcd2c8afe649e4b8 Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Thu, 12 Mar 2026 18:01:44 +0100 Subject: [PATCH 05/11] Added tests with argument input. Signed-off-by: Michal Zientkiewicz --- dali/c_api_2/op_test/complex_pipeline_test.cc | 107 ++++++++++++++++++ 1 file changed, 107 insertions(+) diff --git a/dali/c_api_2/op_test/complex_pipeline_test.cc b/dali/c_api_2/op_test/complex_pipeline_test.cc index 54c79945f87..c07d927ee0a 100644 --- a/dali/c_api_2/op_test/complex_pipeline_test.cc +++ b/dali/c_api_2/op_test/complex_pipeline_test.cc @@ -351,4 +351,111 @@ TEST(CAPI2_PipelineBuilderTest, Checkpointing) { RunCheckpointingTest(*ref, pipe1, pipe2); } +TEST(CAPI2_PipelineBuilderTest, ResizeWithArgumentInput) { + constexpr int kBatchSize = 4; + + daliPipelineParams_t params{}; + params.max_batch_size_present = true; + params.max_batch_size = kBatchSize; + params.num_threads_present = true; + params.num_threads = 2; + + daliPipeline_h h = nullptr; + CHECK_DALI(daliPipelineCreate(&h, ¶ms)); + PipelineHandle pipe(h); + + // Add ExternalInput "images" (CPU) + daliPipelineIODesc_t images_input_desc{}; + images_input_desc.name = "images"; + images_input_desc.device = DALI_STORAGE_CPU; + CHECK_DALI(daliPipelineAddExternalInput(h, &images_input_desc)); + + // Add ExternalInput "sizes" (CPU) — fed as argument input for Resize's "size" argument + daliPipelineIODesc_t sizes_input_desc{}; + sizes_input_desc.name = "sizes"; + sizes_input_desc.device = DALI_STORAGE_CPU; + CHECK_DALI(daliPipelineAddExternalInput(h, &sizes_input_desc)); + + // Resize: regular input "images", argument input "sizes" -> "size" + daliIODesc_t resize_in[1]; + resize_in[0].name = "images"; + resize_in[0].device_type = DALI_STORAGE_CPU; + + daliIODesc_t resize_out[1]; + resize_out[0].name = "resized"; + resize_out[0].device_type = DALI_STORAGE_CPU; + + daliArgInputDesc_t arg_inputs[1]; + arg_inputs[0].arg_name = "size"; + arg_inputs[0].input_name = "sizes"; + + daliOperatorDesc_t resize_op{}; + resize_op.schema_name = "Resize"; + resize_op.backend = DALI_BACKEND_CPU; + resize_op.num_inputs = std::size(resize_in); + resize_op.num_outputs = std::size(resize_out); + resize_op.num_arg_inputs = std::size(arg_inputs); + resize_op.inputs = resize_in; + resize_op.outputs = resize_out; + resize_op.arg_inputs = arg_inputs; + CHECK_DALI(daliPipelineAddOperator(h, &resize_op)); + + daliPipelineIODesc_t out_desc{}; + out_desc.name = "resized"; + out_desc.device = DALI_STORAGE_CPU; + CHECK_DALI(daliPipelineSetOutputs(h, 1, &out_desc)); + + CHECK_DALI(daliPipelineBuild(h)); + + // Per-sample target sizes [H, W] — at least two distinct shapes + const float sample_sizes[kBatchSize][2] = { + {100.f, 200.f}, + { 50.f, 80.f}, + {120.f, 90.f}, + { 30.f, 40.f}, + }; + + int feed_count = 0; + CHECK_DALI(daliPipelineGetFeedCount(h, &feed_count, "images")); + + for (int feed = 0; feed < feed_count; feed++) { + // Input images: HWC tensors of shape [64, 64, 3] (content unused, only shape matters) + auto images_tl = std::make_shared>(); + images_tl->Resize(uniform_list_shape(kBatchSize, TensorShape<3>{64, 64, 3}), DALI_UINT8); + + // Per-sample sizes: 1D float tensors of shape [2], each holding [H, W] + auto sizes_tl = std::make_shared>(); + sizes_tl->Resize(uniform_list_shape(kBatchSize, TensorShape<1>{2}), DALI_FLOAT); + for (int i = 0; i < kBatchSize; i++) { + float *ptr = (*sizes_tl)[i].mutable_data(); + ptr[0] = sample_sizes[i][0]; + ptr[1] = sample_sizes[i][1]; + } + + auto images_handle = Wrap(images_tl); + auto sizes_handle = Wrap(sizes_tl); + CHECK_DALI(daliPipelineFeedInput(h, "images", images_handle.get(), nullptr, {}, nullptr)); + CHECK_DALI(daliPipelineFeedInput(h, "sizes", sizes_handle.get(), nullptr, {}, nullptr)); + } + + CHECK_DALI(daliPipelinePrefetch(h)); + + auto outs = PopOutputs(h); + auto out_tl = GetOutput(outs, 0); + + int num_samples = 0, ndim = 0; + const int64_t *shape = nullptr; + CHECK_DALI(daliTensorListGetShape(out_tl, &num_samples, &ndim, &shape)); + + ASSERT_EQ(num_samples, kBatchSize); + ASSERT_EQ(ndim, 3); // H, W, C + + for (int i = 0; i < kBatchSize; i++) { + const int64_t *s = shape + i * ndim; + EXPECT_EQ(s[0], static_cast(sample_sizes[i][0])) << "Sample " << i << " H mismatch"; + EXPECT_EQ(s[1], static_cast(sample_sizes[i][1])) << "Sample " << i << " W mismatch"; + EXPECT_EQ(s[2], 3) << "Sample " << i << " C mismatch"; + } +} + } // namespace dali::c_api::test From e08deb53b1b1e5e6a81faa3b350039047c3a8f09 Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Fri, 13 Mar 2026 10:23:34 +0100 Subject: [PATCH 06/11] Review issues. Signed-off-by: Michal Zientkiewicz --- dali/c_api_2/pipeline.cc | 6 +++++- include/dali/dali.h | 5 +++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/dali/c_api_2/pipeline.cc b/dali/c_api_2/pipeline.cc index 0223b271967..b65d8895ba2 100644 --- a/dali/c_api_2/pipeline.cc +++ b/dali/c_api_2/pipeline.cc @@ -609,7 +609,7 @@ void AddArgToSpec(dali::OpSpec &spec, const daliArgDesc_t &arg) { std::vector sv; sv.reserve(arg.size); for (int64_t i = 0; i < arg.size; i++) { - if (!d[i]) + if (!d[i]) // the outer `if` prevents make_string in case of no error check_not_null(d[i], dali::make_string("arg.arr[", i, "]")); sv.emplace_back(d[i]); } @@ -703,6 +703,10 @@ daliResult_t daliPipelineSetOutputs( const daliPipelineIODesc_t *outputs) { DALI_PROLOG(); auto pipe = ToPointer(pipeline); + dali::c_api::CheckArg(num_outputs >= 0, "`num_outputs` must not be negative"); + if (num_outputs == 0) + return DALI_SUCCESS; // nothing to do + NOT_NULL(outputs); std::vector descs; descs.reserve(num_outputs); diff --git a/include/dali/dali.h b/include/dali/dali.h index 4d2b4f24561..afb78e844ab 100644 --- a/include/dali/dali.h +++ b/include/dali/dali.h @@ -380,7 +380,6 @@ typedef struct _DALIArgInputDesc { * - DALI_INT8..DALI_INT64 -> ivalue * - DALI_UINT8..DALI_UINT64 -> uvalue * - DALI_FLOAT -> fvalue - * - DALI_FLOAT64 -> dvalue * - DALI_BOOL -> ivalue (0 = false, non-zero = true) * - DALI_STRING -> str (NULL-terminated) * @@ -390,7 +389,9 @@ typedef struct _DALIArgInputDesc { * - DALI_BOOL_VEC -> arr points to bool[], size = element count * - DALI_STRING_VEC -> arr points to const char*[], size = element count * - * For vector types, `arr` must be non-NULL and `size` must be >= 0. + * Other types (including DALI_FLOAT64) are not supported. + * + * For vector types, `size` must be >= 0 and `arr` must be non-NULL when `size` > 0. */ typedef struct _DALIArgDesc { const char *name; From eb00a53234b3c619881e1fd27da889760daa5082 Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Fri, 13 Mar 2026 11:05:03 +0100 Subject: [PATCH 07/11] Extended validation. Signed-off-by: Michal Zientkiewicz --- dali/c_api_2/pipeline.cc | 49 ++++++++++++++++++++++++++++------------ 1 file changed, 35 insertions(+), 14 deletions(-) diff --git a/dali/c_api_2/pipeline.cc b/dali/c_api_2/pipeline.cc index b65d8895ba2..74c6145c752 100644 --- a/dali/c_api_2/pipeline.cc +++ b/dali/c_api_2/pipeline.cc @@ -540,6 +540,15 @@ void AddArgToSpec(dali::OpSpec &spec, const daliArgDesc_t &arg) { }); }; + + auto check_arr = [&]() { + if (arg.size < 0) + throw std::invalid_argument( + dali::make_string("`arg.size` of argument \"", name, "\" has a negative size")); + if (arg.size > 0) + check_not_null(arg.arr, "arg.arr"); + }; + switch (arg.dtype) { // --- scalar types --- case DALI_INT8: @@ -582,29 +591,25 @@ void AddArgToSpec(dali::OpSpec &spec, const daliArgDesc_t &arg) { break; // --- vector (list) types --- case DALI_INT_VEC: { - if (arg.size > 0) - check_not_null(arg.arr, "arg.arr"); + check_arr(); auto *d = static_cast(arg.arr); spec.AddArg(name, std::vector(d, d + arg.size)); break; } case DALI_FLOAT_VEC: { - if (arg.size > 0) - check_not_null(arg.arr, "arg.arr"); + check_arr(); auto *d = static_cast(arg.arr); spec.AddArg(name, std::vector(d, d + arg.size)); break; } case DALI_BOOL_VEC: { - if (arg.size > 0) - check_not_null(arg.arr, "arg.arr"); + check_arr(); auto *d = static_cast(arg.arr); spec.AddArg(name, std::vector(d, d + arg.size)); break; } case DALI_STRING_VEC: { - if (arg.size > 0) - check_not_null(arg.arr, "arg.arr"); + check_arr(); auto *d = static_cast(arg.arr); std::vector sv; sv.reserve(arg.size); @@ -640,12 +645,12 @@ daliResult_t daliPipelineAddExternalInput( DALI_EPILOG(); } -daliResult_t daliPipelineAddOperator( - daliPipeline_h pipeline, - const daliOperatorDesc_t *op_desc) { - DALI_PROLOG(); - auto pipe = ToPointer(pipeline); +inline dali::OpSpec MakeOpSpec(const daliOperatorDesc_t *op_desc) { NOT_NULL(op_desc); + dali::c_api::CheckArg(op_desc->num_inputs >= 0, "`num_inputs` must not be negative."); + dali::c_api::CheckArg(op_desc->num_outputs >= 0, "`num_outputs` must not be negative."); + dali::c_api::CheckArg(op_desc->num_args >= 0, "`num_args` must not be negative."); + dali::c_api::CheckArg(op_desc->num_arg_inputs>= 0, "`num_arg_inputs` must not be negative."); NOT_NULL(op_desc->schema_name); if (op_desc->num_inputs > 0) NOT_NULL(op_desc->inputs); @@ -690,6 +695,15 @@ daliResult_t daliPipelineAddOperator( }); AddArgToSpec(spec, op_desc->args[i]); } + return spec; +} + +daliResult_t daliPipelineAddOperator( + daliPipeline_h pipeline, + const daliOperatorDesc_t *op_desc) { + DALI_PROLOG(); + auto pipe = ToPointer(pipeline); + auto spec = MakeOpSpec(op_desc); if (op_desc->instance_name && op_desc->instance_name[0] != '\0') pipe->Unwrap()->AddOperator(spec, op_desc->instance_name); else @@ -712,12 +726,19 @@ daliResult_t daliPipelineSetOutputs( descs.reserve(num_outputs); for (int i = 0; i < num_outputs; i++) { NOT_NULL(outputs[i].name); + dali::PipelineOutputDesc desc; desc.name = outputs[i].name; desc.device = static_cast(outputs[i].device); + if (outputs[i].dtype_present) desc.dtype = outputs[i].dtype; - if (outputs[i].ndim_present) desc.ndim = outputs[i].ndim; if (outputs[i].layout) desc.layout = outputs[i].layout; + + if (outputs[i].ndim_present) { + ValidateNDim(outputs[i].ndim); + desc.ndim = outputs[i].ndim; + } + descs.push_back(std::move(desc)); } pipe->Unwrap()->SetOutputDescs(std::move(descs)); From 7640b6daae0cc09931f59b8224873b619797ac02 Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Fri, 13 Mar 2026 11:58:45 +0100 Subject: [PATCH 08/11] Extended validation. Signed-off-by: Michal Zientkiewicz --- dali/c_api_2/pipeline.cc | 47 ++++++++++++++++++--------------------- dali/c_api_2/validation.h | 12 ++++++++++ 2 files changed, 34 insertions(+), 25 deletions(-) diff --git a/dali/c_api_2/pipeline.cc b/dali/c_api_2/pipeline.cc index 74c6145c752..6f3ca5ad628 100644 --- a/dali/c_api_2/pipeline.cc +++ b/dali/c_api_2/pipeline.cc @@ -526,7 +526,7 @@ std::string_view BackendToString(daliBackend_t backend) { case DALI_BACKEND_MIXED: return "mixed"; default: throw std::invalid_argument(dali::make_string( - "Invalid backend value: ", static_cast(backend))); + "Invalid backend type: ", static_cast(backend))); } } @@ -628,23 +628,6 @@ void AddArgToSpec(dali::OpSpec &spec, const daliArgDesc_t &arg) { } } -} // namespace - -daliResult_t daliPipelineAddExternalInput( - daliPipeline_h pipeline, - const daliPipelineIODesc_t *input_desc) { - DALI_PROLOG(); - auto pipe = ToPointer(pipeline); - NOT_NULL(input_desc); - NOT_NULL(input_desc->name); - std::string device_str = input_desc->device == DALI_STORAGE_GPU ? "gpu" : "cpu"; - daliDataType_t dtype = input_desc->dtype_present ? input_desc->dtype : DALI_NO_TYPE; - int ndim = input_desc->ndim_present ? input_desc->ndim : -1; - const char *layout = input_desc->layout ? input_desc->layout : ""; - pipe->Unwrap()->AddExternalInput(input_desc->name, device_str, dtype, ndim, layout); - DALI_EPILOG(); -} - inline dali::OpSpec MakeOpSpec(const daliOperatorDesc_t *op_desc) { NOT_NULL(op_desc); dali::c_api::CheckArg(op_desc->num_inputs >= 0, "`num_inputs` must not be negative."); @@ -666,6 +649,7 @@ inline dali::OpSpec MakeOpSpec(const daliOperatorDesc_t *op_desc) { dali::c_api::CheckNotNull(op_desc->inputs[i].name, [i]() { return dali::make_string("`op_desc->inputs[", i, "].name`"); }); + Validate(op_desc->inputs[i].device_type); spec.AddInput(op_desc->inputs[i].name, static_cast(op_desc->inputs[i].device_type)); } @@ -673,6 +657,7 @@ inline dali::OpSpec MakeOpSpec(const daliOperatorDesc_t *op_desc) { dali::c_api::CheckNotNull(op_desc->outputs[i].name, [i]() { return dali::make_string("`op_desc->outputs[", i, "].name`"); }); + Validate(op_desc->outputs[i].device_type); spec.AddOutput(op_desc->outputs[i].name, static_cast(op_desc->outputs[i].device_type)); } @@ -698,6 +683,23 @@ inline dali::OpSpec MakeOpSpec(const daliOperatorDesc_t *op_desc) { return spec; } +} // namespace + +daliResult_t daliPipelineAddExternalInput( + daliPipeline_h pipeline, + const daliPipelineIODesc_t *input_desc) { + DALI_PROLOG(); + auto pipe = ToPointer(pipeline); + NOT_NULL(input_desc); + Validate(*input_desc); + std::string device_str = input_desc->device == DALI_STORAGE_GPU ? "gpu" : "cpu"; + daliDataType_t dtype = input_desc->dtype_present ? input_desc->dtype : DALI_NO_TYPE; + dali::TensorLayout layout(input_desc->layout ? input_desc->layout : ""); + int ndim = input_desc->ndim_present ? input_desc->ndim : -1; + pipe->Unwrap()->AddExternalInput(input_desc->name, device_str, dtype, ndim, layout); + DALI_EPILOG(); +} + daliResult_t daliPipelineAddOperator( daliPipeline_h pipeline, const daliOperatorDesc_t *op_desc) { @@ -725,19 +727,14 @@ daliResult_t daliPipelineSetOutputs( std::vector descs; descs.reserve(num_outputs); for (int i = 0; i < num_outputs; i++) { - NOT_NULL(outputs[i].name); - + Validate(outputs[i]); dali::PipelineOutputDesc desc; desc.name = outputs[i].name; desc.device = static_cast(outputs[i].device); if (outputs[i].dtype_present) desc.dtype = outputs[i].dtype; if (outputs[i].layout) desc.layout = outputs[i].layout; - - if (outputs[i].ndim_present) { - ValidateNDim(outputs[i].ndim); - desc.ndim = outputs[i].ndim; - } + if (outputs[i].ndim_present) desc.ndim = outputs[i].ndim; descs.push_back(std::move(desc)); } diff --git a/dali/c_api_2/validation.h b/dali/c_api_2/validation.h index dd1a2711f65..d49e35936f2 100644 --- a/dali/c_api_2/validation.h +++ b/dali/c_api_2/validation.h @@ -101,6 +101,18 @@ inline void Validate(daliStorageDevice_t device_type) { throw std::invalid_argument(make_string("Invalid storage device type: ", device_type)); } +inline void Validate(const daliPipelineIODesc_t &desc) { + if (desc.name == nullptr) + throw std::invalid_argument("input/output name must not be NULL"); + Validate(desc.device); + Validate(desc.dtype); + if (desc.ndim_present) { + ValidateNDim(desc.ndim); + if (desc.layout) + Validate(desc.layout, desc.ndim); + } +} + DLL_PUBLIC void ValidateDeviceId(int device_id, bool allow_cpu_only); inline void Validate(const daliBufferPlacement_t &placement) { From caaccf89bb90150a8ab846890932179a25f7dc42 Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Fri, 13 Mar 2026 15:57:02 +0100 Subject: [PATCH 09/11] Don't validate dtype if dtype_present is not set. Signed-off-by: Michal Zientkiewicz --- dali/c_api_2/validation.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dali/c_api_2/validation.h b/dali/c_api_2/validation.h index d49e35936f2..b026bee53cc 100644 --- a/dali/c_api_2/validation.h +++ b/dali/c_api_2/validation.h @@ -105,7 +105,8 @@ inline void Validate(const daliPipelineIODesc_t &desc) { if (desc.name == nullptr) throw std::invalid_argument("input/output name must not be NULL"); Validate(desc.device); - Validate(desc.dtype); + if (desc.dtype_present) + Validate(desc.dtype); if (desc.ndim_present) { ValidateNDim(desc.ndim); if (desc.layout) From 4638be64ccb130cd8b7bf81b4d8b8a9f8e3f47ab Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Fri, 13 Mar 2026 16:11:39 +0100 Subject: [PATCH 10/11] Reset outputs when num_outputs is 0. Signed-off-by: Michal Zientkiewicz --- dali/c_api_2/pipeline.cc | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/dali/c_api_2/pipeline.cc b/dali/c_api_2/pipeline.cc index 6f3ca5ad628..45ecd517bbc 100644 --- a/dali/c_api_2/pipeline.cc +++ b/dali/c_api_2/pipeline.cc @@ -720,23 +720,23 @@ daliResult_t daliPipelineSetOutputs( DALI_PROLOG(); auto pipe = ToPointer(pipeline); dali::c_api::CheckArg(num_outputs >= 0, "`num_outputs` must not be negative"); - if (num_outputs == 0) - return DALI_SUCCESS; // nothing to do - NOT_NULL(outputs); std::vector descs; - descs.reserve(num_outputs); - for (int i = 0; i < num_outputs; i++) { - Validate(outputs[i]); - dali::PipelineOutputDesc desc; - desc.name = outputs[i].name; - desc.device = static_cast(outputs[i].device); - - if (outputs[i].dtype_present) desc.dtype = outputs[i].dtype; - if (outputs[i].layout) desc.layout = outputs[i].layout; - if (outputs[i].ndim_present) desc.ndim = outputs[i].ndim; - - descs.push_back(std::move(desc)); + if (num_outputs > 0) { + NOT_NULL(outputs); + descs.reserve(num_outputs); + for (int i = 0; i < num_outputs; i++) { + Validate(outputs[i]); + dali::PipelineOutputDesc desc; + desc.name = outputs[i].name; + desc.device = static_cast(outputs[i].device); + + if (outputs[i].dtype_present) desc.dtype = outputs[i].dtype; + if (outputs[i].layout) desc.layout = outputs[i].layout; + if (outputs[i].ndim_present) desc.ndim = outputs[i].ndim; + + descs.push_back(std::move(desc)); + } } pipe->Unwrap()->SetOutputDescs(std::move(descs)); DALI_EPILOG(); From 681bc5617632d8b8a299490dc001c21deed9f0cf Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Fri, 13 Mar 2026 16:12:34 +0100 Subject: [PATCH 11/11] Improve comments. Signed-off-by: Michal Zientkiewicz --- include/dali/dali.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/dali/dali.h b/include/dali/dali.h index afb78e844ab..811e2a39cd9 100644 --- a/include/dali/dali.h +++ b/include/dali/dali.h @@ -400,7 +400,7 @@ typedef struct _DALIArgDesc { int64_t ivalue; /**< signed integer scalar types and bool */ uint64_t uvalue; /**< unsigned integer scalar types */ float fvalue; /**< DALI_FLOAT */ - double dvalue; /**< DALI_FLOAT64 */ + double dvalue; /**< DALI_FLOAT64, reserved for future use */ const char *str; /**< DALI_STRING — NULL-terminated C string */ struct { int64_t size; /**< number of elements */