Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions Doc/library/heapq.rst
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,42 @@ Heap elements can be tuples. This is useful for assigning comparison values
(1, 'write spec')


Other Applications
------------------

`Medians <https://en.wikipedia.org/wiki/Median>`_ are a measure of
central tendency for a set of numbers. In distributions skewed by
outliers, the median provides a more stable estimate than an average
(arithmetic mean). A running median is an `online algorithm
<https://en.wikipedia.org/wiki/Online_algorithm>`_ that updates
continuously as new data arrives.

A running median can be efficiently implemented by balancing two heaps,
a max-heap for values at or below the midpoint and a min-heap for values
above the midpoint. When the two heaps have the same size, the new
median is the average of the tops of the two heaps; otherwise, the
median is at the top of the larger heap::

def running_median(iterable):
"Yields the cumulative median of values seen so far."

lo = [] # max-heap
hi = [] # min-heap (same size as or one smaller than lo)

for x in iterable:
if len(lo) == len(hi):
heappush_max(lo, heappushpop(hi, x))
yield lo[0]
else:
heappush(hi, heappushpop_max(lo, x))
yield (lo[0] + hi[0]) / 2

For example::

>>> list(running_median([5.0, 9.0, 4.0, 12.0, 8.0, 9.0]))
[5.0, 7.0, 5.0, 7.0, 8.0, 8.5]


Priority Queue Implementation Notes
-----------------------------------

Expand Down
6 changes: 4 additions & 2 deletions Lib/asyncio/proactor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,8 @@ def _pipe_closed(self, fut):
class _ProactorDatagramTransport(_ProactorBasePipeTransport,
transports.DatagramTransport):
max_size = 256 * 1024
_header_size = 8

def __init__(self, loop, sock, protocol, address=None,
waiter=None, extra=None):
self._address = address
Expand Down Expand Up @@ -499,7 +501,7 @@ def sendto(self, data, addr=None):

# Ensure that what we buffer is immutable.
self._buffer.append((bytes(data), addr))
self._buffer_size += len(data) + 8 # include header bytes
self._buffer_size += len(data) + self._header_size

if self._write_fut is None:
# No current write operations are active, kick one off
Expand All @@ -526,7 +528,7 @@ def _loop_writing(self, fut=None):
return

data, addr = self._buffer.popleft()
self._buffer_size -= len(data)
self._buffer_size -= len(data) + self._header_size
if self._address is not None:
self._write_fut = self._loop._proactor.send(self._sock,
data)
Expand Down
7 changes: 4 additions & 3 deletions Lib/asyncio/selector_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1212,6 +1212,7 @@ def close(self):
class _SelectorDatagramTransport(_SelectorTransport, transports.DatagramTransport):

_buffer_factory = collections.deque
_header_size = 8

def __init__(self, loop, sock, protocol, address=None,
waiter=None, extra=None):
Expand Down Expand Up @@ -1285,21 +1286,21 @@ def sendto(self, data, addr=None):

# Ensure that what we buffer is immutable.
self._buffer.append((bytes(data), addr))
self._buffer_size += len(data) + 8 # include header bytes
self._buffer_size += len(data) + self._header_size
self._maybe_pause_protocol()

def _sendto_ready(self):
while self._buffer:
data, addr = self._buffer.popleft()
self._buffer_size -= len(data)
self._buffer_size -= len(data) + self._header_size
try:
if self._extra['peername']:
self._sock.send(data)
else:
self._sock.sendto(data, addr)
except (BlockingIOError, InterruptedError):
self._buffer.appendleft((data, addr)) # Try again later.
self._buffer_size += len(data)
self._buffer_size += len(data) + self._header_size
break
except OSError as exc:
self._protocol.error_received(exc)
Expand Down
5 changes: 3 additions & 2 deletions Lib/heapq.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,9 @@
From all times, sorting has always been a Great Art! :-)
"""

__all__ = ['heappush', 'heappop', 'heapify', 'heapreplace', 'merge',
'nlargest', 'nsmallest', 'heappushpop']
__all__ = ['heappush', 'heappop', 'heapify', 'heapreplace', 'heappushpop',
'heappush_max', 'heappop_max', 'heapify_max', 'heapreplace_max',
'heappushpop_max', 'nlargest', 'nsmallest', 'merge']

def heappush(heap, item):
"""Push item onto heap, maintaining the heap invariant."""
Expand Down
2 changes: 2 additions & 0 deletions Lib/test/test_asyncio/test_proactor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,8 @@ def test_sendto(self):
self.assertTrue(self.proactor.sendto.called)
self.proactor.sendto.assert_called_with(
self.sock, data, addr=('0.0.0.0', 1234))
self.assertFalse(transport._buffer)
self.assertEqual(0, transport._buffer_size)

def test_sendto_bytearray(self):
data = bytearray(b'data')
Expand Down
41 changes: 41 additions & 0 deletions Lib/test/test_asyncio/test_selector_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1497,6 +1497,47 @@ def test_sendto_closing(self):
transport.sendto(b'data', (1,))
self.assertEqual(transport._conn_lost, 2)

def test_sendto_sendto_ready(self):
data = b'data'

# First queue up the buffer by having the socket blocked
self.sock.sendto.side_effect = BlockingIOError
transport = self.datagram_transport()
transport.sendto(data, ('0.0.0.0', 12345))
self.loop.assert_writer(7, transport._sendto_ready)
self.assertEqual(1, len(transport._buffer))
self.assertEqual(transport._buffer_size, len(data) + transport._header_size)

# Now let the socket send the buffer
self.sock.sendto.side_effect = None
transport._sendto_ready()
self.assertTrue(self.sock.sendto.called)
self.assertEqual(
self.sock.sendto.call_args[0], (data, ('0.0.0.0', 12345)))
self.assertFalse(self.loop.writers)
self.assertFalse(transport._buffer)
self.assertEqual(transport._buffer_size, 0)

def test_sendto_sendto_ready_blocked(self):
data = b'data'

# First queue up the buffer by having the socket blocked
self.sock.sendto.side_effect = BlockingIOError
transport = self.datagram_transport()
transport.sendto(data, ('0.0.0.0', 12345))
self.loop.assert_writer(7, transport._sendto_ready)
self.assertEqual(1, len(transport._buffer))
self.assertEqual(transport._buffer_size, len(data) + transport._header_size)

# Now try to send the buffer, it will be added to buffer again if it fails
transport._sendto_ready()
self.assertTrue(self.sock.sendto.called)
self.assertEqual(
self.sock.sendto.call_args[0], (data, ('0.0.0.0', 12345)))
self.assertTrue(self.loop.writers)
self.assertEqual(1, len(transport._buffer))
self.assertEqual(transport._buffer_size, len(data) + transport._header_size)

def test_sendto_ready(self):
data = b'data'
self.sock.sendto.return_value = len(data)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fix :meth:`asyncio.DatagramTransport.sendto` to account for datagram header size when
data cannot be sent.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
:mod:`heapq`: Update :data:`!heapq.__all__` with ``*_max`` functions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Have Tools/wasm/wasi detect a WASI SDK install in /opt when it was directly
extracted from a release tarball.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Add a ``--logdir`` option to ``Tools/wasm/wasi`` for specifying where to
write log files.
57 changes: 42 additions & 15 deletions Tools/wasm/wasi/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
LOCAL_SETUP_MARKER = ("# Generated by Tools/wasm/wasi .\n"
"# Required to statically build extension modules.").encode("utf-8")

WASI_SDK_VERSION = 24

WASMTIME_VAR_NAME = "WASMTIME"
WASMTIME_HOST_RUNNER_VAR = f"{{{WASMTIME_VAR_NAME}}}"

Expand Down Expand Up @@ -91,18 +93,25 @@ def wrapper(context):
return decorator


def call(command, *, quiet, **kwargs):
def call(command, *, context=None, quiet=False, logdir=None, **kwargs):
"""Execute a command.

If 'quiet' is true, then redirect stdout and stderr to a temporary file.
"""
if context is not None:
quiet = context.quiet
logdir = context.logdir
elif quiet and logdir is None:
raise ValueError("When quiet is True, logdir must be specified")

print("❯", " ".join(map(str, command)))
if not quiet:
stdout = None
stderr = None
else:
stdout = tempfile.NamedTemporaryFile("w", encoding="utf-8",
delete=False,
dir=logdir,
prefix="cpython-wasi-",
suffix=".log")
stderr = subprocess.STDOUT
Expand Down Expand Up @@ -154,14 +163,14 @@ def configure_build_python(context, working_dir):
if context.args:
configure.extend(context.args)

call(configure, quiet=context.quiet)
call(configure, context=context)


@subdir(BUILD_DIR)
def make_build_python(context, working_dir):
"""Make/build the build Python."""
call(["make", "--jobs", str(cpu_count()), "all"],
quiet=context.quiet)
context=context)

binary = build_python_path()
cmd = [binary, "-c",
Expand All @@ -173,10 +182,22 @@ def make_build_python(context, working_dir):


def find_wasi_sdk():
"""Find the path to wasi-sdk."""
"""Find the path to the WASI SDK."""
if wasi_sdk_path := os.environ.get("WASI_SDK_PATH"):
return pathlib.Path(wasi_sdk_path)
elif (default_path := pathlib.Path("/opt/wasi-sdk")).exists():

opt_path = pathlib.Path("/opt")
# WASI SDK versions have a ``.0`` suffix, but it's a constant; the WASI SDK team
# has said they don't plan to ever do a point release and all of their Git tags
# lack the ``.0`` suffix.
# Starting with WASI SDK 23, the tarballs went from containing a directory named
# ``wasi-sdk-{WASI_SDK_VERSION}.0`` to e.g.
# ``wasi-sdk-{WASI_SDK_VERSION}.0-x86_64-linux``.
potential_sdks = [path for path in opt_path.glob(f"wasi-sdk-{WASI_SDK_VERSION}.0*")
if path.is_dir()]
if len(potential_sdks) == 1:
return potential_sdks[0]
elif (default_path := opt_path / "wasi-sdk").is_dir():
return default_path


Expand Down Expand Up @@ -261,7 +282,7 @@ def configure_wasi_python(context, working_dir):
configure.extend(context.args)
call(configure,
env=updated_env(env_additions | wasi_sdk_env(context)),
quiet=context.quiet)
context=context)

python_wasm = working_dir / "python.wasm"
exec_script = working_dir / "python.sh"
Expand All @@ -277,7 +298,7 @@ def make_wasi_python(context, working_dir):
"""Run `make` for the WASI/host build."""
call(["make", "--jobs", str(cpu_count()), "all"],
env=updated_env(),
quiet=context.quiet)
context=context)

exec_script = working_dir / "python.sh"
call([exec_script, "--version"], quiet=False)
Expand Down Expand Up @@ -306,6 +327,8 @@ def clean_contents(context):


def main():
default_host_triple = "wasm32-wasip1"
default_wasi_sdk = find_wasi_sdk()
default_host_runner = (f"{WASMTIME_HOST_RUNNER_VAR} run "
# Make sure the stack size will work for a pydebug
# build.
Expand All @@ -317,6 +340,7 @@ def main():
"--dir {HOST_DIR}::{GUEST_DIR} "
# Set PYTHONPATH to the sysconfig data.
"--env {ENV_VAR_NAME}={ENV_VAR_VALUE}")
default_logdir = pathlib.Path(tempfile.gettempdir())

parser = argparse.ArgumentParser()
subcommands = parser.add_subparsers(dest="subcommand")
Expand All @@ -339,6 +363,9 @@ def main():
subcommand.add_argument("--quiet", action="store_true", default=False,
dest="quiet",
help="Redirect output from subprocesses to a log file")
subcommand.add_argument("--logdir", type=pathlib.Path, default=default_logdir,
help="Directory to store log files; "
f"defaults to {default_logdir}")
for subcommand in configure_build, configure_host:
subcommand.add_argument("--clean", action="store_true", default=False,
dest="clean",
Expand All @@ -349,17 +376,17 @@ def main():
for subcommand in build, configure_host:
subcommand.add_argument("--wasi-sdk", type=pathlib.Path,
dest="wasi_sdk_path",
default=find_wasi_sdk(),
help="Path to wasi-sdk; defaults to "
"$WASI_SDK_PATH or /opt/wasi-sdk")
default=default_wasi_sdk,
help=f"Path to the WASI SDK; defaults to {default_wasi_sdk}")
subcommand.add_argument("--host-runner", action="store",
default=default_host_runner, dest="host_runner",
help="Command template for running the WASI host "
"(default designed for wasmtime 14 or newer: "
f"`{default_host_runner}`)")
help="Command template for running the WASI host; defaults to "
f"`{default_host_runner}`")
for subcommand in build, configure_host, make_host:
subcommand.add_argument("--host-triple", action="store", default="wasm32-wasip1",
help="The target triple for the WASI host build")
subcommand.add_argument("--host-triple", action="store",
default=default_host_triple,
help="The target triple for the WASI host build; "
f"defaults to {default_host_triple}")

context = parser.parse_args()
context.init_dir = pathlib.Path().absolute()
Expand Down
Loading