Skip to content

Commit 872e1b8

Browse files
Gazizonokigithub-actions[bot]
authored andcommitted
[C++ SDK] Custom executor (#20752)
1 parent 9ed4d6f commit 872e1b8

File tree

34 files changed

+332
-241
lines changed

34 files changed

+332
-241
lines changed

.github/last_commit.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
b997686248fd02a04de35b15816f27cf320eb67e
1+
234f6d55e1e71a242c2767a22b990d68ff7fce72

examples/executor/main.cpp

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
#include <ydb-cpp-sdk/client/driver/driver.h>
2+
#include <ydb-cpp-sdk/client/helpers/helpers.h>
3+
#include <ydb-cpp-sdk/client/query/client.h>
4+
5+
#include <library/cpp/getopt/last_getopt.h>
6+
7+
#include <util/thread/pool.h>
8+
9+
#include <thread>
10+
11+
12+
void ExecutorExample(const std::string& endpoint, const std::string& database) {
13+
auto driverConfig = NYdb::CreateFromEnvironment(endpoint + "/?database=" + database)
14+
.SetExecutor(NYdb::CreateThreadPoolExecutorAdapter(
15+
std::make_shared<TThreadPool>(TThreadPool::TParams()
16+
.SetBlocking(true)
17+
.SetCatching(false)
18+
.SetForkAware(false)),
19+
std::thread::hardware_concurrency())
20+
);
21+
22+
NYdb::TDriver driver(driverConfig);
23+
NYdb::NQuery::TQueryClient client(driver);
24+
25+
try {
26+
auto result = client.ExecuteQuery("SELECT 1", NYdb::NQuery::TTxControl::NoTx()).GetValueSync();
27+
NYdb::NStatusHelpers::ThrowOnError(result);
28+
auto parser = result.GetResultSetParser(0);
29+
parser.TryNextRow();
30+
std::cout << "Result: " << parser.ColumnParser(0).GetInt32() << std::endl;
31+
} catch (const std::exception& e) {
32+
std::cerr << "Execution failed: " << e.what() << std::endl;
33+
}
34+
35+
driver.Stop(true);
36+
}
37+
38+
int main(int argc, char** argv) {
39+
std::string endpoint;
40+
std::string database;
41+
42+
NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default();
43+
44+
opts.AddLongOption('e', "endpoint", "YDB endpoint").Required().RequiredArgument("HOST:PORT").StoreResult(&endpoint);
45+
opts.AddLongOption('d', "database", "YDB database").Required().RequiredArgument("DATABASE").StoreResult(&database);
46+
47+
opts.SetFreeArgsMin(0);
48+
NLastGetopt::TOptsParseResult result(&opts, argc, argv);
49+
50+
ExecutorExample(endpoint, database);
51+
return 0;
52+
}

include/ydb-cpp-sdk/client/driver/driver.h

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <ydb-cpp-sdk/client/types/fatal_error_handlers/handlers.h>
99
#include <ydb-cpp-sdk/client/types/request_settings.h>
1010
#include <ydb-cpp-sdk/client/types/status/status.h>
11+
#include <ydb-cpp-sdk/client/types/executor/executor.h>
1112

1213
#include <library/cpp/logger/backend.h>
1314

@@ -54,7 +55,7 @@ class TDriverConfig {
5455
//! Enable Ssl.
5556
//! caCerts - The buffer containing the PEM encoded root certificates for SSL/TLS connections.
5657
//! If this parameter is empty, the default roots will be used.
57-
TDriverConfig& UseSecureConnection(const std::string& caCerts = std::string());
58+
TDriverConfig& UseSecureConnection(const std::string& caCerts = "");
5859
TDriverConfig& SetUsePerChannelTcpConnection(bool usePerChannel);
5960
TDriverConfig& UseClientCertificate(const std::string& clientCert, const std::string& clientPrivateKey);
6061

@@ -107,8 +108,7 @@ class TDriverConfig {
107108
//! Set policy for balancing
108109
//! Params is a optionally field to set policy settings
109110
//! default: EBalancingPolicy::UsePreferableLocation
110-
TDriverConfig& SetBalancingPolicy(EBalancingPolicy policy, const std::string& params = std::string());
111-
111+
TDriverConfig& SetBalancingPolicy(EBalancingPolicy policy, const std::string& params = "");
112112
//! Set grpc level keep alive. If keepalive ping was delayed more than given timeout
113113
//! internal grpc routine fails request with TRANSIENT_FAILURE or TRANSPORT_UNAVAILABLE error
114114
//! Note: this timeout should not be too small to prevent fail due to
@@ -142,6 +142,11 @@ class TDriverConfig {
142142

143143
//! Log backend.
144144
TDriverConfig& SetLog(std::unique_ptr<TLogBackend>&& log);
145+
146+
//! Set executor for async responses.
147+
//! If not set, default executor will be used.
148+
TDriverConfig& SetExecutor(std::shared_ptr<IExecutor> executor);
149+
145150
private:
146151
class TImpl;
147152
std::shared_ptr<TImpl> Impl_;

include/ydb-cpp-sdk/client/federated_topic/federated_topic.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,7 @@ struct TFederatedReadSessionSettings: public NTopic::TReadSessionSettings {
394394
//! Executor for handlers.
395395
//! If not set, default single threaded executor will be used.
396396
//! Shared between subsessions
397-
FLUENT_SETTING(NTopic::IExecutor::TPtr, HandlersExecutor);
397+
FLUENT_SETTING(IExecutor::TPtr, HandlersExecutor);
398398
};
399399

400400
//! Federated event handlers.
@@ -492,10 +492,10 @@ struct TFederatedTopicClientSettings : public TCommonClientSettingsBase<TFederat
492492
using TSelf = TFederatedTopicClientSettings;
493493

494494
//! Default executor for compression tasks.
495-
FLUENT_SETTING_DEFAULT(NTopic::IExecutor::TPtr, DefaultCompressionExecutor, NTopic::CreateThreadPoolExecutor(2));
495+
FLUENT_SETTING_DEFAULT(IExecutor::TPtr, DefaultCompressionExecutor, CreateThreadPoolExecutor(2));
496496

497497
//! Default executor for callbacks.
498-
FLUENT_SETTING_DEFAULT(NTopic::IExecutor::TPtr, DefaultHandlersExecutor, NTopic::CreateThreadPoolExecutor(1));
498+
FLUENT_SETTING_DEFAULT(IExecutor::TPtr, DefaultHandlersExecutor, CreateThreadPoolExecutor(1));
499499

500500
//! Connection timeoout for federation discovery.
501501
FLUENT_SETTING_DEFAULT(TDuration, ConnectionTimeout, TDuration::Seconds(30));
Lines changed: 1 addition & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,9 @@
11
#pragma once
22

3-
#include <util/generic/ptr.h>
4-
#include <util/system/spinlock.h>
5-
#include <util/thread/pool.h>
6-
7-
#include <memory>
8-
#include <mutex>
3+
#include <ydb-cpp-sdk/client/types/executor/executor.h>
94

105
namespace NYdb::inline V3::NTopic {
116

12-
class IExecutor: public TThrRefBase {
13-
public:
14-
using TPtr = TIntrusivePtr<IExecutor>;
15-
using TFunction = std::function<void()>;
16-
17-
// Is executor asynchronous.
18-
virtual bool IsAsync() const = 0;
19-
20-
// Post function to execute.
21-
virtual void Post(TFunction&& f) = 0;
22-
23-
// Start method.
24-
// This method is idempotent.
25-
// It can be called many times. Only the first one has effect.
26-
void Start() {
27-
std::lock_guard guard(StartLock);
28-
if (!Started) {
29-
DoStart();
30-
Started = true;
31-
}
32-
}
33-
34-
private:
35-
virtual void DoStart() = 0;
36-
37-
private:
38-
bool Started = false;
39-
TAdaptiveLock StartLock;
40-
};
41-
IExecutor::TPtr CreateThreadPoolExecutorAdapter(
42-
std::shared_ptr<IThreadPool> threadPool); // Thread pool is expected to have been started.
43-
IExecutor::TPtr CreateThreadPoolExecutor(size_t threads);
44-
457
IExecutor::TPtr CreateSyncExecutor();
468

479
} // namespace NYdb::NTopic

include/ydb-cpp-sdk/client/types/credentials/oauth2_token_exchange/jwt_token_source.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ struct TJwtTokenSourceParams {
3636
class ISigningAlgorithm {
3737
public:
3838
virtual ~ISigningAlgorithm() = default;
39-
#ifdef YDB_SDK_USE_NEW_JWT
39+
#ifdef YDB_SDK_OSS
4040
virtual std::string sign(const std::string& data, std::error_code& ec) const = 0;
4141
#else
4242
virtual std::string sign(const std::string& data) const = 0;
@@ -54,7 +54,7 @@ struct TJwtTokenSourceParams {
5454
{
5555
}
5656

57-
#ifdef YDB_SDK_USE_NEW_JWT
57+
#ifdef YDB_SDK_OSS
5858
std::string sign(const std::string& data, std::error_code& ec) const override {
5959
return Alg.sign(data, ec);
6060
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
#pragma once
2+
3+
#ifndef YDB_SDK_OSS
4+
#include <util/thread/pool.h>
5+
#endif
6+
7+
#include <functional>
8+
#include <mutex>
9+
10+
namespace NYdb::inline V3 {
11+
12+
class IExecutor {
13+
public:
14+
using TPtr = std::shared_ptr<IExecutor>;
15+
using TFunction = std::function<void()>;
16+
17+
// Start method.
18+
// This method is idempotent.
19+
void Start() {
20+
std::lock_guard guard(StartLock);
21+
if (!Started) {
22+
DoStart();
23+
Started = true;
24+
}
25+
}
26+
27+
virtual void Stop() = 0;
28+
29+
// Post function to execute.
30+
virtual void Post(TFunction&& f) = 0;
31+
32+
// Is executor asynchronous.
33+
virtual bool IsAsync() const = 0;
34+
35+
virtual ~IExecutor() = default;
36+
37+
protected:
38+
virtual void DoStart() = 0;
39+
40+
bool Started = false;
41+
std::mutex StartLock;
42+
};
43+
44+
// Create default executor for thread pool.
45+
IExecutor::TPtr CreateThreadPoolExecutor(std::size_t threadCount, std::size_t maxQueueSize = 0);
46+
47+
#ifndef YDB_SDK_OSS
48+
// Create executor adapter for util thread pool.
49+
// Thread pool is started and stopped by SDK.
50+
IExecutor::TPtr CreateThreadPoolExecutorAdapter(std::shared_ptr<IThreadPool> threadPool, std::size_t threadCount, std::size_t maxQueueSize = 0);
51+
52+
// Create executor adapter for util thread pool.
53+
// Thread pool is expected to have been started and stopped by user.
54+
IExecutor::TPtr CreateExternalThreadPoolExecutorAdapter(std::shared_ptr<IThreadPool> threadPool);
55+
#endif
56+
57+
} // namespace NYdb

src/client/driver/driver.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ class TDriverConfig::TImpl : public IConnectionsParams {
5050
uint64_t GetMaxOutboundMessageSize() const override { return MaxOutboundMessageSize; }
5151
uint64_t GetMaxMessageSize() const override { return MaxMessageSize; }
5252
const TLog& GetLog() const override { return Log; }
53+
std::shared_ptr<IExecutor> GetExecutor() const override { return Executor; }
5354

5455
std::string Endpoint;
5556
size_t NetworkThreadsNum = 2;
@@ -78,6 +79,7 @@ class TDriverConfig::TImpl : public IConnectionsParams {
7879
uint64_t MaxOutboundMessageSize = 0;
7980
uint64_t MaxMessageSize = 0;
8081
TLog Log; // Null by default.
82+
std::shared_ptr<IExecutor> Executor;
8183
};
8284

8385
TDriverConfig::TDriverConfig(const std::string& connectionString)
@@ -214,6 +216,11 @@ TDriverConfig& TDriverConfig::SetLog(std::unique_ptr<TLogBackend>&& log) {
214216
return *this;
215217
}
216218

219+
TDriverConfig& TDriverConfig::SetExecutor(std::shared_ptr<IExecutor> executor) {
220+
Impl_->Executor = executor;
221+
return *this;
222+
}
223+
217224
////////////////////////////////////////////////////////////////////////////////
218225

219226
std::shared_ptr<TGRpcConnectionsImpl> CreateInternalInterface(const TDriver connection) {

src/client/federated_topic/impl/federated_topic_impl.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,10 +83,10 @@ NThreading::TFuture<std::vector<TFederatedTopicClient::TClusterInfo>> TFederated
8383
});
8484
}
8585

86-
auto TFederatedTopicClient::TImpl::GetSubsessionHandlersExecutor() -> NTopic::IExecutor::TPtr {
86+
auto TFederatedTopicClient::TImpl::GetSubsessionHandlersExecutor() -> IExecutor::TPtr {
8787
with_lock (Lock) {
8888
if (!SubsessionHandlersExecutor) {
89-
SubsessionHandlersExecutor = NTopic::CreateThreadPoolExecutor(1);
89+
SubsessionHandlersExecutor = CreateThreadPoolExecutor(1);
9090
}
9191
return SubsessionHandlersExecutor;
9292
}

src/client/federated_topic/impl/federated_topic_impl.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ class TFederatedTopicClient::TImpl {
7676

7777
private:
7878

79-
// Use single-threaded executor to prevent deadlocks inside subsession event handlers.
80-
NTopic::IExecutor::TPtr GetSubsessionHandlersExecutor();
79+
// Use single-threaded executor to prevent deadlocks inside subsession event handlers.
80+
IExecutor::TPtr GetSubsessionHandlersExecutor();
8181

8282
private:
8383
std::shared_ptr<TGRpcConnectionsImpl> Connections;
@@ -86,7 +86,7 @@ class TFederatedTopicClient::TImpl {
8686
std::shared_ptr<std::unordered_map<NTopic::ECodec, std::unique_ptr<NTopic::ICodec>>> ProvidedCodecs =
8787
std::make_shared<std::unordered_map<NTopic::ECodec, std::unique_ptr<NTopic::ICodec>>>();
8888

89-
NTopic::IExecutor::TPtr SubsessionHandlersExecutor;
89+
IExecutor::TPtr SubsessionHandlersExecutor;
9090

9191
TAdaptiveLock Lock;
9292
};

0 commit comments

Comments
 (0)