Skip to content

Commit a8e682e

Browse files
GrigoriyPACopilot
andauthored
YQ-5013 fixed S3 async decompression (#31655)
Co-authored-by: Copilot <[email protected]>
1 parent 1dc03cf commit a8e682e

File tree

19 files changed

+472
-204
lines changed

19 files changed

+472
-204
lines changed

ydb/core/kqp/ut/federated_query/common/common.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@ namespace NKikimr::NKqp::NFederatedQueryTest {
102102
appConfig->MutableQueryServiceConfig()->SetAllExternalDataSourcesAreAvailable(true);
103103
}
104104

105+
appConfig->MutableQueryServiceConfig()->MutableS3()->SetAllowLocalFiles(true);
106+
105107
auto settings = TKikimrSettings(*appConfig);
106108

107109
NYql::IHTTPGateway::TPtr httpGateway;

ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2870,6 +2870,177 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
28702870
UNIT_ASSERT_STRING_CONTAINS(issues, "Field 'data' has incompatible with S3 json_list input format type: Variant");
28712871
}
28722872

2873+
Y_UNIT_TEST(TestReadingFromFileValidation) {
2874+
{
2875+
TFileOutput out("data.txt");
2876+
out << R"({"data": "test_data"})";
2877+
}
2878+
2879+
auto kikimr = NTestUtils::MakeKikimrRunner();
2880+
auto tc = kikimr->GetTableClient();
2881+
auto session = tc.CreateSession().ExtractValueSync().GetSession();
2882+
{
2883+
const TString query = R"(
2884+
CREATE EXTERNAL DATA SOURCE test_bucket WITH (
2885+
SOURCE_TYPE = "ObjectStorage",
2886+
LOCATION = "file://./",
2887+
AUTH_METHOD = "NONE"
2888+
);)";
2889+
const auto result = session.ExecuteSchemeQuery(query).ExtractValueSync();
2890+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2891+
}
2892+
2893+
auto db = kikimr->GetQueryClient();
2894+
{
2895+
const TString query = R"(
2896+
SELECT * FROM test_bucket.`data.txt` WITH (
2897+
FORMAT = raw,
2898+
SCHEMA (
2899+
data String
2900+
)
2901+
);
2902+
)";
2903+
const auto result = db.ExecuteQuery(query, TTxControl::NoTx()).ExtractValueSync();
2904+
const auto& issues = result.GetIssues().ToString();
2905+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, issues);
2906+
UNIT_ASSERT_STRING_CONTAINS(issues, "Reading from files is not supported with format 'raw'");
2907+
}
2908+
2909+
{
2910+
const TString query = R"(
2911+
PRAGMA s3.AsyncDecompressing = "TRUE";
2912+
2913+
SELECT * FROM test_bucket.`data.txt` WITH (
2914+
FORMAT = json_each_row,
2915+
SCHEMA (
2916+
data String
2917+
)
2918+
);
2919+
)";
2920+
const auto result = db.ExecuteQuery(query, TTxControl::NoTx()).ExtractValueSync();
2921+
const auto& issues = result.GetIssues().ToString();
2922+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, issues);
2923+
UNIT_ASSERT_STRING_CONTAINS(issues, "Reading from files is not supported with enabled pragma s3.AsyncDecompressing, to read from files use: PRAGMA s3.AsyncDecompressing = \"FALSE\"");
2924+
}
2925+
2926+
{
2927+
const TString query = R"(
2928+
PRAGMA s3.UseRuntimeListing = "TRUE";
2929+
2930+
SELECT * FROM test_bucket.`/` WITH (
2931+
FORMAT = json_each_row,
2932+
SCHEMA (
2933+
data String
2934+
)
2935+
);
2936+
)";
2937+
const auto result = db.ExecuteQuery(query, TTxControl::NoTx()).ExtractValueSync();
2938+
const auto& issues = result.GetIssues().ToOneLineString();
2939+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, issues);
2940+
UNIT_ASSERT_STRING_CONTAINS(issues, "Subdirectory listing is not supported for local files (can not use delimiter: '/')");
2941+
}
2942+
}
2943+
2944+
Y_UNIT_TEST(TestAsyncDecompressionErrorHandle) {
2945+
const TString bucket = "test_async_decompressing_error_bucket";
2946+
CreateBucketWithObject(bucket, "test/data.json", R"({"data": "test_data"})");
2947+
2948+
auto kikimr = NTestUtils::MakeKikimrRunner();
2949+
auto tc = kikimr->GetTableClient();
2950+
auto session = tc.CreateSession().ExtractValueSync().GetSession();
2951+
{
2952+
const TString query = fmt::format(R"(
2953+
CREATE EXTERNAL DATA SOURCE test_bucket WITH (
2954+
SOURCE_TYPE = "ObjectStorage",
2955+
LOCATION = "{location}",
2956+
AUTH_METHOD = "NONE"
2957+
);)",
2958+
"location"_a = GetBucketLocation(bucket)
2959+
);
2960+
const auto result = session.ExecuteSchemeQuery(query).ExtractValueSync();
2961+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2962+
}
2963+
2964+
auto db = kikimr->GetQueryClient();
2965+
2966+
const TString query = R"(
2967+
PRAGMA s3.AsyncDecompressing = "TRUE";
2968+
2969+
SELECT * FROM test_bucket.`test/data.json` WITH (
2970+
FORMAT = json_each_row,
2971+
COMPRESSION = zstd,
2972+
SCHEMA (
2973+
data String
2974+
)
2975+
);
2976+
)";
2977+
const auto result = db.ExecuteQuery(query, TTxControl::NoTx()).ExtractValueSync();
2978+
const auto& issues = result.GetIssues().ToString();
2979+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::BAD_REQUEST, issues);
2980+
UNIT_ASSERT_STRING_CONTAINS(issues, "Error while reading file test/data.json");
2981+
UNIT_ASSERT_STRING_CONTAINS(issues, "Decompress failed: Unknown frame descriptor");
2982+
}
2983+
2984+
Y_UNIT_TEST(TestAsyncDecompressionWithLargeFile) {
2985+
const TString bucket = "test_async_decompressing_with_large_file_bucket";
2986+
CreateBucket(bucket);
2987+
2988+
NKikimrConfig::TAppConfig appConfig;
2989+
appConfig.MutableQueryServiceConfig()->MutableS3()->SetDataInflight(1_KB);
2990+
2991+
auto kikimr = NTestUtils::MakeKikimrRunner(appConfig);
2992+
auto tc = kikimr->GetTableClient();
2993+
auto session = tc.CreateSession().ExtractValueSync().GetSession();
2994+
{
2995+
const TString query = fmt::format(R"(
2996+
CREATE EXTERNAL DATA SOURCE test_bucket WITH (
2997+
SOURCE_TYPE = "ObjectStorage",
2998+
LOCATION = "{location}",
2999+
AUTH_METHOD = "NONE"
3000+
);)",
3001+
"location"_a = GetBucketLocation(bucket)
3002+
);
3003+
const auto result = session.ExecuteSchemeQuery(query).ExtractValueSync();
3004+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
3005+
}
3006+
3007+
auto db = kikimr->GetQueryClient();
3008+
{
3009+
const TString query = fmt::format(R"(
3010+
INSERT INTO test_bucket.`test/` WITH (
3011+
FORMAT = csv_with_names,
3012+
COMPRESSION = zstd
3013+
) SELECT
3014+
data,
3015+
RandomNumber(TableRow()) AS guid
3016+
FROM AS_TABLE(ListReplicate(<|data: "{payload}"|>, 10000)))",
3017+
"payload"_a = TString(200, 'X')
3018+
);
3019+
const auto result = db.ExecuteQuery(query, TTxControl::NoTx()).ExtractValueSync();
3020+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
3021+
}
3022+
3023+
const TString query = R"(
3024+
PRAGMA s3.AsyncDecompressing = "TRUE";
3025+
3026+
SELECT COUNT(*) FROM test_bucket.`test/` WITH (
3027+
FORMAT = csv_with_names,
3028+
COMPRESSION = zstd,
3029+
SCHEMA (
3030+
data String NOT NULL,
3031+
guid Uint64 NOT NULL,
3032+
)
3033+
);
3034+
)";
3035+
const auto result = db.ExecuteQuery(query, TTxControl::NoTx()).ExtractValueSync();
3036+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
3037+
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1);
3038+
3039+
auto resultSet = result.GetResultSetParser(0);
3040+
UNIT_ASSERT(resultSet.TryNextRow());
3041+
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUint64(), 10000);
3042+
}
3043+
28733044
Y_UNIT_TEST(TestRestartQueryAndCleanupWithGetOperation) {
28743045
auto kikimr = NTestUtils::MakeKikimrRunner(std::nullopt, {.EnableScriptExecutionBackgroundChecks = false});
28753046
auto db = kikimr->GetQueryClient();

ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -562,9 +562,10 @@ TDqPqRdReadActor::TDqPqRdReadActor(
562562
const auto programBuilder = std::make_unique<TProgramBuilder>(typeEnv, *holderFactory.GetFunctionRegistry());
563563

564564
// Parse output schema (expected struct output type)
565-
const auto& outputTypeYson = SourceParams.GetRowType();
566-
const auto outputItemType = NCommon::ParseTypeFromYson(TStringBuf(outputTypeYson), *programBuilder, Cerr);
567-
YQL_ENSURE(outputItemType, "Failed to parse output type: " << outputTypeYson);
565+
const TStringBuf outputTypeYson(SourceParams.GetRowType());
566+
TStringStream error;
567+
const auto outputItemType = NCommon::ParseTypeFromYson(outputTypeYson, *programBuilder, error);
568+
YQL_ENSURE(outputItemType, "Failed to parse output type: " << outputTypeYson << ", reason: " << error.Str());
568569
YQL_ENSURE(outputItemType->IsStruct(), "Output type " << outputTypeYson << " is not struct");
569570
const auto structType = static_cast<TStructType*>(outputItemType);
570571

ydb/library/yql/providers/s3/actors/yql_s3_decompressor_actor.cpp

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,11 @@ class TS3DecompressorCoroImpl : public TActorCoroImpl {
3030
private:
3131
class TCoroReadBuffer : public NDB::ReadBuffer {
3232
public:
33-
TCoroReadBuffer(TS3DecompressorCoroImpl* coro)
34-
: NDB::ReadBuffer(nullptr, 0ULL)
33+
explicit TCoroReadBuffer(TS3DecompressorCoroImpl* coro)
34+
: NDB::ReadBuffer(nullptr, 0)
3535
, Coro(coro)
36-
{ }
36+
{}
37+
3738
private:
3839
bool nextImpl() final {
3940
while (!Coro->InputFinished || !Coro->Requests.empty()) {
@@ -50,7 +51,8 @@ class TS3DecompressorCoroImpl : public TActorCoroImpl {
5051
}
5152
return false;
5253
}
53-
TS3DecompressorCoroImpl *const Coro;
54+
55+
TS3DecompressorCoroImpl* const Coro;
5456
TString RawDataBuffer;
5557
};
5658

@@ -107,7 +109,7 @@ class TS3DecompressorCoroImpl : public TActorCoroImpl {
107109
InputBuffer = std::move(event.Data);
108110
}
109111

110-
TDuration GetCpuTimeDelta() {
112+
TDuration GetCpuTimeDelta() const {
111113
return TDuration::Seconds(NHPTimer::GetSeconds(GetCycleCountFast() - StartCycleCount));
112114
}
113115

@@ -129,16 +131,17 @@ class TS3DecompressorCoroImpl : public TActorCoroImpl {
129131

130132
class TS3DecompressorCoroActor : public TActorCoro {
131133
public:
132-
TS3DecompressorCoroActor(THolder<TS3DecompressorCoroImpl> impl)
133-
: TActorCoro(THolder<TS3DecompressorCoroImpl>(impl.Release()))
134+
explicit TS3DecompressorCoroActor(THolder<TS3DecompressorCoroImpl> impl)
135+
: TActorCoro(std::move(impl))
134136
{}
137+
135138
private:
136139
void Registered(TActorSystem* actorSystem, const TActorId& parent) override {
137140
TActorCoro::Registered(actorSystem, parent); // Calls TActorCoro::OnRegister and sends bootstrap event to ourself.
138141
}
139142
};
140143

141-
}
144+
} // anonymous namespace
142145

143146
NActors::IActor* CreateS3DecompressorActor(const NActors::TActorId& parent, const TString& compression) {
144147
return new TS3DecompressorCoroActor(MakeHolder<TS3DecompressorCoroImpl>(parent, compression));

ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,11 @@ class TS3ReadActor : public NActors::TActorBootstrapped<TS3ReadActor>, public ID
111111
}
112112

113113
void Bootstrap() {
114+
if (Url.StartsWith("file://")) {
115+
OnFatalError({TIssue("Reading from files is not supported in raw read actor, please contact internal support")}, NDqProto::StatusIds::INTERNAL_ERROR);
116+
return;
117+
}
118+
114119
if (!UseRuntimeListing) {
115120
FileQueueActor = RegisterWithSameMailbox(CreateS3FileQueueActor(
116121
TxId,

0 commit comments

Comments
 (0)