Skip to content

Commit 30ddbb0

Browse files
authored
Merge pull request #130 from willstott101/bug/builtin_dcps_topic
Fixing segfault with reading from BuiltinTopicDcpsTopic
2 parents 9f35714 + 0e20923 commit 30ddbb0

3 files changed

Lines changed: 308 additions & 15 deletions

File tree

clayer/pysertype.c

Lines changed: 214 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1773,8 +1773,7 @@ ddspy_read_endpoint(PyObject *self, PyObject *args)
17731773
const dds_typeinfo_t *type_info = NULL;
17741774

17751775
/// Fetch the type id
1776-
if (rcontainer[i]->qos != NULL)
1777-
dds_builtintopic_get_endpoint_type_info(rcontainer[i], &type_info);
1776+
dds_builtintopic_get_endpoint_type_info(rcontainer[i], &type_info);
17781777

17791778
/// convert to cdr bytes
17801779
if (type_info != NULL) {
@@ -1824,6 +1823,95 @@ ddspy_read_endpoint(PyObject *self, PyObject *args)
18241823
return list;
18251824
}
18261825

1826+
static PyObject *
1827+
ddspy_read_topic(PyObject *self, PyObject *args)
1828+
{
1829+
uint32_t Nu32;
1830+
long long N;
1831+
dds_entity_t reader;
1832+
dds_return_t sts;
1833+
1834+
PyObject* endpoint_constructor;
1835+
PyObject* cqos_to_qos;
1836+
(void)self;
1837+
1838+
if (!PyArg_ParseTuple(args, "iLOO", &reader, &N, &endpoint_constructor, &cqos_to_qos))
1839+
return NULL;
1840+
if (!(Nu32 = check_number_of_samples(N)))
1841+
return NULL;
1842+
1843+
dds_sample_info_t* info = dds_alloc(sizeof(dds_sample_info_t) * Nu32);
1844+
struct dds_builtintopic_topic** rcontainer = dds_alloc(sizeof(struct dds_builtintopic_topic*) * Nu32);
1845+
1846+
for(uint32_t i = 0; i < Nu32; ++i) {
1847+
rcontainer[i] = NULL;
1848+
}
1849+
1850+
sts = dds_read(reader, (void**) rcontainer, info, Nu32, Nu32);
1851+
if (sts < 0) {
1852+
return PyLong_FromLong((long) sts);
1853+
}
1854+
1855+
PyObject* list = PyList_New(sts);
1856+
1857+
for(uint32_t i = 0; i < ((uint32_t)sts > Nu32 ? Nu32 : (uint32_t)sts); ++i) {
1858+
PyObject *type_id_bytes = NULL;
1859+
1860+
#ifdef DDS_HAS_TYPE_DISCOVERY
1861+
dds_ostream_t type_obj_stream;
1862+
const dds_typeinfo_t *type_info = NULL;
1863+
1864+
/// Fetch the type id
1865+
// dds_builtintopic_get_endpoint_type_info(rcontainer[i], &type_info);
1866+
if (rcontainer[i]->qos && rcontainer[i]->qos->present & QP_TYPE_INFORMATION)
1867+
type_info = rcontainer[i]->qos->type_information;
1868+
1869+
/// convert to cdr bytes
1870+
if (type_info != NULL) {
1871+
dds_ostream_init(&type_obj_stream, 0, CDR_ENC_VERSION_2);
1872+
const dds_typeid_t *type_id = ddsi_typeinfo_complete_typeid(type_info);
1873+
ddspy_typeid_ser(&type_obj_stream, type_id);
1874+
type_id_bytes = Py_BuildValue("y#", type_obj_stream.m_buffer, type_obj_stream.m_index);
1875+
dds_ostream_fini(&type_obj_stream);
1876+
}
1877+
else {
1878+
type_id_bytes = Py_None;
1879+
Py_INCREF(type_id_bytes);
1880+
}
1881+
#else
1882+
type_id_bytes = Py_None;
1883+
Py_INCREF(type_id_bytes);
1884+
#endif
1885+
1886+
PyObject* sampleinfo = get_sampleinfo_pyobject(&info[i]);
1887+
if (PyErr_Occurred()) { return NULL; }
1888+
PyObject* qos_p = PyLong_FromVoidPtr(rcontainer[i]->qos);
1889+
if (PyErr_Occurred()) { return NULL; }
1890+
PyObject* qos = PyObject_CallFunction(cqos_to_qos, "O", qos_p);
1891+
if (PyErr_Occurred()) { return NULL; }
1892+
PyObject* item = PyObject_CallFunction( \
1893+
endpoint_constructor, "y#ssOOO", \
1894+
rcontainer[i]->key.d, 16, \
1895+
rcontainer[i]->topic_name,
1896+
rcontainer[i]->type_name,
1897+
qos,
1898+
sampleinfo,
1899+
type_id_bytes
1900+
);
1901+
if (PyErr_Occurred()) { return NULL; }
1902+
PyList_SetItem(list, i, item); // steals ref
1903+
Py_DECREF(sampleinfo);
1904+
Py_DECREF(qos_p);
1905+
Py_DECREF(qos);
1906+
}
1907+
1908+
dds_return_loan(reader, (void**) rcontainer, sts);
1909+
dds_free(info);
1910+
dds_free(rcontainer);
1911+
1912+
return list;
1913+
}
1914+
18271915
static PyObject *
18281916
ddspy_take_endpoint(PyObject *self, PyObject *args)
18291917
{
@@ -1863,8 +1951,7 @@ ddspy_take_endpoint(PyObject *self, PyObject *args)
18631951
const dds_typeinfo_t *type_info = NULL;
18641952

18651953
/// Fetch the type id
1866-
if (rcontainer[i]->qos != NULL)
1867-
dds_builtintopic_get_endpoint_type_info(rcontainer[i], &type_info);
1954+
dds_builtintopic_get_endpoint_type_info(rcontainer[i], &type_info);
18681955

18691956
/// convert to cdr bytes
18701957
if (type_info != NULL) {
@@ -1939,6 +2026,121 @@ ddspy_take_endpoint(PyObject *self, PyObject *args)
19392026

19402027
return list;
19412028
}
2029+
2030+
static PyObject *
2031+
ddspy_take_topic(PyObject *self, PyObject *args)
2032+
{
2033+
uint32_t Nu32;
2034+
long long N;
2035+
dds_entity_t reader;
2036+
dds_return_t sts;
2037+
2038+
PyObject* endpoint_constructor;
2039+
PyObject* cqos_to_qos;
2040+
(void)self;
2041+
2042+
if (!PyArg_ParseTuple(args, "iLOO", &reader, &N, &endpoint_constructor, &cqos_to_qos))
2043+
return NULL;
2044+
if (!(Nu32 = check_number_of_samples(N)))
2045+
return NULL;
2046+
2047+
dds_sample_info_t* info = dds_alloc(sizeof(dds_sample_info_t) * Nu32);
2048+
struct dds_builtintopic_topic** rcontainer = dds_alloc(sizeof(struct dds_builtintopic_topic*) * Nu32);
2049+
2050+
for(uint32_t i = 0; i < Nu32; ++i) {
2051+
rcontainer[i] = NULL;
2052+
}
2053+
2054+
sts = dds_take(reader, (void**) rcontainer, info, Nu32, Nu32);
2055+
if (sts < 0) {
2056+
return PyLong_FromLong((long) sts);
2057+
}
2058+
2059+
PyObject* list = PyList_New(sts);
2060+
2061+
for(uint32_t i = 0; i < ((uint32_t)sts > Nu32 ? Nu32 : (uint32_t)sts); ++i) {
2062+
PyObject *type_id_bytes = NULL;
2063+
2064+
#ifdef DDS_HAS_TYPE_DISCOVERY
2065+
dds_ostream_t type_obj_stream;
2066+
const dds_typeinfo_t *type_info = NULL;
2067+
2068+
/// Fetch the type id
2069+
// dds_builtintopic_get_endpoint_type_info(rcontainer[i], &type_info);
2070+
if (rcontainer[i]->qos && rcontainer[i]->qos->present & QP_TYPE_INFORMATION)
2071+
type_info = rcontainer[i]->qos->type_information;
2072+
2073+
/// convert to cdr bytes
2074+
if (type_info != NULL) {
2075+
dds_ostream_init(&type_obj_stream, 0, CDR_ENC_VERSION_2);
2076+
const dds_typeid_t *type_id = ddsi_typeinfo_complete_typeid(type_info);
2077+
ddspy_typeid_ser(&type_obj_stream, type_id);
2078+
type_id_bytes = Py_BuildValue("y#", type_obj_stream.m_buffer, type_obj_stream.m_index);
2079+
dds_ostream_fini(&type_obj_stream);
2080+
}
2081+
else {
2082+
type_id_bytes = Py_None;
2083+
Py_INCREF(type_id_bytes);
2084+
}
2085+
#else
2086+
type_id_bytes = Py_None;
2087+
Py_INCREF(type_id_bytes);
2088+
#endif
2089+
2090+
PyObject* sampleinfo = get_sampleinfo_pyobject(&info[i]);
2091+
if (PyErr_Occurred()) {
2092+
PyErr_Clear();
2093+
PyErr_SetString(PyExc_Exception, "Sampleinfo errored.");
2094+
return NULL;
2095+
}
2096+
2097+
PyObject* qos_p, *qos;
2098+
2099+
if (rcontainer[i]->qos != NULL) {
2100+
qos_p = PyLong_FromVoidPtr(rcontainer[i]->qos);
2101+
if (PyErr_Occurred()) {
2102+
PyErr_Clear();
2103+
PyErr_SetString(PyExc_Exception, "VoidPtr errored.");
2104+
return NULL;
2105+
}
2106+
qos = PyObject_CallFunction(cqos_to_qos, "O", qos_p);
2107+
if (PyErr_Occurred()) {
2108+
PyErr_Clear();
2109+
PyErr_SetString(PyExc_Exception, "Callfunc cqos errored.");
2110+
return NULL;
2111+
}
2112+
} else {
2113+
Py_INCREF(Py_None);
2114+
Py_INCREF(Py_None);
2115+
qos_p = Py_None;
2116+
qos = Py_None;
2117+
}
2118+
PyObject* item = PyObject_CallFunction( \
2119+
endpoint_constructor, "y#s#s#OOO", \
2120+
rcontainer[i]->key.d, (Py_ssize_t) 16, \
2121+
rcontainer[i]->topic_name, rcontainer[i]->topic_name == NULL ? 0 : strlen(rcontainer[i]->topic_name),
2122+
rcontainer[i]->type_name, rcontainer[i]->type_name == NULL ? 0 : strlen(rcontainer[i]->type_name),
2123+
qos,
2124+
sampleinfo,
2125+
type_id_bytes
2126+
);
2127+
if (PyErr_Occurred()) {
2128+
PyErr_Clear();
2129+
PyErr_SetString(PyExc_Exception, "Callfunc endpoint constructor errored.");
2130+
return NULL;
2131+
}
2132+
PyList_SetItem(list, i, item); // steals ref
2133+
Py_DECREF(sampleinfo);
2134+
Py_DECREF(qos_p);
2135+
Py_DECREF(qos);
2136+
}
2137+
2138+
dds_return_loan(reader, (void**) rcontainer, sts);
2139+
dds_free(info);
2140+
dds_free(rcontainer);
2141+
2142+
return list;
2143+
}
19422144
/* end builtin topic */
19432145

19442146

@@ -2109,6 +2311,14 @@ PyMethodDef ddspy_funcs[] = {
21092311
(PyCFunction)ddspy_take_endpoint,
21102312
METH_VARARGS,
21112313
ddspy_docs},
2314+
{ "ddspy_read_topic",
2315+
(PyCFunction)ddspy_read_topic,
2316+
METH_VARARGS,
2317+
ddspy_docs},
2318+
{ "ddspy_take_topic",
2319+
(PyCFunction)ddspy_take_topic,
2320+
METH_VARARGS,
2321+
ddspy_docs},
21122322
#ifdef DDS_HAS_TYPE_DISCOVERY
21132323
{ "ddspy_get_typeobj",
21142324
(PyCFunction)ddspy_get_typeobj,

cyclonedds/builtin.py

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from .internal import dds_c_t
2222
from .qos import _CQos
2323

24-
from cyclonedds._clayer import ddspy_read_participant, ddspy_take_participant, ddspy_read_endpoint, ddspy_take_endpoint
24+
from cyclonedds._clayer import ddspy_read_participant, ddspy_take_participant, ddspy_read_endpoint, ddspy_take_endpoint, ddspy_read_topic, ddspy_take_topic
2525
from cyclonedds.idl._typesupport.DDS.XTypes import TypeIdentifier
2626

2727

@@ -57,11 +57,37 @@ class DcpsParticipant:
5757
qos: Qos
5858

5959

60+
@dataclass
61+
class DcpsTopic:
62+
"""
63+
Data sample as returned when you subscribe to the BuiltinTopicDcpsTopic topic.
64+
65+
Attributes
66+
----------
67+
key:
68+
Unique identifier for the topic, publication or subscription endpoint.
69+
topic_name:
70+
Name of the associated topic.
71+
type_name:
72+
Name of the type.
73+
qos:
74+
Qos policies associated with the endpoint.
75+
typeid:
76+
Complete XTypes TypeIdentifier of the type, can be None.
77+
"""
78+
79+
key: uuid.UUID
80+
topic_name: str
81+
type_name: str
82+
qos: Qos
83+
type_id: Optional[TypeIdentifier]
84+
85+
6086
@dataclass
6187
class DcpsEndpoint:
6288
"""
63-
Data sample as returned when you subscribe to the BuiltinTopicDcpsTopic,
64-
BuiltinTopicDcpsPublication or BuiltinTopicDcpsSubscription topic.
89+
Data sample as returned when you subscribe to the BuiltinTopicDcpsPublication or
90+
BuiltinTopicDcpsSubscription topic.
6591
6692
Attributes
6793
----------
@@ -128,7 +154,8 @@ def __init__(self,
128154
builtin_topic._ref,
129155
cqos,
130156
listener._ref if listener else None
131-
)
157+
),
158+
listener=listener
132159
)
133160
self._next_condition = ReadCondition(self, ViewState.Any | SampleState.NotRead | InstanceState.Any)
134161
if cqos:
@@ -163,6 +190,24 @@ def endpoint_constructor(keybytes, participant_keybytes, p_instance_handle, topi
163190
s.sample_info = sampleinfo
164191
return s
165192

193+
def topic_constructor(keybytes, topic_name, type_name, qosobject, sampleinfo, typeid_bytes):
194+
ident = None
195+
if typeid_bytes is not None:
196+
try:
197+
ident = TypeIdentifier.deserialize(typeid_bytes, has_header=False, use_version_2=True)
198+
except Exception:
199+
pass
200+
201+
s = DcpsTopic(
202+
uuid.UUID(bytes=keybytes),
203+
topic_name,
204+
type_name,
205+
qosobject,
206+
ident
207+
)
208+
s.sample_info = sampleinfo
209+
return s
210+
166211
def cqos_to_qos(pointer):
167212
p = ct.cast(pointer, dds_c_t.qos_p)
168213
return _CQos.cqos_to_qos(p)
@@ -171,6 +216,10 @@ def cqos_to_qos(pointer):
171216
self._readfn = ddspy_read_participant
172217
self._takefn = ddspy_take_participant
173218
self._constructor = participant_constructor
219+
elif self._topic == BuiltinTopicDcpsTopic:
220+
self._readfn = ddspy_read_topic
221+
self._takefn = ddspy_take_topic
222+
self._constructor = topic_constructor
174223
else:
175224
self._readfn = ddspy_read_endpoint
176225
self._takefn = ddspy_take_endpoint

0 commit comments

Comments
 (0)