Skip to content

Commit cbbff2d

Browse files
authored
YQ-4221 fixed s3 write with large path (#16902)
1 parent e1ad79a commit cbbff2d

File tree

2 files changed

+32
-3
lines changed

2 files changed

+32
-3
lines changed

ydb/core/kqp/proxy_service/kqp_script_executions.cpp

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,30 @@ NYql::TIssues DeserializeIssues(const TString& issuesSerialized) {
7070
return issues;
7171
}
7272

73+
template <typename TProto>
74+
void SerializeBinaryProto(const TProto& proto, NJson::TJsonValue& value) {
75+
value.SetType(NJson::EJsonValueType::JSON_MAP);
76+
77+
const auto config = NProtobufJson::TProto2JsonConfig()
78+
.AddStringTransform(MakeIntrusive<NProtobufJson::TBase64EncodeBytesTransform>());
79+
80+
NProtobufJson::Proto2Json(proto, value["encoded_proto"], config);
81+
}
82+
83+
template <typename TProto>
84+
void DeserializeBinaryProto(const NJson::TJsonValue& value, TProto& proto) {
85+
const auto& valueMap = value.GetMap();
86+
const auto encodedProto = valueMap.find("encoded_proto");
87+
if (encodedProto == valueMap.end()) {
88+
return NProtobufJson::Json2Proto(value, proto, NProtobufJson::TJson2ProtoConfig());
89+
}
90+
91+
const auto config = NProtobufJson::TJson2ProtoConfig()
92+
.AddStringTransform(MakeIntrusive<NProtobufJson::TBase64DecodeBytesTransform>());
93+
94+
NProtobufJson::Json2Proto(encodedProto->second, proto, config);
95+
}
96+
7397

7498
class TQueryBase : public NKikimr::TQueryBase {
7599
public:
@@ -2272,7 +2296,7 @@ class TSaveScriptExternalEffectActor : public TQueryBase {
22722296
NJson::TJsonValue::TArray& jsonArray = value.GetArraySafe();
22732297
jsonArray.resize(sinks.size());
22742298
for (size_t i = 0; i < sinks.size(); ++i) {
2275-
NProtobufJson::Proto2Json(sinks[i], jsonArray[i], NProtobufJson::TProto2JsonConfig());
2299+
SerializeBinaryProto(sinks[i], jsonArray[i]);
22762300
}
22772301

22782302
NJsonWriter::TBuf serializedSinks;
@@ -2416,7 +2440,7 @@ class TSaveScriptFinalStatusActor : public TQueryBase {
24162440
value.GetValuePointer(i, &serializedSink);
24172441

24182442
NKqpProto::TKqpExternalSink sink;
2419-
NProtobufJson::Json2Proto(*serializedSink, sink);
2443+
DeserializeBinaryProto(*serializedSink, sink);
24202444
Response->Sinks.push_back(sink);
24212445
}
24222446
}

ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -785,7 +785,10 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
785785
Y_UNIT_TEST(InsertIntoBucketWithSelect) {
786786
const TString writeDataSourceName = "/Root/write_data_source";
787787
const TString writeBucket = "test_bucket_write_with_select";
788-
const TString writeObject = "test_object_write/";
788+
789+
// Also tests large object path with size >= 128
790+
// for atomic upload commit case
791+
const TString writeObject = TStringBuilder() << "test_object_write/" << TString(512, 'x') << "/";
789792

790793
{
791794
Aws::S3::S3Client s3Client = MakeS3Client();
@@ -811,6 +814,8 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
811814
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
812815

813816
const TString sql = fmt::format(R"(
817+
PRAGMA s3.AtomicUploadCommit = "true";
818+
814819
INSERT INTO `{write_source}`.`{write_object}` WITH (FORMAT = "csv_with_names")
815820
SELECT * FROM AS_TABLE([<|id: 0, payload: "#######"|>]);
816821

0 commit comments

Comments
 (0)