Skip to content

Commit 2e188b8

Browse files
committed
YQL-20072: Support mrrun + yqlworker
commit_hash:a70465d9a63f8a86c6990ae04e9c615827498fe0
1 parent 949037f commit 2e188b8

File tree

19 files changed

+96
-20
lines changed

19 files changed

+96
-20
lines changed

yql/essentials/cfg/tests/fs_http.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
MaxRedirects: 10
2-
SocketTimeoutMs: 300000
2+
TimeoutMs: 300000
33
MaxRetries: 3
44

55
MinDelayMs: 1000

yql/essentials/core/facade/yql_facade.cpp

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,10 @@ void TProgramFactory::SetCredentials(TCredentials::TPtr credentials) {
203203
Credentials_ = std::move(credentials);
204204
}
205205

206+
void TProgramFactory::AddRemoteLayersProvider(const TString& alias, NLayers::IRemoteLayerProviderPtr provider) {
207+
RemoteLayersProviders_.emplace(alias, std::move(provider));
208+
}
209+
206210
void TProgramFactory::SetGatewaysConfig(const TGatewaysConfig* gatewaysConfig) {
207211
GatewaysConfig_ = gatewaysConfig;
208212
}
@@ -281,7 +285,7 @@ TProgramPtr TProgramFactory::Create(
281285
LangVer_, MaxLangVer_, VolatileResults_, UserDataTable_, Credentials_, moduleResolver, urlListerManager,
282286
udfResolver, udfIndex, udfIndexPackageSet, FileStorage_, UrlPreprocessing_,
283287
GatewaysConfig_, filename, sourceCode, sessionId, Runner_, EnableRangeComputeFor_, ArrowResolver_, hiddenMode,
284-
qContext, gatewaysForMerge);
288+
qContext, gatewaysForMerge, RemoteLayersProviders_);
285289
}
286290

287291
///////////////////////////////////////////////////////////////////////////////
@@ -314,7 +318,8 @@ TProgram::TProgram(
314318
const IArrowResolver::TPtr& arrowResolver,
315319
EHiddenMode hiddenMode,
316320
const TQContext& qContext,
317-
TMaybe<TString> gatewaysForMerge)
321+
TMaybe<TString> gatewaysForMerge,
322+
THashMap<TString, NLayers::IRemoteLayerProviderPtr> remoteLayersProviders)
318323
: FunctionRegistry_(functionRegistry)
319324
, RandomProvider_(randomProvider)
320325
, TimeProvider_(timeProvider)
@@ -347,6 +352,7 @@ TProgram::TProgram(
347352
, HiddenMode_(hiddenMode)
348353
, QContext_(qContext)
349354
, GatewaysForMerge_(gatewaysForMerge)
355+
, RemoteLayersProviders_(std::move(remoteLayersProviders))
350356
{
351357
if (SessionId_.empty()) {
352358
SessionId_ = CreateGuidAsString();
@@ -2073,6 +2079,9 @@ TTypeAnnotationContextPtr TProgram::BuildTypeAnnotationContext(const TString& us
20732079
typeAnnotationContext->FileStorage = FileStorage_;
20742080
typeAnnotationContext->QContext = QContext_;
20752081
typeAnnotationContext->HiddenMode = HiddenMode_;
2082+
for (auto& [alias, provider] : RemoteLayersProviders_) {
2083+
typeAnnotationContext->AddRemoteLayersProvider(alias, provider);
2084+
}
20762085

20772086
if (UdfIndex_ && UdfIndexPackageSet_) {
20782087
// setup default versions at the beginning

yql/essentials/core/facade/yql_facade.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <yql/essentials/core/url_preprocessing/interface/url_preprocessing.h>
99
#include <yql/essentials/core/yql_type_annotation.h>
1010
#include <yql/essentials/core/yql_user_data.h>
11+
#include <yql/essentials/core/layers/remote_layer_provider.h>
1112
#include <yql/essentials/core/qplayer/storage/interface/yql_qstorage.h>
1213
#include <yql/essentials/providers/config/yql_config_provider.h>
1314
#include <yql/essentials/providers/result/provider/yql_result_provider.h>
@@ -65,6 +66,7 @@ class TProgramFactory: public TThrRefBase, private TMoveOnly {
6566
void EnableRangeComputeFor();
6667
void SetArrowResolver(IArrowResolver::TPtr arrowResolver);
6768
void SetUdfResolverLogfile(const TString& path);
69+
void AddRemoteLayersProvider(const TString& alias, NLayers::IRemoteLayerProviderPtr provider);
6870

6971
TProgramPtr Create(
7072
const TFile& file,
@@ -105,6 +107,7 @@ class TProgramFactory: public TThrRefBase, private TMoveOnly {
105107
bool EnableRangeComputeFor_ = false;
106108
IArrowResolver::TPtr ArrowResolver_;
107109
TMaybe<TString> UdfResolverLogfile_;
110+
THashMap<TString, NLayers::IRemoteLayerProviderPtr> RemoteLayersProviders_;
108111
};
109112

110113
///////////////////////////////////////////////////////////////////////////////
@@ -387,7 +390,8 @@ class TProgram: public TThrRefBase, private TNonCopyable {
387390
const IArrowResolver::TPtr& arrowResolver,
388391
EHiddenMode hiddenMode,
389392
const TQContext& qContext,
390-
TMaybe<TString> gatewaysForMerge);
393+
TMaybe<TString> gatewaysForMerge,
394+
THashMap<TString, NLayers::IRemoteLayerProviderPtr> remoteLayersProviders);
391395

392396
TTypeAnnotationContextPtr BuildTypeAnnotationContext(const TString& username);
393397
TTypeAnnotationContextPtr GetAnnotationContext() const;
@@ -490,6 +494,7 @@ class TProgram: public TThrRefBase, private TNonCopyable {
490494
TIssues FinalIssues_;
491495
TMaybe<TIssue> ParametersIssue_;
492496
bool EnableLineage_ = false;
497+
THashMap<TString, NLayers::IRemoteLayerProviderPtr> RemoteLayersProviders_;
493498
};
494499

495500
void UpdateSqlFlagsFromQContext(const TQContext& qContext, THashSet<TString>& flags, TMaybe<TString> gatewaysPatch = {});

yql/essentials/core/file_storage/file_storage_ut.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#include <yql/essentials/utils/test_http_server/test_http_server.h>
44
#include <yql/essentials/core/file_storage/proto/file_storage.pb.h>
55
#include <yql/essentials/core/file_storage/http_download/http_download.h>
6-
#include <yql/essentials/core/file_storage/http_download/proto/http_download.pb.h>
6+
#include <yql/essentials/utils/fetch/proto/fetch_config.pb.h>
77

88
#include <library/cpp/threading/future/future.h>
99
#include <library/cpp/threading/future/async.h>
@@ -26,7 +26,7 @@ static TString ReadFileContent(const TString& path) {
2626
return TIFStream(path).ReadAll();
2727
}
2828

29-
static TFileStoragePtr CreateTestFS(TFileStorageConfig params = {}, const THttpDownloaderConfig* httpCfg = nullptr) {
29+
static TFileStoragePtr CreateTestFS(TFileStorageConfig params = {}, const TFetchConfig* httpCfg = nullptr) {
3030
if (httpCfg) {
3131
TStringStream strCfg;
3232
SerializeToTextFormat(*httpCfg, strCfg);
@@ -550,8 +550,8 @@ Y_UNIT_TEST(SocketTimeout) {
550550
return TTestHttpServer::TReply::Ok("ABC");
551551
});
552552

553-
THttpDownloaderConfig httpCfg;
554-
httpCfg.SetSocketTimeoutMs(1000);
553+
TFetchConfig httpCfg;
554+
httpCfg.SetTimeoutMs(1000);
555555
TFileStorageConfig params;
556556
TFileStoragePtr fs = CreateTestFS(params, &httpCfg);
557557

yql/essentials/core/file_storage/http_download/http_download.cpp

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
#include "http_download.h"
22

33
#include <yql/essentials/core/file_storage/proto/file_storage.pb.h>
4-
#include <yql/essentials/core/file_storage/http_download/proto/http_download.pb.h>
54
#include <yql/essentials/core/file_storage/download/download_stream.h>
65
#include <yql/essentials/core/file_storage/download/download_config.h>
76
#include <yql/essentials/core/file_storage/defs/downloader.h>
7+
#include <yql/essentials/utils/fetch/proto/fetch_config.pb.h>
88
#include <yql/essentials/utils/fetch/fetch.h>
99
#include <yql/essentials/utils/log/context.h>
1010
#include <yql/essentials/utils/md5_stream.h>
@@ -22,7 +22,7 @@
2222

2323
namespace NYql {
2424

25-
class THttpDownloader: public TDownloadConfig<THttpDownloader, THttpDownloaderConfig>, public NYql::NFS::IDownloader {
25+
class THttpDownloader: public TDownloadConfig<THttpDownloader, TFetchConfig>, public NYql::NFS::IDownloader {
2626
public:
2727
THttpDownloader(const TFileStorageConfig& config)
2828
: UseFakeChecksums(GetEnv("YQL_LOCAL") == "1")
@@ -31,7 +31,7 @@ class THttpDownloader: public TDownloadConfig<THttpDownloader, THttpDownloaderCo
3131
}
3232
~THttpDownloader() = default;
3333

34-
void DoConfigure(const THttpDownloaderConfig& cfg) {
34+
void DoConfigure(const TFetchConfig& cfg) {
3535
Policy_ = IRetryPolicy<unsigned>::GetExponentialBackoffPolicy(
3636
DefaultClassifyHttpCode,
3737
TDuration::MilliSeconds(cfg.GetMinDelayMs()),
@@ -41,7 +41,7 @@ class THttpDownloader: public TDownloadConfig<THttpDownloader, THttpDownloaderCo
4141
TDuration::MilliSeconds(cfg.GetMaxTotalDelayTimeMs()),
4242
cfg.GetScale());
4343
Redirects_ = cfg.GetMaxRedirects();
44-
SocketTimeoutMs = cfg.GetSocketTimeoutMs();
44+
TimeoutMs = cfg.GetTimeoutMs();
4545
}
4646

4747
bool Accept(const THttpURL& url) final {
@@ -56,7 +56,7 @@ class THttpDownloader: public TDownloadConfig<THttpDownloader, THttpDownloaderCo
5656
}
5757

5858
std::tuple<NYql::NFS::TDataProvider, TString, TString> Download(const THttpURL& url, const TString& token, const TString& oldEtag, const TString& oldLastModified) final {
59-
TFetchResultPtr fr1 = FetchWithETagAndLastModified(url, token, oldEtag, oldLastModified, SocketTimeoutMs, Redirects_, Policy_);
59+
TFetchResultPtr fr1 = FetchWithETagAndLastModified(url, token, oldEtag, oldLastModified, TimeoutMs, Redirects_, Policy_);
6060
switch (fr1->GetRetCode()) {
6161
case HTTP_NOT_MODIFIED:
6262
return std::make_tuple(NYql::NFS::TDataProvider{}, TString{}, TString{});
@@ -76,7 +76,7 @@ class THttpDownloader: public TDownloadConfig<THttpDownloader, THttpDownloaderCo
7676
}
7777

7878
private:
79-
static TFetchResultPtr FetchWithETagAndLastModified(const THttpURL& url, const TString& token, const TString& oldEtag, const TString& oldLastModified, ui32 socketTimeoutMs, size_t redirects, const IRetryPolicy<unsigned>::TPtr& policy) {
79+
static TFetchResultPtr FetchWithETagAndLastModified(const THttpURL& url, const TString& token, const TString& oldEtag, const TString& oldLastModified, ui32 timeoutMs, size_t redirects, const IRetryPolicy<unsigned>::TPtr& policy) {
8080
// more details about ETag and ModifiedSince: https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.26
8181
THttpHeaders headers;
8282
if (!token.empty()) {
@@ -91,7 +91,7 @@ class THttpDownloader: public TDownloadConfig<THttpDownloader, THttpDownloaderCo
9191
}
9292

9393
try {
94-
return Fetch(url, headers, TDuration::MilliSeconds(socketTimeoutMs), redirects, policy);
94+
return Fetch(url, headers, TDuration::MilliSeconds(timeoutMs), redirects, policy);
9595
} catch (const std::exception& e) {
9696
// remap exception type to leverage retry logic
9797
throw TDownloadError() << e.what();
@@ -159,7 +159,7 @@ class THttpDownloader: public TDownloadConfig<THttpDownloader, THttpDownloaderCo
159159
private:
160160
const bool UseFakeChecksums = false;
161161
IRetryPolicy<unsigned>::TPtr Policy_;
162-
ui32 SocketTimeoutMs = 300000;
162+
ui32 TimeoutMs = 300000;
163163
size_t Redirects_ = 10;
164164
};
165165

yql/essentials/core/file_storage/http_download/ya.make

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ PEERDIR(
88
yql/essentials/core/file_storage/defs
99
yql/essentials/core/file_storage/download
1010
yql/essentials/core/file_storage/proto
11-
yql/essentials/core/file_storage/http_download/proto
1211
yql/essentials/utils/fetch
1312
yql/essentials/utils/log
1413
yql/essentials/utils

yql/essentials/core/file_storage/ut/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ PEERDIR(
1111
library/cpp/threading/future
1212
library/cpp/deprecated/atomic
1313
yql/essentials/utils/test_http_server
14+
yql/essentials/utils/fetch/proto
1415
)
1516

1617
END()

yql/essentials/core/layers/layers.cpp

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,21 @@ class TLayersRegistry: public ILayersRegistry {
264264
THashMap<TString, TLayerInfo> InfoByUrl_;
265265
THashMap<TString, TLogicalInfo> LogicalInfoByName_;
266266
};
267+
268+
class TDummyRemoteLayerProvider: public IRemoteLayerProvider {
269+
public:
270+
TDummyRemoteLayerProvider(TString errorMessage)
271+
: ErrorMessage_(std::move(errorMessage))
272+
{
273+
}
274+
275+
NThreading::TFuture<TLayerInfo> GetLayerInfo(const TMaybe<TString>&, const TString&) const override {
276+
return NThreading::MakeErrorFuture<TLayerInfo>(std::make_exception_ptr(yexception() << ErrorMessage_));
277+
}
278+
279+
private:
280+
TString ErrorMessage_;
281+
};
267282
} // namespace
268283

269284
namespace NYql::NLayers {
@@ -343,4 +358,9 @@ TMaybe<TVector<TLocations>> RemoveDuplicates(const TVector<std::pair<TKey, const
343358
ILayersRegistryPtr MakeLayersRegistry(const THashMap<TString, IRemoteLayerProviderPtr>& remoteProviders, const THashMap<TString, ILayersIntegrationPtr>& integrations) {
344359
return MakeIntrusive<TLayersRegistry>(remoteProviders, integrations);
345360
}
361+
362+
IRemoteLayerProviderPtr MakeDummyRemoteLayerProvider(TString errorMessage) {
363+
return MakeIntrusive<TDummyRemoteLayerProvider>(std::move(errorMessage));
364+
}
365+
346366
} // namespace NYql::NLayers

yql/essentials/core/layers/remote_layer_provider.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,7 @@ class IRemoteLayerProvider: public TThrRefBase {
1111
};
1212

1313
using IRemoteLayerProviderPtr = TIntrusivePtr<IRemoteLayerProvider>;
14+
15+
IRemoteLayerProviderPtr MakeDummyRemoteLayerProvider(TString errorMessage);
16+
1417
} // namespace NYql::NLayers

yql/essentials/providers/common/proto/gateways_config.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package NYql;
22
option java_package = "ru.yandex.yql.proto";
33

44
import "yql/essentials/protos/clickhouse.proto";
5+
import "yql/essentials/utils/fetch/proto/fetch_config.proto";
56

67

78
/////////////////////////////// common ///////////////////////////////
@@ -933,6 +934,11 @@ message TFmrGatewayConfig {
933934
repeated TFmrConfig FmrConfigurations = 1;
934935
}
935936

937+
message TLayersConfig {
938+
optional string FetchUrl = 1;
939+
optional TFetchConfig FetchConfig = 2;
940+
}
941+
936942
/////////////////////////////// Root ///////////////////////////////
937943

938944
message TGatewaysConfig {
@@ -961,4 +967,5 @@ message TGatewaysConfig {
961967
optional TDbResolverConfig DbResolver = 23;
962968
optional TYtflowGatewayConfig Ytflow = 24;
963969
optional TFmrGatewayConfig Fmr = 25;
970+
optional TLayersConfig Layers = 26;
964971
}

0 commit comments

Comments
 (0)