diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 4be1fb7..9e841dd 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -4,12 +4,12 @@ - Python - `pip` -- `componentize-py` 0.13.3 +- `componentize-py` 0.16.0 Once you have `pip` installed, you can install `componentize-py` using: ```bash -pip install componentize-py==0.13.3 +pip install componentize-py==0.16.0 ``` ### Generating the bindings @@ -18,7 +18,13 @@ The bindings are generated from [src/spin_sdk/wit/spin.wit](./src/spin_sdk/wit/spin.wit). ```bash -componentize-py -d src/spin_sdk/wit -w spin-all bindings bindings --world-module spin_sdk.wit +componentize-py \ + -d src/spin_sdk/wit \ + -w spin-all \ + --import-interface-name fermyon:spin/postgres@2.0.0=postgres \ + bindings \ + bindings \ + --world-module spin_sdk.wit rm -r src/spin_sdk/wit/imports src/spin_sdk/wit/exports mv bindings/spin_sdk/wit/* src/spin_sdk/wit/ rm -r bindings diff --git a/README.md b/README.md index feb1fc1..1081b46 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,7 @@ enter a virtual environment and then install the desired packages ```shell python -m venv .venv source .venv/bin/activate -pip install componentize-py==0.13.3 spin-sdk==3.2.1 mypy==1.8.0 +pip install componentize-py==0.16.0 spin-sdk==3.3.0 mypy==1.8.0 ``` ### Hello, World diff --git a/docs/v3/http/index.html b/docs/v3/http/index.html index 0f2df21..3db7af2 100644 --- a/docs/v3/http/index.html +++ b/docs/v3/http/index.html @@ -2,18 +2,21 @@ - - + + spin_sdk.http API documentation - - - - - - + + + + + + - - + +
@@ -23,217 +26,6 @@

Module spin_sdk.http

Module with helpers for wasi http

-
- -Expand source code - -
"""Module with helpers for wasi http"""
-
-import traceback
-
-from spin_sdk.wit import exports
-from spin_sdk.wit.types import Ok, Err
-from spin_sdk.wit.imports import types, outgoing_handler
-from spin_sdk.wit.imports.types import (
-    Method, Method_Get, Method_Head, Method_Post, Method_Put, Method_Delete, Method_Connect, Method_Options,
-    Method_Trace, Method_Patch, Method_Other, IncomingRequest, IncomingBody, ResponseOutparam, OutgoingResponse,
-    Fields, Scheme, Scheme_Http, Scheme_Https, Scheme_Other, OutgoingRequest, OutgoingBody
-)
-from spin_sdk.wit.imports.streams import StreamError_Closed
-from dataclasses import dataclass
-from collections.abc import Mapping
-from typing import Optional
-from urllib import parse
-
-@dataclass
-class Request:
-    """An HTTP request"""
-    method: str
-    uri: str
-    headers: Mapping[str, str]
-    body: Optional[bytes]
-
-@dataclass
-class Response:
-    """An HTTP response"""
-    status: int
-    headers: Mapping[str, str]
-    body: Optional[bytes]
-
-class IncomingHandler(exports.IncomingHandler):
-    """Simplified handler for incoming HTTP requests using blocking, buffered I/O."""
-    
-    def handle_request(self, request: Request) -> Response:
-        """Handle an incoming HTTP request and return a response or raise an error"""
-        raise NotImplementedError
-    
-    def handle(self, request: IncomingRequest, response_out: ResponseOutparam):
-        method = request.method()
-
-        if isinstance(method, Method_Get):
-            method_str = "GET"
-        elif isinstance(method, Method_Head):
-            method_str = "HEAD"
-        elif isinstance(method, Method_Post):
-            method_str = "POST"
-        elif isinstance(method, Method_Put):
-            method_str = "PUT"
-        elif isinstance(method, Method_Delete):
-            method_str = "DELETE"
-        elif isinstance(method, Method_Connect):
-            method_str = "CONNECT"
-        elif isinstance(method, Method_Options):
-            method_str = "OPTIONS"
-        elif isinstance(method, Method_Trace):
-            method_str = "TRACE"
-        elif isinstance(method, Method_Patch):
-            method_str = "PATCH"
-        elif isinstance(method, Method_Other):
-            method_str = method.value
-        else:
-            raise AssertionError
-
-        request_body = request.consume()
-        request_stream = request_body.stream()
-        body = bytearray()
-        while True:
-            try:
-                body += request_stream.blocking_read(16 * 1024)
-            except Err as e:
-                if isinstance(e.value, StreamError_Closed):
-                    request_stream.__exit__()
-                    IncomingBody.finish(request_body)
-                    break
-                else:
-                    raise e
-
-        request_uri = request.path_with_query()
-        if request_uri is None:
-            uri = "/"
-        else:
-            uri = request_uri
-
-        try:
-            simple_response = self.handle_request(Request(
-                method_str,
-                uri,
-                dict(map(lambda pair: (pair[0], str(pair[1], "utf-8")), request.headers().entries())),
-                bytes(body)
-            ))
-        except:
-            traceback.print_exc()
-
-            response = OutgoingResponse(Fields())
-            response.set_status_code(500)
-            ResponseOutparam.set(response_out, Ok(response))
-            return
-
-        response = OutgoingResponse(Fields.from_list(list(map(
-            lambda pair: (pair[0], bytes(pair[1], "utf-8")),
-            simple_response.headers.items()
-        ))))
-        response_body = response.body()
-        response.set_status_code(simple_response.status)
-        ResponseOutparam.set(response_out, Ok(response))
-        response_stream = response_body.write()
-        if simple_response.body is not None:
-            MAX_BLOCKING_WRITE_SIZE = 4096
-            offset = 0
-            while offset < len(simple_response.body):
-                count = min(len(simple_response.body) - offset, MAX_BLOCKING_WRITE_SIZE)
-                response_stream.blocking_write_and_flush(simple_response.body[offset:offset+count])
-                offset += count
-        response_stream.__exit__()
-        OutgoingBody.finish(response_body, None)
-    
-def send(request: Request) -> Response:
-    """Send an HTTP request and return a response or raise an error"""
-
-    match request.method:
-        case "GET":
-            method: Method = Method_Get()
-        case "HEAD":
-            method = Method_Head()
-        case "POST":
-            method = Method_Post()
-        case "PUT":
-            method = Method_Put()
-        case "DELETE":
-            method = Method_Delete()
-        case "CONNECT":
-            method = Method_Connect()
-        case "OPTIONS":
-            method = Method_Options()
-        case "TRACE":
-            method = Method_Trace()
-        case "PATCH":
-            method = Method_Patch()
-        case _:
-            method = Method_Other(request.method)
-    
-    url_parsed = parse.urlparse(request.uri)
-
-    match url_parsed.scheme:
-        case "http":
-            scheme: Scheme = Scheme_Http()
-        case "https":
-            scheme = Scheme_Https()
-        case _:
-            scheme = Scheme_Other(url_parsed.scheme)
-
-    outgoing_request = OutgoingRequest(Fields.from_list(list(map(
-        lambda pair: (pair[0], bytes(pair[1], "utf-8")),
-        request.headers.items()
-    ))))
-    outgoing_request.set_method(method)
-    outgoing_request.set_scheme(scheme)
-    outgoing_request.set_authority(url_parsed.netloc)
-    outgoing_request.set_path_with_query(url_parsed.path)
-
-    if request.body is not None:
-        raise NotImplementedError("todo: handle outgoing request bodies")
-
-    future = outgoing_handler.handle(outgoing_request, None)
-    pollable = future.subscribe()
-
-    while True:
-        response = future.get()
-        if response is None:
-            pollable.block()
-        else:
-            pollable.__exit__()
-            future.__exit__()
-            
-            if isinstance(response, Ok):
-                if isinstance(response.value, Ok):
-                    response_value = response.value.value
-                    response_body = response_value.consume()
-                    response_stream = response_body.stream()
-                    body = bytearray()
-                    while True:
-                        try:
-                            body += response_stream.blocking_read(16 * 1024)
-                        except Err as e:
-                            if isinstance(e.value, StreamError_Closed):
-                                response_stream.__exit__()
-                                IncomingBody.finish(response_body)
-                                simple_response = Response(
-                                    response_value.status(),
-                                    dict(map(
-                                        lambda pair: (pair[0], str(pair[1], "utf-8")),
-                                        response_value.headers().entries()
-                                    )),
-                                    bytes(body)
-                                )
-                                response_value.__exit__()
-                                return simple_response
-                            else:
-                                raise e
-                else:
-                    raise response.value
-            else:
-                raise response
-

Sub-modules

@@ -254,98 +46,18 @@

Functions

Send an HTTP request and return a response or raise an error

-
- -Expand source code - -
def send(request: Request) -> Response:
-    """Send an HTTP request and return a response or raise an error"""
-
-    match request.method:
-        case "GET":
-            method: Method = Method_Get()
-        case "HEAD":
-            method = Method_Head()
-        case "POST":
-            method = Method_Post()
-        case "PUT":
-            method = Method_Put()
-        case "DELETE":
-            method = Method_Delete()
-        case "CONNECT":
-            method = Method_Connect()
-        case "OPTIONS":
-            method = Method_Options()
-        case "TRACE":
-            method = Method_Trace()
-        case "PATCH":
-            method = Method_Patch()
-        case _:
-            method = Method_Other(request.method)
-    
-    url_parsed = parse.urlparse(request.uri)
-
-    match url_parsed.scheme:
-        case "http":
-            scheme: Scheme = Scheme_Http()
-        case "https":
-            scheme = Scheme_Https()
-        case _:
-            scheme = Scheme_Other(url_parsed.scheme)
-
-    outgoing_request = OutgoingRequest(Fields.from_list(list(map(
-        lambda pair: (pair[0], bytes(pair[1], "utf-8")),
-        request.headers.items()
-    ))))
-    outgoing_request.set_method(method)
-    outgoing_request.set_scheme(scheme)
-    outgoing_request.set_authority(url_parsed.netloc)
-    outgoing_request.set_path_with_query(url_parsed.path)
-
-    if request.body is not None:
-        raise NotImplementedError("todo: handle outgoing request bodies")
-
-    future = outgoing_handler.handle(outgoing_request, None)
-    pollable = future.subscribe()
-
-    while True:
-        response = future.get()
-        if response is None:
-            pollable.block()
-        else:
-            pollable.__exit__()
-            future.__exit__()
-            
-            if isinstance(response, Ok):
-                if isinstance(response.value, Ok):
-                    response_value = response.value.value
-                    response_body = response_value.consume()
-                    response_stream = response_body.stream()
-                    body = bytearray()
-                    while True:
-                        try:
-                            body += response_stream.blocking_read(16 * 1024)
-                        except Err as e:
-                            if isinstance(e.value, StreamError_Closed):
-                                response_stream.__exit__()
-                                IncomingBody.finish(response_body)
-                                simple_response = Response(
-                                    response_value.status(),
-                                    dict(map(
-                                        lambda pair: (pair[0], str(pair[1], "utf-8")),
-                                        response_value.headers().entries()
-                                    )),
-                                    bytes(body)
-                                )
-                                response_value.__exit__()
-                                return simple_response
-                            else:
-                                raise e
-                else:
-                    raise response.value
-            else:
-                raise response
-
+
+
+async def send_and_close(sink: Sink,
data: bytes)
+
+
+
+
+
+async def send_async(request: Request) ‑> Response +
+
+
@@ -403,7 +115,7 @@

Classes

body += request_stream.blocking_read(16 * 1024) except Err as e: if isinstance(e.value, StreamError_Closed): - request_stream.__exit__() + request_stream.__exit__(None, None, None) IncomingBody.finish(request_body) break else: @@ -430,6 +142,10 @@

Classes

ResponseOutparam.set(response_out, Ok(response)) return + if simple_response.headers.get('content-length') is None: + content_length = len(simple_response.body) if simple_response.body is not None else 0 + simple_response.headers['content-length'] = str(content_length) + response = OutgoingResponse(Fields.from_list(list(map( lambda pair: (pair[0], bytes(pair[1], "utf-8")), simple_response.headers.items() @@ -445,7 +161,7 @@

Classes

count = min(len(simple_response.body) - offset, MAX_BLOCKING_WRITE_SIZE) response_stream.blocking_write_and_flush(simple_response.body[offset:offset+count]) offset += count - response_stream.__exit__() + response_stream.__exit__(None, None, None) OutgoingBody.finish(response_body, None)

Ancestors

@@ -457,18 +173,10 @@

Ancestors

Methods

-def handle_request(self, request: Request) ‑> Response +def handle_request(self,
request: Request) ‑> Response

Handle an incoming HTTP request and return a response or raise an error

-
- -Expand source code - -
def handle_request(self, request: Request) -> Response:
-    """Handle an incoming HTTP request and return a response or raise an error"""
-    raise NotImplementedError
-

Inherited members

@@ -482,7 +190,7 @@

Inherited members

class Request -(method: str, uri: str, headers: collections.abc.Mapping[str, str], body: Optional[bytes]) +(method: str, uri: str, headers: MutableMapping[str, str], body: bytes | None)

An HTTP request

@@ -495,16 +203,16 @@

Inherited members

"""An HTTP request""" method: str uri: str - headers: Mapping[str, str] + headers: MutableMapping[str, str] body: Optional[bytes]

Class variables

-
var body : Optional[bytes]
+
var body : bytes | None
-
var headers : collections.abc.Mapping[str, str]
+
var headers : MutableMapping[str, str]
@@ -520,7 +228,7 @@

Class variables

class Response -(status: int, headers: collections.abc.Mapping[str, str], body: Optional[bytes]) +(status: int, headers: MutableMapping[str, str], body: bytes | None)

An HTTP response

@@ -532,16 +240,16 @@

Class variables

class Response: """An HTTP response""" status: int - headers: Mapping[str, str] + headers: MutableMapping[str, str] body: Optional[bytes]

Class variables

-
var body : Optional[bytes]
+
var body : bytes | None
-
var headers : collections.abc.Mapping[str, str]
+
var headers : MutableMapping[str, str]
@@ -555,7 +263,6 @@

Class variables

- \ No newline at end of file + diff --git a/docs/v3/http/poll_loop.html b/docs/v3/http/poll_loop.html index a84559f..faceaa9 100644 --- a/docs/v3/http/poll_loop.html +++ b/docs/v3/http/poll_loop.html @@ -2,18 +2,21 @@ - - + + spin_sdk.http.poll_loop API documentation - - - - - - + + + + + + - - + +
@@ -28,381 +31,6 @@

Module spin_sdk.http.poll_loop

asynchronous functions and streams. We expect that little or none of this boilerplate will be needed once those features arrive in Preview 3.

-
- -Expand source code - -
"""Defines a custom `asyncio` event loop backed by `wasi:io/poll#poll`.
-
-This also includes helper classes and functions for working with `wasi:http`.
-
-As of WASI Preview 2, there is not yet a standard for first-class, composable
-asynchronous functions and streams.  We expect that little or none of this
-boilerplate will be needed once those features arrive in Preview 3.
-"""
-
-import asyncio
-import socket
-import subprocess
-
-from spin_sdk.wit.types import Ok, Err
-from spin_sdk.wit.imports import types, streams, poll, outgoing_handler
-from spin_sdk.wit.imports.types import IncomingBody, OutgoingBody, OutgoingRequest, IncomingResponse
-from spin_sdk.wit.imports.streams import StreamError_Closed, InputStream
-from spin_sdk.wit.imports.poll import Pollable
-from typing import Optional, cast
-
-# Maximum number of bytes to read at a time
-READ_SIZE: int = 16 * 1024
-
-async def send(request: OutgoingRequest) -> IncomingResponse:
-    """Send the specified request and wait asynchronously for the response."""
-    
-    future = outgoing_handler.handle(request, None)
-
-    while True:
-        response = future.get()
-        if response is None:
-            await register(cast(PollLoop, asyncio.get_event_loop()), future.subscribe())
-        else:
-            future.__exit__()
-            
-            if isinstance(response, Ok):
-                if isinstance(response.value, Ok):
-                    return response.value.value
-                else:
-                    raise response.value
-            else:
-                raise response
-
-class Stream:
-    """Reader abstraction over `wasi:http/types#incoming-body`."""
-    def __init__(self, body: IncomingBody):
-        self.body: Optional[IncomingBody] = body
-        self.stream: Optional[InputStream] = body.stream()
-
-    async def next(self) -> Optional[bytes]:
-        """Wait for the next chunk of data to arrive on the stream.
-
-        This will return `None` when the end of the stream has been reached.
-        """
-        while True:
-            try:
-                if self.stream is None:
-                    return None
-                else:
-                    buffer = self.stream.read(READ_SIZE)
-                    if len(buffer) == 0:
-                        await register(cast(PollLoop, asyncio.get_event_loop()), self.stream.subscribe())
-                    else:
-                        return buffer
-            except Err as e:
-                if isinstance(e.value, StreamError_Closed):
-                    if self.stream is not None:
-                        self.stream.__exit__()
-                        self.stream = None
-                    if self.body is not None:
-                        IncomingBody.finish(self.body)
-                        self.body = None
-                else:
-                    raise e
-
-class Sink:
-    """Writer abstraction over `wasi-http/types#outgoing-body`."""
-    def __init__(self, body: OutgoingBody):
-        self.body = body
-        self.stream = body.write()
-
-    async def send(self, chunk: bytes):
-        """Write the specified bytes to the sink.
-
-        This may need to yield according to the backpressure requirements of the sink.
-        """
-        offset = 0
-        flushing = False
-        while True:
-            count = self.stream.check_write()
-            if count == 0:
-                await register(cast(PollLoop, asyncio.get_event_loop()), self.stream.subscribe())
-            elif offset == len(chunk):
-                if flushing:
-                    return
-                else:
-                    self.stream.flush()
-                    flushing = True
-            else:
-                count = min(count, len(chunk) - offset)
-                self.stream.write(chunk[offset:offset+count])
-                offset += count
-
-    def close(self):
-        """Close the stream, indicating no further data will be written."""
-
-        self.stream.__exit__()
-        self.stream = None
-        OutgoingBody.finish(self.body, None)
-        self.body = None
-        
-class PollLoop(asyncio.AbstractEventLoop):
-    """Custom `asyncio` event loop backed by `wasi:io/poll#poll`."""
-    
-    def __init__(self):
-        self.wakers = []
-        self.running = False
-        self.handles = []
-        self.exception = None
-
-    def get_debug(self):
-        return False
-
-    def run_until_complete(self, future):
-        future = asyncio.ensure_future(future, loop=self)
-
-        self.running = True
-        asyncio.events._set_running_loop(self)
-        while self.running and not future.done():
-            handle = self.handles[0]
-            self.handles = self.handles[1:]
-            if not handle._cancelled:
-                handle._run()
-                
-            if self.wakers:
-                [pollables, wakers] = list(map(list, zip(*self.wakers)))
-                
-                new_wakers = []
-                ready = [False] * len(pollables)
-                for index in poll.poll(pollables):
-                    ready[index] = True
-                
-                for (ready, pollable), waker in zip(zip(ready, pollables), wakers):
-                    if ready:
-                        pollable.__exit__()
-                        waker.set_result(None)
-                    else:
-                        new_wakers.append((pollable, waker))
-
-                self.wakers = new_wakers
-
-            if self.exception is not None:
-                raise self.exception
-            
-        return future.result()
-
-    def is_running(self):
-        return self.running
-
-    def is_closed(self):
-        return not self.running
-
-    def stop(self):
-        self.running = False
-
-    def close(self):
-        self.running = False
-
-    def shutdown_asyncgens(self):
-        pass
-
-    def call_exception_handler(self, context):
-        self.exception = context.get('exception', None)
-
-    def call_soon(self, callback, *args, context=None):
-        handle = asyncio.Handle(callback, args, self, context)
-        self.handles.append(handle)
-        return handle
-
-    def create_task(self, coroutine):
-        return asyncio.Task(coroutine, loop=self)
-
-    def create_future(self):
-        return asyncio.Future(loop=self)
-
-    # The remaining methods should be irrelevant for our purposes and thus unimplemented
-
-    def run_forever(self):
-        raise NotImplementedError
-
-    async def shutdown_default_executor(self):
-        raise NotImplementedError
-
-    def _timer_handle_cancelled(self, handle):
-        raise NotImplementedError
-
-    def call_later(self, delay, callback, *args, context=None):
-        raise NotImplementedError
-
-    def call_at(self, when, callback, *args, context=None):
-        raise NotImplementedError
-
-    def time(self):
-        raise NotImplementedError
-
-    def call_soon_threadsafe(self, callback, *args, context=None):
-        raise NotImplementedError
-
-    def run_in_executor(self, executor, func, *args):
-        raise NotImplementedError
-
-    def set_default_executor(self, executor):
-        raise NotImplementedError
-
-    async def getaddrinfo(self, host, port, *,
-                          family=0, type=0, proto=0, flags=0):
-        raise NotImplementedError
-
-    async def getnameinfo(self, sockaddr, flags=0):
-        raise NotImplementedError
-
-    async def create_connection(
-            self, protocol_factory, host=None, port=None,
-            *, ssl=None, family=0, proto=0,
-            flags=0, sock=None, local_addr=None,
-            server_hostname=None,
-            ssl_handshake_timeout=None,
-            ssl_shutdown_timeout=None,
-            happy_eyeballs_delay=None, interleave=None):
-        raise NotImplementedError
-
-    async def create_server(
-            self, protocol_factory, host=None, port=None,
-            *, family=socket.AF_UNSPEC,
-            flags=socket.AI_PASSIVE, sock=None, backlog=100,
-            ssl=None, reuse_address=None, reuse_port=None,
-            ssl_handshake_timeout=None,
-            ssl_shutdown_timeout=None,
-            start_serving=True):
-        raise NotImplementedError
-
-    async def sendfile(self, transport, file, offset=0, count=None,
-                       *, fallback=True):
-        raise NotImplementedError
-
-    async def start_tls(self, transport, protocol, sslcontext, *,
-                        server_side=False,
-                        server_hostname=None,
-                        ssl_handshake_timeout=None,
-                        ssl_shutdown_timeout=None):
-        raise NotImplementedError
-
-    async def create_unix_connection(
-            self, protocol_factory, path=None, *,
-            ssl=None, sock=None,
-            server_hostname=None,
-            ssl_handshake_timeout=None,
-            ssl_shutdown_timeout=None):
-        raise NotImplementedError
-
-    async def create_unix_server(
-            self, protocol_factory, path=None, *,
-            sock=None, backlog=100, ssl=None,
-            ssl_handshake_timeout=None,
-            ssl_shutdown_timeout=None,
-            start_serving=True):
-        raise NotImplementedError
-
-    async def connect_accepted_socket(
-            self, protocol_factory, sock,
-            *, ssl=None,
-            ssl_handshake_timeout=None,
-            ssl_shutdown_timeout=None):
-        raise NotImplementedError
-
-    async def create_datagram_endpoint(self, protocol_factory,
-                                       local_addr=None, remote_addr=None, *,
-                                       family=0, proto=0, flags=0,
-                                       reuse_address=None, reuse_port=None,
-                                       allow_broadcast=None, sock=None):
-        raise NotImplementedError
-
-    async def connect_read_pipe(self, protocol_factory, pipe):
-        raise NotImplementedError
-
-    async def connect_write_pipe(self, protocol_factory, pipe):
-        raise NotImplementedError
-
-    async def subprocess_shell(self, protocol_factory, cmd, *,
-                               stdin=subprocess.PIPE,
-                               stdout=subprocess.PIPE,
-                               stderr=subprocess.PIPE,
-                               **kwargs):
-        raise NotImplementedError
-
-    async def subprocess_exec(self, protocol_factory, *args,
-                              stdin=subprocess.PIPE,
-                              stdout=subprocess.PIPE,
-                              stderr=subprocess.PIPE,
-                              **kwargs):
-        raise NotImplementedError
-
-    def add_reader(self, fd, callback, *args):
-        raise NotImplementedError
-
-    def remove_reader(self, fd):
-        raise NotImplementedError
-
-    def add_writer(self, fd, callback, *args):
-        raise NotImplementedError
-
-    def remove_writer(self, fd):
-        raise NotImplementedError
-
-    async def sock_recv(self, sock, nbytes):
-        raise NotImplementedError
-
-    async def sock_recv_into(self, sock, buf):
-        raise NotImplementedError
-
-    async def sock_recvfrom(self, sock, bufsize):
-        raise NotImplementedError
-
-    async def sock_recvfrom_into(self, sock, buf, nbytes=0):
-        raise NotImplementedError
-
-    async def sock_sendall(self, sock, data):
-        raise NotImplementedError
-
-    async def sock_sendto(self, sock, data, address):
-        raise NotImplementedError
-
-    async def sock_connect(self, sock, address):
-        raise NotImplementedError
-
-    async def sock_accept(self, sock):
-        raise NotImplementedError
-
-    async def sock_sendfile(self, sock, file, offset=0, count=None,
-                            *, fallback=None):
-        raise NotImplementedError
-
-    def add_signal_handler(self, sig, callback, *args):
-        raise NotImplementedError
-
-    def remove_signal_handler(self, sig):
-        raise NotImplementedError
-
-    def set_task_factory(self, factory):
-        raise NotImplementedError
-
-    def get_task_factory(self):
-        raise NotImplementedError
-
-    def get_exception_handler(self):
-        raise NotImplementedError
-
-    def set_exception_handler(self, handler):
-        raise NotImplementedError
-
-    def default_exception_handler(self, context):
-        raise NotImplementedError
-
-    def set_debug(self, enabled):
-        raise NotImplementedError
-
-async def register(loop: PollLoop, pollable: Pollable):
-    waker = loop.create_future()
-    loop.wakers.append((pollable, waker))
-    await waker
-
@@ -412,49 +40,16 @@

Module spin_sdk.http.poll_loop

Functions

-async def register(loop: PollLoop, pollable: Pollable) +async def register(loop: PollLoop,
pollable: Pollable)
-
- -Expand source code - -
async def register(loop: PollLoop, pollable: Pollable):
-    waker = loop.create_future()
-    loop.wakers.append((pollable, waker))
-    await waker
-
async def send(request: OutgoingRequest) ‑> IncomingResponse

Send the specified request and wait asynchronously for the response.

-
- -Expand source code - -
async def send(request: OutgoingRequest) -> IncomingResponse:
-    """Send the specified request and wait asynchronously for the response."""
-    
-    future = outgoing_handler.handle(request, None)
-
-    while True:
-        response = future.get()
-        if response is None:
-            await register(cast(PollLoop, asyncio.get_event_loop()), future.subscribe())
-        else:
-            future.__exit__()
-            
-            if isinstance(response, Ok):
-                if isinstance(response.value, Ok):
-                    return response.value.value
-                else:
-                    raise response.value
-            else:
-                raise response
-
@@ -488,10 +83,11 @@

Classes

self.running = True asyncio.events._set_running_loop(self) while self.running and not future.done(): - handle = self.handles[0] - self.handles = self.handles[1:] - if not handle._cancelled: - handle._run() + handles = self.handles + self.handles = [] + for handle in handles: + if not handle._cancelled: + handle._run() if self.wakers: [pollables, wakers] = list(map(list, zip(*self.wakers))) @@ -503,7 +99,7 @@

Classes

for (ready, pollable), waker in zip(zip(ready, pollables), wakers): if ready: - pollable.__exit__() + pollable.__exit__(None, None, None) waker.set_result(None) else: new_wakers.append((pollable, waker)) @@ -736,106 +332,48 @@

Methods

-
- -Expand source code - -
def add_reader(self, fd, callback, *args):
-    raise NotImplementedError
-
def add_signal_handler(self, sig, callback, *args)
-
- -Expand source code - -
def add_signal_handler(self, sig, callback, *args):
-    raise NotImplementedError
-
def add_writer(self, fd, callback, *args)
-
- -Expand source code - -
def add_writer(self, fd, callback, *args):
-    raise NotImplementedError
-
def call_at(self, when, callback, *args, context=None)
-
- -Expand source code - -
def call_at(self, when, callback, *args, context=None):
-    raise NotImplementedError
-
def call_exception_handler(self, context)
-
- -Expand source code - -
def call_exception_handler(self, context):
-    self.exception = context.get('exception', None)
-
def call_later(self, delay, callback, *args, context=None)
-
- -Expand source code - -
def call_later(self, delay, callback, *args, context=None):
-    raise NotImplementedError
-
def call_soon(self, callback, *args, context=None)
-
- -Expand source code - -
def call_soon(self, callback, *args, context=None):
-    handle = asyncio.Handle(callback, args, self, context)
-    self.handles.append(handle)
-    return handle
-
def call_soon_threadsafe(self, callback, *args, context=None)
-
- -Expand source code - -
def call_soon_threadsafe(self, callback, *args, context=None):
-    raise NotImplementedError
-
def close(self) @@ -845,16 +383,9 @@

Methods

The loop should not be running.

This is idempotent and irreversible.

No other methods should be called after this one.

-
- -Expand source code - -
def close(self):
-    self.running = False
-
-async def connect_accepted_socket(self, protocol_factory, sock, *, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None) +async def connect_accepted_socket(self,
protocol_factory,
sock,
*,
ssl=None,
ssl_handshake_timeout=None,
ssl_shutdown_timeout=None)

Handle an accepted connection.

@@ -863,17 +394,6 @@

Methods

This method is a coroutine. When completed, the coroutine returns a (transport, protocol) pair.

-
- -Expand source code - -
async def connect_accepted_socket(
-        self, protocol_factory, sock,
-        *, ssl=None,
-        ssl_handshake_timeout=None,
-        ssl_shutdown_timeout=None):
-    raise NotImplementedError
-
async def connect_read_pipe(self, protocol_factory, pipe) @@ -884,13 +404,6 @@

Methods

pipe is a file-like object. Return pair (transport, protocol), where transport supports the ReadTransport interface.

-
- -Expand source code - -
async def connect_read_pipe(self, protocol_factory, pipe):
-    raise NotImplementedError
-
async def connect_write_pipe(self, protocol_factory, pipe) @@ -901,36 +414,15 @@

Methods

Pipe is file-like object already switched to nonblocking. Return pair (transport, protocol), where transport support WriteTransport interface.

-
- -Expand source code - -
async def connect_write_pipe(self, protocol_factory, pipe):
-    raise NotImplementedError
-
-async def create_connection(self, protocol_factory, host=None, port=None, *, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, happy_eyeballs_delay=None, interleave=None) +async def create_connection(self,
protocol_factory,
host=None,
port=None,
*,
ssl=None,
family=0,
proto=0,
flags=0,
sock=None,
local_addr=None,
server_hostname=None,
ssl_handshake_timeout=None,
ssl_shutdown_timeout=None,
happy_eyeballs_delay=None,
interleave=None)
-
- -Expand source code - -
async def create_connection(
-        self, protocol_factory, host=None, port=None,
-        *, ssl=None, family=0, proto=0,
-        flags=0, sock=None, local_addr=None,
-        server_hostname=None,
-        ssl_handshake_timeout=None,
-        ssl_shutdown_timeout=None,
-        happy_eyeballs_delay=None, interleave=None):
-    raise NotImplementedError
-
-async def create_datagram_endpoint(self, protocol_factory, local_addr=None, remote_addr=None, *, family=0, proto=0, flags=0, reuse_address=None, reuse_port=None, allow_broadcast=None, sock=None) +async def create_datagram_endpoint(self,
protocol_factory,
local_addr=None,
remote_addr=None,
*,
family=0,
proto=0,
flags=0,
reuse_address=None,
reuse_port=None,
allow_broadcast=None,
sock=None)

A coroutine which creates a datagram endpoint.

@@ -953,33 +445,15 @@

Methods

messages to the broadcast address.

sock can optionally be specified in order to use a preexisting socket object.

-
- -Expand source code - -
async def create_datagram_endpoint(self, protocol_factory,
-                                   local_addr=None, remote_addr=None, *,
-                                   family=0, proto=0, flags=0,
-                                   reuse_address=None, reuse_port=None,
-                                   allow_broadcast=None, sock=None):
-    raise NotImplementedError
-
def create_future(self)
-
- -Expand source code - -
def create_future(self):
-    return asyncio.Future(loop=self)
-
-async def create_server(self, protocol_factory, host=None, port=None, *, family=0, flags=1, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True) +async def create_server(self,
protocol_factory,
host=None,
port=None,
*,
family=0,
flags=1,
sock=None,
backlog=100,
ssl=None,
reuse_address=None,
reuse_port=None,
ssl_handshake_timeout=None,
ssl_shutdown_timeout=None,
start_serving=True)

A coroutine which creates a TCP server bound to host and port.

@@ -1018,54 +492,21 @@

Methods

When set to False, the user should await Server.start_serving() or Server.serve_forever() to make the server to start accepting connections.

-
- -Expand source code - -
async def create_server(
-        self, protocol_factory, host=None, port=None,
-        *, family=socket.AF_UNSPEC,
-        flags=socket.AI_PASSIVE, sock=None, backlog=100,
-        ssl=None, reuse_address=None, reuse_port=None,
-        ssl_handshake_timeout=None,
-        ssl_shutdown_timeout=None,
-        start_serving=True):
-    raise NotImplementedError
-
def create_task(self, coroutine)
-
- -Expand source code - -
def create_task(self, coroutine):
-    return asyncio.Task(coroutine, loop=self)
-
-async def create_unix_connection(self, protocol_factory, path=None, *, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None) +async def create_unix_connection(self,
protocol_factory,
path=None,
*,
ssl=None,
sock=None,
server_hostname=None,
ssl_handshake_timeout=None,
ssl_shutdown_timeout=None)
-
- -Expand source code - -
async def create_unix_connection(
-        self, protocol_factory, path=None, *,
-        ssl=None, sock=None,
-        server_hostname=None,
-        ssl_handshake_timeout=None,
-        ssl_shutdown_timeout=None):
-    raise NotImplementedError
-
-async def create_unix_server(self, protocol_factory, path=None, *, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True) +async def create_unix_server(self,
protocol_factory,
path=None,
*,
sock=None,
backlog=100,
ssl=None,
ssl_handshake_timeout=None,
ssl_shutdown_timeout=None,
start_serving=True)

A coroutine which creates a UNIX Domain Socket server.

@@ -1088,188 +529,84 @@

Methods

When set to False, the user should await Server.start_serving() or Server.serve_forever() to make the server to start accepting connections.

-
- -Expand source code - -
async def create_unix_server(
-        self, protocol_factory, path=None, *,
-        sock=None, backlog=100, ssl=None,
-        ssl_handshake_timeout=None,
-        ssl_shutdown_timeout=None,
-        start_serving=True):
-    raise NotImplementedError
-
def default_exception_handler(self, context)
-
- -Expand source code - -
def default_exception_handler(self, context):
-    raise NotImplementedError
-
def get_debug(self)
-
- -Expand source code - -
def get_debug(self):
-    return False
-
def get_exception_handler(self)
-
- -Expand source code - -
def get_exception_handler(self):
-    raise NotImplementedError
-
def get_task_factory(self)
-
- -Expand source code - -
def get_task_factory(self):
-    raise NotImplementedError
-
async def getaddrinfo(self, host, port, *, family=0, type=0, proto=0, flags=0)
-
- -Expand source code - -
async def getaddrinfo(self, host, port, *,
-                      family=0, type=0, proto=0, flags=0):
-    raise NotImplementedError
-
async def getnameinfo(self, sockaddr, flags=0)
-
- -Expand source code - -
async def getnameinfo(self, sockaddr, flags=0):
-    raise NotImplementedError
-
def is_closed(self)

Returns True if the event loop was closed.

-
- -Expand source code - -
def is_closed(self):
-    return not self.running
-
def is_running(self)

Return whether the event loop is currently running.

-
- -Expand source code - -
def is_running(self):
-    return self.running
-
def remove_reader(self, fd)
-
- -Expand source code - -
def remove_reader(self, fd):
-    raise NotImplementedError
-
def remove_signal_handler(self, sig)
-
- -Expand source code - -
def remove_signal_handler(self, sig):
-    raise NotImplementedError
-
def remove_writer(self, fd)
-
- -Expand source code - -
def remove_writer(self, fd):
-    raise NotImplementedError
-
def run_forever(self)

Run the event loop until stop() is called.

-
- -Expand source code - -
def run_forever(self):
-    raise NotImplementedError
-
def run_in_executor(self, executor, func, *args)
-
- -Expand source code - -
def run_in_executor(self, executor, func, *args):
-    raise NotImplementedError
-
def run_until_complete(self, future) @@ -1277,43 +614,6 @@

Methods

Run the event loop until a Future is done.

Return the Future's result, or raise its exception.

-
- -Expand source code - -
def run_until_complete(self, future):
-    future = asyncio.ensure_future(future, loop=self)
-
-    self.running = True
-    asyncio.events._set_running_loop(self)
-    while self.running and not future.done():
-        handle = self.handles[0]
-        self.handles = self.handles[1:]
-        if not handle._cancelled:
-            handle._run()
-            
-        if self.wakers:
-            [pollables, wakers] = list(map(list, zip(*self.wakers)))
-            
-            new_wakers = []
-            ready = [False] * len(pollables)
-            for index in poll.poll(pollables):
-                ready[index] = True
-            
-            for (ready, pollable), waker in zip(zip(ready, pollables), wakers):
-                if ready:
-                    pollable.__exit__()
-                    waker.set_result(None)
-                else:
-                    new_wakers.append((pollable, waker))
-
-            self.wakers = new_wakers
-
-        if self.exception is not None:
-            raise self.exception
-        
-    return future.result()
-
async def sendfile(self, transport, file, offset=0, count=None, *, fallback=True) @@ -1321,229 +621,104 @@

Methods

Send a file through a transport.

Return an amount of sent bytes.

-
- -Expand source code - -
async def sendfile(self, transport, file, offset=0, count=None,
-                   *, fallback=True):
-    raise NotImplementedError
-
def set_debug(self, enabled)
-
- -Expand source code - -
def set_debug(self, enabled):
-    raise NotImplementedError
-
def set_default_executor(self, executor)
-
- -Expand source code - -
def set_default_executor(self, executor):
-    raise NotImplementedError
-
def set_exception_handler(self, handler)
-
- -Expand source code - -
def set_exception_handler(self, handler):
-    raise NotImplementedError
-
def set_task_factory(self, factory)
-
- -Expand source code - -
def set_task_factory(self, factory):
-    raise NotImplementedError
-
def shutdown_asyncgens(self)

Shutdown all active asynchronous generators.

-
- -Expand source code - -
def shutdown_asyncgens(self):
-    pass
-
async def shutdown_default_executor(self)

Schedule the shutdown of the default executor.

-
- -Expand source code - -
async def shutdown_default_executor(self):
-    raise NotImplementedError
-
async def sock_accept(self, sock)
-
- -Expand source code - -
async def sock_accept(self, sock):
-    raise NotImplementedError
-
async def sock_connect(self, sock, address)
-
- -Expand source code - -
async def sock_connect(self, sock, address):
-    raise NotImplementedError
-
async def sock_recv(self, sock, nbytes)
-
- -Expand source code - -
async def sock_recv(self, sock, nbytes):
-    raise NotImplementedError
-
async def sock_recv_into(self, sock, buf)
-
- -Expand source code - -
async def sock_recv_into(self, sock, buf):
-    raise NotImplementedError
-
async def sock_recvfrom(self, sock, bufsize)
-
- -Expand source code - -
async def sock_recvfrom(self, sock, bufsize):
-    raise NotImplementedError
-
async def sock_recvfrom_into(self, sock, buf, nbytes=0)
-
- -Expand source code - -
async def sock_recvfrom_into(self, sock, buf, nbytes=0):
-    raise NotImplementedError
-
async def sock_sendall(self, sock, data)
-
- -Expand source code - -
async def sock_sendall(self, sock, data):
-    raise NotImplementedError
-
async def sock_sendfile(self, sock, file, offset=0, count=None, *, fallback=None)
-
- -Expand source code - -
async def sock_sendfile(self, sock, file, offset=0, count=None,
-                        *, fallback=None):
-    raise NotImplementedError
-
async def sock_sendto(self, sock, data, address)
-
- -Expand source code - -
async def sock_sendto(self, sock, data, address):
-    raise NotImplementedError
-
-async def start_tls(self, transport, protocol, sslcontext, *, server_side=False, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None) +async def start_tls(self,
transport,
protocol,
sslcontext,
*,
server_side=False,
server_hostname=None,
ssl_handshake_timeout=None,
ssl_shutdown_timeout=None)

Upgrade a transport to TLS.

Return a new transport that protocol should start using immediately.

-
- -Expand source code - -
async def start_tls(self, transport, protocol, sslcontext, *,
-                    server_side=False,
-                    server_hostname=None,
-                    ssl_handshake_timeout=None,
-                    ssl_shutdown_timeout=None):
-    raise NotImplementedError
-
def stop(self) @@ -1552,60 +727,24 @@

Methods

Stop the event loop as soon as reasonable.

Exactly how soon that is may depend on the implementation, but no more I/O callbacks should be scheduled.

-
- -Expand source code - -
def stop(self):
-    self.running = False
-
async def subprocess_exec(self, protocol_factory, *args, stdin=-1, stdout=-1, stderr=-1, **kwargs)
-
- -Expand source code - -
async def subprocess_exec(self, protocol_factory, *args,
-                          stdin=subprocess.PIPE,
-                          stdout=subprocess.PIPE,
-                          stderr=subprocess.PIPE,
-                          **kwargs):
-    raise NotImplementedError
-
async def subprocess_shell(self, protocol_factory, cmd, *, stdin=-1, stdout=-1, stderr=-1, **kwargs)
-
- -Expand source code - -
async def subprocess_shell(self, protocol_factory, cmd, *,
-                           stdin=subprocess.PIPE,
-                           stdout=subprocess.PIPE,
-                           stderr=subprocess.PIPE,
-                           **kwargs):
-    raise NotImplementedError
-
def time(self)
-
- -Expand source code - -
def time(self):
-    raise NotImplementedError
-
@@ -1650,7 +789,7 @@

Methods

def close(self): """Close the stream, indicating no further data will be written.""" - self.stream.__exit__() + self.stream.__exit__(None, None, None) self.stream = None OutgoingBody.finish(self.body, None) self.body = None
@@ -1662,18 +801,6 @@

Methods

Close the stream, indicating no further data will be written.

-
- -Expand source code - -
def close(self):
-    """Close the stream, indicating no further data will be written."""
-
-    self.stream.__exit__()
-    self.stream = None
-    OutgoingBody.finish(self.body, None)
-    self.body = None
-
async def send(self, chunk: bytes) @@ -1681,32 +808,6 @@

Methods

Write the specified bytes to the sink.

This may need to yield according to the backpressure requirements of the sink.

-
- -Expand source code - -
async def send(self, chunk: bytes):
-    """Write the specified bytes to the sink.
-
-    This may need to yield according to the backpressure requirements of the sink.
-    """
-    offset = 0
-    flushing = False
-    while True:
-        count = self.stream.check_write()
-        if count == 0:
-            await register(cast(PollLoop, asyncio.get_event_loop()), self.stream.subscribe())
-        elif offset == len(chunk):
-            if flushing:
-                return
-            else:
-                self.stream.flush()
-                flushing = True
-        else:
-            count = min(count, len(chunk) - offset)
-            self.stream.write(chunk[offset:offset+count])
-            offset += count
-
@@ -1744,7 +845,7 @@

Methods

except Err as e: if isinstance(e.value, StreamError_Closed): if self.stream is not None: - self.stream.__exit__() + self.stream.__exit__(None, None, None) self.stream = None if self.body is not None: IncomingBody.finish(self.body) @@ -1755,41 +856,11 @@

Methods

Methods

-async def next(self) ‑> Optional[bytes] +async def next(self) ‑> bytes | None

Wait for the next chunk of data to arrive on the stream.

This will return None when the end of the stream has been reached.

-
- -Expand source code - -
async def next(self) -> Optional[bytes]:
-    """Wait for the next chunk of data to arrive on the stream.
-
-    This will return `None` when the end of the stream has been reached.
-    """
-    while True:
-        try:
-            if self.stream is None:
-                return None
-            else:
-                buffer = self.stream.read(READ_SIZE)
-                if len(buffer) == 0:
-                    await register(cast(PollLoop, asyncio.get_event_loop()), self.stream.subscribe())
-                else:
-                    return buffer
-        except Err as e:
-            if isinstance(e.value, StreamError_Closed):
-                if self.stream is not None:
-                    self.stream.__exit__()
-                    self.stream = None
-                if self.body is not None:
-                    IncomingBody.finish(self.body)
-                    self.body = None
-            else:
-                raise e
-
@@ -1797,7 +868,6 @@

Methods