import ssl import sys from _typeshed import ReadableBuffer, StrPath, Incomplete from collections.abc import Awaitable, Callable, Iterable, Sequence, Sized from types import ModuleType from typing import Any, Protocol, SupportsIndex from typing_extensions import Self, TypeAlias from . import events, protocols, transports from .base_events import Server if sys.platform == "win32": __all__ = ( "StreamReader", "StreamWriter", "StreamReaderProtocol", "open_connection", "start_server", ) else: __all__ = ( "StreamReader", "StreamWriter", "StreamReaderProtocol", "open_connection", "start_server", "open_unix_connection", "start_unix_server", ) _ClientConnectedCallback: TypeAlias = Callable[[StreamReader, StreamWriter], Awaitable[None] | None] class _ReaduntilBuffer(ReadableBuffer, Sized, Protocol): ... if sys.version_info >= (3, 10): async def open_connection( host: str | None = None, port: int | str | None = None, *, limit: int = 65536, ssl_handshake_timeout: float | None = ..., **kwds: Any, ) -> tuple[StreamReader, StreamWriter]: ... async def start_server( client_connected_cb: _ClientConnectedCallback, host: str | Sequence[str] | None = None, port: int | str | None = None, backlog: int = 5, # MicroPython backlog argument *, ssl: Incomplete | None = ..., # limit: int = 65536, # ssl_handshake_timeout: float | None = ..., # **kwds: Any, ) -> Server: ... else: async def open_connection( host: str | None = None, port: int | str | None = None, *, loop: events.AbstractEventLoop | None = None, limit: int = 65536, ssl_handshake_timeout: float | None = ..., **kwds: Any, ) -> tuple[StreamReader, StreamWriter]: ... async def start_server( client_connected_cb: _ClientConnectedCallback, host: str | None = None, port: int | str | None = None, *, loop: events.AbstractEventLoop | None = None, limit: int = 65536, ssl_handshake_timeout: float | None = ..., **kwds: Any, ) -> Server: ... if sys.platform != "win32": if sys.version_info >= (3, 10): async def open_unix_connection( path: StrPath | None = None, *, limit: int = 65536, **kwds: Any ) -> tuple[StreamReader, StreamWriter]: ... async def start_unix_server( client_connected_cb: _ClientConnectedCallback, path: StrPath | None = None, *, limit: int = 65536, **kwds: Any, ) -> Server: ... else: async def open_unix_connection( path: StrPath | None = None, *, loop: events.AbstractEventLoop | None = None, limit: int = 65536, **kwds: Any, ) -> tuple[StreamReader, StreamWriter]: ... async def start_unix_server( client_connected_cb: _ClientConnectedCallback, path: StrPath | None = None, *, loop: events.AbstractEventLoop | None = None, limit: int = 65536, **kwds: Any, ) -> Server: ... class FlowControlMixin(protocols.Protocol): def __init__(self, loop: events.AbstractEventLoop | None = None) -> None: ... class StreamReaderProtocol(FlowControlMixin, protocols.Protocol): def __init__( self, stream_reader: StreamReader, # client_connected_cb: _ClientConnectedCallback | None = None, # loop: events.AbstractEventLoop | None = None, ) -> None: ... def __del__(self) -> None: ... class StreamWriter: def __init__( self, transport: transports.WriteTransport | Incomplete, protocol: protocols.BaseProtocol | Incomplete, # MicroPython doesn't support reader and loop arguments # reader: StreamReader | None, # loop: events.AbstractEventLoop, ) -> None: ... @property def transport(self) -> transports.WriteTransport: ... def write(self, data: bytes | bytearray | memoryview | str) -> None: ... def awrite(self, data: bytes | bytearray | memoryview | str) -> Awaitable: ... def writelines(self, data: Iterable[bytes | bytearray | memoryview]) -> None: ... def write_eof(self) -> None: ... def can_write_eof(self) -> bool: ... def close(self) -> None: ... def is_closing(self) -> bool: ... async def wait_closed(self) -> None: ... def get_extra_info(self, name: str, default: Any = None) -> Any: ... async def drain(self) -> None: ... if sys.version_info >= (3, 12): async def start_tls( self, sslcontext: ssl.SSLContext, *, server_hostname: str | None = None, ssl_handshake_timeout: float | None = None, ssl_shutdown_timeout: float | None = None, ) -> None: ... elif sys.version_info >= (3, 11): async def start_tls( self, sslcontext: ssl.SSLContext, *, server_hostname: str | None = None, ssl_handshake_timeout: float | None = None, ) -> None: ... if sys.version_info >= (3, 13): def __del__(self, warnings: ModuleType = ...) -> None: ... elif sys.version_info >= (3, 11): def __del__(self) -> None: ... # StreamReader: TypeAlias = StreamWriter class StreamReader: def __init__( self, transport: transports.WriteTransport | Incomplete, protocol: protocols.BaseProtocol | Incomplete = None, # limit: int = 65536, # loop: events.AbstractEventLoop | None = None, ) -> None: ... def exception(self) -> Exception: ... def set_exception(self, exc: Exception) -> None: ... def set_transport(self, transport: transports.BaseTransport) -> None: ... def feed_eof(self) -> None: ... def at_eof(self) -> bool: ... def feed_data(self, data: Iterable[SupportsIndex]) -> None: ... async def readline(self) -> bytes: ... if sys.version_info >= (3, 13): async def readuntil(self, separator: _ReaduntilBuffer | tuple[_ReaduntilBuffer, ...] = b"\n") -> bytes: ... else: async def readuntil(self, separator: _ReaduntilBuffer = b"\n") -> bytes: ... async def read(self, n: int = -1) -> bytes: ... async def readexactly(self, n: int) -> bytes: ... def __aiter__(self) -> Self: ... async def __anext__(self) -> bytes: ...