11#include < cstdint>
2- #include < cstdio>
32#include < iostream>
43#include < limits.h>
5- #include < sys/epoll.h>
6- #include < sys/inotify.h>
74
85#include < csignal>
96#include < filesystem>
107#include < unistd.h>
118
129#include " hopper/daemon/daemon.hpp"
1310#include " hopper/daemon/endpoint.hpp"
11+ #include " hopper/daemon/pipe.hpp"
1412#include " hopper/daemon/util.hpp"
1513
1614namespace hopper {
1715
1816HopperDaemon::HopperDaemon (std::filesystem::path path, int max_events,
1917 int timeout)
20- : m_path(path), m_max_events(max_events), m_timeout(timeout) {
18+ : m_max_events(max_events), m_timeout(timeout), m_path(path ) {
2119 if (!std::filesystem::exists (path)) {
2220 std::filesystem::create_directories (path);
2321 }
@@ -28,160 +26,59 @@ HopperDaemon::HopperDaemon(std::filesystem::path path, int max_events,
2826 if ((m_epoll_fd = epoll_create1 (0 )) < 0 )
2927 throw_errno (" epoll_create1" );
3028
31- if ((m_inotify_fd = inotify_init ()) < 0 )
32- throw_errno (" inotify_init" );
33-
34- // Set up an event for inotify
35- struct HopperEvent *inotify_ev = new HopperEvent;
36- inotify_ev->fd = m_inotify_fd;
37- inotify_ev->data .u64 = 0 ;
38-
39- // C++ lambda syntax is really weird
40- inotify_ev->callback = [this ](HopperEvent *ev) {
41- return this ->handle_inotify (ev);
42- };
43-
44- if (add_event (inotify_ev) != 0 )
45- throw_errno (" HopperDaemon::add_event" );
46-
47- if ((m_inotify_watch_fd =
48- inotify_add_watch (m_inotify_fd, path.c_str (),
49- IN_CREATE | IN_DELETE | IN_DELETE_SELF)) < 0 )
50- throw_errno (" inotify_add_watch" );
29+ setup_inotify ();
5130}
5231
5332HopperDaemon::~HopperDaemon () {
5433 for (const auto &[_, endpoint] : m_endpoints)
5534 delete endpoint;
56-
57- for (auto *event : m_events)
58- delete event;
59- }
60-
61- int HopperDaemon::create_endpoint (std::filesystem::path path) {
62- int endpoint_id = next_endpoint_id ();
63- auto *endpoint = new HopperEndpoint (endpoint_id, path);
64- m_endpoints[endpoint_id] = endpoint;
65-
66- std::cout << " CREATE " << endpoint->path () << std::endl;
67-
68- return endpoint->refresh (this );
6935}
7036
71- int HopperDaemon::delete_endpoint (int id) {
72- if (m_endpoints[id] == nullptr )
73- return 1 ;
37+ void HopperDaemon::try_add_pipe (std::pair<uint64_t , int > pipe, PipeType type) {
38+ // Pipe has bad ID or bad FD
39+ if (pipe.first == 0 || pipe.second == -1 )
40+ return ;
7441
75- std::cout << " DELETE " << m_endpoints[id]->path () << std::endl;
76-
77- delete m_endpoints[id];
78- m_endpoints.erase (id);
42+ struct epoll_event ev = {};
43+ ev.events = (type == PipeType::IN ? EPOLLIN | EPOLLHUP : EPOLLHUP);
44+ ev.data .u64 = pipe.first ;
7945
80- return 0 ;
46+ if (epoll_ctl (m_epoll_fd, EPOLL_CTL_ADD, pipe.second , &ev) != 0 )
47+ throw_errno (" epoll_ctl ADD" );
8148}
8249
83- int HopperDaemon::delete_endpoint (std::filesystem::path path) {
50+ void HopperDaemon::refresh_pipes () {
51+ // Try to open any inactive pipes again
8452 for (const auto &[_, endpoint] : m_endpoints) {
85- if (endpoint->path () == path) {
86- return delete_endpoint (endpoint->id ());
53+ for (const auto &[id, pipe] : endpoint->outputs ()) {
54+ if (pipe->status () == PipeStatus::ACTIVE || !pipe->open_pipe ())
55+ continue ;
56+ try_add_pipe (std::make_pair (id, pipe->fd ()), PipeType::OUT);
8757 }
8858 }
89- return 1 ;
90- }
91-
92- int HopperDaemon::handle_inotify (HopperEvent *ev) {
93- // I can't remember how to do this with new
94- struct inotify_event *iev = reinterpret_cast <struct inotify_event *>(
95- std::malloc (sizeof (struct inotify_event ) + NAME_MAX + 1 ));
96-
97- if (read (ev->fd , iev, sizeof (struct inotify_event ) + NAME_MAX + 1 ) < 0 ) {
98- throw_errno (" read" );
99- return -1 ;
100- }
101-
102- if (iev->mask & IN_DELETE_SELF) {
103- // The hopper got deleted, this is fatal
104- std::cerr << " (ENOENT) Hopper " << m_path
105- << " was deleted, exiting... :(" ;
106- _exit (1 );
107- }
108-
109- if (iev->mask & IN_CREATE) {
110- std::string path;
111- path.resize (PATH_MAX);
112- std::snprintf (path.data (), PATH_MAX, " %s/%s" , m_path.c_str (),
113- iev->name );
114-
115- std::free (iev);
116- return create_endpoint (path);
117- }
118-
119- if (iev->mask & IN_DELETE) {
120- std::string path;
121- path.resize (PATH_MAX);
122- std::snprintf (path.data (), PATH_MAX, " %s/%s" , m_path.c_str (),
123- iev->name );
124-
125- std::free (iev);
126- return delete_endpoint (path);
127- }
128-
129- std::free (iev);
130- return 0 ;
131- }
132-
133- int HopperDaemon::add_event (HopperEvent *event, int events) {
134- uint64_t event_id = next_event_id ();
135- event->id = event_id;
136-
137- struct epoll_event ev = {};
138- ev.events = events;
139- ev.data .u64 = event_id;
140-
141- if (epoll_ctl (m_epoll_fd, EPOLL_CTL_ADD, event->fd , &ev) != 0 )
142- return -1 ;
143-
144- m_events.push_back (event);
145-
146- return 0 ;
14759}
14860
149- int HopperDaemon::remove_event (uint64_t id) {
150- for (size_t i = 0 ; i < m_events.size (); i++) {
151- if (m_events[i]->id == id) {
152- if (epoll_ctl (m_epoll_fd, EPOLL_CTL_DEL, m_events[i]->fd ,
153- nullptr ) != 0 )
154- return -1 ;
61+ void HopperDaemon::process_events (struct epoll_event *events, int n_events) {
62+ for (int i = 0 ; i < n_events; i++) {
63+ struct epoll_event ev = events[i];
15564
156- m_events.erase (m_events.begin () + i);
157- break ;
65+ // inotify events use 0 as ID
66+ if (ev.data .u64 == 0 ) {
67+ handle_inotify ();
68+ continue ;
15869 }
159- }
16070
161- return 0 ;
162- }
71+ uint32_t endpoint_id = (ev.data .u64 >> 40 ) & 0xFFFFFFFFFF ;
72+ if (!m_endpoints.contains (endpoint_id))
73+ continue ;
16374
164- int HopperDaemon::remove_event (HopperEvent *event) {
165- for (size_t i = 0 ; i < m_events.size (); i++) {
166- if (m_events[i] == event) {
167- if (epoll_ctl (m_epoll_fd, EPOLL_CTL_DEL, m_events[i]->fd ,
168- nullptr ) != 0 )
169- return -1 ;
75+ HopperEndpoint *endpoint = m_endpoints[endpoint_id];
17076
171- m_events.erase (m_events.begin () + i);
172- break ;
173- }
77+ if (ev.events & EPOLLIN)
78+ endpoint->on_pipe_readable (ev.data .u64 );
79+ else if (ev.events & EPOLLOUT)
80+ endpoint->on_pipe_writable (ev.data .u64 );
17481 }
175-
176- return 0 ;
177- }
178-
179- HopperEvent *HopperDaemon::get_event (uint64_t id) {
180- for (size_t i = 0 ; i < m_events.size (); i++)
181- if (m_events[i]->id == id)
182- return m_events[i];
183-
184- return nullptr ;
18582}
18683
18784int HopperDaemon::run () {
@@ -195,35 +92,16 @@ int HopperDaemon::run() {
19592
19693 int n = epoll_wait (m_epoll_fd, events, m_max_events, m_timeout);
19794 if (n < 0 ) {
95+ if (errno == EINTR)
96+ continue ;
97+
19898 delete[] events;
19999 throw_errno (" epoll_wait" );
200100 return -1 ;
201101 }
202102
203- HopperEvent *ev;
204-
205- for (int i = 0 ; i < n; i++) {
206- if ((ev = get_event (i)) == nullptr )
207- continue ;
208-
209- if (ev->callback != nullptr ) {
210- int r = ev->callback (ev);
211- if (r != 0 )
212- std::cerr << " Failed to run callback for Ev(fd=" << ev->fd
213- << " ), code " << r << std::endl;
214- }
215- }
216-
217- delete[] events;
218-
219- for (const auto &[_, endpoint] : m_endpoints) {
220- // This is absolutely disgusting, but I haven't thought of a better
221- // way yet
222- int r = endpoint->refresh (this );
223- if (r != 0 )
224- std::cerr << " Failed to run refresh for Endpoint(path="
225- << endpoint->path () << " )" << std::endl;
226- }
103+ process_events (events, n);
104+ refresh_pipes ();
227105 }
228106
229107 return res;
0 commit comments