import ssl import sys from _typeshed import FileDescriptorLike, ReadableBuffer, WriteableBuffer from asyncio import _AwaitableLike, _CoroutineLike from asyncio.events import AbstractEventLoop, AbstractServer, Handle, TimerHandle, _TaskFactory from asyncio.futures import Future from asyncio.protocols import BaseProtocol from asyncio.tasks import Task from asyncio.transports import BaseTransport, DatagramTransport, ReadTransport, SubprocessTransport, Transport, WriteTransport from collections.abc import Callable, Iterable, Sequence from contextvars import Context # type: ignore from socket import AddressFamily, SocketKind, _Address, _RetAddress, socket from typing import IO, Any, Literal, TypeVar, overload from typing_extensions import TypeAlias, TypeVarTuple, Unpack if sys.version_info >= (3, 9): __all__ = ("BaseEventLoop", "Server") else: __all__ = ("BaseEventLoop",) _T = TypeVar("_T") _Ts = TypeVarTuple("_Ts") _ProtocolT = TypeVar("_ProtocolT", bound=BaseProtocol) _Context: TypeAlias = dict[str, Any] _ExceptionHandler: TypeAlias = Callable[[AbstractEventLoop, _Context], object] _ProtocolFactory: TypeAlias = Callable[[], BaseProtocol] _SSLContext: TypeAlias = bool | None | ssl.SSLContext class Server(AbstractServer): if sys.version_info >= (3, 11): def __init__( self, loop: AbstractEventLoop, sockets: Iterable[socket], protocol_factory: _ProtocolFactory, ssl_context: _SSLContext, backlog: int, ssl_handshake_timeout: float | None, ssl_shutdown_timeout: float | None = None, ) -> None: ... else: def __init__( self, loop: AbstractEventLoop, sockets: Iterable[socket], protocol_factory: _ProtocolFactory, ssl_context: _SSLContext, backlog: int, ssl_handshake_timeout: float | None, ) -> None: ... if sys.version_info >= (3, 13): def close_clients(self) -> None: ... def abort_clients(self) -> None: ... def get_loop(self) -> AbstractEventLoop: ... def is_serving(self) -> bool: ... async def start_serving(self) -> None: ... async def serve_forever(self) -> None: ... @property def sockets(self) -> tuple[socket, ...]: ... def close(self) -> None: ... async def wait_closed(self) -> None: ... class BaseEventLoop(AbstractEventLoop): def run_forever(self) -> None: ... def run_until_complete(self, future: _AwaitableLike[_T]) -> _T: ... def stop(self) -> None: ... def is_running(self) -> bool: ... def is_closed(self) -> bool: ... def close(self) -> None: ... async def shutdown_asyncgens(self) -> None: ... # Methods scheduling callbacks. All these return Handles. def call_soon(self, callback: Callable[[Unpack[_Ts]], object], *args: Unpack[_Ts], context: Context | None = None) -> Handle: ... # type: ignore def call_later( self, delay: float, callback: Callable[[Unpack[_Ts]], object], *args: Unpack[_Ts], context: Context | None = None # type: ignore ) -> TimerHandle: ... def call_at( self, when: float, callback: Callable[[Unpack[_Ts]], object], *args: Unpack[_Ts], context: Context | None = None # type: ignore ) -> TimerHandle: ... def time(self) -> float: ... # Future methods def create_future(self) -> Future[Any]: ... # Tasks methods if sys.version_info >= (3, 11): def create_task(self, coro: _CoroutineLike[_T], *, name: object = None, context: Context | None = None) -> Task[_T]: ... # type: ignore else: def create_task(self, coro: _CoroutineLike[_T], *, name: object = None) -> Task[_T]: ... def set_task_factory(self, factory: _TaskFactory | None) -> None: ... def get_task_factory(self) -> _TaskFactory | None: ... # Methods for interacting with threads def call_soon_threadsafe( self, callback: Callable[[Unpack[_Ts]], object], *args: Unpack[_Ts], context: Context | None = None # type: ignore ) -> Handle: ... def run_in_executor(self, executor: Any, func: Callable[[Unpack[_Ts]], _T], *args: Unpack[_Ts]) -> Future[_T]: ... def set_default_executor(self, executor: Any) -> None: ... # Network I/O methods returning Futures. async def getaddrinfo( self, host: bytes | str | None, port: bytes | str | int | None, *, family: int = 0, type: int = 0, proto: int = 0, flags: int = 0, ) -> list[tuple[AddressFamily, SocketKind, int, str, tuple[str, int] | tuple[str, int, int, int]]]: ... async def getnameinfo(self, sockaddr: tuple[str, int] | tuple[str, int, int, int], flags: int = 0) -> tuple[str, str]: ... if sys.version_info >= (3, 12): @overload async def create_connection( self, protocol_factory: Callable[[], _ProtocolT], host: str = ..., port: int = ..., *, ssl: _SSLContext = None, family: int = 0, proto: int = 0, flags: int = 0, sock: None = None, local_addr: tuple[str, int] | None = None, server_hostname: str | None = None, ssl_handshake_timeout: float | None = None, ssl_shutdown_timeout: float | None = None, happy_eyeballs_delay: float | None = None, interleave: int | None = None, all_errors: bool = False, ) -> tuple[Transport, _ProtocolT]: ... @overload async def create_connection( self, protocol_factory: Callable[[], _ProtocolT], host: None = None, port: None = None, *, ssl: _SSLContext = None, family: int = 0, proto: int = 0, flags: int = 0, sock: socket, local_addr: None = None, server_hostname: str | None = None, ssl_handshake_timeout: float | None = None, ssl_shutdown_timeout: float | None = None, happy_eyeballs_delay: float | None = None, interleave: int | None = None, all_errors: bool = False, ) -> tuple[Transport, _ProtocolT]: ... elif sys.version_info >= (3, 11): @overload async def create_connection( self, protocol_factory: Callable[[], _ProtocolT], host: str = ..., port: int = ..., *, ssl: _SSLContext = None, family: int = 0, proto: int = 0, flags: int = 0, sock: None = None, local_addr: tuple[str, int] | None = None, server_hostname: str | None = None, ssl_handshake_timeout: float | None = None, ssl_shutdown_timeout: float | None = None, happy_eyeballs_delay: float | None = None, interleave: int | None = None, ) -> tuple[Transport, _ProtocolT]: ... @overload async def create_connection( self, protocol_factory: Callable[[], _ProtocolT], host: None = None, port: None = None, *, ssl: _SSLContext = None, family: int = 0, proto: int = 0, flags: int = 0, sock: socket, local_addr: None = None, server_hostname: str | None = None, ssl_handshake_timeout: float | None = None, ssl_shutdown_timeout: float | None = None, happy_eyeballs_delay: float | None = None, interleave: int | None = None, ) -> tuple[Transport, _ProtocolT]: ... else: @overload async def create_connection( self, protocol_factory: Callable[[], _ProtocolT], host: str = ..., port: int = ..., *, ssl: _SSLContext = None, family: int = 0, proto: int = 0, flags: int = 0, sock: None = None, local_addr: tuple[str, int] | None = None, server_hostname: str | None = None, ssl_handshake_timeout: float | None = None, happy_eyeballs_delay: float | None = None, interleave: int | None = None, ) -> tuple[Transport, _ProtocolT]: ... @overload async def create_connection( self, protocol_factory: Callable[[], _ProtocolT], host: None = None, port: None = None, *, ssl: _SSLContext = None, family: int = 0, proto: int = 0, flags: int = 0, sock: socket, local_addr: None = None, server_hostname: str | None = None, ssl_handshake_timeout: float | None = None, happy_eyeballs_delay: float | None = None, interleave: int | None = None, ) -> tuple[Transport, _ProtocolT]: ... if sys.version_info >= (3, 13): # 3.13 added `keep_alive`. @overload async def create_server( self, protocol_factory: _ProtocolFactory, host: str | Sequence[str] | None = None, port: int = ..., *, family: int = ..., flags: int = ..., sock: None = None, backlog: int = 100, ssl: _SSLContext = None, reuse_address: bool | None = None, reuse_port: bool | None = None, keep_alive: bool | None = None, ssl_handshake_timeout: float | None = None, ssl_shutdown_timeout: float | None = None, start_serving: bool = True, ) -> Server: ... @overload async def create_server( self, protocol_factory: _ProtocolFactory, host: None = None, port: None = None, *, family: int = ..., flags: int = ..., sock: socket = ..., backlog: int = 100, ssl: _SSLContext = None, reuse_address: bool | None = None, reuse_port: bool | None = None, keep_alive: bool | None = None, ssl_handshake_timeout: float | None = None, ssl_shutdown_timeout: float | None = None, start_serving: bool = True, ) -> Server: ... elif sys.version_info >= (3, 11): @overload async def create_server( self, protocol_factory: _ProtocolFactory, host: str | Sequence[str] | None = None, port: int = ..., *, family: int = ..., flags: int = ..., sock: None = None, backlog: int = 100, ssl: _SSLContext = None, reuse_address: bool | None = None, reuse_port: bool | None = None, ssl_handshake_timeout: float | None = None, ssl_shutdown_timeout: float | None = None, start_serving: bool = True, ) -> Server: ... @overload async def create_server( self, protocol_factory: _ProtocolFactory, host: None = None, port: None = None, *, family: int = ..., flags: int = ..., sock: socket = ..., backlog: int = 100, ssl: _SSLContext = None, reuse_address: bool | None = None, reuse_port: bool | None = None, ssl_handshake_timeout: float | None = None, ssl_shutdown_timeout: float | None = None, start_serving: bool = True, ) -> Server: ... else: @overload async def create_server( self, protocol_factory: _ProtocolFactory, host: str | Sequence[str] | None = None, port: int = ..., *, family: int = ..., flags: int = ..., sock: None = None, backlog: int = 100, ssl: _SSLContext = None, reuse_address: bool | None = None, reuse_port: bool | None = None, ssl_handshake_timeout: float | None = None, start_serving: bool = True, ) -> Server: ... @overload async def create_server( self, protocol_factory: _ProtocolFactory, host: None = None, port: None = None, *, family: int = ..., flags: int = ..., sock: socket = ..., backlog: int = 100, ssl: _SSLContext = None, reuse_address: bool | None = None, reuse_port: bool | None = None, ssl_handshake_timeout: float | None = None, start_serving: bool = True, ) -> Server: ... if sys.version_info >= (3, 11): async def start_tls( self, transport: BaseTransport, protocol: BaseProtocol, sslcontext: ssl.SSLContext, *, server_side: bool = False, server_hostname: str | None = None, ssl_handshake_timeout: float | None = None, ssl_shutdown_timeout: float | None = None, ) -> Transport | None: ... async def connect_accepted_socket( self, protocol_factory: Callable[[], _ProtocolT], sock: socket, *, ssl: _SSLContext = None, ssl_handshake_timeout: float | None = None, ssl_shutdown_timeout: float | None = None, ) -> tuple[Transport, _ProtocolT]: ... else: async def start_tls( self, transport: BaseTransport, protocol: BaseProtocol, sslcontext: ssl.SSLContext, *, server_side: bool = False, server_hostname: str | None = None, ssl_handshake_timeout: float | None = None, ) -> Transport | None: ... async def connect_accepted_socket( self, protocol_factory: Callable[[], _ProtocolT], sock: socket, *, ssl: _SSLContext = None, ssl_handshake_timeout: float | None = None, ) -> tuple[Transport, _ProtocolT]: ... async def sock_sendfile( self, sock: socket, file: IO[bytes], offset: int = 0, count: int | None = None, *, fallback: bool | None = True ) -> int: ... async def sendfile( self, transport: WriteTransport, file: IO[bytes], offset: int = 0, count: int | None = None, *, fallback: bool = True ) -> int: ... if sys.version_info >= (3, 11): async def create_datagram_endpoint( # type: ignore[override] self, protocol_factory: Callable[[], _ProtocolT], local_addr: tuple[str, int] | str | None = None, remote_addr: tuple[str, int] | str | None = None, *, family: int = 0, proto: int = 0, flags: int = 0, reuse_port: bool | None = None, allow_broadcast: bool | None = None, sock: socket | None = None, ) -> tuple[DatagramTransport, _ProtocolT]: ... else: async def create_datagram_endpoint( self, protocol_factory: Callable[[], _ProtocolT], local_addr: tuple[str, int] | str | None = None, remote_addr: tuple[str, int] | str | None = None, *, family: int = 0, proto: int = 0, flags: int = 0, reuse_address: bool | None = ..., reuse_port: bool | None = None, allow_broadcast: bool | None = None, sock: socket | None = None, ) -> tuple[DatagramTransport, _ProtocolT]: ... # Pipes and subprocesses. async def connect_read_pipe(self, protocol_factory: Callable[[], _ProtocolT], pipe: Any) -> tuple[ReadTransport, _ProtocolT]: ... async def connect_write_pipe(self, protocol_factory: Callable[[], _ProtocolT], pipe: Any) -> tuple[WriteTransport, _ProtocolT]: ... async def subprocess_shell( self, protocol_factory: Callable[[], _ProtocolT], cmd: bytes | str, *, stdin: int | IO[Any] | None = -1, stdout: int | IO[Any] | None = -1, stderr: int | IO[Any] | None = -1, universal_newlines: Literal[False] = False, shell: Literal[True] = True, bufsize: Literal[0] = 0, encoding: None = None, errors: None = None, text: Literal[False] | None = None, **kwargs: Any, ) -> tuple[SubprocessTransport, _ProtocolT]: ... async def subprocess_exec( self, protocol_factory: Callable[[], _ProtocolT], program: Any, *args: Any, stdin: int | IO[Any] | None = -1, stdout: int | IO[Any] | None = -1, stderr: int | IO[Any] | None = -1, universal_newlines: Literal[False] = False, shell: Literal[False] = False, bufsize: Literal[0] = 0, encoding: None = None, errors: None = None, **kwargs: Any, ) -> tuple[SubprocessTransport, _ProtocolT]: ... def add_reader(self, fd: FileDescriptorLike, callback: Callable[[Unpack[_Ts]], Any], *args: Unpack[_Ts]) -> None: ... def remove_reader(self, fd: FileDescriptorLike) -> bool: ... def add_writer(self, fd: FileDescriptorLike, callback: Callable[[Unpack[_Ts]], Any], *args: Unpack[_Ts]) -> None: ... def remove_writer(self, fd: FileDescriptorLike) -> bool: ... # The sock_* methods (and probably some others) are not actually implemented on # BaseEventLoop, only on subclasses. We list them here for now for convenience. async def sock_recv(self, sock: socket, nbytes: int) -> bytes: ... async def sock_recv_into(self, sock: socket, buf: WriteableBuffer) -> int: ... async def sock_sendall(self, sock: socket, data: ReadableBuffer) -> None: ... async def sock_connect(self, sock: socket, address: _Address) -> None: ... async def sock_accept(self, sock: socket) -> tuple[socket, _RetAddress]: ... if sys.version_info >= (3, 11): async def sock_recvfrom(self, sock: socket, bufsize: int) -> tuple[bytes, _RetAddress]: ... async def sock_recvfrom_into(self, sock: socket, buf: WriteableBuffer, nbytes: int = 0) -> tuple[int, _RetAddress]: ... async def sock_sendto(self, sock: socket, data: ReadableBuffer, address: _Address) -> int: ... # Signal handling. def add_signal_handler(self, sig: int, callback: Callable[[Unpack[_Ts]], Any], *args: Unpack[_Ts]) -> None: ... def remove_signal_handler(self, sig: int) -> bool: ... # Error handlers. def set_exception_handler(self, handler: _ExceptionHandler | None) -> None: ... def get_exception_handler(self) -> _ExceptionHandler | None: ... def default_exception_handler(self, context: _Context) -> None: ... def call_exception_handler(self, context: _Context) -> None: ... # Debug flag management. def get_debug(self) -> bool: ... def set_debug(self, enabled: bool) -> None: ... if sys.version_info >= (3, 12): async def shutdown_default_executor(self, timeout: float | None = None) -> None: ... elif sys.version_info >= (3, 9): async def shutdown_default_executor(self) -> None: ... def __del__(self) -> None: ...