@@ -63,22 +63,38 @@ namespace frpc = eprosima::fastdds::dds::rpc;
63
63
namespace frtps = eprosima::fastdds::rtps;
64
64
65
65
class CalculatorServerLogic
66
- : public CalculatorServer
66
+ : public frpc::RpcServer
67
+ , public std::enable_shared_from_this<CalculatorServerLogic>
67
68
{
68
69
using RequestType = Calculator_Request;
69
70
using ReplyType = Calculator_Reply;
70
71
71
72
public:
72
73
73
74
CalculatorServerLogic (
74
- eprosima::fastdds::dds ::DomainParticipant& part,
75
+ fdds ::DomainParticipant& part,
75
76
const char * service_name,
76
- const eprosima::fastdds::dds ::ReplierQos& qos,
77
+ const fdds ::ReplierQos& qos,
77
78
size_t thread_pool_size,
78
79
std::shared_ptr<CalculatorServer_IServerImplementation> implementation)
79
- : CalculatorServer()
80
+ : CalculatorServerLogic(
81
+ part,
82
+ service_name,
83
+ qos,
84
+ std::make_shared<ThreadPool>(*this , thread_pool_size),
85
+ std::move (implementation))
86
+ {
87
+ }
88
+
89
+ CalculatorServerLogic (
90
+ fdds::DomainParticipant& part,
91
+ const char * service_name,
92
+ const fdds::ReplierQos& qos,
93
+ std::shared_ptr<frpc::RpcServerSchedulingStrategy> scheduler,
94
+ std::shared_ptr<CalculatorServer_IServerImplementation> implementation)
95
+ : frpc::RpcServer()
80
96
, participant_(part)
81
- , thread_pool_(* this , thread_pool_size )
97
+ , request_scheduler_(scheduler )
82
98
, implementation_(std::move(implementation))
83
99
{
84
100
// Register the service type support
@@ -106,8 +122,6 @@ class CalculatorServerLogic
106
122
107
123
~CalculatorServerLogic () override
108
124
{
109
- stop ();
110
-
111
125
if (nullptr != replier_)
112
126
{
113
127
participant_.delete_service_replier (service_->get_service_name (), replier_);
@@ -174,7 +188,21 @@ class CalculatorServerLogic
174
188
}
175
189
176
190
// Wait for all threads to finish
177
- thread_pool_.stop ();
191
+ request_scheduler_->server_stopped (shared_from_this ());
192
+ }
193
+
194
+ void execute_request (
195
+ const std::shared_ptr<frpc::RpcRequest>& request) override
196
+ {
197
+ auto ctx = std::dynamic_pointer_cast<RequestContext>(request);
198
+ if (ctx)
199
+ {
200
+ execute_request (ctx);
201
+ }
202
+ else
203
+ {
204
+ throw std::runtime_error (" Invalid request context type" );
205
+ }
178
206
}
179
207
180
208
private:
@@ -724,7 +752,7 @@ class CalculatorServerLogic
724
752
725
753
// } operation filter
726
754
727
- struct RequestContext : CalculatorServer_ClientContext
755
+ struct RequestContext : frpc::RpcRequest
728
756
{
729
757
RequestType request;
730
758
frpc::RequestInfo info;
@@ -999,6 +1027,7 @@ class CalculatorServerLogic
999
1027
};
1000
1028
1001
1029
struct ThreadPool
1030
+ : public frpc::RpcServerSchedulingStrategy
1002
1031
{
1003
1032
ThreadPool (
1004
1033
CalculatorServerLogic& server,
@@ -1015,7 +1044,7 @@ class CalculatorServerLogic
1015
1044
{
1016
1045
while (!finished_)
1017
1046
{
1018
- std::shared_ptr<RequestContext > req;
1047
+ std::shared_ptr<frpc::RpcRequest > req;
1019
1048
{
1020
1049
std::unique_lock<std::mutex> lock (mtx_);
1021
1050
cv_.wait (lock, [this ]()
@@ -1041,9 +1070,12 @@ class CalculatorServerLogic
1041
1070
}
1042
1071
}
1043
1072
1044
- void new_request (
1045
- const std::shared_ptr<RequestContext>& req)
1073
+ void schedule_request (
1074
+ const std::shared_ptr<frpc::RpcRequest>& req,
1075
+ const std::shared_ptr<frpc::RpcServer>& server) override
1046
1076
{
1077
+ static_cast <void >(server);
1078
+
1047
1079
std::lock_guard<std::mutex> lock (mtx_);
1048
1080
if (!finished_)
1049
1081
{
@@ -1052,8 +1084,11 @@ class CalculatorServerLogic
1052
1084
}
1053
1085
}
1054
1086
1055
- void stop ()
1087
+ void server_stopped (
1088
+ const std::shared_ptr<frpc::RpcServer>& server) override
1056
1089
{
1090
+ static_cast <void >(server);
1091
+
1057
1092
// Notify all threads in the pool to stop
1058
1093
{
1059
1094
std::lock_guard<std::mutex> lock (mtx_);
@@ -1075,7 +1110,7 @@ class CalculatorServerLogic
1075
1110
CalculatorServerLogic& server_;
1076
1111
std::mutex mtx_;
1077
1112
std::condition_variable cv_;
1078
- std::queue<std::shared_ptr<RequestContext >> requests_;
1113
+ std::queue<std::shared_ptr<frpc::RpcRequest >> requests_;
1079
1114
bool finished_{ false };
1080
1115
std::vector<std::thread> threads_;
1081
1116
};
@@ -1107,7 +1142,7 @@ class CalculatorServerLogic
1107
1142
processing_requests_[id] = ctx;
1108
1143
}
1109
1144
1110
- thread_pool_. new_request (ctx);
1145
+ request_scheduler_-> schedule_request (ctx, shared_from_this () );
1111
1146
}
1112
1147
1113
1148
void execute_request (
@@ -1312,22 +1347,73 @@ class CalculatorServerLogic
1312
1347
fdds::GuardCondition finish_condition_;
1313
1348
std::mutex mtx_;
1314
1349
std::map<frtps::SampleIdentity, std::shared_ptr<RequestContext>> processing_requests_;
1315
- ThreadPool thread_pool_ ;
1350
+ std::shared_ptr<frpc::RpcServerSchedulingStrategy> request_scheduler_ ;
1316
1351
std::shared_ptr<CalculatorServer_IServerImplementation> implementation_;
1317
1352
1318
1353
};
1319
1354
1355
+ struct CalculatorServerProxy
1356
+ : public frpc::RpcServer
1357
+ {
1358
+ CalculatorServerProxy (
1359
+ std::shared_ptr<frpc::RpcServer> impl)
1360
+ : impl_(std::move(impl))
1361
+ {
1362
+ }
1363
+
1364
+ ~CalculatorServerProxy () override
1365
+ {
1366
+ if (impl_)
1367
+ {
1368
+ impl_->stop ();
1369
+ }
1370
+ }
1371
+
1372
+ void run () override
1373
+ {
1374
+ impl_->run ();
1375
+ }
1376
+
1377
+ void stop () override
1378
+ {
1379
+ impl_->stop ();
1380
+ }
1381
+
1382
+ void execute_request (
1383
+ const std::shared_ptr<frpc::RpcRequest>& request) override
1384
+ {
1385
+ impl_->execute_request (request);
1386
+ }
1387
+
1388
+ private:
1389
+
1390
+ std::shared_ptr<frpc::RpcServer> impl_;
1391
+ };
1392
+
1320
1393
} // namespace detail
1321
1394
1322
- std::shared_ptr<CalculatorServer > create_CalculatorServer (
1395
+ std::shared_ptr<eprosima::fastdds::dds::rpc::RpcServer > create_CalculatorServer (
1323
1396
eprosima::fastdds::dds::DomainParticipant& part,
1324
1397
const char * service_name,
1325
1398
const eprosima::fastdds::dds::ReplierQos& qos,
1326
1399
size_t thread_pool_size,
1327
1400
std::shared_ptr<CalculatorServer_IServerImplementation> implementation)
1328
1401
{
1329
- return std::make_shared<detail::CalculatorServerLogic>(
1402
+ auto ptr = std::make_shared<detail::CalculatorServerLogic>(
1330
1403
part, service_name, qos, thread_pool_size, implementation);
1404
+ return std::make_shared<detail::CalculatorServerProxy>(ptr);
1405
+ }
1406
+
1407
+ std::shared_ptr<eprosima::fastdds::dds::rpc::RpcServer> create_CalculatorServer (
1408
+ eprosima::fastdds::dds::DomainParticipant& part,
1409
+ const char * service_name,
1410
+ const eprosima::fastdds::dds::ReplierQos& qos,
1411
+ std::shared_ptr<eprosima::fastdds::dds::rpc::RpcServerSchedulingStrategy> scheduler,
1412
+ std::shared_ptr<CalculatorServer_IServerImplementation> implementation)
1413
+ {
1414
+ auto ptr = std::make_shared<detail::CalculatorServerLogic>(
1415
+ part, service_name, qos, scheduler, implementation);
1416
+ return std::make_shared<detail::CalculatorServerProxy>(ptr);
1331
1417
}
1332
1418
1333
1419
// } interface Calculator
0 commit comments