Skip to content

Commit 490e7f7

Browse files
committed
cleanup(userspace/libsinsp): avoid calling sinsp_observer methods inline during parsing.
Instead, push them onto a queue owned by the inspector to be later called, 1 by 1, as requested. This ensures that the whole libsinsp state has been processed, even by plugins, before sinsp_observer methods are called. Signed-off-by: Federico Di Pierro <nierro92@gmail.com>
1 parent d1881b4 commit 490e7f7

File tree

3 files changed

+105
-47
lines changed

3 files changed

+105
-47
lines changed

userspace/libsinsp/parsers.cpp

Lines changed: 88 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1276,9 +1276,14 @@ void sinsp_parser::parse_clone_exit_caller(sinsp_evt *evt, int64_t child_tid) {
12761276
return;
12771277
}
12781278

1279-
/* If there's a listener, invoke it */
1279+
//
1280+
// If there's a listener, add a callback to later invoke it.
1281+
//
12801282
if(m_inspector->get_observer()) {
1281-
m_inspector->get_observer()->on_clone(evt, new_child.get(), tid_collision);
1283+
m_inspector->m_post_process_cbs.emplace(
1284+
[&new_child, tid_collision](sinsp_observer *observer, sinsp_evt *evt) {
1285+
observer->on_clone(evt, new_child.get(), tid_collision);
1286+
});
12821287
}
12831288

12841289
/* If we had to erase a previous entry for this tid and rebalance the table,
@@ -1292,8 +1297,6 @@ void sinsp_parser::parse_clone_exit_caller(sinsp_evt *evt, int64_t child_tid) {
12921297
new_child->m_comm.c_str());
12931298
}
12941299
/*=============================== ADD THREAD TO THE TABLE ===========================*/
1295-
1296-
return;
12971300
}
12981301

12991302
void sinsp_parser::parse_clone_exit_child(sinsp_evt *evt) {
@@ -1761,17 +1764,19 @@ void sinsp_parser::parse_clone_exit_child(sinsp_evt *evt) {
17611764
evt->set_tinfo(new_child.get());
17621765

17631766
//
1764-
// If there's a listener, invoke it
1767+
// If there's a listener, add a callback to later invoke it.
17651768
//
17661769
if(m_inspector->get_observer()) {
1767-
m_inspector->get_observer()->on_clone(evt, new_child.get(), tid_collision);
1770+
m_inspector->m_post_process_cbs.emplace(
1771+
[&new_child, tid_collision](sinsp_observer *observer, sinsp_evt *evt) {
1772+
observer->on_clone(evt, new_child.get(), tid_collision);
1773+
});
17681774
}
17691775

17701776
/* If we had to erase a previous entry for this tid and rebalance the table,
17711777
* make sure we reinitialize the child_tinfo pointer for this event, as the thread
17721778
* generating it might have gone away.
17731779
*/
1774-
17751780
if(tid_collision != -1) {
17761781
reset(evt);
17771782
/* Right now we have collisions only on the clone() caller */
@@ -1781,7 +1786,6 @@ void sinsp_parser::parse_clone_exit_child(sinsp_evt *evt) {
17811786
}
17821787

17831788
/*=============================== CREATE NEW THREAD-INFO ===========================*/
1784-
return;
17851789
}
17861790

17871791
void sinsp_parser::parse_clone_exit(sinsp_evt *evt) {
@@ -2235,10 +2239,11 @@ void sinsp_parser::parse_execve_exit(sinsp_evt *evt) {
22352239
evt->get_tinfo()->compute_program_hash();
22362240

22372241
//
2238-
// If there's a listener, invoke it
2242+
// If there's a listener, add a callback to later invoke it.
22392243
//
22402244
if(m_inspector->get_observer()) {
2241-
m_inspector->get_observer()->on_execve(evt);
2245+
m_inspector->m_post_process_cbs.emplace(
2246+
[](sinsp_observer *observer, sinsp_evt *evt) { observer->on_execve(evt); });
22422247
}
22432248

22442249
/* If any of the threads in a thread group performs an
@@ -2787,10 +2792,11 @@ void sinsp_parser::parse_bind_exit(sinsp_evt *evt) {
27872792
evt->get_fd_info()->m_name = evt->get_param_as_str(1, &parstr, sinsp_evt::PF_SIMPLE);
27882793

27892794
//
2790-
// If there's a listener callback, invoke it
2795+
// If there's a listener, add a callback to later invoke it.
27912796
//
27922797
if(m_inspector->get_observer()) {
2793-
m_inspector->get_observer()->on_bind(evt);
2798+
m_inspector->m_post_process_cbs.emplace(
2799+
[](sinsp_observer *observer, sinsp_evt *evt) { observer->on_bind(evt); });
27942800
}
27952801
}
27962802

@@ -2982,7 +2988,6 @@ inline void sinsp_parser::fill_client_socket_info(sinsp_evt *evt,
29822988

29832989
void sinsp_parser::parse_connect_exit(sinsp_evt *evt) {
29842990
const sinsp_evt_param *parinfo;
2985-
uint8_t *packed_data;
29862991
int64_t retval;
29872992
int64_t fd;
29882993
bool force_overwrite_stale_data = false;
@@ -3048,15 +3053,18 @@ void sinsp_parser::parse_connect_exit(sinsp_evt *evt) {
30483053
return;
30493054
}
30503055

3051-
packed_data = (uint8_t *)parinfo->m_val;
3056+
uint8_t *packed_data = (uint8_t *)parinfo->m_val;
30523057

30533058
fill_client_socket_info(evt, packed_data, force_overwrite_stale_data);
30543059

30553060
//
3056-
// If there's a listener callback, invoke it
3061+
// If there's a listener, add a callback to later invoke it.
30573062
//
30583063
if(m_inspector->get_observer()) {
3059-
m_inspector->get_observer()->on_connect(evt, packed_data);
3064+
m_inspector->m_post_process_cbs.emplace(
3065+
[&packed_data](sinsp_observer *observer, sinsp_evt *evt) {
3066+
observer->on_connect(evt, packed_data);
3067+
});
30603068
}
30613069
}
30623070

@@ -3144,8 +3152,14 @@ void sinsp_parser::parse_accept_exit(sinsp_evt *evt) {
31443152
fdi->m_name = evt->get_param_as_str(1, &parstr, sinsp_evt::PF_SIMPLE);
31453153
fdi->m_flags = 0;
31463154

3155+
//
3156+
// If there's a listener, add a callback to later invoke it.
3157+
//
31473158
if(m_inspector->get_observer()) {
3148-
m_inspector->get_observer()->on_accept(evt, fd, packed_data, fdi.get());
3159+
m_inspector->m_post_process_cbs.emplace(
3160+
[fd, &packed_data](sinsp_observer *observer, sinsp_evt *evt) {
3161+
observer->on_accept(evt, fd, packed_data, evt->get_fd_info());
3162+
});
31493163
}
31503164

31513165
//
@@ -3205,8 +3219,14 @@ void sinsp_parser::erase_fd(erase_fd_params *params) {
32053219
m_inspector->get_fds_to_remove().push_back(params->m_fd);
32063220
}
32073221

3222+
//
3223+
// If there's a listener, add a callback to later invoke it.
3224+
//
32083225
if(m_inspector->get_observer()) {
3209-
m_inspector->get_observer()->on_erase_fd(params);
3226+
m_inspector->m_post_process_cbs.emplace(
3227+
[&params](sinsp_observer *observer, sinsp_evt *evt) {
3228+
observer->on_erase_fd(params);
3229+
});
32103230
}
32113231
}
32123232

@@ -3669,8 +3689,6 @@ void sinsp_parser::parse_rw_exit(sinsp_evt *evt) {
36693689
}
36703690

36713691
if(eflags & EF_READS_FROM_FD) {
3672-
const char *data;
3673-
uint32_t datalen;
36743692
int32_t tupleparam = -1;
36753693

36763694
if(etype == PPME_SOCKET_RECVFROM_X) {
@@ -3729,20 +3747,23 @@ void sinsp_parser::parse_rw_exit(sinsp_evt *evt) {
37293747
parinfo = evt->get_param(1);
37303748
}
37313749

3732-
datalen = parinfo->m_len;
3733-
data = parinfo->m_val;
3750+
uint32_t datalen = parinfo->m_len;
3751+
const char *data = parinfo->m_val;
37343752

37353753
//
3736-
// If there's an fd listener, call it now
3754+
// If there's a listener, add a callback to later invoke it.
37373755
//
37383756
if(m_inspector->get_observer()) {
3739-
m_inspector->get_observer()->on_read(evt,
3740-
tid,
3741-
evt->get_tinfo()->m_lastevent_fd,
3742-
evt->get_fd_info(),
3743-
data,
3744-
(uint32_t)retval,
3745-
datalen);
3757+
m_inspector->m_post_process_cbs.emplace(
3758+
[tid, &data, retval, datalen](sinsp_observer *observer, sinsp_evt *evt) {
3759+
observer->on_read(evt,
3760+
tid,
3761+
evt->get_tinfo()->m_lastevent_fd,
3762+
evt->get_fd_info(),
3763+
data,
3764+
(uint32_t)retval,
3765+
datalen);
3766+
});
37463767
}
37473768

37483769
//
@@ -3791,7 +3812,6 @@ void sinsp_parser::parse_rw_exit(sinsp_evt *evt) {
37913812
#endif
37923813

37933814
} else {
3794-
const char *data;
37953815
uint32_t datalen;
37963816
int32_t tupleparam = -1;
37973817

@@ -3849,19 +3869,22 @@ void sinsp_parser::parse_rw_exit(sinsp_evt *evt) {
38493869
//
38503870
parinfo = evt->get_param(1);
38513871
datalen = parinfo->m_len;
3852-
data = parinfo->m_val;
3872+
const char *data = parinfo->m_val;
38533873

38543874
//
3855-
// If there's an fd listener, call it now
3875+
// If there's a listener, add a callback to later invoke it.
38563876
//
38573877
if(m_inspector->get_observer()) {
3858-
m_inspector->get_observer()->on_write(evt,
3859-
tid,
3860-
evt->get_tinfo()->m_lastevent_fd,
3861-
evt->get_fd_info(),
3862-
data,
3863-
(uint32_t)retval,
3864-
datalen);
3878+
m_inspector->m_post_process_cbs.emplace(
3879+
[tid, &data, retval, datalen](sinsp_observer *observer, sinsp_evt *evt) {
3880+
observer->on_write(evt,
3881+
tid,
3882+
evt->get_tinfo()->m_lastevent_fd,
3883+
evt->get_fd_info(),
3884+
data,
3885+
(uint32_t)retval,
3886+
datalen);
3887+
});
38653888
}
38663889

38673890
// perform syslog decoding if applicable
@@ -3873,8 +3896,14 @@ void sinsp_parser::parse_rw_exit(sinsp_evt *evt) {
38733896
if(evt->get_fd_info()->m_type == SCAP_FD_IPV4_SOCK ||
38743897
evt->get_fd_info()->m_type == SCAP_FD_IPV6_SOCK) {
38753898
evt->get_fd_info()->set_socket_failed();
3899+
//
3900+
// If there's a listener, add a callback to later invoke it.
3901+
//
38763902
if(m_inspector->get_observer()) {
3877-
m_inspector->get_observer()->on_socket_status_changed(evt);
3903+
m_inspector->m_post_process_cbs.emplace(
3904+
[](sinsp_observer *observer, sinsp_evt *evt) {
3905+
observer->on_socket_status_changed(evt);
3906+
});
38783907
}
38793908
}
38803909
}
@@ -3897,7 +3926,6 @@ void sinsp_parser::parse_sendfile_exit(sinsp_evt *evt) {
38973926
//
38983927
if(retval >= 0) {
38993928
sinsp_evt *enter_evt = &m_tmp_evt;
3900-
int64_t fdin;
39013929

39023930
if(!retrieve_enter_event(enter_evt, evt)) {
39033931
return;
@@ -3906,13 +3934,16 @@ void sinsp_parser::parse_sendfile_exit(sinsp_evt *evt) {
39063934
//
39073935
// Extract the in FD
39083936
//
3909-
fdin = enter_evt->get_param(1)->as<int64_t>();
3937+
int64_t fdin = enter_evt->get_param(1)->as<int64_t>();
39103938

39113939
//
3912-
// If there's an fd listener, call it now
3940+
// If there's a listener, add a callback to later invoke it.
39133941
//
39143942
if(m_inspector->get_observer()) {
3915-
m_inspector->get_observer()->on_sendfile(evt, fdin, (uint32_t)retval);
3943+
m_inspector->m_post_process_cbs.emplace(
3944+
[fdin, retval](sinsp_observer *observer, sinsp_evt *evt) {
3945+
observer->on_sendfile(evt, fdin, (uint32_t)retval);
3946+
});
39163947
}
39173948
}
39183949
}
@@ -4068,8 +4099,13 @@ void sinsp_parser::parse_shutdown_exit(sinsp_evt *evt) {
40684099
return;
40694100
}
40704101

4102+
//
4103+
// If there's a listener, add a callback to later invoke it.
4104+
//
40714105
if(m_inspector->get_observer()) {
4072-
m_inspector->get_observer()->on_socket_shutdown(evt);
4106+
m_inspector->m_post_process_cbs.emplace([](sinsp_observer *observer, sinsp_evt *evt) {
4107+
observer->on_socket_shutdown(evt);
4108+
});
40734109
}
40744110
}
40754111
}
@@ -5048,8 +5084,13 @@ void sinsp_parser::parse_getsockopt_exit(sinsp_evt *evt) {
50485084
} else {
50495085
evt->get_fd_info()->set_socket_connected();
50505086
}
5087+
//
5088+
// If there's a listener, add a callback to later invoke it.
5089+
//
50515090
if(m_inspector->get_observer()) {
5052-
m_inspector->get_observer()->on_socket_status_changed(evt);
5091+
m_inspector->m_post_process_cbs.emplace([](sinsp_observer *observer, sinsp_evt *evt) {
5092+
observer->on_socket_status_changed(evt);
5093+
});
50535094
}
50545095
}
50555096
}

userspace/libsinsp/sinsp.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1316,6 +1316,20 @@ int32_t sinsp::next(sinsp_evt** puevt) {
13161316
// todo(jason): should we log parsing errors here?
13171317
pp.process_event(evt, m_event_sources);
13181318
}
1319+
1320+
// Once we processed all events, make sure to call all
1321+
// requested post-process callbacks.
1322+
// At this point, any plugin could have modified state tables,
1323+
// thus we can guarantee that any post-process callback
1324+
// will see the full post-event-processed state.
1325+
// NOTE: we don't use a RAII object because
1326+
// we cannot guarantee that no exception will be thrown by the callbacks.
1327+
if(m_observer != nullptr) {
1328+
for(; !m_post_process_cbs.empty(); m_post_process_cbs.pop()) {
1329+
auto cb = m_post_process_cbs.front();
1330+
cb(m_observer, evt);
1331+
}
1332+
}
13191333
}
13201334

13211335
// Finally set output evt;

userspace/libsinsp/sinsp.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ limitations under the License.
8282
#include <string>
8383
#include <unordered_set>
8484
#include <vector>
85+
#include <queue>
8586

8687
#define ONE_SECOND_IN_NS 1000000000LL
8788

@@ -1242,6 +1243,8 @@ class SINSP_PUBLIC sinsp : public capture_stats_source {
12421243

12431244
sinsp_observer* m_observer{nullptr};
12441245

1246+
std::queue<std::function<void(sinsp_observer* observer, sinsp_evt* evt)>> m_post_process_cbs{};
1247+
12451248
bool m_inited;
12461249
static std::atomic<int> instance_count;
12471250
};

0 commit comments

Comments
 (0)