Skip to content

Commit 4fb66a4

Browse files
Allow injection of execution scheduling to RPC servers (#482)
* Refs #23271. Add `execute_request` to Server interface. Signed-off-by: Miguel Company <[email protected]> * Refs #23271. Add xxxServerSchedulingStrategy interface. Signed-off-by: Miguel Company <[email protected]> * Refs #23271. ThreadPool implements xxxServerSchedulingStrategy. Signed-off-by: Miguel Company <[email protected]> * Refs #23271. Add `server_stopped` to xxxServerSchedulingStrategy. Signed-off-by: Miguel Company <[email protected]> * Refs #23271. Keep pointer to interface instead of ThreadPool. Signed-off-by: Miguel Company <[email protected]> * Refs #23271. Add Server constructor that receives the scheduling strategy. Signed-off-by: Miguel Company <[email protected]> * Refs #23271. Add create_xxxServer overload that receives the scheduling strategy. Signed-off-by: Miguel Company <[email protected]> * Refs #23271. Fix documentation of `thread_pool_size`. Signed-off-by: Miguel Company <[email protected]> * Refs #23271. Publicly inherit from `enable_shared_from_this`. Signed-off-by: Miguel Company <[email protected]> * Refs #23271. Move `stop()` call from destructor to server proxy. Signed-off-by: Miguel Company <[email protected]> * Refs #23271. Override of `execute_request` moved to public. Signed-off-by: Miguel Company <[email protected]> * Refs #23271. Improve doxygen documentation. Signed-off-by: Miguel Company <[email protected]> * Refs #23271. Improve doxygen documentation on `stop`. Signed-off-by: Miguel Company <[email protected]> --------- Signed-off-by: Miguel Company <[email protected]>
1 parent d7a19f7 commit 4fb66a4

File tree

2 files changed

+163
-16
lines changed

2 files changed

+163
-16
lines changed

src/main/java/com/eprosima/fastdds/idl/templates/ServerHeader.stg

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,10 +96,55 @@ struct $interface.name$Server
9696
* @brief Stop the server.
9797
*
9898
* This method stops the server and releases all resources.
99-
* It will cancel all pending requests, and wait for all processing threads to finish before returning.
99+
* It will cancel all pending requests, and then @c call server_stopped on the request scheduler to let
100+
* it release any resources associated to this server.
101+
*
102+
* When the server has been created with the factory method that receives a @c thread_pool_size argument,
103+
* it will wait for all threads in the pool to finish before returning.
100104
*/
101105
virtual void stop() = 0;
102106

107+
/**
108+
* @brief Perform execution of a client request.
109+
*
110+
* @param request The client request to execute.
111+
*/
112+
virtual void execute_request(
113+
const std::shared_ptr<$interface.name$Server_ClientContext>& request) = 0;
114+
115+
};
116+
117+
struct $interface.name$ServerSchedulingStrategy
118+
{
119+
virtual ~$interface.name$ServerSchedulingStrategy() = default;
120+
121+
/**
122+
* @brief Schedule a request for processing.
123+
*
124+
* This method is called when a request is received and should be processed by the server.
125+
* The implementation should decide how to handle the request, whether to process it immediately,
126+
* or to queue it for later processing.
127+
*
128+
* A call to server->execute_request(request) should eventually be made to process the request.
129+
*
130+
* @note This method is called from the thread that takes requests and input feed values, so it
131+
* should not directly execute the request for operations that have input feed parameters.
132+
*
133+
* @param request The request to schedule.
134+
* @param server The server instance that should process the request.
135+
*/
136+
virtual void schedule_request(
137+
const std::shared_ptr<$interface.name$Server_ClientContext>& request,
138+
const std::shared_ptr<$interface.name$Server>& server) = 0;
139+
140+
/**
141+
* @brief Informs that a server has been stopped and all its requests have been cancelled.
142+
*
143+
* @param server The server instance that has been stopped.
144+
*/
145+
virtual void server_stopped(
146+
const std::shared_ptr<$interface.name$Server>& server) = 0;
147+
103148
};
104149

105150
/**
@@ -109,7 +154,7 @@ struct $interface.name$Server
109154
* @param service_name The name of the service.
110155
* @param qos The QoS settings for the server.
111156
* @param thread_pool_size The size of the thread pool to use for processing requests.
112-
* When set to 0, a new thread will be created when no threads are available.
157+
* When set to 0, a pool with a single thread will be created.
113158
* @param implementation The implementation of the server interface.
114159
*/
115160
extern eProsima_user_DllExport std::shared_ptr<$interface.name$Server> create_$interface.name$Server(
@@ -118,6 +163,22 @@ extern eProsima_user_DllExport std::shared_ptr<$interface.name$Server> create_$i
118163
const eprosima::fastdds::dds::ReplierQos& qos,
119164
size_t thread_pool_size,
120165
std::shared_ptr<$interface.name$Server_IServerImplementation> implementation);
166+
167+
/**
168+
* @brief Create a $interface.name$Server instance.
169+
*
170+
* @param part The DomainParticipant to use for the server.
171+
* @param service_name The name of the service.
172+
* @param qos The QoS settings for the server.
173+
* @param scheduler The request scheduling strategy to use for the server.
174+
* @param implementation The implementation of the server interface.
175+
*/
176+
extern eProsima_user_DllExport std::shared_ptr<$interface.name$Server> create_$interface.name$Server(
177+
eprosima::fastdds::dds::DomainParticipant& part,
178+
const char* service_name,
179+
const eprosima::fastdds::dds::ReplierQos& qos,
180+
std::shared_ptr<$interface.name$ServerSchedulingStrategy> scheduler,
181+
std::shared_ptr<$interface.name$Server_IServerImplementation> implementation);
121182
$endif$
122183
>>
123184

src/main/java/com/eprosima/fastdds/idl/templates/ServerSource.stg

Lines changed: 100 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -78,21 +78,37 @@ namespace frtps = eprosima::fastdds::rtps;
7878

7979
class $interface.name$ServerLogic
8080
: public $interface.name$Server
81+
, public std::enable_shared_from_this<$interface.name$ServerLogic>
8182
{
8283
using RequestType = $interface.name$_Request;
8384
using ReplyType = $interface.name$_Reply;
8485

8586
public:
8687

8788
$interface.name$ServerLogic(
88-
eprosima::fastdds::dds::DomainParticipant& part,
89+
fdds::DomainParticipant& part,
8990
const char* service_name,
90-
const eprosima::fastdds::dds::ReplierQos& qos,
91+
const fdds::ReplierQos& qos,
9192
size_t thread_pool_size,
9293
std::shared_ptr<$interface.name$Server_IServerImplementation> implementation)
94+
: $interface.name$ServerLogic(
95+
part,
96+
service_name,
97+
qos,
98+
std::make_shared<ThreadPool>(*this, thread_pool_size),
99+
std::move(implementation))
100+
{
101+
}
102+
103+
$interface.name$ServerLogic(
104+
fdds::DomainParticipant& part,
105+
const char* service_name,
106+
const fdds::ReplierQos& qos,
107+
std::shared_ptr<$interface.name$ServerSchedulingStrategy> scheduler,
108+
std::shared_ptr<$interface.name$Server_IServerImplementation> implementation)
93109
: $interface.name$Server()
94110
, participant_(part)
95-
, thread_pool_(*this, thread_pool_size)
111+
, request_scheduler_(scheduler)
96112
, implementation_(std::move(implementation))
97113
{
98114
// Register the service type support
@@ -120,8 +136,6 @@ public:
120136

121137
~$interface.name$ServerLogic() override
122138
{
123-
stop();
124-
125139
if (nullptr != replier_)
126140
{
127141
participant_.delete_service_replier(service_->get_service_name(), replier_);
@@ -188,7 +202,21 @@ public:
188202
}
189203

190204
// Wait for all threads to finish
191-
thread_pool_.stop();
205+
request_scheduler_->server_stopped(shared_from_this());
206+
}
207+
208+
void execute_request(
209+
const std::shared_ptr<$interface.name$Server_ClientContext>& request) override
210+
{
211+
auto ctx = std::dynamic_pointer_cast<RequestContext>(request);
212+
if (ctx)
213+
{
214+
execute_request(ctx);
215+
}
216+
else
217+
{
218+
throw std::runtime_error("Invalid request context type");
219+
}
192220
}
193221

194222
private:
@@ -346,6 +374,7 @@ $endif$
346374
};
347375

348376
struct ThreadPool
377+
: public $interface.name$ServerSchedulingStrategy
349378
{
350379
ThreadPool(
351380
$interface.name$ServerLogic& server,
@@ -362,7 +391,7 @@ $endif$
362391
{
363392
while (!finished_)
364393
{
365-
std::shared_ptr<RequestContext> req;
394+
std::shared_ptr<$interface.name$Server_ClientContext> req;
366395
{
367396
std::unique_lock<std::mutex> lock(mtx_);
368397
cv_.wait(lock, [this]()
@@ -388,9 +417,12 @@ $endif$
388417
}
389418
}
390419

391-
void new_request(
392-
const std::shared_ptr<RequestContext>& req)
420+
void schedule_request(
421+
const std::shared_ptr<$interface.name$Server_ClientContext>& req,
422+
const std::shared_ptr<$interface.name$Server>& server) override
393423
{
424+
static_cast<void>(server);
425+
394426
std::lock_guard<std::mutex> lock(mtx_);
395427
if (!finished_)
396428
{
@@ -399,8 +431,11 @@ $endif$
399431
}
400432
}
401433

402-
void stop()
434+
void server_stopped(
435+
const std::shared_ptr<$interface.name$Server>& server) override
403436
{
437+
static_cast<void>(server);
438+
404439
// Notify all threads in the pool to stop
405440
{
406441
std::lock_guard<std::mutex> lock(mtx_);
@@ -422,7 +457,7 @@ $endif$
422457
$interface.name$ServerLogic& server_;
423458
std::mutex mtx_;
424459
std::condition_variable cv_;
425-
std::queue<std::shared_ptr<RequestContext\>> requests_;
460+
std::queue<std::shared_ptr<$interface.name$Server_ClientContext\>> requests_;
426461
bool finished_{ false };
427462
std::vector<std::thread> threads_;
428463
};
@@ -454,7 +489,7 @@ $endif$
454489
processing_requests_[id] = ctx;
455490
}
456491

457-
thread_pool_.new_request(ctx);
492+
request_scheduler_->schedule_request(ctx, shared_from_this());
458493
}
459494

460495
void execute_request(
@@ -491,11 +526,49 @@ $endif$
491526
fdds::GuardCondition finish_condition_;
492527
std::mutex mtx_;
493528
std::map<frtps::SampleIdentity, std::shared_ptr<RequestContext\>> processing_requests_;
494-
ThreadPool thread_pool_;
529+
std::shared_ptr<$interface.name$ServerSchedulingStrategy> request_scheduler_;
495530
std::shared_ptr<$interface.name$Server_IServerImplementation> implementation_;
496531

497532
};
498533

534+
struct $interface.name$ServerProxy
535+
: public $interface.name$Server
536+
{
537+
$interface.name$ServerProxy(
538+
std::shared_ptr<$interface.name$Server> impl)
539+
: impl_(std::move(impl))
540+
{
541+
}
542+
543+
~$interface.name$ServerProxy() override
544+
{
545+
if (impl_)
546+
{
547+
impl_->stop();
548+
}
549+
}
550+
551+
void run() override
552+
{
553+
impl_->run();
554+
}
555+
556+
void stop() override
557+
{
558+
impl_->stop();
559+
}
560+
561+
void execute_request(
562+
const std::shared_ptr<$interface.name$Server_ClientContext>& request) override
563+
{
564+
impl_->execute_request(request);
565+
}
566+
567+
private:
568+
569+
std::shared_ptr<$interface.name$Server> impl_;
570+
};
571+
499572
} // namespace detail
500573

501574
std::shared_ptr<$interface.name$Server> create_$interface.name$Server(
@@ -505,8 +578,21 @@ std::shared_ptr<$interface.name$Server> create_$interface.name$Server(
505578
size_t thread_pool_size,
506579
std::shared_ptr<$interface.name$Server_IServerImplementation> implementation)
507580
{
508-
return std::make_shared<detail::$interface.name$ServerLogic>(
581+
auto ptr = std::make_shared<detail::$interface.name$ServerLogic>(
509582
part, service_name, qos, thread_pool_size, implementation);
583+
return std::make_shared<detail::$interface.name$ServerProxy>(ptr);
584+
}
585+
586+
std::shared_ptr<$interface.name$Server> create_$interface.name$Server(
587+
eprosima::fastdds::dds::DomainParticipant& part,
588+
const char* service_name,
589+
const eprosima::fastdds::dds::ReplierQos& qos,
590+
std::shared_ptr<$interface.name$ServerSchedulingStrategy> scheduler,
591+
std::shared_ptr<$interface.name$Server_IServerImplementation> implementation)
592+
{
593+
auto ptr = std::make_shared<detail::$interface.name$ServerLogic>(
594+
part, service_name, qos, scheduler, implementation);
595+
return std::make_shared<detail::$interface.name$ServerProxy>(ptr);
510596
}
511597

512598
//} interface $interface.name$

0 commit comments

Comments
 (0)