Skip to content

Commit 245784c

Browse files
committed
improve thread safety of journal.Reader class by simple protection with ref-counter; avoid segfault by closing journal across threads (closes gh-143)
1 parent 9031424 commit 245784c

File tree

1 file changed

+69
-39
lines changed

1 file changed

+69
-39
lines changed

systemd/_reader.c

Lines changed: 69 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@
6464
typedef struct {
6565
PyObject_HEAD
6666
sd_journal *j;
67+
unsigned closed;
68+
unsigned ref_count;
6769
} Reader;
6870
static PyTypeObject ReaderType;
6971

@@ -89,6 +91,31 @@ static PyStructSequence_Desc Monotonic_desc = {
8991
2,
9092
};
9193

94+
static PyObject *Reader_new(PyTypeObject *type, PyObject *args, PyObject *kwds) {
95+
Reader *self = (Reader *)PyType_GenericNew(type, args, kwds);
96+
self->j = NULL;
97+
self->closed = 0;
98+
self->ref_count = 1; /* initial reference */
99+
return (PyObject *)self;
100+
}
101+
102+
static inline void decr_ref_count(Reader *self) {
103+
if (!self->ref_count) return;
104+
if (!--self->ref_count && self->j) {
105+
sd_journal_close(self->j);
106+
self->j = NULL;
107+
}
108+
}
109+
110+
#define INCR_REF_BEGIN_ALLOW_THREADS(self) \
111+
self->ref_count++; \
112+
Py_BEGIN_ALLOW_THREADS
113+
#define DECR_REF_END_ALLOW_THREADS(self) \
114+
Py_END_ALLOW_THREADS \
115+
decr_ref_count(self);
116+
117+
118+
92119
/**
93120
* Convert a str or bytes object into a C-string path.
94121
* Returns NULL on error.
@@ -220,7 +247,10 @@ static int intlist_converter(PyObject* obj, int **_result, size_t *_len) {
220247
}
221248

222249
static void Reader_dealloc(Reader* self) {
223-
sd_journal_close(self->j);
250+
if (self->j) {
251+
sd_journal_close(self->j);
252+
self->j = NULL;
253+
}
224254
Py_TYPE(self)->tp_free((PyObject*)self);
225255
}
226256

@@ -271,9 +301,9 @@ static int Reader_init(Reader *self, PyObject *args, PyObject *keywds) {
271301
return -1;
272302

273303
#if HAVE_JOURNAL_OPEN_DIRECTORY_FD
274-
Py_BEGIN_ALLOW_THREADS
304+
INCR_REF_BEGIN_ALLOW_THREADS(self)
275305
r = sd_journal_open_directory_fd(&self->j, (int) fd, flags);
276-
Py_END_ALLOW_THREADS
306+
DECR_REF_END_ALLOW_THREADS(self)
277307
#else
278308
r = -ENOSYS;
279309
#endif
@@ -285,9 +315,9 @@ static int Reader_init(Reader *self, PyObject *args, PyObject *keywds) {
285315
if (!path)
286316
return -1;
287317

288-
Py_BEGIN_ALLOW_THREADS
318+
INCR_REF_BEGIN_ALLOW_THREADS(self)
289319
r = sd_journal_open_directory(&self->j, path, flags);
290-
Py_END_ALLOW_THREADS
320+
DECR_REF_END_ALLOW_THREADS(self)
291321
}
292322
} else if (_files) {
293323
_cleanup_Py_DECREF_ PyObject *item0 = NULL;
@@ -300,9 +330,9 @@ static int Reader_init(Reader *self, PyObject *args, PyObject *keywds) {
300330
return -1;
301331

302332
#if HAVE_JOURNAL_OPEN_FILES
303-
Py_BEGIN_ALLOW_THREADS
333+
INCR_REF_BEGIN_ALLOW_THREADS(self)
304334
r = sd_journal_open_files(&self->j, (const char**) files, flags);
305-
Py_END_ALLOW_THREADS
335+
DECR_REF_END_ALLOW_THREADS(self)
306336
#else
307337
r = -ENOSYS;
308338
#endif
@@ -314,9 +344,9 @@ static int Reader_init(Reader *self, PyObject *args, PyObject *keywds) {
314344
return -1;
315345

316346
#if HAVE_JOURNAL_OPEN_DIRECTORY_FD
317-
Py_BEGIN_ALLOW_THREADS
347+
INCR_REF_BEGIN_ALLOW_THREADS(self)
318348
r = sd_journal_open_files_fd(&self->j, fds, n_fds, flags);
319-
Py_END_ALLOW_THREADS
349+
DECR_REF_END_ALLOW_THREADS(self)
320350
#else
321351
r = -ENOSYS;
322352
#endif
@@ -329,16 +359,16 @@ static int Reader_init(Reader *self, PyObject *args, PyObject *keywds) {
329359
if (!namespace)
330360
return -1;
331361

332-
Py_BEGIN_ALLOW_THREADS
362+
INCR_REF_BEGIN_ALLOW_THREADS(self)
333363
r = sd_journal_open_namespace(&self->j, namespace, flags);
334-
Py_END_ALLOW_THREADS
364+
DECR_REF_END_ALLOW_THREADS(self)
335365
#else
336366
r = -ENOSYS;
337367
#endif
338368
} else {
339-
Py_BEGIN_ALLOW_THREADS
369+
INCR_REF_BEGIN_ALLOW_THREADS(self)
340370
r = sd_journal_open(&self->j, flags);
341-
Py_END_ALLOW_THREADS
371+
DECR_REF_END_ALLOW_THREADS(self)
342372
}
343373

344374
return set_error(r, NULL, "Opening the journal failed");
@@ -438,8 +468,8 @@ static PyObject* Reader_close(Reader *self, PyObject *args) {
438468
assert(self);
439469
assert(!args);
440470

441-
sd_journal_close(self->j);
442-
self->j = NULL;
471+
self->closed = 1;
472+
decr_ref_count(self); /* decrement initial reference (without incr) */
443473
Py_RETURN_NONE;
444474
}
445475

@@ -501,7 +531,7 @@ static PyObject* Reader_next(Reader *self, PyObject *args) {
501531
return NULL;
502532
}
503533

504-
Py_BEGIN_ALLOW_THREADS
534+
INCR_REF_BEGIN_ALLOW_THREADS(self)
505535
if (skip == 1LL)
506536
r = sd_journal_next(self->j);
507537
else if (skip == -1LL)
@@ -512,7 +542,7 @@ static PyObject* Reader_next(Reader *self, PyObject *args) {
512542
r = sd_journal_previous_skip(self->j, -skip);
513543
else
514544
assert(!"should be here");
515-
Py_END_ALLOW_THREADS
545+
DECR_REF_END_ALLOW_THREADS(self)
516546

517547
if (set_error(r, NULL, NULL) < 0)
518548
return NULL;
@@ -782,9 +812,9 @@ PyDoc_STRVAR(Reader_seek_head__doc__,
782812
"See :manpage:`sd_journal_seek_head(3)`.");
783813
static PyObject* Reader_seek_head(Reader *self, PyObject *args) {
784814
int r;
785-
Py_BEGIN_ALLOW_THREADS
815+
INCR_REF_BEGIN_ALLOW_THREADS(self)
786816
r = sd_journal_seek_head(self->j);
787-
Py_END_ALLOW_THREADS
817+
DECR_REF_END_ALLOW_THREADS(self)
788818

789819
if (set_error(r, NULL, NULL) < 0)
790820
return NULL;
@@ -800,9 +830,9 @@ PyDoc_STRVAR(Reader_seek_tail__doc__,
800830
static PyObject* Reader_seek_tail(Reader *self, PyObject *args) {
801831
int r;
802832

803-
Py_BEGIN_ALLOW_THREADS
833+
INCR_REF_BEGIN_ALLOW_THREADS(self)
804834
r = sd_journal_seek_tail(self->j);
805-
Py_END_ALLOW_THREADS
835+
DECR_REF_END_ALLOW_THREADS(self)
806836

807837
if (set_error(r, NULL, NULL) < 0)
808838
return NULL;
@@ -820,9 +850,9 @@ static PyObject* Reader_seek_realtime(Reader *self, PyObject *args) {
820850
if (!PyArg_ParseTuple(args, "K:seek_realtime", &timestamp))
821851
return NULL;
822852

823-
Py_BEGIN_ALLOW_THREADS
853+
INCR_REF_BEGIN_ALLOW_THREADS(self)
824854
r = sd_journal_seek_realtime_usec(self->j, timestamp);
825-
Py_END_ALLOW_THREADS
855+
DECR_REF_END_ALLOW_THREADS(self)
826856

827857
if (set_error(r, NULL, NULL) < 0)
828858
return NULL;
@@ -850,17 +880,17 @@ static PyObject* Reader_seek_monotonic(Reader *self, PyObject *args) {
850880
if (set_error(r, NULL, "Invalid bootid") < 0)
851881
return NULL;
852882
} else {
853-
Py_BEGIN_ALLOW_THREADS
883+
INCR_REF_BEGIN_ALLOW_THREADS(self)
854884
r = sd_id128_get_boot(&id);
855-
Py_END_ALLOW_THREADS
885+
DECR_REF_END_ALLOW_THREADS(self)
856886

857887
if (set_error(r, NULL, NULL) < 0)
858888
return NULL;
859889
}
860890

861-
Py_BEGIN_ALLOW_THREADS
891+
INCR_REF_BEGIN_ALLOW_THREADS(self)
862892
r = sd_journal_seek_monotonic_usec(self->j, id, timestamp);
863-
Py_END_ALLOW_THREADS
893+
DECR_REF_END_ALLOW_THREADS(self)
864894

865895
if (set_error(r, NULL, NULL) < 0)
866896
return NULL;
@@ -924,9 +954,9 @@ static PyObject* Reader_process(Reader *self, PyObject *args) {
924954

925955
assert(!args);
926956

927-
Py_BEGIN_ALLOW_THREADS
957+
INCR_REF_BEGIN_ALLOW_THREADS(self)
928958
r = sd_journal_process(self->j);
929-
Py_END_ALLOW_THREADS
959+
DECR_REF_END_ALLOW_THREADS(self)
930960
if (set_error(r, NULL, NULL) < 0)
931961
return NULL;
932962

@@ -950,9 +980,9 @@ static PyObject* Reader_wait(Reader *self, PyObject *args) {
950980
if (!PyArg_ParseTuple(args, "|L:wait", &timeout))
951981
return NULL;
952982

953-
Py_BEGIN_ALLOW_THREADS
983+
INCR_REF_BEGIN_ALLOW_THREADS(self)
954984
r = sd_journal_wait(self->j, timeout);
955-
Py_END_ALLOW_THREADS
985+
DECR_REF_END_ALLOW_THREADS(self)
956986

957987
if (set_error(r, NULL, NULL) < 0)
958988
return NULL;
@@ -970,9 +1000,9 @@ static PyObject* Reader_seek_cursor(Reader *self, PyObject *args) {
9701000
if (!PyArg_ParseTuple(args, "s:seek_cursor", &cursor))
9711001
return NULL;
9721002

973-
Py_BEGIN_ALLOW_THREADS
1003+
INCR_REF_BEGIN_ALLOW_THREADS(self)
9741004
r = sd_journal_seek_cursor(self->j, cursor);
975-
Py_END_ALLOW_THREADS
1005+
DECR_REF_END_ALLOW_THREADS(self)
9761006

9771007
if (set_error(r, NULL, "Invalid cursor") < 0)
9781008
return NULL;
@@ -1035,9 +1065,9 @@ static PyObject* Reader_query_unique(Reader *self, PyObject *args) {
10351065
if (!PyArg_ParseTuple(args, "s:query_unique", &query))
10361066
return NULL;
10371067

1038-
Py_BEGIN_ALLOW_THREADS
1068+
INCR_REF_BEGIN_ALLOW_THREADS(self)
10391069
r = sd_journal_query_unique(self->j, query);
1040-
Py_END_ALLOW_THREADS
1070+
DECR_REF_END_ALLOW_THREADS(self)
10411071

10421072
if (set_error(r, NULL, "Invalid field name") < 0)
10431073
return NULL;
@@ -1172,9 +1202,9 @@ static PyObject* Reader_get_catalog(Reader *self, PyObject *args) {
11721202
assert(self);
11731203
assert(!args);
11741204

1175-
Py_BEGIN_ALLOW_THREADS
1205+
INCR_REF_BEGIN_ALLOW_THREADS(self)
11761206
r = sd_journal_get_catalog(self->j, &msg);
1177-
Py_END_ALLOW_THREADS
1207+
DECR_REF_END_ALLOW_THREADS(self)
11781208

11791209
if (r == -ENOENT) {
11801210
const void* mid;
@@ -1264,7 +1294,7 @@ static int Reader_set_data_threshold(Reader *self, PyObject *value, void *closur
12641294
PyDoc_STRVAR(closed__doc__,
12651295
"True iff journal is closed");
12661296
static PyObject* Reader_get_closed(Reader *self, void *closure) {
1267-
return PyBool_FromLong(!self->j);
1297+
return PyBool_FromLong(self->closed || !self->j);
12681298
}
12691299

12701300
static PyGetSetDef Reader_getsetters[] = {
@@ -1330,7 +1360,7 @@ static PyTypeObject ReaderType = {
13301360
.tp_methods = Reader_methods,
13311361
.tp_getset = Reader_getsetters,
13321362
.tp_init = (initproc) Reader_init,
1333-
.tp_new = PyType_GenericNew,
1363+
.tp_new = Reader_new,
13341364
};
13351365

13361366
static PyMethodDef methods[] = {

0 commit comments

Comments
 (0)