99 */
1010#include " datalake/tests/record_generator.h"
1111
12+ #include " pandaproxy/schema_registry/protobuf.h"
13+ #include " pandaproxy/schema_registry/types.h"
1214#include " schema/registry.h"
13- #include " serde/avro/tests/data_generator.h"
1415#include " storage/record_batch_builder.h"
16+ #include " utils/vint.h"
1517
18+ #include < seastar/core/temporary_buffer.hh>
1619#include < seastar/coroutine/as_future.hh>
20+ #include < seastar/util/variant_utils.hh>
1721
1822#include < avro/Encoder.hh>
1923#include < avro/Generic.hh>
2024#include < avro/Specific.hh>
2125#include < avro/Stream.hh>
26+ #include < google/protobuf/descriptor.h>
27+ #include < google/protobuf/descriptor_database.h>
28+ #include < google/protobuf/dynamic_message.h>
29+ #include < google/protobuf/text_format.h>
2230
2331namespace datalake ::tests {
2432
@@ -41,11 +49,111 @@ record_generator::register_avro_schema(
4149 co_return std::nullopt ;
4250}
4351
52+ ss::future<checked<std::nullopt_t , record_generator::error>>
53+ record_generator::register_protobuf_schema (
54+ std::string_view name, std::string_view schema) {
55+ using namespace pandaproxy ::schema_registry;
56+ auto id = co_await ss::coroutine::as_future (
57+ _sr->create_schema (unparsed_schema{
58+ subject{" foo" },
59+ unparsed_schema_definition{schema, schema_type::protobuf}}));
60+ if (id.failed ()) {
61+ co_return error{fmt::format (
62+ " Error creating schema {}: {}" , name, id.get_exception ())};
63+ }
64+ auto [_, added] = _id_by_name.emplace (name, id.get ());
65+ if (!added) {
66+ co_return error{fmt::format (" Failed to add schema {} to map" , name)};
67+ }
68+ co_return std::nullopt ;
69+ }
70+
71+ iobuf encode_protobuf_message_index (const std::vector<int32_t >& message_index) {
72+ iobuf ret;
73+ if (message_index.size () == 1 && message_index[0 ] == 0 ) {
74+ ret.append (" \0 " , 1 );
75+ return ret;
76+ }
77+
78+ std::array<uint8_t , vint::max_length> bytes{0 };
79+ size_t res_size = vint::serialize (message_index.size (), &bytes[0 ]);
80+ ret.append (&bytes[0 ], res_size);
81+
82+ for (const auto & o : message_index) {
83+ size_t res_size = vint::serialize (o, &bytes[0 ]);
84+ ret.append (&bytes[0 ], res_size);
85+ }
86+
87+ return ret;
88+ }
89+
90+ ss::future<checked<std::nullopt_t , record_generator::error>>
91+ record_generator::add_random_protobuf_record (
92+ storage::record_batch_builder& b,
93+ std::string_view name,
94+ const std::vector<int32_t >& message_index,
95+ std::optional<iobuf> key,
96+ testing::protobuf_generator_config config) {
97+ using namespace pandaproxy ::schema_registry;
98+ auto it = _id_by_name.find (name);
99+ if (it == _id_by_name.end ()) {
100+ co_return error{fmt::format (" Schema {} is missing" , name)};
101+ }
102+ auto schema_id = it->second ;
103+ auto schema_def = co_await _sr->get_valid_schema (schema_id);
104+ if (!schema_def) {
105+ co_return error{
106+ fmt::format (" Unable to find schema def for id: {}" , schema_id)};
107+ }
108+ if (schema_def->type () != schema_type::protobuf) {
109+ co_return error{fmt::format (
110+ " Schema {} has wrong type: {}" , name, schema_def->type ())};
111+ }
112+
113+ auto protobuf_def = schema_def
114+ ->visit (ss::make_visitor (
115+ [](const avro_schema_definition&)
116+ -> std::optional<protobuf_schema_definition> {
117+ return std::nullopt ;
118+ },
119+ [](const protobuf_schema_definition& pb_def)
120+ -> std::optional<protobuf_schema_definition> {
121+ return {pb_def};
122+ },
123+ [](const json_schema_definition&)
124+ -> std::optional<protobuf_schema_definition> {
125+ return std::nullopt ;
126+ }))
127+ .value ();
128+ auto md_res = pandaproxy::schema_registry::descriptor (
129+ protobuf_def, message_index);
130+ if (md_res.has_error ()) {
131+ co_return error{fmt::format (
132+ " Wasn't able to get descriptor for protobuf def with id: {}" ,
133+ schema_id)};
134+ }
135+
136+ iobuf val;
137+ val.append (" \0 " , 1 );
138+ int32_t encoded_id = ss::cpu_to_be (schema_id ());
139+ val.append ((const uint8_t *)(&encoded_id), 4 );
140+
141+ testing::protobuf_generator pb_gen (config);
142+ auto msg = pb_gen.generate_protobuf_message (&md_res.value ().get ());
143+
144+ val.append (encode_protobuf_message_index (message_index));
145+ val.append (iobuf::from (msg->SerializeAsString ()));
146+
147+ b.add_raw_kv (std::move (key), std::move (val));
148+ co_return std::nullopt ;
149+ }
150+
44151ss::future<checked<std::nullopt_t , record_generator::error>>
45152record_generator::add_random_avro_record (
46153 storage::record_batch_builder& b,
47154 std::string_view name,
48- std::optional<iobuf> key) {
155+ std::optional<iobuf> key,
156+ testing::avro_generator_config config) {
49157 using namespace pandaproxy ::schema_registry;
50158 auto it = _id_by_name.find (name);
51159 if (it == _id_by_name.end ()) {
@@ -83,8 +191,8 @@ record_generator::add_random_avro_record(
83191 co_return error{
84192 fmt::format (" Schema {} didn't resolve Avro node" , name)};
85193 }
86- testing::generator_state gs ;
87- auto datum = generate_datum (node_ptr, gs, 10 );
194+ testing::avro_generator gen (config) ;
195+ auto datum = gen. generate_datum (node_ptr);
88196 std::unique_ptr<avro::OutputStream> out = avro::memoryOutputStream ();
89197 avro::EncoderPtr e = avro::binaryEncoder ();
90198 e->init (*out);
0 commit comments