1010#endif
1111
1212#include < nlohmann/json.hpp>
13+ #include < openssl/evp.h>
1314
15+ #include < algorithm>
1416#include < atomic>
1517#include < cctype>
1618#include < chrono>
@@ -34,6 +36,43 @@ using internal::IHttpClient;
3436using internal::ParseStatus;
3537using internal::QueryOptionsToJsonString;
3638
39+ namespace {
40+
41+ std::string Base64Encode (const std::string& input) {
42+ const auto encoded_len = 4 * ((input.size () + 2 ) / 3 );
43+ std::string output (encoded_len + 1 , ' \0 ' );
44+ EVP_EncodeBlock (reinterpret_cast <unsigned char *>(output.data ()), reinterpret_cast <const unsigned char *>(input.data ()),
45+ static_cast <int >(input.size ()));
46+ output.resize (encoded_len);
47+ return output;
48+ }
49+
50+ std::string Base64Decode (const std::string& input) {
51+ const auto max_decoded_len = 3 * input.size () / 4 ;
52+ std::string output (max_decoded_len, ' \0 ' );
53+ int decoded_len =
54+ EVP_DecodeBlock (reinterpret_cast <unsigned char *>(output.data ()),
55+ reinterpret_cast <const unsigned char *>(input.data ()), static_cast <int >(input.size ()));
56+ // EVP_DecodeBlock doesn't account for padding, trim trailing zeros
57+ if (input.size () >= 2 && input[input.size () - 1 ] == ' =' ) decoded_len--;
58+ if (input.size () >= 2 && input[input.size () - 2 ] == ' =' ) decoded_len--;
59+ output.resize (decoded_len);
60+ return output;
61+ }
62+
63+ bool IsJsonContentType (std::string_view content_type) {
64+ auto pos = content_type.find (' ;' );
65+ auto ct = content_type.substr (0 , pos);
66+ while (!ct.empty () && ct.back () == ' ' ) ct.remove_suffix (1 );
67+ while (!ct.empty () && ct.front () == ' ' ) ct.remove_prefix (1 );
68+ std::string lower (ct);
69+ std::transform (lower.begin (), lower.end (), lower.begin (), ::tolower);
70+ return lower == " application/json" || lower == " text/json" ||
71+ (lower.size () >= 5 && lower.substr (lower.size () - 5 ) == " +json" );
72+ }
73+
74+ } // namespace
75+
3776class Bucket : public IBucket {
3877 using BatchType = internal::BatchType;
3978
@@ -182,22 +221,31 @@ class Bucket : public IBucket {
182221 return ProcessBatchV2 (std::move (callback), BatchType::kUpdate );
183222 }
184223
185- Error WriteAttachments (std::string_view entry_name,
186- const AttachmentMap& attachments ) const noexcept override {
224+ Error WriteAttachments (std::string_view entry_name, const AttachmentMap& attachments,
225+ std::string_view content_type ) const noexcept override {
187226 if (attachments.empty ()) {
188227 return Error::kOk ;
189228 }
190229
230+ const auto ct = content_type.empty () ? " application/json" : std::string (content_type);
231+ const bool is_json = IsJsonContentType (ct);
232+
191233 Batch batch;
192234 const auto meta_entry = fmt::format (" {}/$meta" , entry_name);
193235 auto timestamp = std::chrono::time_point_cast<std::chrono::microseconds>(Time::clock::now ());
194236 for (const auto & [key, payload] : attachments) {
195- try {
196- [[maybe_unused]] auto parsed = nlohmann::json::parse (payload);
197- } catch (const std::exception& ex) {
198- return Error{.code = -1 , .message = ex.what ()};
237+ std::string data;
238+ if (is_json) {
239+ try {
240+ [[maybe_unused]] auto parsed = nlohmann::json::parse (payload);
241+ } catch (const std::exception& ex) {
242+ return Error{.code = -1 , .message = ex.what ()};
243+ }
244+ data = payload;
245+ } else {
246+ data = Base64Decode (payload);
199247 }
200- batch.AddRecord (meta_entry, timestamp, payload, " application/json " , {{" key" , key}});
248+ batch.AddRecord (meta_entry, timestamp, data, ct , {{" key" , key}});
201249 timestamp += std::chrono::microseconds (1 );
202250 }
203251
@@ -226,14 +274,17 @@ class Bucket : public IBucket {
226274 return false ;
227275 }
228276
229- try {
230- [[maybe_unused]] auto parsed = nlohmann::json::parse (payload);
231- } catch (const std::exception& ex) {
232- callback_err = Error{.code = -1 , .message = ex.what ()};
233- return false ;
277+ if (IsJsonContentType (record.content_type )) {
278+ try {
279+ [[maybe_unused]] auto parsed = nlohmann::json::parse (payload);
280+ } catch (const std::exception& ex) {
281+ callback_err = Error{.code = -1 , .message = ex.what ()};
282+ return false ;
283+ }
284+ attachments[key->second ] = std::move (payload);
285+ } else {
286+ attachments[key->second ] = Base64Encode (payload);
234287 }
235-
236- attachments[key->second ] = std::move (payload);
237288 return true ;
238289 });
239290
@@ -275,13 +326,13 @@ class Bucket : public IBucket {
275326
276327 Batch remove_batch;
277328 const auto meta_entry = fmt::format (" {}/$meta" , entry_name);
278- auto query_err = QueryV2 ({meta_entry}, std:: nullopt , std:: nullopt , std::move (options),
279- [&remove_batch](const auto & record) {
280- auto labels = record.labels ;
281- labels[" remove" ] = " true" ;
282- remove_batch.AddOnlyLabels (record.entry , record.timestamp , std::move (labels));
283- return true ;
284- });
329+ auto query_err =
330+ QueryV2 ({meta_entry}, std:: nullopt , std:: nullopt , std::move (options), [&remove_batch](const auto & record) {
331+ auto labels = record.labels ;
332+ labels[" remove" ] = " true" ;
333+ remove_batch.AddOnlyLabels (record.entry , record.timestamp , std::move (labels));
334+ return true ;
335+ });
285336
286337 if (query_err) {
287338 return query_err;
@@ -846,9 +897,8 @@ class Bucket : public IBucket {
846897 if (internal::IsCompatible (" 1.19" , api_version) && !options.record_entry .has_value ()) {
847898 return {{},
848899 Error{.code = -1 ,
849- .message =
850- " record entry and timestamp must be provided for ReductStore API v1.19+; use "
851- " record_entry and record_timestamp" }};
900+ .message = " record entry and timestamp must be provided for ReductStore API v1.19+; use "
901+ " record_entry and record_timestamp" }};
852902 }
853903
854904 return {std::move (options), Error::kOk };
0 commit comments