Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Include/internal/pycore_range.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ typedef struct {
long start;
long step;
long len;
#ifdef Py_GIL_DISABLED
// Make sure multi-threaded use of a single iterator doesn't produce
// values past the end of the range (even though it is NOT guaranteed to
// uniquely produce all the values in the range!)
long stop;
#endif
} _PyRangeIterObject;

#ifdef __cplusplus
Expand Down
1 change: 1 addition & 0 deletions Lib/test/libregrtest/tsan.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
'test_threadsignals',
'test_weakref',
'test_free_threading.test_slots',
'test_free_threading.test_iteration',
]


Expand Down
123 changes: 123 additions & 0 deletions Lib/test/test_free_threading/test_iteration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import sys
import threading
import unittest
from test import support

# The race conditions these tests were written for only happen every now and
# then, even with the current numbers. To find rare race conditions, bumping
# these up will help, but it makes the test runtime highly variable under
# free-threading. Overhead is much higher under ThreadSanitizer, but it's
# also much better at detecting certain races, so we don't need as many
# items/threads.
if support.check_sanitizer(thread=True):
NUMITEMS = 1000
NUMTHREADS = 2
else:
NUMITEMS = 50000
NUMTHREADS = 3
NUMMUTATORS = 2

class ContendedTupleIterationTest(unittest.TestCase):
def make_testdata(self, n):
return tuple(range(n))

def assert_iterator_results(self, results, expected):
# Most iterators are not atomic (yet?) so they can skip or duplicate
# items, but they should not invent new items (like the range
# iterator has done in the past).
extra_items = set(results) - set(expected)
self.assertEqual(set(), extra_items)

def run_threads(self, func, *args, numthreads=NUMTHREADS):
threads = []
for _ in range(numthreads):
t = threading.Thread(target=func, args=args)
t.start()
threads.append(t)
return threads

def test_iteration(self):
"""Test iteration over a shared container"""
seq = self.make_testdata(NUMITEMS)
results = []
start = threading.Event()
def worker():
idx = 0
start.wait()
for item in seq:
idx += 1
results.append(idx)
threads = self.run_threads(worker)
start.set()
for t in threads:
t.join()
# Each thread has its own iterator, so results should be entirely predictable.
self.assertEqual(results, [NUMITEMS] * NUMTHREADS)

def test_shared_iterator(self):
"""Test iteration over a shared iterator"""
seq = self.make_testdata(NUMITEMS)
it = iter(seq)
results = []
start = threading.Event()
def worker():
items = []
start.wait()
# We want a tight loop, so put items in the shared list at the end.
for item in it:
items.append(item)
results.extend(items)
threads = self.run_threads(worker)
start.set()
for t in threads:
t.join()
self.assert_iterator_results(sorted(results), seq)

class ContendedListIterationTest(ContendedTupleIterationTest):
def make_testdata(self, n):
return list(range(n))

def test_iteration_while_mutating(self):
"""Test iteration over a shared mutating container."""
seq = self.make_testdata(NUMITEMS)
results = []
start = threading.Event()
endmutate = threading.Event()
def mutator():
orig = seq[:]
# Make changes big enough to cause resizing of the list, with
# items shifted around for good measure.
replacement = (orig * 3)[NUMITEMS//2:]
start.wait()
while not endmutate.is_set():
seq[:] = replacement
seq[:] = orig
def worker():
items = []
start.wait()
# We want a tight loop, so put items in the shared list at the end.
for item in seq:
items.append(item)
results.extend(items)
mutators = ()
try:
threads = self.run_threads(worker)
mutators = self.run_threads(mutator, numthreads=NUMMUTATORS)
start.set()
for t in threads:
t.join()
finally:
endmutate.set()
for m in mutators:
m.join()
self.assert_iterator_results(results, list(seq))


class ContendedRangeIterationTest(ContendedTupleIterationTest):
def make_testdata(self, n):
return range(n)


class ContendedLongRangeIterationTest(ContendedTupleIterationTest):
def make_testdata(self, n):
return range(0, sys.maxsize*n, sys.maxsize)
6 changes: 5 additions & 1 deletion Lib/test/test_sys.py
Original file line number Diff line number Diff line change
Expand Up @@ -1689,7 +1689,11 @@ def delx(self): del self.__x
# PyCapsule
check(_datetime.datetime_CAPI, size('6P'))
# rangeiterator
check(iter(range(1)), size('3l'))
if support.Py_GIL_DISABLED:
RANGE_ITER_SIZE = size('4l')
else:
RANGE_ITER_SIZE = size('3l')
check(iter(range(1)), RANGE_ITER_SIZE)
check(iter(range(2**65)), size('3P'))
# reverse
check(reversed(''), size('nP'))
Expand Down
4 changes: 2 additions & 2 deletions Objects/listobject.c
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ list_get_item_ref(PyListObject *op, Py_ssize_t i)
return NULL;
}
Py_ssize_t cap = list_capacity(ob_item);
assert(cap != -1 && cap >= size);
assert(cap != -1);
if (!valid_index(i, cap)) {
return NULL;
}
Expand Down Expand Up @@ -937,7 +937,7 @@ list_ass_slice_lock_held(PyListObject *a, Py_ssize_t ilow, Py_ssize_t ihigh, PyO
}
for (k = 0; k < n; k++, ilow++) {
PyObject *w = vitem[k];
item[ilow] = Py_XNewRef(w);
FT_ATOMIC_STORE_PTR_RELAXED(item[ilow], Py_XNewRef(w));
}
for (k = norig - 1; k >= 0; --k)
Py_XDECREF(recycle[k]);
Expand Down
94 changes: 66 additions & 28 deletions Objects/rangeobject.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "pycore_modsupport.h" // _PyArg_NoKwnames()
#include "pycore_range.h"
#include "pycore_tuple.h" // _PyTuple_ITEMS()
#include "pycore_pyatomic_ft_wrappers.h"


/* Support objects whose length is > PY_SSIZE_T_MAX.
Expand Down Expand Up @@ -816,10 +817,19 @@ PyTypeObject PyRange_Type = {
static PyObject *
rangeiter_next(_PyRangeIterObject *r)
{
if (r->len > 0) {
long result = r->start;
r->start = result + r->step;
r->len--;
long len = FT_ATOMIC_LOAD_LONG_RELAXED(r->len);
if (len > 0) {
long result = FT_ATOMIC_LOAD_LONG_RELAXED(r->start);
FT_ATOMIC_STORE_LONG_RELAXED(r->start, result + r->step);
FT_ATOMIC_STORE_LONG_RELAXED(r->len, len - 1);
#ifdef Py_GIL_DISABLED
// Concurrent calls can cause start to be updated by another thread
// after our len check, so double-check that we're not past the end.
if ((r->step > 0 && result >= r->stop) ||
(r->step < 0 && result <= r->stop)) {
return NULL;
}
#endif
return PyLong_FromLong(result);
}
return NULL;
Expand All @@ -828,7 +838,7 @@ rangeiter_next(_PyRangeIterObject *r)
static PyObject *
rangeiter_len(_PyRangeIterObject *r, PyObject *Py_UNUSED(ignored))
{
return PyLong_FromLong(r->len);
return PyLong_FromLong(FT_ATOMIC_LOAD_LONG_RELAXED(r->len));
}

PyDoc_STRVAR(length_hint_doc,
Expand All @@ -841,10 +851,14 @@ rangeiter_reduce(_PyRangeIterObject *r, PyObject *Py_UNUSED(ignored))
PyObject *range;

/* create a range object for pickling */
start = PyLong_FromLong(r->start);
start = PyLong_FromLong(FT_ATOMIC_LOAD_LONG_RELAXED(r->start));
if (start == NULL)
goto err;
#ifdef Py_GIL_DISABLED
stop = PyLong_FromLong(r->stop);
#else
stop = PyLong_FromLong(r->start + r->len * r->step);
#endif
if (stop == NULL)
goto err;
step = PyLong_FromLong(r->step);
Expand All @@ -870,13 +884,15 @@ rangeiter_setstate(_PyRangeIterObject *r, PyObject *state)
long index = PyLong_AsLong(state);
if (index == -1 && PyErr_Occurred())
return NULL;
long len = FT_ATOMIC_LOAD_LONG_RELAXED(r->len);
/* silently clip the index value */
if (index < 0)
index = 0;
else if (index > r->len)
index = r->len; /* exhausted iterator */
r->start += index * r->step;
r->len -= index;
else if (index > len)
index = len; /* exhausted iterator */
FT_ATOMIC_STORE_LONG_RELAXED(r->start,
FT_ATOMIC_LOAD_LONG_RELAXED(r->start) + index * r->step);
FT_ATOMIC_STORE_LONG_RELAXED(r->len, len - index);
Py_RETURN_NONE;
}

Expand Down Expand Up @@ -966,6 +982,9 @@ fast_range_iter(long start, long stop, long step, long len)
it->start = start;
it->step = step;
it->len = len;
#ifdef Py_GIL_DISABLED
it->stop = stop;
#endif
return (PyObject *)it;
}

Expand All @@ -979,75 +998,89 @@ typedef struct {
static PyObject *
longrangeiter_len(longrangeiterobject *r, PyObject *no_args)
{
Py_INCREF(r->len);
return r->len;
PyObject *len;
Py_BEGIN_CRITICAL_SECTION(r);
len = Py_NewRef(r->len);
Py_END_CRITICAL_SECTION();
return len;
}

static PyObject *
longrangeiter_reduce(longrangeiterobject *r, PyObject *Py_UNUSED(ignored))
{
PyObject *product, *stop=NULL;
PyObject *range;
PyObject *range, *result=NULL;

Py_BEGIN_CRITICAL_SECTION(r);
/* create a range object for pickling. Must calculate the "stop" value */
product = PyNumber_Multiply(r->len, r->step);
if (product == NULL)
return NULL;
goto fail;
stop = PyNumber_Add(r->start, product);
Py_DECREF(product);
if (stop == NULL)
return NULL;
goto fail;
range = (PyObject*)make_range_object(&PyRange_Type,
Py_NewRef(r->start), stop, Py_NewRef(r->step));
if (range == NULL) {
Py_DECREF(r->start);
Py_DECREF(stop);
Py_DECREF(r->step);
return NULL;
goto fail;
}

/* return the result */
return Py_BuildValue("N(N)O", _PyEval_GetBuiltin(&_Py_ID(iter)),
range, Py_None);
result = Py_BuildValue("N(N)O", _PyEval_GetBuiltin(&_Py_ID(iter)),
range, Py_None);
fail:
; // A statement must follow the label before Py_END_CRITICAL_SECTION.
Py_END_CRITICAL_SECTION();
return result;
}

static PyObject *
longrangeiter_setstate(longrangeiterobject *r, PyObject *state)
{
PyObject *zero = _PyLong_GetZero(); // borrowed reference
int cmp;
PyObject *result = NULL;

Py_BEGIN_CRITICAL_SECTION(r);
/* clip the value */
cmp = PyObject_RichCompareBool(state, zero, Py_LT);
if (cmp < 0)
return NULL;
goto fail;
if (cmp > 0) {
state = zero;
}
else {
cmp = PyObject_RichCompareBool(r->len, state, Py_LT);
if (cmp < 0)
return NULL;
goto fail;
if (cmp > 0)
state = r->len;
}
PyObject *product = PyNumber_Multiply(state, r->step);
if (product == NULL)
return NULL;
goto fail;
PyObject *new_start = PyNumber_Add(r->start, product);
Py_DECREF(product);
if (new_start == NULL)
return NULL;
goto fail;
PyObject *new_len = PyNumber_Subtract(r->len, state);
if (new_len == NULL) {
Py_DECREF(new_start);
return NULL;
goto fail;
}
PyObject *tmp = r->start;
r->start = new_start;
Py_SETREF(r->len, new_len);
Py_DECREF(tmp);
Py_RETURN_NONE;
result = Py_NewRef(Py_None);
fail:
; // A statement must follow the label before Py_END_CRITICAL_SECTION.
Py_END_CRITICAL_SECTION();
return result;
}

static PyMethodDef longrangeiter_methods[] = {
Expand All @@ -1072,21 +1105,26 @@ longrangeiter_dealloc(longrangeiterobject *r)
static PyObject *
longrangeiter_next(longrangeiterobject *r)
{
PyObject *result = NULL;
Py_BEGIN_CRITICAL_SECTION(r);
if (PyObject_RichCompareBool(r->len, _PyLong_GetZero(), Py_GT) != 1)
return NULL;
goto fail;

PyObject *new_start = PyNumber_Add(r->start, r->step);
if (new_start == NULL) {
return NULL;
goto fail;
}
PyObject *new_len = PyNumber_Subtract(r->len, _PyLong_GetOne());
if (new_len == NULL) {
Py_DECREF(new_start);
return NULL;
goto fail;
}
PyObject *result = r->start;
result = r->start;
r->start = new_start;
Py_SETREF(r->len, new_len);
fail:
; // A statement must follow the label before Py_END_CRITICAL_SECTION.
Py_END_CRITICAL_SECTION();
return result;
}

Expand Down
Loading
Loading