From 2b65d6ba4b67873d09c006af5353f3c6c0fe54da Mon Sep 17 00:00:00 2001 From: Alexey Semenyuk Date: Fri, 4 Jul 2025 18:35:24 +0500 Subject: [PATCH 1/3] gh-136336: add tests for PySys_Audit() and PySys_AuditTuple() --- Lib/test/test_capi/test_sys.py | 141 +++++++++++++++++++++++++++++++-- Modules/_testlimitedcapi/sys.c | 45 +++++++++++ 2 files changed, 181 insertions(+), 5 deletions(-) diff --git a/Lib/test/test_capi/test_sys.py b/Lib/test/test_capi/test_sys.py index 3793ce2461effd..356901a23b489d 100644 --- a/Lib/test/test_capi/test_sys.py +++ b/Lib/test/test_capi/test_sys.py @@ -12,11 +12,6 @@ NULL = None class CAPITest(unittest.TestCase): - # TODO: Test the following functions: - # - # PySys_Audit() - # PySys_AuditTuple() - maxDiff = None @unittest.skipIf(_testlimitedcapi is None, 'need _testlimitedcapi module') @@ -211,6 +206,142 @@ def test_sys_writestderr(self): # Test PySys_WriteStderr() self._test_sys_writestream('PySys_WriteStderr', 'stderr') + @unittest.skipIf(_testlimitedcapi is None, 'need _testlimitedcapi module') + def test_sys_audit(self): + sys_audit = _testlimitedcapi.sys_audit + + audit_events = [] + def audit_hook(event, args): + audit_events.append((event, args)) + return None + + import sys + sys.addaudithook(audit_hook) + + try: + result = sys_audit("cpython.run_command", "") + self.assertEqual(result, 0) + self.assertEqual(len(audit_events), 1) + self.assertEqual(audit_events[-1][0], "cpython.run_command") + self.assertEqual(audit_events[-1][1], ()) + + result = sys_audit("open", "OOO", "test.txt", "r", 0) + self.assertEqual(result, 0) + self.assertEqual(len(audit_events), 2) + self.assertEqual(audit_events[-1][0], "open") + self.assertEqual(len(audit_events[-1][1]), 3) + self.assertEqual(audit_events[-1][1][0], "test.txt") + self.assertEqual(audit_events[-1][1][1], "r") + self.assertEqual(audit_events[-1][1][2], 0) + + test_dict = {"key": "value"} + test_list = [1, 2, 3] + result = sys_audit("test.objects", "OO", test_dict, test_list) + self.assertEqual(result, 0) + self.assertEqual(len(audit_events), 3) + self.assertEqual(audit_events[-1][0], "test.objects") + self.assertEqual(audit_events[-1][1][0], test_dict) + self.assertEqual(audit_events[-1][1][1], test_list) + + result = sys_audit("test.mixed_types", "OOO", "string", 42, 123456789) + self.assertEqual(result, 0) + self.assertEqual(len(audit_events), 4) + self.assertEqual(audit_events[-1][0], "test.mixed_types") + self.assertEqual(audit_events[-1][1][0], "string") + self.assertEqual(audit_events[-1][1][1], 42) + self.assertEqual(audit_events[-1][1][2], 123456789) + + finally: + sys.audit_hooks = [] + + result = sys_audit("cpython.run_file", "") + self.assertEqual(result, 0) + + result = sys_audit("os.chdir", "(O)", "/tmp") + self.assertEqual(result, 0) + + result = sys_audit("ctypes.dlopen", "O", "libc.so.6") + self.assertEqual(result, 0) + + self.assertRaises(TypeError, sys_audit, 123, "O", "arg") + self.assertRaises(TypeError, sys_audit, None, "O", "arg") + self.assertRaises(TypeError, sys_audit, ["not", "a", "string"], "O", "arg") + + self.assertRaises(TypeError, sys_audit, "test.event", 456, "arg") + self.assertRaises(TypeError, sys_audit, "test.event", None, "arg") + self.assertRaises(TypeError, sys_audit, "test.event", {"format": "string"}, "arg") + + @unittest.skipIf(_testlimitedcapi is None, 'need _testlimitedcapi module') + def test_sys_audittuple(self): + sys_audittuple = _testlimitedcapi.sys_audittuple + + # Test with audit hook to verify internal behavior + audit_events = [] + def audit_hook(event, args): + audit_events.append((event, args)) + return None + + import sys + sys.addaudithook(audit_hook) + + try: + result = sys_audittuple("cpython.run_command", ()) + self.assertEqual(result, 0) + self.assertEqual(len(audit_events), 1) + self.assertEqual(audit_events[-1][0], "cpython.run_command") + self.assertEqual(audit_events[-1][1], ()) + + result = sys_audittuple("os.chdir", ("/tmp",)) + self.assertEqual(result, 0) + self.assertEqual(len(audit_events), 2) + self.assertEqual(audit_events[-1][0], "os.chdir") + self.assertEqual(audit_events[-1][1], ("/tmp",)) + + result = sys_audittuple("open", ("test.txt", "r", 0)) + self.assertEqual(result, 0) + self.assertEqual(len(audit_events), 3) + self.assertEqual(audit_events[-1][0], "open") + self.assertEqual(audit_events[-1][1], ("test.txt", "r", 0)) + + test_dict = {"key": "value"} + test_list = [1, 2, 3] + result = sys_audittuple("test.objects", (test_dict, test_list)) + self.assertEqual(result, 0) + self.assertEqual(len(audit_events), 4) + self.assertEqual(audit_events[-1][0], "test.objects") + self.assertEqual(audit_events[-1][1][0], test_dict) + self.assertEqual(audit_events[-1][1][1], test_list) + + result = sys_audittuple("test.complex", ("text", 3.14, True, None)) + self.assertEqual(result, 0) + self.assertEqual(len(audit_events), 5) + self.assertEqual(audit_events[-1][0], "test.complex") + self.assertEqual(audit_events[-1][1][0], "text") + self.assertEqual(audit_events[-1][1][1], 3.14) + self.assertEqual(audit_events[-1][1][2], True) + self.assertEqual(audit_events[-1][1][3], None) + + finally: + sys.audit_hooks = [] + + result = sys_audittuple("cpython.run_file", ()) + self.assertEqual(result, 0) + + result = sys_audittuple("ctypes.dlopen", ("libc.so.6",)) + self.assertEqual(result, 0) + + result = sys_audittuple("sqlite3.connect", ("test.db",)) + self.assertEqual(result, 0) + + self.assertRaises(TypeError, sys_audittuple, 123, ("arg",)) + self.assertRaises(TypeError, sys_audittuple, None, ("arg",)) + self.assertRaises(TypeError, sys_audittuple, ["not", "a", "string"], ("arg",)) + + self.assertRaises(TypeError, sys_audittuple, "test.event", "not_a_tuple") + self.assertRaises(TypeError, sys_audittuple, "test.event", ["list", "not", "tuple"]) + self.assertRaises(TypeError, sys_audittuple, "test.event", {"dict": "not_tuple"}) + self.assertRaises(TypeError, sys_audittuple, "test.event", None) + if __name__ == "__main__": unittest.main() diff --git a/Modules/_testlimitedcapi/sys.c b/Modules/_testlimitedcapi/sys.c index cec7f8ab612019..47635645226af1 100644 --- a/Modules/_testlimitedcapi/sys.c +++ b/Modules/_testlimitedcapi/sys.c @@ -5,6 +5,7 @@ #endif #include "parts.h" #include "util.h" +#include "audit.h" static PyObject * @@ -106,6 +107,48 @@ sys_getxoptions(PyObject *Py_UNUSED(module), PyObject *Py_UNUSED(ignored)) return Py_XNewRef(result); } +static PyObject * +sys_audit(PyObject *Py_UNUSED(module), PyObject *args) +{ + const char *event; + const char *argFormat; + PyObject *arg1 = NULL, *arg2 = NULL, *arg3 = NULL; + + if (!PyArg_ParseTuple(args, "ss|OOO", &event, &argFormat, &arg1, &arg2, &arg3)) { + return NULL; + } + + int result; + if (arg1 == NULL) { + result = PySys_Audit(event, argFormat); + } else if (arg2 == NULL) { + result = PySys_Audit(event, argFormat, arg1); + } else if (arg3 == NULL) { + result = PySys_Audit(event, argFormat, arg1, arg2); + } else { + result = PySys_Audit(event, argFormat, arg1, arg2, arg3); + } + + RETURN_INT(result); +} + +static PyObject * +sys_audittuple(PyObject *Py_UNUSED(module), PyObject *args) +{ + const char *event; + PyObject *tuple_args; + + if (!PyArg_ParseTuple(args, "sO", &event, &tuple_args)) { + return NULL; + } + + if (!PyTuple_Check(tuple_args)) { + PyErr_SetString(PyExc_TypeError, "second argument must be a tuple"); + return NULL; + } + + RETURN_INT(PySys_AuditTuple(event, tuple_args)); +} static PyMethodDef test_methods[] = { {"sys_getattr", sys_getattr, METH_O}, @@ -115,6 +158,8 @@ static PyMethodDef test_methods[] = { {"sys_getobject", sys_getobject, METH_O}, {"sys_setobject", sys_setobject, METH_VARARGS}, {"sys_getxoptions", sys_getxoptions, METH_NOARGS}, + {"sys_audit", sys_audit, METH_VARARGS}, + {"sys_audittuple", sys_audittuple, METH_VARARGS}, {NULL}, }; From a0faa3f035f2af22985d20be164616e94ba3d450 Mon Sep 17 00:00:00 2001 From: Alexey Semenyuk Date: Wed, 9 Jul 2025 01:47:23 +0500 Subject: [PATCH 2/3] Fix based on comments --- Lib/test/test_capi/test_sys.py | 126 ++++++++------------------------- Modules/_testlimitedcapi/sys.c | 19 +++-- 2 files changed, 44 insertions(+), 101 deletions(-) diff --git a/Lib/test/test_capi/test_sys.py b/Lib/test/test_capi/test_sys.py index 356901a23b489d..a8bef9e071183c 100644 --- a/Lib/test/test_capi/test_sys.py +++ b/Lib/test/test_capi/test_sys.py @@ -208,140 +208,72 @@ def test_sys_writestderr(self): @unittest.skipIf(_testlimitedcapi is None, 'need _testlimitedcapi module') def test_sys_audit(self): + # Test PySys_Audit() sys_audit = _testlimitedcapi.sys_audit - audit_events = [] def audit_hook(event, args): audit_events.append((event, args)) return None - import sys sys.addaudithook(audit_hook) - try: - result = sys_audit("cpython.run_command", "") + result = sys_audit("test.event", "OO", 1, "a") self.assertEqual(result, 0) - self.assertEqual(len(audit_events), 1) - self.assertEqual(audit_events[-1][0], "cpython.run_command") - self.assertEqual(audit_events[-1][1], ()) + self.assertEqual(audit_events[-1][0], "test.event") + self.assertEqual(audit_events[-1][1], (1, "a")) - result = sys_audit("open", "OOO", "test.txt", "r", 0) + result = sys_audit("test.no_args", "") self.assertEqual(result, 0) - self.assertEqual(len(audit_events), 2) - self.assertEqual(audit_events[-1][0], "open") - self.assertEqual(len(audit_events[-1][1]), 3) - self.assertEqual(audit_events[-1][1][0], "test.txt") - self.assertEqual(audit_events[-1][1][1], "r") - self.assertEqual(audit_events[-1][1][2], 0) - - test_dict = {"key": "value"} - test_list = [1, 2, 3] - result = sys_audit("test.objects", "OO", test_dict, test_list) - self.assertEqual(result, 0) - self.assertEqual(len(audit_events), 3) - self.assertEqual(audit_events[-1][0], "test.objects") - self.assertEqual(audit_events[-1][1][0], test_dict) - self.assertEqual(audit_events[-1][1][1], test_list) + self.assertEqual(audit_events[-1][0], "test.no_args") + self.assertEqual(audit_events[-1][1], ()) + + with self.assertRaises(TypeError): + sys_audit(123, "O", 1) - result = sys_audit("test.mixed_types", "OOO", "string", 42, 123456789) + result = sys_audit("テスト.イベント", "O", 42) self.assertEqual(result, 0) - self.assertEqual(len(audit_events), 4) - self.assertEqual(audit_events[-1][0], "test.mixed_types") - self.assertEqual(audit_events[-1][1][0], "string") - self.assertEqual(audit_events[-1][1][1], 42) - self.assertEqual(audit_events[-1][1][2], 123456789) + self.assertEqual(audit_events[-1][0], "テスト.イベント") + self.assertEqual(audit_events[-1][1], (42,)) + with self.assertRaises(UnicodeDecodeError): + sys_audit(b"test.non_utf8\xff", "O", 1) finally: sys.audit_hooks = [] - result = sys_audit("cpython.run_file", "") - self.assertEqual(result, 0) - - result = sys_audit("os.chdir", "(O)", "/tmp") - self.assertEqual(result, 0) - - result = sys_audit("ctypes.dlopen", "O", "libc.so.6") - self.assertEqual(result, 0) - - self.assertRaises(TypeError, sys_audit, 123, "O", "arg") - self.assertRaises(TypeError, sys_audit, None, "O", "arg") - self.assertRaises(TypeError, sys_audit, ["not", "a", "string"], "O", "arg") - - self.assertRaises(TypeError, sys_audit, "test.event", 456, "arg") - self.assertRaises(TypeError, sys_audit, "test.event", None, "arg") - self.assertRaises(TypeError, sys_audit, "test.event", {"format": "string"}, "arg") - @unittest.skipIf(_testlimitedcapi is None, 'need _testlimitedcapi module') def test_sys_audittuple(self): + # Test PySys_AuditTuple() sys_audittuple = _testlimitedcapi.sys_audittuple - - # Test with audit hook to verify internal behavior audit_events = [] def audit_hook(event, args): audit_events.append((event, args)) return None - import sys sys.addaudithook(audit_hook) - try: - result = sys_audittuple("cpython.run_command", ()) - self.assertEqual(result, 0) - self.assertEqual(len(audit_events), 1) - self.assertEqual(audit_events[-1][0], "cpython.run_command") - self.assertEqual(audit_events[-1][1], ()) - - result = sys_audittuple("os.chdir", ("/tmp",)) + result = sys_audittuple("test.event", (1, "a")) self.assertEqual(result, 0) - self.assertEqual(len(audit_events), 2) - self.assertEqual(audit_events[-1][0], "os.chdir") - self.assertEqual(audit_events[-1][1], ("/tmp",)) + self.assertEqual(audit_events[-1][0], "test.event") + self.assertEqual(audit_events[-1][1], (1, "a")) - result = sys_audittuple("open", ("test.txt", "r", 0)) + result = sys_audittuple("test.null_tuple") self.assertEqual(result, 0) - self.assertEqual(len(audit_events), 3) - self.assertEqual(audit_events[-1][0], "open") - self.assertEqual(audit_events[-1][1], ("test.txt", "r", 0)) + self.assertEqual(audit_events[-1][0], "test.null_tuple") + self.assertEqual(audit_events[-1][1], ()) - test_dict = {"key": "value"} - test_list = [1, 2, 3] - result = sys_audittuple("test.objects", (test_dict, test_list)) - self.assertEqual(result, 0) - self.assertEqual(len(audit_events), 4) - self.assertEqual(audit_events[-1][0], "test.objects") - self.assertEqual(audit_events[-1][1][0], test_dict) - self.assertEqual(audit_events[-1][1][1], test_list) + with self.assertRaises(TypeError): + sys_audittuple("test.bad_tuple", [1, 2]) - result = sys_audittuple("test.complex", ("text", 3.14, True, None)) + result = sys_audittuple("テスト.イベント", (42,)) self.assertEqual(result, 0) - self.assertEqual(len(audit_events), 5) - self.assertEqual(audit_events[-1][0], "test.complex") - self.assertEqual(audit_events[-1][1][0], "text") - self.assertEqual(audit_events[-1][1][1], 3.14) - self.assertEqual(audit_events[-1][1][2], True) - self.assertEqual(audit_events[-1][1][3], None) + self.assertEqual(audit_events[-1][0], "テスト.イベント") + self.assertEqual(audit_events[-1][1], (42,)) + with self.assertRaises(UnicodeDecodeError): + sys_audittuple(b"test.non_utf8\xff", (1,)) finally: sys.audit_hooks = [] - result = sys_audittuple("cpython.run_file", ()) - self.assertEqual(result, 0) - - result = sys_audittuple("ctypes.dlopen", ("libc.so.6",)) - self.assertEqual(result, 0) - - result = sys_audittuple("sqlite3.connect", ("test.db",)) - self.assertEqual(result, 0) - - self.assertRaises(TypeError, sys_audittuple, 123, ("arg",)) - self.assertRaises(TypeError, sys_audittuple, None, ("arg",)) - self.assertRaises(TypeError, sys_audittuple, ["not", "a", "string"], ("arg",)) - - self.assertRaises(TypeError, sys_audittuple, "test.event", "not_a_tuple") - self.assertRaises(TypeError, sys_audittuple, "test.event", ["list", "not", "tuple"]) - self.assertRaises(TypeError, sys_audittuple, "test.event", {"dict": "not_tuple"}) - self.assertRaises(TypeError, sys_audittuple, "test.event", None) - if __name__ == "__main__": unittest.main() diff --git a/Modules/_testlimitedcapi/sys.c b/Modules/_testlimitedcapi/sys.c index 47635645226af1..863c164dadae9e 100644 --- a/Modules/_testlimitedcapi/sys.c +++ b/Modules/_testlimitedcapi/sys.c @@ -111,13 +111,19 @@ static PyObject * sys_audit(PyObject *Py_UNUSED(module), PyObject *args) { const char *event; + Py_ssize_t event_len; const char *argFormat; + Py_ssize_t format_len; PyObject *arg1 = NULL, *arg2 = NULL, *arg3 = NULL; - if (!PyArg_ParseTuple(args, "ss|OOO", &event, &argFormat, &arg1, &arg2, &arg3)) { + if (!PyArg_ParseTuple(args, "z#z#|OOO", &event, &event_len, &argFormat, &format_len, &arg1, &arg2, &arg3)) { return NULL; } + if (event == NULL) { + RETURN_INT(0); + } + int result; if (arg1 == NULL) { result = PySys_Audit(event, argFormat); @@ -136,13 +142,18 @@ static PyObject * sys_audittuple(PyObject *Py_UNUSED(module), PyObject *args) { const char *event; - PyObject *tuple_args; + Py_ssize_t event_len; + PyObject *tuple_args = NULL; - if (!PyArg_ParseTuple(args, "sO", &event, &tuple_args)) { + if (!PyArg_ParseTuple(args, "z#|O", &event, &event_len, &tuple_args)) { return NULL; } - if (!PyTuple_Check(tuple_args)) { + if (event == NULL) { + RETURN_INT(0); + } + + if (tuple_args != NULL && !PyTuple_Check(tuple_args)) { PyErr_SetString(PyExc_TypeError, "second argument must be a tuple"); return NULL; } From c89cf8fa36a7bbfa8daa5d2a83e08b22ab35f12b Mon Sep 17 00:00:00 2001 From: Alexey Semenyuk Date: Thu, 10 Jul 2025 00:00:12 +0500 Subject: [PATCH 3/3] Address comments --- Lib/test/test_capi/test_sys.py | 103 +++++++++++++++------------------ Modules/_testlimitedcapi/sys.c | 13 ----- 2 files changed, 47 insertions(+), 69 deletions(-) diff --git a/Lib/test/test_capi/test_sys.py b/Lib/test/test_capi/test_sys.py index a8bef9e071183c..6541d34c4f961f 100644 --- a/Lib/test/test_capi/test_sys.py +++ b/Lib/test/test_capi/test_sys.py @@ -210,69 +210,60 @@ def test_sys_writestderr(self): def test_sys_audit(self): # Test PySys_Audit() sys_audit = _testlimitedcapi.sys_audit - audit_events = [] - def audit_hook(event, args): - audit_events.append((event, args)) - return None - sys.addaudithook(audit_hook) - try: - result = sys_audit("test.event", "OO", 1, "a") - self.assertEqual(result, 0) - self.assertEqual(audit_events[-1][0], "test.event") - self.assertEqual(audit_events[-1][1], (1, "a")) - - result = sys_audit("test.no_args", "") - self.assertEqual(result, 0) - self.assertEqual(audit_events[-1][0], "test.no_args") - self.assertEqual(audit_events[-1][1], ()) - - with self.assertRaises(TypeError): - sys_audit(123, "O", 1) - - result = sys_audit("テスト.イベント", "O", 42) - self.assertEqual(result, 0) - self.assertEqual(audit_events[-1][0], "テスト.イベント") - self.assertEqual(audit_events[-1][1], (42,)) - - with self.assertRaises(UnicodeDecodeError): - sys_audit(b"test.non_utf8\xff", "O", 1) - finally: - sys.audit_hooks = [] + result = sys_audit("test.event", "OO", 1, "a") + self.assertEqual(result, 0) + + result = sys_audit("test.no_args", "") + self.assertEqual(result, 0) + + with self.assertRaises(TypeError): + sys_audit(123, "O", 1) + + result = sys_audit("テスト.イベント", "O", 42) + self.assertEqual(result, 0) + + result = sys_audit(None, "O", 1) + self.assertEqual(result, 0) + + result = sys_audit(b"test.non_utf8\xff", "O", 1) + self.assertEqual(result, 0) + + result = sys_audit("test.event", "(") + self.assertEqual(result, 0) + + result = sys_audit("test.event", "&") + self.assertEqual(result, 0) + + result = sys_audit("test.event", b"\xff") + self.assertEqual(result, 0) + + result = sys_audit("test.event", "{OO}", [], []) + self.assertEqual(result, 0) + @unittest.skipIf(_testlimitedcapi is None, 'need _testlimitedcapi module') def test_sys_audittuple(self): # Test PySys_AuditTuple() sys_audittuple = _testlimitedcapi.sys_audittuple - audit_events = [] - def audit_hook(event, args): - audit_events.append((event, args)) - return None - sys.addaudithook(audit_hook) - try: - result = sys_audittuple("test.event", (1, "a")) - self.assertEqual(result, 0) - self.assertEqual(audit_events[-1][0], "test.event") - self.assertEqual(audit_events[-1][1], (1, "a")) - - result = sys_audittuple("test.null_tuple") - self.assertEqual(result, 0) - self.assertEqual(audit_events[-1][0], "test.null_tuple") - self.assertEqual(audit_events[-1][1], ()) - - with self.assertRaises(TypeError): - sys_audittuple("test.bad_tuple", [1, 2]) - - result = sys_audittuple("テスト.イベント", (42,)) - self.assertEqual(result, 0) - self.assertEqual(audit_events[-1][0], "テスト.イベント") - self.assertEqual(audit_events[-1][1], (42,)) - - with self.assertRaises(UnicodeDecodeError): - sys_audittuple(b"test.non_utf8\xff", (1,)) - finally: - sys.audit_hooks = [] + result = sys_audittuple("test.event", (1, "a")) + self.assertEqual(result, 0) + + result = sys_audittuple("test.null_tuple") + self.assertEqual(result, 0) + + with self.assertRaises(TypeError): + sys_audittuple("test.bad_tuple", [1, 2]) + + result = sys_audittuple("テスト.イベント", (42,)) + self.assertEqual(result, 0) + + result = sys_audittuple(None, (123,)) + self.assertEqual(result, 0) + + result = sys_audittuple(b"test.non_utf8\xff", (1,)) + self.assertEqual(result, 0) if __name__ == "__main__": diff --git a/Modules/_testlimitedcapi/sys.c b/Modules/_testlimitedcapi/sys.c index 863c164dadae9e..5c2940c273bc0d 100644 --- a/Modules/_testlimitedcapi/sys.c +++ b/Modules/_testlimitedcapi/sys.c @@ -120,10 +120,6 @@ sys_audit(PyObject *Py_UNUSED(module), PyObject *args) return NULL; } - if (event == NULL) { - RETURN_INT(0); - } - int result; if (arg1 == NULL) { result = PySys_Audit(event, argFormat); @@ -149,15 +145,6 @@ sys_audittuple(PyObject *Py_UNUSED(module), PyObject *args) return NULL; } - if (event == NULL) { - RETURN_INT(0); - } - - if (tuple_args != NULL && !PyTuple_Check(tuple_args)) { - PyErr_SetString(PyExc_TypeError, "second argument must be a tuple"); - return NULL; - } - RETURN_INT(PySys_AuditTuple(event, tuple_args)); }