Skip to content

Commit 3f70b94

Browse files
authored
Module service insulate (#343)
1 parent 5536ef8 commit 3f70b94

File tree

2 files changed

+173
-184
lines changed

2 files changed

+173
-184
lines changed

src/viam/sdk/module/service.cpp

Lines changed: 168 additions & 160 deletions
Original file line numberDiff line numberDiff line change
@@ -42,185 +42,193 @@
4242
namespace viam {
4343
namespace sdk {
4444

45-
Dependencies ModuleService::get_dependencies_(
46-
google::protobuf::RepeatedPtrField<std::string> const& proto,
47-
std::string const& resource_name) {
48-
Dependencies deps;
49-
for (const auto& dep : proto) {
50-
auto dep_name = Name::from_string(dep);
51-
const std::shared_ptr<Resource> dep_resource = get_parent_resource_(dep_name);
52-
if (!dep_resource) {
53-
std::ostringstream buffer;
54-
buffer << resource_name << ": Dependency "
55-
<< "`" << dep_name << "` was not found during (re)configuration";
56-
throw Exception(ErrorCondition::k_resource_not_found, buffer.str());
45+
struct ModuleService::ServiceImpl : viam::module::v1::ModuleService::Service {
46+
ServiceImpl(ModuleService& p) : parent(p) {}
47+
48+
ModuleService& parent;
49+
50+
// TODO(RSDK-6528) - to the extent possible, switch to using `server_helper`
51+
::grpc::Status AddResource(::grpc::ServerContext*,
52+
const ::viam::module::v1::AddResourceRequest* request,
53+
::viam::module::v1::AddResourceResponse*) override {
54+
const viam::app::v1::ComponentConfig& proto = request->config();
55+
const ResourceConfig cfg = v2::from_proto(proto);
56+
const std::lock_guard<std::mutex> lock(parent.lock_);
57+
58+
std::shared_ptr<Resource> res;
59+
const Dependencies deps = parent.get_dependencies_(request->dependencies(), cfg.name());
60+
const std::shared_ptr<const ModelRegistration> reg =
61+
Registry::lookup_model(cfg.api(), cfg.model());
62+
if (reg) {
63+
try {
64+
res = reg->construct_resource(deps, cfg);
65+
} catch (const std::exception& exc) {
66+
return grpc::Status(::grpc::INTERNAL, exc.what());
67+
}
68+
};
69+
try {
70+
parent.server_->add_resource(res);
71+
} catch (const std::exception& exc) {
72+
return grpc::Status(::grpc::INTERNAL, exc.what());
5773
}
58-
deps.emplace(dep_name, dep_resource);
59-
}
60-
return deps;
61-
}
6274

63-
std::shared_ptr<Resource> ModuleService::get_parent_resource_(const Name& name) {
64-
if (!parent_) {
65-
parent_ = RobotClient::at_local_socket(parent_addr_, {0, boost::none});
75+
return grpc::Status();
6676
}
6777

68-
return parent_->resource_by_name(name);
69-
}
78+
::grpc::Status ReconfigureResource(
79+
::grpc::ServerContext*,
80+
const ::viam::module::v1::ReconfigureResourceRequest* request,
81+
::viam::module::v1::ReconfigureResourceResponse*) override {
82+
const viam::app::v1::ComponentConfig& proto = request->config();
83+
ResourceConfig cfg = v2::from_proto(proto);
7084

71-
// TODO(RSDK-6528) - to the extent possible, switch to using `server_helper`
72-
::grpc::Status ModuleService::AddResource(::grpc::ServerContext*,
73-
const ::viam::module::v1::AddResourceRequest* request,
74-
::viam::module::v1::AddResourceResponse*) {
75-
const viam::app::v1::ComponentConfig& proto = request->config();
76-
const ResourceConfig cfg = v2::from_proto(proto);
77-
const std::lock_guard<std::mutex> lock(lock_);
85+
const Dependencies deps = parent.get_dependencies_(request->dependencies(), cfg.name());
7886

79-
std::shared_ptr<Resource> res;
80-
const Dependencies deps = get_dependencies_(request->dependencies(), cfg.name());
81-
const std::shared_ptr<const ModelRegistration> reg =
82-
Registry::lookup_model(cfg.api(), cfg.model());
83-
if (reg) {
87+
auto resource_server = parent.server_->lookup_resource_server(cfg.api());
88+
if (!resource_server) {
89+
return grpc::Status(grpc::UNKNOWN,
90+
"no rpc service for config: " + cfg.api().to_string());
91+
}
92+
auto manager = resource_server->resource_manager();
93+
94+
// see if our resource is reconfigurable. if it is, reconfigure
95+
const std::shared_ptr<Resource> res = manager->resource(cfg.resource_name().name());
96+
if (!res) {
97+
return grpc::Status(grpc::UNKNOWN,
98+
"unable to reconfigure resource " + cfg.resource_name().name() +
99+
" as it doesn't exist.");
100+
}
84101
try {
85-
res = reg->construct_resource(deps, cfg);
102+
Reconfigurable::reconfigure_if_reconfigurable(res, deps, cfg);
103+
return grpc::Status();
86104
} catch (const std::exception& exc) {
87105
return grpc::Status(::grpc::INTERNAL, exc.what());
88106
}
89-
};
90-
try {
91-
server_->add_resource(res);
92-
} catch (const std::exception& exc) {
93-
return grpc::Status(::grpc::INTERNAL, exc.what());
94-
}
95-
96-
return grpc::Status();
97-
};
98107

99-
::grpc::Status ModuleService::ReconfigureResource(
100-
::grpc::ServerContext*,
101-
const ::viam::module::v1::ReconfigureResourceRequest* request,
102-
::viam::module::v1::ReconfigureResourceResponse*) {
103-
const viam::app::v1::ComponentConfig& proto = request->config();
104-
ResourceConfig cfg = v2::from_proto(proto);
108+
// if the type isn't reconfigurable by default, replace it
109+
try {
110+
Stoppable::stop_if_stoppable(res);
111+
} catch (const std::exception& err) {
112+
BOOST_LOG_TRIVIAL(error) << "unable to stop resource: " << err.what();
113+
}
105114

106-
const Dependencies deps = get_dependencies_(request->dependencies(), cfg.name());
115+
const std::shared_ptr<const ModelRegistration> reg = Registry::lookup_model(cfg.name());
116+
if (reg) {
117+
try {
118+
const std::shared_ptr<Resource> res = reg->construct_resource(deps, cfg);
119+
manager->replace_one(cfg.resource_name(), res);
120+
} catch (const std::exception& exc) {
121+
return grpc::Status(::grpc::INTERNAL, exc.what());
122+
}
123+
}
107124

108-
auto resource_server = server_->lookup_resource_server(cfg.api());
109-
if (!resource_server) {
110-
return grpc::Status(grpc::UNKNOWN, "no rpc service for config: " + cfg.api().to_string());
111-
}
112-
auto manager = resource_server->resource_manager();
113-
114-
// see if our resource is reconfigurable. if it is, reconfigure
115-
const std::shared_ptr<Resource> res = manager->resource(cfg.resource_name().name());
116-
if (!res) {
117-
return grpc::Status(grpc::UNKNOWN,
118-
"unable to reconfigure resource " + cfg.resource_name().name() +
119-
" as it doesn't exist.");
120-
}
121-
try {
122-
Reconfigurable::reconfigure_if_reconfigurable(res, deps, cfg);
123125
return grpc::Status();
124-
} catch (const std::exception& exc) {
125-
return grpc::Status(::grpc::INTERNAL, exc.what());
126-
}
127-
128-
// if the type isn't reconfigurable by default, replace it
129-
try {
130-
Stoppable::stop_if_stoppable(res);
131-
} catch (const std::exception& err) {
132-
BOOST_LOG_TRIVIAL(error) << "unable to stop resource: " << err.what();
133126
}
134127

135-
const std::shared_ptr<const ModelRegistration> reg = Registry::lookup_model(cfg.name());
136-
if (reg) {
128+
::grpc::Status ValidateConfig(::grpc::ServerContext*,
129+
const ::viam::module::v1::ValidateConfigRequest* request,
130+
::viam::module::v1::ValidateConfigResponse* response) override {
131+
const viam::app::v1::ComponentConfig& proto = request->config();
132+
ResourceConfig cfg = v2::from_proto(proto);
133+
134+
const std::shared_ptr<const ModelRegistration> reg =
135+
Registry::lookup_model(cfg.api(), cfg.model());
136+
if (!reg) {
137+
return grpc::Status(grpc::UNKNOWN,
138+
"unable to validate resource " + cfg.resource_name().name() +
139+
" as it hasn't been registered.");
140+
}
137141
try {
138-
const std::shared_ptr<Resource> res = reg->construct_resource(deps, cfg);
139-
manager->replace_one(cfg.resource_name(), res);
140-
} catch (const std::exception& exc) {
141-
return grpc::Status(::grpc::INTERNAL, exc.what());
142+
const std::vector<std::string> implicit_deps = reg->validate(cfg);
143+
for (const auto& dep : implicit_deps) {
144+
response->add_dependencies(dep);
145+
}
146+
} catch (const std::exception& err) {
147+
return grpc::Status(grpc::UNKNOWN,
148+
"validation failure in resource " + cfg.name() + ": " + err.what());
142149
}
150+
return grpc::Status();
143151
}
144152

145-
return grpc::Status();
146-
};
153+
::grpc::Status RemoveResource(::grpc::ServerContext*,
154+
const ::viam::module::v1::RemoveResourceRequest* request,
155+
::viam::module::v1::RemoveResourceResponse*) override {
156+
auto name = Name::from_string(request->name());
157+
auto resource_server = parent.server_->lookup_resource_server(name.api());
158+
if (!resource_server) {
159+
return grpc::Status(grpc::UNKNOWN, "no grpc service for " + name.api().to_string());
160+
}
161+
const std::shared_ptr<ResourceManager> manager = resource_server->resource_manager();
162+
const std::shared_ptr<Resource> res = manager->resource(name.name());
163+
if (!res) {
164+
return grpc::Status(
165+
grpc::UNKNOWN,
166+
"unable to remove resource " + name.to_string() + " as it doesn't exist.");
167+
}
147168

148-
::grpc::Status ModuleService::ValidateConfig(
149-
::grpc::ServerContext*,
150-
const ::viam::module::v1::ValidateConfigRequest* request,
151-
::viam::module::v1::ValidateConfigResponse* response) {
152-
const viam::app::v1::ComponentConfig& proto = request->config();
153-
ResourceConfig cfg = v2::from_proto(proto);
154-
155-
const std::shared_ptr<const ModelRegistration> reg =
156-
Registry::lookup_model(cfg.api(), cfg.model());
157-
if (!reg) {
158-
return grpc::Status(grpc::UNKNOWN,
159-
"unable to validate resource " + cfg.resource_name().name() +
160-
" as it hasn't been registered.");
161-
}
162-
try {
163-
const std::vector<std::string> implicit_deps = reg->validate(cfg);
164-
for (const auto& dep : implicit_deps) {
165-
response->add_dependencies(dep);
169+
try {
170+
Stoppable::stop_if_stoppable(res);
171+
} catch (const std::exception& err) {
172+
BOOST_LOG_TRIVIAL(error) << "unable to stop resource: " << err.what();
166173
}
167-
} catch (const std::exception& err) {
168-
return grpc::Status(grpc::UNKNOWN,
169-
"validation failure in resource " + cfg.name() + ": " + err.what());
170-
}
171-
return grpc::Status();
172-
};
173174

174-
::grpc::Status ModuleService::RemoveResource(
175-
::grpc::ServerContext*,
176-
const ::viam::module::v1::RemoveResourceRequest* request,
177-
::viam::module::v1::RemoveResourceResponse*) {
178-
auto name = Name::from_string(request->name());
179-
auto resource_server = server_->lookup_resource_server(name.api());
180-
if (!resource_server) {
181-
return grpc::Status(grpc::UNKNOWN, "no grpc service for " + name.api().to_string());
175+
manager->remove(name);
176+
return grpc::Status();
182177
}
183-
const std::shared_ptr<ResourceManager> manager = resource_server->resource_manager();
184-
const std::shared_ptr<Resource> res = manager->resource(name.name());
185-
if (!res) {
186-
return grpc::Status(
187-
grpc::UNKNOWN,
188-
"unable to remove resource " + name.to_string() + " as it doesn't exist.");
178+
179+
::grpc::Status Ready(::grpc::ServerContext*,
180+
const ::viam::module::v1::ReadyRequest* request,
181+
::viam::module::v1::ReadyResponse* response) override {
182+
const std::lock_guard<std::mutex> lock(parent.lock_);
183+
const viam::module::v1::HandlerMap hm = parent.module_->handles().to_proto();
184+
*response->mutable_handlermap() = hm;
185+
parent.parent_addr_ = request->parent_address();
186+
response->set_ready(parent.module_->ready());
187+
return grpc::Status();
189188
}
189+
};
190190

191-
try {
192-
Stoppable::stop_if_stoppable(res);
193-
} catch (const std::exception& err) {
194-
BOOST_LOG_TRIVIAL(error) << "unable to stop resource: " << err.what();
191+
Dependencies ModuleService::get_dependencies_(
192+
google::protobuf::RepeatedPtrField<std::string> const& proto,
193+
std::string const& resource_name) {
194+
Dependencies deps;
195+
for (const auto& dep : proto) {
196+
auto dep_name = Name::from_string(dep);
197+
const std::shared_ptr<Resource> dep_resource = get_parent_resource_(dep_name);
198+
if (!dep_resource) {
199+
std::ostringstream buffer;
200+
buffer << resource_name << ": Dependency "
201+
<< "`" << dep_name << "` was not found during (re)configuration";
202+
throw Exception(ErrorCondition::k_resource_not_found, buffer.str());
203+
}
204+
deps.emplace(dep_name, dep_resource);
195205
}
206+
return deps;
207+
}
196208

197-
manager->remove(name);
198-
return grpc::Status();
199-
};
209+
std::shared_ptr<Resource> ModuleService::get_parent_resource_(const Name& name) {
210+
if (!parent_) {
211+
parent_ = RobotClient::at_local_socket(parent_addr_, {0, boost::none});
212+
}
200213

201-
::grpc::Status ModuleService::Ready(::grpc::ServerContext*,
202-
const ::viam::module::v1::ReadyRequest* request,
203-
::viam::module::v1::ReadyResponse* response) {
204-
const std::lock_guard<std::mutex> lock(lock_);
205-
const viam::module::v1::HandlerMap hm = this->module_->handles().to_proto();
206-
*response->mutable_handlermap() = hm;
207-
parent_addr_ = request->parent_address();
208-
response->set_ready(module_->ready());
209-
return grpc::Status();
210-
};
214+
return parent_->resource_by_name(name);
215+
}
211216

212217
ModuleService::ModuleService(std::string addr)
213-
: module_(std::make_unique<Module>(std::move(addr))), server_(std::make_unique<Server>()) {}
218+
: module_(std::make_unique<Module>(std::move(addr))), server_(std::make_unique<Server>()) {
219+
impl_ = std::make_unique<ServiceImpl>(*this);
220+
}
214221

215222
ModuleService::ModuleService(int argc,
216223
char** argv,
217-
const std::vector<std::shared_ptr<ModelRegistration>>& registrations) {
218-
if (argc < 2) {
219-
throw Exception(ErrorCondition::k_connection, "Need socket path as command line argument");
220-
}
221-
module_ = std::make_unique<Module>(argv[1]);
222-
server_ = std::make_unique<Server>();
223-
signal_manager_ = SignalManager();
224+
const std::vector<std::shared_ptr<ModelRegistration>>& registrations)
225+
: ModuleService([argc, argv] {
226+
if (argc < 2) {
227+
throw Exception(ErrorCondition::k_connection,
228+
"Need socket path as command line argument");
229+
}
230+
return argv[1];
231+
}()) {
224232
set_logger_severity_from_args(argc, argv);
225233

226234
for (auto&& mr : registrations) {
@@ -229,13 +237,27 @@ ModuleService::ModuleService(int argc,
229237
}
230238
}
231239

240+
ModuleService::~ModuleService() {
241+
// TODO(RSDK-5509): Run registered cleanup functions here.
242+
BOOST_LOG_TRIVIAL(info) << "Shutting down gracefully.";
243+
server_->shutdown();
244+
245+
if (parent_) {
246+
try {
247+
parent_->close();
248+
} catch (const std::exception& exc) {
249+
BOOST_LOG_TRIVIAL(error) << exc.what();
250+
}
251+
}
252+
}
253+
232254
void ModuleService::serve() {
233255
const mode_t old_mask = umask(0077);
234256
const int sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
235257
listen(sockfd, 10);
236258
umask(old_mask);
237259

238-
server_->register_service(this);
260+
server_->register_service(impl_.get());
239261
const std::string address = "unix://" + module_->addr();
240262
server_->add_listening_port(address);
241263

@@ -249,20 +271,6 @@ void ModuleService::serve() {
249271
signal_manager_.wait();
250272
}
251273

252-
ModuleService::~ModuleService() {
253-
// TODO(RSDK-5509): Run registered cleanup functions here.
254-
BOOST_LOG_TRIVIAL(info) << "Shutting down gracefully.";
255-
server_->shutdown();
256-
257-
if (parent_) {
258-
try {
259-
parent_->close();
260-
} catch (const std::exception& exc) {
261-
BOOST_LOG_TRIVIAL(error) << exc.what();
262-
}
263-
}
264-
}
265-
266274
void ModuleService::add_model_from_registry_inlock_(API api,
267275
Model model,
268276
const std::lock_guard<std::mutex>&) {

0 commit comments

Comments
 (0)