Skip to content

Commit ef1b8c1

Browse files
authored
Feature/presistent deltas (#21)
* track deltas and add request service * fix callback * add success field * minor tweaks * continue if deltas do not exist * update service name * address nathan's comments
1 parent d4afbdb commit ef1b8c1

File tree

2 files changed

+50
-1
lines changed

2 files changed

+50
-1
lines changed

hydra_ros/include/hydra_ros/frontend/ros_frontend_publisher.h

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,23 @@
3737
#include <kimera_pgmo_ros/conversion/mesh_delta.h>
3838
#include <pose_graph_tools_ros/conversions.h>
3939

40+
#include <map>
41+
#include <queue>
42+
43+
#include <kimera_pgmo_msgs/srv/mesh_delta_query.hpp>
44+
4045
#include "hydra_ros/utils/dsg_streaming_interface.h"
4146

4247
namespace hydra {
4348

4449
class RosFrontendPublisher : public GraphBuilder::Sink {
4550
public:
51+
using MeshDeltaSrv = kimera_pgmo_msgs::srv::MeshDeltaQuery;
52+
4653
struct Config {
4754
//! @brief Configuration for dsg publisher
4855
DsgSender::Config dsg_sender;
56+
size_t mesh_delta_queue_size = 100; // Store mesh delta to resend. 0 for infinite
4957
} const config;
5058

5159
explicit RosFrontendPublisher(ianvs::NodeHandle);
@@ -57,9 +65,16 @@ class RosFrontendPublisher : public GraphBuilder::Sink {
5765
std::string printInfo() const override { return "RosFrontendPublisher"; }
5866

5967
protected:
68+
void processMeshDeltaQuery(const MeshDeltaSrv::Request::SharedPtr req,
69+
MeshDeltaSrv::Response::SharedPtr resp);
70+
6071
std::unique_ptr<DsgSender> dsg_sender_;
72+
mutable std::map<uint16_t, kimera_pgmo::MeshDelta::Ptr> stored_delta_;
73+
6174
pose_graph_tools::PoseGraphPublisher mesh_graph_pub_;
6275
kimera_pgmo::PgmoMeshDeltaPublisher mesh_update_pub_;
76+
rclcpp::Service<MeshDeltaSrv>::SharedPtr mesh_delta_server_;
77+
rclcpp::TypeAdapter<kimera_pgmo::MeshDelta, kimera_pgmo_msgs::msg::MeshDelta>
78+
mesh_delta_converter_;
6379
};
64-
6580
} // namespace hydra

hydra_ros/src/frontend/ros_frontend_publisher.cpp

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ namespace hydra {
4444

4545
using kimera_pgmo::MeshDeltaTypeAdapter;
4646
using pose_graph_tools::PoseGraphTypeAdapter;
47+
using BaseInterface = rclcpp::node_interfaces::NodeBaseInterface;
48+
using rclcpp::CallbackGroupType;
4749

4850
namespace {
4951

@@ -60,10 +62,19 @@ void declare_config(RosFrontendPublisher::Config& config) {
6062
using namespace config;
6163
name("RosFrontendPublisher::Config");
6264
field(config.dsg_sender, "");
65+
field(config.mesh_delta_queue_size, "mesh_delta_queue_size");
6366
}
6467

6568
RosFrontendPublisher::RosFrontendPublisher(ianvs::NodeHandle nh)
6669
: config(config::checkValid(get_config())) {
70+
auto group = nh.as<BaseInterface>()->create_callback_group(
71+
CallbackGroupType::MutuallyExclusive);
72+
mesh_delta_server_ =
73+
nh.create_service<MeshDeltaSrv>("mesh_delta_query",
74+
&RosFrontendPublisher::processMeshDeltaQuery,
75+
this,
76+
rclcpp::ServicesQoS(),
77+
group);
6778
dsg_sender_ = std::make_unique<DsgSender>(config.dsg_sender, nh);
6879
mesh_graph_pub_ = nh.create_publisher<PoseGraphTypeAdapter>(
6980
"mesh_graph_incremental", rclcpp::QoS(100).transient_local());
@@ -79,9 +90,32 @@ void RosFrontendPublisher::call(uint64_t timestamp_ns,
7990
if (backend_input.mesh_update) {
8091
backend_input.mesh_update->timestamp_ns = timestamp_ns;
8192
mesh_update_pub_->publish(*backend_input.mesh_update);
93+
stored_delta_.insert(
94+
{backend_input.mesh_update->sequence_number, backend_input.mesh_update});
95+
if (config.mesh_delta_queue_size > 0 &&
96+
stored_delta_.size() > static_cast<size_t>(config.mesh_delta_queue_size)) {
97+
stored_delta_.erase(stored_delta_.begin());
98+
}
8299
}
83100

84101
dsg_sender_->sendGraph(graph, rclcpp::Time(timestamp_ns));
85102
}
86103

104+
void RosFrontendPublisher::processMeshDeltaQuery(
105+
const MeshDeltaSrv::Request::SharedPtr req,
106+
MeshDeltaSrv::Response::SharedPtr resp) {
107+
LOG(INFO) << "Received request for " << req->sequence_numbers.size()
108+
<< " mesh deltas...";
109+
for (const auto& seq : req->sequence_numbers) {
110+
auto& msg = resp->deltas.emplace_back();
111+
// Check TypeAdater documentation TODO(Yun)
112+
if (!stored_delta_.count(seq)) {
113+
LOG(ERROR) << "Mesh delta sequence " << seq << " not found";
114+
continue;
115+
}
116+
mesh_delta_converter_.convert_to_ros_message(*stored_delta_.at(seq), msg);
117+
}
118+
LOG(INFO) << "Responding with " << resp->deltas.size() << " deltas...";
119+
}
120+
87121
} // namespace hydra

0 commit comments

Comments
 (0)