diff --git a/dali/c_api_2/op_test/complex_pipeline_test.cc b/dali/c_api_2/op_test/complex_pipeline_test.cc index 227e711419e..c07d927ee0a 100644 --- a/dali/c_api_2/op_test/complex_pipeline_test.cc +++ b/dali/c_api_2/op_test/complex_pipeline_test.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -13,6 +13,7 @@ // limitations under the License. #include +#include "dali/dali.h" #include "dali/c_api_2/pipeline_test_utils.h" #include "dali/core/common.h" #include "dali/test/dali_test_config.h" @@ -30,6 +31,10 @@ ReaderDecoderPipe( PipelineParams params = {}) { std::string file_root = testing::dali_extra_path() + "/db/single/jpeg/"; std::string file_list = file_root + "image_list.txt"; + auto resize_device = decoder_device == "mixed" ? "gpu" : "cpu"; + auto out_dev_str = to_string(output_device); + auto decoder_out_dev = decoder_device == "mixed" + ? StorageDevice::GPU : StorageDevice::CPU; if (!params.max_batch_size) params.max_batch_size = 4; if (!params.num_threads) params.num_threads = 1; if (!params.seed) params.seed = 12345; @@ -46,10 +51,145 @@ ReaderDecoderPipe( .AddArg("device", decoder_device) .AddArg("output_type", DALI_RGB) .AddInput("compressed_images", StorageDevice::CPU) - .AddOutput("decoded", decoder_device == "cpu" ? StorageDevice::CPU : StorageDevice::GPU)); + .AddOutput("decoded", decoder_out_dev)); + + pipe->AddOperator(OpSpec("Resize") + .AddArg("device", resize_device) + .AddArg("output_type", DALI_RGB) + .AddArg("size", std::vector{ 224, 224 }) + .AddInput("decoded", decoder_out_dev) + .AddOutput("resized", decoder_out_dev)); + + pipe->SetOutputDescs({{ "resized", out_dev_str }, { "labels", out_dev_str }}); + return pipe; +} + +PipelineHandle +ReaderDecoderCApiPipe( + std::string_view decoder_device, + StorageDevice output_device, + daliPipelineParams_t params = {}) { + std::string file_root = testing::dali_extra_path() + "/db/single/jpeg/"; + std::string file_list = file_root + "image_list.txt"; + + if (!params.max_batch_size_present) { + params.max_batch_size_present = true; + params.max_batch_size = 4; + } + if (!params.num_threads_present) { + params.num_threads_present = true; + params.num_threads = 1; + } + if (!params.seed_present) { + params.seed_present = true; + params.seed = 12345; + } + if (!params.exec_type_present) { + params.exec_type_present = true; + params.exec_type = DALI_EXEC_DYNAMIC; + } + + daliPipeline_h h = nullptr; + CHECK_DALI(daliPipelineCreate(&h, ¶ms)); + PipelineHandle pipe(h); + + // FileReader: no inputs, outputs "compressed_images" and "labels" on CPU + daliArgDesc_t reader_args[2]; + reader_args[0].name = "file_root"; + reader_args[0].dtype = DALI_STRING; + reader_args[0].str = file_root.c_str(); + reader_args[1].name = "file_list"; + reader_args[1].dtype = DALI_STRING; + reader_args[1].str = file_list.c_str(); + + daliIODesc_t reader_out[2]; + reader_out[0].name = "compressed_images"; + reader_out[0].device_type = DALI_STORAGE_CPU; + reader_out[1].name = "labels"; + reader_out[1].device_type = DALI_STORAGE_CPU; + + daliOperatorDesc_t reader_op{}; + reader_op.schema_name = "FileReader"; + reader_op.backend = DALI_BACKEND_CPU; + reader_op.num_inputs = 0; + reader_op.num_outputs = std::size(reader_out); + reader_op.num_args = std::size(reader_args); + reader_op.outputs = reader_out; + reader_op.args = reader_args; + CHECK_DALI(daliPipelineAddOperator(h, &reader_op)); + + // ImageDecoder: input "compressed_images" (CPU), output "decoded" (CPU or GPU) + bool is_mixed = (decoder_device == "mixed"); + daliBackend_t decoder_backend = is_mixed ? DALI_BACKEND_MIXED : DALI_BACKEND_CPU; + daliStorageDevice_t decoded_dev = is_mixed ? DALI_STORAGE_GPU : DALI_STORAGE_CPU; + + daliArgDesc_t decoder_args[1]; + decoder_args[0].name = "output_type"; + decoder_args[0].dtype = DALI_INT32; + decoder_args[0].ivalue = DALI_RGB; + + daliIODesc_t decoder_in[1]; + decoder_in[0] = reader_out[0]; + + daliIODesc_t decoder_out[1]; + decoder_out[0].name = "decoded"; + decoder_out[0].device_type = decoded_dev; + + daliOperatorDesc_t decoder_op{}; + decoder_op.schema_name = "ImageDecoder"; + decoder_op.backend = decoder_backend; + decoder_op.num_inputs = std::size(decoder_in); + decoder_op.num_outputs = std::size(decoder_out); + decoder_op.num_args = std::size(decoder_args); + decoder_op.inputs = decoder_in; + decoder_op.outputs = decoder_out; + decoder_op.args = decoder_args; + CHECK_DALI(daliPipelineAddOperator(h, &decoder_op)); + + + // Resize: input "decoded" (CPU or GPU), output "resized" (same as input) + // If decoder is mixed, then resize is gpu + daliBackend_t resize_backend = is_mixed ? DALI_BACKEND_GPU : DALI_BACKEND_CPU; + + daliArgDesc_t resize_args[2]; + resize_args[0].name = "output_type"; + resize_args[0].dtype = DALI_INT32; + resize_args[0].ivalue = DALI_RGB; + resize_args[1].name = "size"; + resize_args[1].dtype = DALI_FLOAT_VEC; + float size[] = { 224, 224 }; + resize_args[1].arr = size; + resize_args[1].size = 2; + + daliIODesc_t resize_in[1]; + resize_in[0] = decoder_out[0]; + + daliIODesc_t resize_out[1]; + resize_out[0].name = "resized"; + resize_out[0].device_type = decoded_dev; + + daliOperatorDesc_t resize_op{}; + resize_op.schema_name = "Resize"; + resize_op.backend = resize_backend; + resize_op.num_inputs = std::size(resize_in); + resize_op.num_outputs = std::size(resize_out); + resize_op.num_args = std::size(resize_args); + resize_op.inputs = resize_in; + resize_op.outputs = resize_out; + resize_op.args = resize_args; + CHECK_DALI(daliPipelineAddOperator(h, &resize_op)); + + daliStorageDevice_t out_dev = + output_device == StorageDevice::GPU ? DALI_STORAGE_GPU : DALI_STORAGE_CPU; + daliPipelineIODesc_t out_descs[2]; + out_descs[0] = {}; + out_descs[0].name = "resized"; + out_descs[0].device = out_dev; + out_descs[1] = {}; + out_descs[1].name = "labels"; + out_descs[1].device = out_dev; + CHECK_DALI(daliPipelineSetOutputs(h, 2, out_descs)); - auto out_dev_str = to_string(output_device); - pipe->SetOutputDescs({{ "decoded", out_dev_str }, { "labels", out_dev_str }}); return pipe; } @@ -66,48 +206,60 @@ void TestReaderDecoder(std::string_view decoder_device, StorageDevice output_dev ComparePipelineOutputs(*ref_pipe, pipe); } -TEST(CAPI2_PipelineTest, ReaderDecoderCPU) { +TEST(CAPI2_SerializedPipelineTest, ReaderDecoderCPU) { TestReaderDecoder("cpu", StorageDevice::CPU); } -TEST(CAPI2_PipelineTest, ReaderDecoderCPU2GPU) { +TEST(CAPI2_SerializedPipelineTest, ReaderDecoderCPU2GPU) { TestReaderDecoder("cpu", StorageDevice::GPU); } -TEST(CAPI2_PipelineTest, ReaderDecoderMixed) { +TEST(CAPI2_SerializedPipelineTest, ReaderDecoderMixed) { if (!MixedOperatorRegistry::Registry().IsRegistered("ImageDecoder")) { GTEST_SKIP() << "ImageDecoder for mixed backend is not available"; } TestReaderDecoder("mixed", StorageDevice::GPU); } -TEST(CAPI2_PipelineTest, ReaderDecoderMixed2CPU) { +TEST(CAPI2_SerializedPipelineTest, ReaderDecoderMixed2CPU) { if (!MixedOperatorRegistry::Registry().IsRegistered("ImageDecoder")) { GTEST_SKIP() << "ImageDecoder for mixed backend is not available"; } TestReaderDecoder("mixed", StorageDevice::CPU); } -TEST(CAPI2_PipelineTest, Checkpointing) { - // This test creates three pipelines - a C++ pipeline (ref) and two C pipelines (pipe1, pipe2), - // created by deserializing the serialized representation of the C++ pipeline. - // - // (pipe1) advances 3 iterations and then a checkpoint is taken and restored in (ref), - // after which 5 iterations of outputs are compared. - // Then a checkpoint is taken in (ref) and restored in (pipe2), after which another 5 iterations - // are compared. - PipelineParams params{}; - params.enable_checkpointing = true; - params.seed = 1234; - auto ref = ReaderDecoderPipe("cpu", StorageDevice::GPU, params); - ref->Build(); - auto pipe_str = ref->SerializeToProtobuf(); // serialize the ref... - auto pipe1 = Deserialize(pipe_str, {}); // ...and create pipe1 - auto pipe2 = Deserialize(pipe_str, {}); // ...and pipe2 from serialized ref - CHECK_DALI(daliPipelineBuild(pipe1)); - CHECK_DALI(daliPipelineBuild(pipe2)); +void TestReaderDecoderBuilder(std::string_view decoder_device, StorageDevice output_device) { + auto ref_pipe = ReaderDecoderPipe(decoder_device, output_device); + ref_pipe->Build(); + auto test_pipe = ReaderDecoderCApiPipe(decoder_device, output_device); + CHECK_DALI(daliPipelineBuild(test_pipe)); + ComparePipelineOutputs(*ref_pipe, test_pipe); +} + +TEST(CAPI2_PipelineBuilderTest, ReaderDecoderCPU) { + TestReaderDecoderBuilder("cpu", StorageDevice::CPU); +} + +TEST(CAPI2_PipelineBuilderTest, ReaderDecoderCPU2GPU) { + TestReaderDecoderBuilder("cpu", StorageDevice::GPU); +} + +TEST(CAPI2_PipelineBuilderTest, ReaderDecoderMixed) { + if (!MixedOperatorRegistry::Registry().IsRegistered("ImageDecoder")) { + GTEST_SKIP() << "ImageDecoder for mixed backend is not available"; + } + TestReaderDecoderBuilder("mixed", StorageDevice::GPU); +} + +TEST(CAPI2_PipelineBuilderTest, ReaderDecoderMixed2CPU) { + if (!MixedOperatorRegistry::Registry().IsRegistered("ImageDecoder")) { + GTEST_SKIP() << "ImageDecoder for mixed backend is not available"; + } + TestReaderDecoderBuilder("mixed", StorageDevice::CPU); +} - // Advance a few iterations... +void RunCheckpointingTest(Pipeline &ref, daliPipeline_h pipe1, daliPipeline_h pipe2) { + // Advance a few iterations in pipe1... CHECK_DALI(daliPipelinePrefetch(pipe1)); daliPipelineOutputs_h out1_h{}; CHECK_DALI(daliPipelinePopOutputs(pipe1, &out1_h)); @@ -129,7 +281,7 @@ TEST(CAPI2_PipelineTest, Checkpointing) { ext.pipeline_data.size = strlen(ext.pipeline_data.data); daliCheckpoint_h checkpoint_h{}; - // Take a checkpoint... + // Take a checkpoint from pipe1... CHECK_DALI(daliPipelineGetCheckpoint(pipe1, &checkpoint_h, &ext)); CheckpointHandle checkpoint(checkpoint_h); @@ -138,15 +290,15 @@ TEST(CAPI2_PipelineTest, Checkpointing) { CHECK_DALI(daliPipelineSerializeCheckpoint(pipe1, checkpoint, &data, &size)); ASSERT_NE(data, nullptr); - // ...restore... - ref->RestoreFromSerializedCheckpoint(std::string(data, size)); + // ...restore into ref... + ref.RestoreFromSerializedCheckpoint(std::string(data, size)); // ...run and compare. - ComparePipelineOutputs(*ref, pipe1, 5, false); + ComparePipelineOutputs(ref, pipe1, 5, false); - // Now take another checkpoint... - auto chk_str = ref->GetSerializedCheckpoint({ ext.pipeline_data.data, ext.iterator_data.data }); + // Now take another checkpoint from ref... + auto chk_str = ref.GetSerializedCheckpoint({ ext.pipeline_data.data, ext.iterator_data.data }); - // ...deserialize... + // ...deserialize into pipe2... CHECK_DALI(daliPipelineDeserializeCheckpoint( pipe2, &checkpoint_h, chk_str.data(), chk_str.length())); CheckpointHandle checkpoint2(checkpoint_h); @@ -157,10 +309,153 @@ TEST(CAPI2_PipelineTest, Checkpointing) { EXPECT_STREQ(ext2.iterator_data.data, "ITER"); EXPECT_EQ(ext2.pipeline_data.size, pipeline_data_size); EXPECT_STREQ(ext2.pipeline_data.data, pipeline_data); - // ...restore... + // ...restore and compare. CHECK_DALI(daliPipelineRestoreCheckpoint(pipe2, checkpoint2)); - // ...run and compare. - ComparePipelineOutputs(*ref, pipe2, 5, false); + ComparePipelineOutputs(ref, pipe2, 5, false); +} + +TEST(CAPI2_SerializedPipelineTest, Checkpointing) { + // This test creates three pipelines - a C++ pipeline (ref) and two C pipelines (pipe1, pipe2), + // created by deserializing the serialized representation of the C++ pipeline. + PipelineParams params{}; + params.enable_checkpointing = true; + params.seed = 1234; + auto ref = ReaderDecoderPipe("cpu", StorageDevice::GPU, params); + ref->Build(); + auto pipe_str = ref->SerializeToProtobuf(); // serialize the ref... + auto pipe1 = Deserialize(pipe_str, {}); // ...and create pipe1 + auto pipe2 = Deserialize(pipe_str, {}); // ...and pipe2 from serialized ref + CHECK_DALI(daliPipelineBuild(pipe1)); + CHECK_DALI(daliPipelineBuild(pipe2)); + RunCheckpointingTest(*ref, pipe1, pipe2); +} + +TEST(CAPI2_PipelineBuilderTest, Checkpointing) { + // This test creates three pipelines - a C++ pipeline (ref) and two C API builder pipelines + // (pipe1, pipe2) with identical parameters, then verifies checkpoint save/restore. + PipelineParams cpp_params{}; + cpp_params.enable_checkpointing = true; + cpp_params.seed = 1234; + auto ref = ReaderDecoderPipe("cpu", StorageDevice::GPU, cpp_params); + ref->Build(); + + daliPipelineParams_t c_params{}; + c_params.enable_checkpointing_present = true; + c_params.enable_checkpointing = true; + c_params.seed_present = true; + c_params.seed = 1234; + auto pipe1 = ReaderDecoderCApiPipe("cpu", StorageDevice::GPU, c_params); + auto pipe2 = ReaderDecoderCApiPipe("cpu", StorageDevice::GPU, c_params); + CHECK_DALI(daliPipelineBuild(pipe1)); + CHECK_DALI(daliPipelineBuild(pipe2)); + RunCheckpointingTest(*ref, pipe1, pipe2); +} + +TEST(CAPI2_PipelineBuilderTest, ResizeWithArgumentInput) { + constexpr int kBatchSize = 4; + + daliPipelineParams_t params{}; + params.max_batch_size_present = true; + params.max_batch_size = kBatchSize; + params.num_threads_present = true; + params.num_threads = 2; + + daliPipeline_h h = nullptr; + CHECK_DALI(daliPipelineCreate(&h, ¶ms)); + PipelineHandle pipe(h); + + // Add ExternalInput "images" (CPU) + daliPipelineIODesc_t images_input_desc{}; + images_input_desc.name = "images"; + images_input_desc.device = DALI_STORAGE_CPU; + CHECK_DALI(daliPipelineAddExternalInput(h, &images_input_desc)); + + // Add ExternalInput "sizes" (CPU) — fed as argument input for Resize's "size" argument + daliPipelineIODesc_t sizes_input_desc{}; + sizes_input_desc.name = "sizes"; + sizes_input_desc.device = DALI_STORAGE_CPU; + CHECK_DALI(daliPipelineAddExternalInput(h, &sizes_input_desc)); + + // Resize: regular input "images", argument input "sizes" -> "size" + daliIODesc_t resize_in[1]; + resize_in[0].name = "images"; + resize_in[0].device_type = DALI_STORAGE_CPU; + + daliIODesc_t resize_out[1]; + resize_out[0].name = "resized"; + resize_out[0].device_type = DALI_STORAGE_CPU; + + daliArgInputDesc_t arg_inputs[1]; + arg_inputs[0].arg_name = "size"; + arg_inputs[0].input_name = "sizes"; + + daliOperatorDesc_t resize_op{}; + resize_op.schema_name = "Resize"; + resize_op.backend = DALI_BACKEND_CPU; + resize_op.num_inputs = std::size(resize_in); + resize_op.num_outputs = std::size(resize_out); + resize_op.num_arg_inputs = std::size(arg_inputs); + resize_op.inputs = resize_in; + resize_op.outputs = resize_out; + resize_op.arg_inputs = arg_inputs; + CHECK_DALI(daliPipelineAddOperator(h, &resize_op)); + + daliPipelineIODesc_t out_desc{}; + out_desc.name = "resized"; + out_desc.device = DALI_STORAGE_CPU; + CHECK_DALI(daliPipelineSetOutputs(h, 1, &out_desc)); + + CHECK_DALI(daliPipelineBuild(h)); + + // Per-sample target sizes [H, W] — at least two distinct shapes + const float sample_sizes[kBatchSize][2] = { + {100.f, 200.f}, + { 50.f, 80.f}, + {120.f, 90.f}, + { 30.f, 40.f}, + }; + + int feed_count = 0; + CHECK_DALI(daliPipelineGetFeedCount(h, &feed_count, "images")); + + for (int feed = 0; feed < feed_count; feed++) { + // Input images: HWC tensors of shape [64, 64, 3] (content unused, only shape matters) + auto images_tl = std::make_shared>(); + images_tl->Resize(uniform_list_shape(kBatchSize, TensorShape<3>{64, 64, 3}), DALI_UINT8); + + // Per-sample sizes: 1D float tensors of shape [2], each holding [H, W] + auto sizes_tl = std::make_shared>(); + sizes_tl->Resize(uniform_list_shape(kBatchSize, TensorShape<1>{2}), DALI_FLOAT); + for (int i = 0; i < kBatchSize; i++) { + float *ptr = (*sizes_tl)[i].mutable_data(); + ptr[0] = sample_sizes[i][0]; + ptr[1] = sample_sizes[i][1]; + } + + auto images_handle = Wrap(images_tl); + auto sizes_handle = Wrap(sizes_tl); + CHECK_DALI(daliPipelineFeedInput(h, "images", images_handle.get(), nullptr, {}, nullptr)); + CHECK_DALI(daliPipelineFeedInput(h, "sizes", sizes_handle.get(), nullptr, {}, nullptr)); + } + + CHECK_DALI(daliPipelinePrefetch(h)); + + auto outs = PopOutputs(h); + auto out_tl = GetOutput(outs, 0); + + int num_samples = 0, ndim = 0; + const int64_t *shape = nullptr; + CHECK_DALI(daliTensorListGetShape(out_tl, &num_samples, &ndim, &shape)); + + ASSERT_EQ(num_samples, kBatchSize); + ASSERT_EQ(ndim, 3); // H, W, C + + for (int i = 0; i < kBatchSize; i++) { + const int64_t *s = shape + i * ndim; + EXPECT_EQ(s[0], static_cast(sample_sizes[i][0])) << "Sample " << i << " H mismatch"; + EXPECT_EQ(s[1], static_cast(sample_sizes[i][1])) << "Sample " << i << " W mismatch"; + EXPECT_EQ(s[2], 3) << "Sample " << i << " C mismatch"; + } } } // namespace dali::c_api::test diff --git a/dali/c_api_2/pipeline.cc b/dali/c_api_2/pipeline.cc index 3004394a5fa..45ecd517bbc 100644 --- a/dali/c_api_2/pipeline.cc +++ b/dali/c_api_2/pipeline.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,11 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include "dali/c_api_2/pipeline.h" #include "dali/c_api_2/pipeline_outputs.h" #include "dali/c_api_2/checkpoint.h" #include "dali/c_api_2/error_handling.h" #include "dali/pipeline/pipeline.h" +#include "dali/pipeline/operator/op_spec.h" +#include "dali/pipeline/pipeline_output_desc.h" #include "dali/c_api_2/utils.h" #include "dali/c_api_2/validation.h" @@ -513,3 +516,228 @@ daliResult_t daliCheckpointDestroy(daliCheckpoint_h checkpoint) { delete ToPointer(checkpoint); DALI_EPILOG(); } + +namespace { + +std::string_view BackendToString(daliBackend_t backend) { + switch (backend) { + case DALI_BACKEND_CPU: return "cpu"; + case DALI_BACKEND_GPU: return "gpu"; + case DALI_BACKEND_MIXED: return "mixed"; + default: + throw std::invalid_argument(dali::make_string( + "Invalid backend type: ", static_cast(backend))); + } +} + +void AddArgToSpec(dali::OpSpec &spec, const daliArgDesc_t &arg) { + assert(arg.name != nullptr); // checked in daliPipelineAddOperator + std::string_view name = arg.name; + + auto check_not_null = [&](auto *x, auto &&field) { + dali::c_api::CheckNotNull(x, [&]() { + return dali::make_string("`", field, "` of argument \"", name, "\""); + }); + }; + + + auto check_arr = [&]() { + if (arg.size < 0) + throw std::invalid_argument( + dali::make_string("`arg.size` of argument \"", name, "\" has a negative size")); + if (arg.size > 0) + check_not_null(arg.arr, "arg.arr"); + }; + + switch (arg.dtype) { + // --- scalar types --- + case DALI_INT8: + spec.AddArg(name, static_cast(arg.ivalue)); + break; + case DALI_INT16: + spec.AddArg(name, static_cast(arg.ivalue)); + break; + case DALI_INT32: + spec.AddArg(name, static_cast(arg.ivalue)); + break; + case DALI_INT64: + spec.AddArg(name, arg.ivalue); + break; + case DALI_UINT8: + spec.AddArg(name, static_cast(arg.uvalue)); + break; + case DALI_UINT16: + spec.AddArg(name, static_cast(arg.uvalue)); + break; + case DALI_UINT32: + spec.AddArg(name, static_cast(arg.uvalue)); + break; + case DALI_UINT64: + spec.AddArg(name, arg.uvalue); + break; + case DALI_FLOAT: + spec.AddArg(name, arg.fvalue); + break; + // NOT SUPPORTED / NOT IMPLEMENTED! + /*case DALI_FLOAT64: + spec.AddArg(name, arg.dvalue); + break;*/ + case DALI_BOOL: + spec.AddArg(name, static_cast(arg.ivalue)); + break; + case DALI_STRING: + check_not_null(arg.str, "arg.str"); + spec.AddArg(name, std::string(arg.str)); + break; + // --- vector (list) types --- + case DALI_INT_VEC: { + check_arr(); + auto *d = static_cast(arg.arr); + spec.AddArg(name, std::vector(d, d + arg.size)); + break; + } + case DALI_FLOAT_VEC: { + check_arr(); + auto *d = static_cast(arg.arr); + spec.AddArg(name, std::vector(d, d + arg.size)); + break; + } + case DALI_BOOL_VEC: { + check_arr(); + auto *d = static_cast(arg.arr); + spec.AddArg(name, std::vector(d, d + arg.size)); + break; + } + case DALI_STRING_VEC: { + check_arr(); + auto *d = static_cast(arg.arr); + std::vector sv; + sv.reserve(arg.size); + for (int64_t i = 0; i < arg.size; i++) { + if (!d[i]) // the outer `if` prevents make_string in case of no error + check_not_null(d[i], dali::make_string("arg.arr[", i, "]")); + sv.emplace_back(d[i]); + } + spec.AddArg(name, std::move(sv)); + break; + } + default: + throw std::invalid_argument(dali::make_string( + "Unsupported argument `dtype`: ", static_cast(arg.dtype), + " in argument \"", arg.name, "\".")); + } +} + +inline dali::OpSpec MakeOpSpec(const daliOperatorDesc_t *op_desc) { + NOT_NULL(op_desc); + dali::c_api::CheckArg(op_desc->num_inputs >= 0, "`num_inputs` must not be negative."); + dali::c_api::CheckArg(op_desc->num_outputs >= 0, "`num_outputs` must not be negative."); + dali::c_api::CheckArg(op_desc->num_args >= 0, "`num_args` must not be negative."); + dali::c_api::CheckArg(op_desc->num_arg_inputs>= 0, "`num_arg_inputs` must not be negative."); + NOT_NULL(op_desc->schema_name); + if (op_desc->num_inputs > 0) + NOT_NULL(op_desc->inputs); + if (op_desc->num_outputs > 0) + NOT_NULL(op_desc->outputs); + if (op_desc->num_arg_inputs > 0) + NOT_NULL(op_desc->arg_inputs); + if (op_desc->num_args > 0) + NOT_NULL(op_desc->args); + dali::OpSpec spec(op_desc->schema_name); + spec.AddArg("device", std::string(BackendToString(op_desc->backend))); + for (int i = 0; i < op_desc->num_inputs; i++) { + dali::c_api::CheckNotNull(op_desc->inputs[i].name, [i]() { + return dali::make_string("`op_desc->inputs[", i, "].name`"); + }); + Validate(op_desc->inputs[i].device_type); + spec.AddInput(op_desc->inputs[i].name, + static_cast(op_desc->inputs[i].device_type)); + } + for (int i = 0; i < op_desc->num_outputs; i++) { + dali::c_api::CheckNotNull(op_desc->outputs[i].name, [i]() { + return dali::make_string("`op_desc->outputs[", i, "].name`"); + }); + Validate(op_desc->outputs[i].device_type); + spec.AddOutput(op_desc->outputs[i].name, + static_cast(op_desc->outputs[i].device_type)); + } + // Argument inputs need to be added after regular inputs + for (int i = 0; i < op_desc->num_arg_inputs; i++) { + dali::c_api::CheckNotNull(op_desc->arg_inputs[i].arg_name, [i]() { + return dali::make_string( + "`arg_input[", i, "].arg_name`"); + }); + dali::c_api::CheckNotNull(op_desc->arg_inputs[i].input_name, [i]() { + return dali::make_string( + "`arg_input[", i, "].input_name`"); + }); + spec.AddArgumentInput(op_desc->arg_inputs[i].arg_name, + op_desc->arg_inputs[i].input_name); + } + for (int i = 0; i < op_desc->num_args; i++) { + dali::c_api::CheckNotNull(op_desc->args[i].name, [i]() { + return dali::make_string("`op_desc->args[", i, "].name`"); + }); + AddArgToSpec(spec, op_desc->args[i]); + } + return spec; +} + +} // namespace + +daliResult_t daliPipelineAddExternalInput( + daliPipeline_h pipeline, + const daliPipelineIODesc_t *input_desc) { + DALI_PROLOG(); + auto pipe = ToPointer(pipeline); + NOT_NULL(input_desc); + Validate(*input_desc); + std::string device_str = input_desc->device == DALI_STORAGE_GPU ? "gpu" : "cpu"; + daliDataType_t dtype = input_desc->dtype_present ? input_desc->dtype : DALI_NO_TYPE; + dali::TensorLayout layout(input_desc->layout ? input_desc->layout : ""); + int ndim = input_desc->ndim_present ? input_desc->ndim : -1; + pipe->Unwrap()->AddExternalInput(input_desc->name, device_str, dtype, ndim, layout); + DALI_EPILOG(); +} + +daliResult_t daliPipelineAddOperator( + daliPipeline_h pipeline, + const daliOperatorDesc_t *op_desc) { + DALI_PROLOG(); + auto pipe = ToPointer(pipeline); + auto spec = MakeOpSpec(op_desc); + if (op_desc->instance_name && op_desc->instance_name[0] != '\0') + pipe->Unwrap()->AddOperator(spec, op_desc->instance_name); + else + pipe->Unwrap()->AddOperator(spec); + DALI_EPILOG(); +} + +daliResult_t daliPipelineSetOutputs( + daliPipeline_h pipeline, + int num_outputs, + const daliPipelineIODesc_t *outputs) { + DALI_PROLOG(); + auto pipe = ToPointer(pipeline); + dali::c_api::CheckArg(num_outputs >= 0, "`num_outputs` must not be negative"); + + std::vector descs; + if (num_outputs > 0) { + NOT_NULL(outputs); + descs.reserve(num_outputs); + for (int i = 0; i < num_outputs; i++) { + Validate(outputs[i]); + dali::PipelineOutputDesc desc; + desc.name = outputs[i].name; + desc.device = static_cast(outputs[i].device); + + if (outputs[i].dtype_present) desc.dtype = outputs[i].dtype; + if (outputs[i].layout) desc.layout = outputs[i].layout; + if (outputs[i].ndim_present) desc.ndim = outputs[i].ndim; + + descs.push_back(std::move(desc)); + } + } + pipe->Unwrap()->SetOutputDescs(std::move(descs)); + DALI_EPILOG(); +} diff --git a/dali/c_api_2/pipeline_builder_test.cc b/dali/c_api_2/pipeline_builder_test.cc new file mode 100644 index 00000000000..4fb43d869c4 --- /dev/null +++ b/dali/c_api_2/pipeline_builder_test.cc @@ -0,0 +1,256 @@ +// Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include "dali/dali.h" +#include "dali/dali_cpp_wrappers.h" +#include "dali/c_api_2/pipeline.h" +#include "dali/c_api_2/pipeline_test_utils.h" +#include "dali/c_api_2/data_objects.h" +#include "dali/pipeline/executor/executor2/exec2_ops_for_test.h" +#include "dali/pipeline/pipeline.h" +#include "dali/pipeline/pipeline_params.h" +#include "dali/pipeline/data/tensor_list.h" + +namespace dali::c_api::test { + +namespace { + +constexpr int kBatchSize = 4; +constexpr int kNumThreads = 4; + +// ----- Helpers for building reference (C++) pipelines ----- + +// Counter + two CPU TestOps: ctr -> op1(+1000), ctr -> op2(+2000); outputs: op1, op2 +std::unique_ptr BuildRefCPUPipeline(std::optional device_id) { + auto p = std::make_unique(MakePipelineParams( + kBatchSize, kNumThreads, device_id.value_or(CPU_ONLY_DEVICE_ID))); + + p->AddOperator( + OpSpec(exec2::test::kCounterOpName) + .AddArg("ctr", std::string("ctr")) + .AddOutput("ctr", StorageDevice::CPU), + "ctr"); + p->AddOperator( + OpSpec(exec2::test::kTestOpName) + .AddArg("name", std::string("op1")) + .AddArg("device", std::string("cpu")) + .AddInput("ctr", StorageDevice::CPU) + .AddOutput("op1", StorageDevice::CPU) + .AddArg("addend", 1000), + "op1"); + p->AddOperator( + OpSpec(exec2::test::kTestOpName) + .AddArg("name", std::string("op2")) + .AddArg("device", std::string("cpu")) + .AddInput("ctr", StorageDevice::CPU) + .AddOutput("op2", StorageDevice::CPU) + .AddArg("addend", 2000), + "op2"); + p->SetOutputDescs({ {"op1", "cpu"}, {"op2", "cpu"} }); + p->Build(); + return p; +} + +// ExternalSource (cpu) -> output "ext" directly; used for feed-input comparison +std::unique_ptr BuildRefExtSrcPipeline(std::optional device_id) { + auto p = std::make_unique(MakePipelineParams( + kBatchSize, kNumThreads, device_id.value_or(CPU_ONLY_DEVICE_ID))); + + p->AddExternalInput("ext", "cpu"); + p->SetOutputDescs({ {"ext", "cpu"} }); + p->Build(); + return p; +} + +// ----- Helpers for building C API pipelines ----- + +daliPipelineParams_t MakeCApiParams(std::optional device_id) { + daliPipelineParams_t params{}; + params.max_batch_size_present = true; + params.max_batch_size = kBatchSize; + params.num_threads_present = true; + params.num_threads = kNumThreads; + params.device_id_present = device_id.has_value(); + params.device_id = device_id.value_or(-1); + return params; +} + +// Counter + two CPU TestOps built via C API, matching BuildRefCPUPipeline +PipelineHandle BuildCApiCPUPipeline(std::optional device_id) { + auto params = MakeCApiParams(device_id); + daliPipeline_h h = nullptr; + CHECK_DALI(daliPipelineCreate(&h, ¶ms)); + PipelineHandle pipe(h); + + // CounterOp "ctr": schema arg "ctr"="ctr", output "ctr" (CPU) + daliArgDesc_t ctr_args[1]; + ctr_args[0].name = "ctr"; + ctr_args[0].dtype = DALI_STRING; + ctr_args[0].str = "ctr"; + + daliIODesc_t ctr_out[1]; + ctr_out[0].name = "ctr"; + ctr_out[0].device_type = DALI_STORAGE_CPU; + + daliOperatorDesc_t ctr_op{}; + ctr_op.schema_name = exec2::test::kCounterOpName; + ctr_op.instance_name = "ctr"; + ctr_op.backend = DALI_BACKEND_CPU; + ctr_op.num_outputs = 1; + ctr_op.num_args = 1; + ctr_op.outputs = ctr_out; + ctr_op.args = ctr_args; + CHECK_DALI(daliPipelineAddOperator(h, &ctr_op)); + + // TestOp "op1": input "ctr" (CPU), output "op1" (CPU), addend=1000 + daliArgDesc_t op1_args[2]; + op1_args[0].name = "name"; + op1_args[0].dtype = DALI_STRING; + op1_args[0].str = "op1"; + op1_args[1].name = "addend"; + op1_args[1].dtype = DALI_INT32; + op1_args[1].ivalue = 1000; + + daliIODesc_t op1_in[1]; + op1_in[0].name = "ctr"; + op1_in[0].device_type = DALI_STORAGE_CPU; + + daliIODesc_t op1_out[1]; + op1_out[0].name = "op1"; + op1_out[0].device_type = DALI_STORAGE_CPU; + + daliOperatorDesc_t op1_op{}; + op1_op.schema_name = exec2::test::kTestOpName; + op1_op.instance_name = "op1"; + op1_op.backend = DALI_BACKEND_CPU; + op1_op.num_inputs = 1; + op1_op.num_outputs = 1; + op1_op.num_args = 2; + op1_op.inputs = op1_in; + op1_op.outputs = op1_out; + op1_op.args = op1_args; + CHECK_DALI(daliPipelineAddOperator(h, &op1_op)); + + // TestOp "op2": input "ctr" (CPU), output "op2" (CPU), addend=2000 + daliArgDesc_t op2_args[2]; + op2_args[0].name = "name"; + op2_args[0].dtype = DALI_STRING; + op2_args[0].str = "op2"; + op2_args[1].name = "addend"; + op2_args[1].dtype = DALI_INT32; + op2_args[1].ivalue = 2000; + + daliIODesc_t op2_in[1]; + op2_in[0].name = "ctr"; + op2_in[0].device_type = DALI_STORAGE_CPU; + + daliIODesc_t op2_out[1]; + op2_out[0].name = "op2"; + op2_out[0].device_type = DALI_STORAGE_CPU; + + daliOperatorDesc_t op2_op{}; + op2_op.schema_name = exec2::test::kTestOpName; + op2_op.instance_name = "op2"; + op2_op.backend = DALI_BACKEND_CPU; + op2_op.num_inputs = 1; + op2_op.num_outputs = 1; + op2_op.num_args = 2; + op2_op.inputs = op2_in; + op2_op.outputs = op2_out; + op2_op.args = op2_args; + CHECK_DALI(daliPipelineAddOperator(h, &op2_op)); + + // Set outputs: op1 (cpu), op2 (cpu) + daliPipelineIODesc_t out_descs[2]; + out_descs[0] = {}; + out_descs[0].name = "op1"; + out_descs[0].device = DALI_STORAGE_CPU; + out_descs[1] = {}; + out_descs[1].name = "op2"; + out_descs[1].device = DALI_STORAGE_CPU; + CHECK_DALI(daliPipelineSetOutputs(h, 2, out_descs)); + + CHECK_DALI(daliPipelineBuild(h)); + return pipe; +} + +// ExternalSource "ext" (CPU) built via C API, matching BuildRefExtSrcPipeline +PipelineHandle BuildCApiExtSrcPipeline(std::optional device_id) { + auto params = MakeCApiParams(device_id); + daliPipeline_h h = nullptr; + CHECK_DALI(daliPipelineCreate(&h, ¶ms)); + PipelineHandle pipe(h); + + daliPipelineIODesc_t ext_desc{}; + ext_desc.name = "ext"; + ext_desc.device = DALI_STORAGE_CPU; + CHECK_DALI(daliPipelineAddExternalInput(h, &ext_desc)); + + daliPipelineIODesc_t out_desc{}; + out_desc.name = "ext"; + out_desc.device = DALI_STORAGE_CPU; + CHECK_DALI(daliPipelineSetOutputs(h, 1, &out_desc)); + + CHECK_DALI(daliPipelineBuild(h)); + return pipe; +} + +} // namespace + +// --- Test 1: Operator pipeline (counter + two CPU TestOps) --- +// Build both C++ and C API pipelines, run 5 iterations, compare outputs. + +TEST(CAPI2_PipelineBuilderTest, AddOperator_CPUOnly) { + auto ref = BuildRefCPUPipeline(std::nullopt); + auto test = BuildCApiCPUPipeline(std::nullopt); + ComparePipelineOutputs(*ref, test, /*iters=*/5, /*prefetch_on_first_iter=*/true); +} + +// --- Test 2: ExternalSource pipeline --- +// Build both C++ (AddExternalInput) and C API (daliPipelineAddExternalInput) pipelines. +// Feed identical data to both, run, compare outputs. + +TEST(CAPI2_PipelineBuilderTest, AddExternalInput) { + auto ref = BuildRefExtSrcPipeline(0); + auto test = BuildCApiExtSrcPipeline(0); + + // Determine how many times we need to feed before prefetching + int feed_count = ref->InputFeedCount("ext"); + + // Create and feed identical scalar TensorLists + for (int i = 0; i < feed_count; i++) { + auto cpp_tl = std::make_shared>(); + cpp_tl->Resize(uniform_list_shape(kBatchSize, TensorShape<>{}), DALI_INT32); + for (int s = 0; s < kBatchSize; s++) + (*cpp_tl)[s].mutable_data()[0] = i * kBatchSize + s; + + // Feed to reference C++ pipeline + ref->SetExternalInput("ext", *cpp_tl); + + // Feed to test C API pipeline + auto tl_handle = Wrap(cpp_tl); + CHECK_DALI(daliPipelineFeedInput(test, "ext", tl_handle.get(), nullptr, {}, nullptr)); + } + + // Prefetch both, then compare outputs for each prefetched batch + ref->Prefetch(); + CHECK_DALI(daliPipelinePrefetch(test)); + + for (int i = 0; i < feed_count; i++) + ComparePipelineOutput(*ref, test); +} + +} // namespace dali::c_api::test diff --git a/dali/c_api_2/pipeline_test_utils.h b/dali/c_api_2/pipeline_test_utils.h index 6e28dc490da..ad09af94698 100644 --- a/dali/c_api_2/pipeline_test_utils.h +++ b/dali/c_api_2/pipeline_test_utils.h @@ -1,4 +1,4 @@ -// Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -54,7 +54,7 @@ auto &Unwrap(daliTensorList_h h) { return static_cast(h)->Unwrap(); } -void CompareTensorLists(const TensorList &a, const TensorList &b) { +inline void CompareTensorLists(const TensorList &a, const TensorList &b) { ASSERT_EQ(a.type(), b.type()); ASSERT_EQ(a.sample_dim(), b.sample_dim()); ASSERT_EQ(a.num_samples(), b.num_samples()); @@ -65,7 +65,7 @@ void CompareTensorLists(const TensorList &a, const TensorList &a, const TensorList &b) { +inline void CompareTensorLists(const TensorList &a, const TensorList &b) { TensorList a_cpu, b_cpu; a_cpu.set_order(AccessOrder::host()); b_cpu.set_order(AccessOrder::host()); @@ -74,7 +74,7 @@ void CompareTensorLists(const TensorList &a, const TensorList #include #include #include +#include #include "dali/dali.h" #include "dali/core/format.h" #include "dali/core/span.h" @@ -99,6 +101,19 @@ inline void Validate(daliStorageDevice_t device_type) { throw std::invalid_argument(make_string("Invalid storage device type: ", device_type)); } +inline void Validate(const daliPipelineIODesc_t &desc) { + if (desc.name == nullptr) + throw std::invalid_argument("input/output name must not be NULL"); + Validate(desc.device); + if (desc.dtype_present) + Validate(desc.dtype); + if (desc.ndim_present) { + ValidateNDim(desc.ndim); + if (desc.layout) + Validate(desc.layout, desc.ndim); + } +} + DLL_PUBLIC void ValidateDeviceId(int device_id, bool allow_cpu_only); inline void Validate(const daliBufferPlacement_t &placement) { @@ -112,11 +127,25 @@ inline void CheckArg(bool assertion, const std::string &what) { throw std::invalid_argument(what); } +template CB> +inline void CheckArg(bool assertion, CB &&message_callback) { + if (!assertion) + throw std::invalid_argument(std::forward(message_callback)()); +} + template void CheckNotNull(T *x, std::string_view what) { CheckArg(x != nullptr, make_string(what, " must not be NULL.")); } +template CB> +void CheckNotNull(T *x, CB &&message_callback) { + if (x == nullptr) { + throw std::invalid_argument(make_string( + std::forward(message_callback)(), " must not be NULL.")); + } +} + #define CHECK_OUTPUT(output_param) \ ::dali::c_api::CheckNotNull(output_param, "The output parameter `" #output_param "`"); diff --git a/include/dali/dali.h b/include/dali/dali.h index dfc701d524a..811e2a39cd9 100644 --- a/include/dali/dali.h +++ b/include/dali/dali.h @@ -1,4 +1,4 @@ -// Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2024-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -352,6 +352,78 @@ typedef struct _DALIPipelineIODesc { const char *layout; } daliPipelineIODesc_t; +/** Specifies the processing backend of an operator */ +typedef enum _DALIBackend { + DALI_BACKEND_CPU = 0, + DALI_BACKEND_GPU = 1, + DALI_BACKEND_MIXED = 2, + DALI_BACKEND_FORCE_INT32 = 0x7fffffff +} daliBackend_t; + +/** Describes a single input or output of an operator */ +typedef struct _DALIIODesc { + const char *name; + daliStorageDevice_t device_type; +} daliIODesc_t; + +/** Describes a tensor argument input (argument fed as a TensorList at runtime) */ +typedef struct _DALIArgInputDesc { + const char *arg_name; + const char *input_name; +} daliArgInputDesc_t; + +/** Describes an operator argument value. + * + * The `dtype` field determines which union member to use: + * + * Scalar types: + * - DALI_INT8..DALI_INT64 -> ivalue + * - DALI_UINT8..DALI_UINT64 -> uvalue + * - DALI_FLOAT -> fvalue + * - DALI_BOOL -> ivalue (0 = false, non-zero = true) + * - DALI_STRING -> str (NULL-terminated) + * + * Vector (list) types — use the {size, arr} struct: + * - DALI_INT_VEC -> arr points to int[], size = element count + * - DALI_FLOAT_VEC -> arr points to float[], size = element count + * - DALI_BOOL_VEC -> arr points to bool[], size = element count + * - DALI_STRING_VEC -> arr points to const char*[], size = element count + * + * Other types (including DALI_FLOAT64) are not supported. + * + * For vector types, `size` must be >= 0 and `arr` must be non-NULL when `size` > 0. + */ +typedef struct _DALIArgDesc { + const char *name; + daliDataType_t dtype; + union { + int64_t ivalue; /**< signed integer scalar types and bool */ + uint64_t uvalue; /**< unsigned integer scalar types */ + float fvalue; /**< DALI_FLOAT */ + double dvalue; /**< DALI_FLOAT64, reserved for future use */ + const char *str; /**< DALI_STRING — NULL-terminated C string */ + struct { + int64_t size; /**< number of elements */ + const void *arr; /**< pointer to the element data for vector types */ + }; + }; +} daliArgDesc_t; + +/** Describes an operator to be added to the pipeline */ +typedef struct _DALIOperatorDesc { + const char *schema_name; + const char *instance_name; /**< may be NULL or empty */ + daliBackend_t backend; + int num_inputs; + int num_outputs; + int num_args; + int num_arg_inputs; + const daliIODesc_t *inputs; + const daliIODesc_t *outputs; + const daliArgDesc_t *args; + const daliArgInputDesc_t *arg_inputs; +} daliOperatorDesc_t; + /** Creates an empty pipeline. */ DALI_API daliResult_t daliPipelineCreate( daliPipeline_h *out_pipe_handle, @@ -380,6 +452,41 @@ DALI_API daliResult_t daliPipelineDeserialize( const daliPipelineParams_t *param_overrides); +/** Adds an external input to the pipeline. + * + * Equivalent to Pipeline::AddExternalInput. + * The input is described by a daliPipelineIODesc_t: + * - name: the name of the input (also the name of the output produced by ExternalSource) + * - device: DALI_STORAGE_CPU or DALI_STORAGE_GPU + * - dtype, ndim, layout: optional metadata (use the _present flags) + */ +DALI_API daliResult_t daliPipelineAddExternalInput( + daliPipeline_h pipeline, + const daliPipelineIODesc_t *input_desc); + +/** Adds an operator to the pipeline. + * + * Constructs an OpSpec from op_desc and calls Pipeline::AddOperator. + * The `instance_name` field of op_desc may be NULL or empty to use an auto-generated name. + */ +DALI_API daliResult_t daliPipelineAddOperator( + daliPipeline_h pipeline, + const daliOperatorDesc_t *op_desc); + +/** Sets the output descriptors of the pipeline. + * + * Must be called before daliPipelineBuild. Equivalent to Pipeline::SetOutputDescs. + * + * @param pipeline the pipeline + * @param num_outputs number of elements in `outputs` + * @param outputs array of output descriptors; the `name` and `device` fields are required; + * `dtype`, `ndim`, and `layout` are optional (use the _present flags) + */ +DALI_API daliResult_t daliPipelineSetOutputs( + daliPipeline_h pipeline, + int num_outputs, + const daliPipelineIODesc_t *outputs); + /** Prepares the pipeline for execution */ DALI_API daliResult_t daliPipelineBuild(daliPipeline_h pipeline);