501 lines
17 KiB
Python
501 lines
17 KiB
Python
import concurrent.futures
|
|
import sys
|
|
from _asyncio import (
|
|
Task as Task,
|
|
_enter_task as _enter_task,
|
|
_leave_task as _leave_task,
|
|
_register_task as _register_task,
|
|
_unregister_task as _unregister_task,
|
|
)
|
|
from collections.abc import AsyncIterator, Awaitable, Coroutine, Generator, Iterable, Iterator
|
|
from typing import Any, Literal, Protocol, TypeVar, overload
|
|
from typing_extensions import TypeAlias
|
|
|
|
from . import _CoroutineLike
|
|
from .events import AbstractEventLoop
|
|
from .futures import Future
|
|
|
|
if sys.version_info >= (3, 11):
|
|
from contextvars import Context
|
|
|
|
if sys.version_info >= (3, 12):
|
|
__all__ = (
|
|
"Task",
|
|
"create_task",
|
|
"FIRST_COMPLETED",
|
|
"FIRST_EXCEPTION",
|
|
"ALL_COMPLETED",
|
|
"wait",
|
|
"wait_for",
|
|
"as_completed",
|
|
"sleep",
|
|
"sleep_ms",
|
|
"gather",
|
|
"shield",
|
|
"ensure_future",
|
|
"run_coroutine_threadsafe",
|
|
"current_task",
|
|
"all_tasks",
|
|
"create_eager_task_factory",
|
|
"eager_task_factory",
|
|
"_register_task",
|
|
"_unregister_task",
|
|
"_enter_task",
|
|
"_leave_task",
|
|
)
|
|
else:
|
|
__all__ = (
|
|
"Task",
|
|
"create_task",
|
|
"FIRST_COMPLETED",
|
|
"FIRST_EXCEPTION",
|
|
"ALL_COMPLETED",
|
|
"wait",
|
|
"wait_for",
|
|
"as_completed",
|
|
"sleep",
|
|
"gather",
|
|
"shield",
|
|
"ensure_future",
|
|
"run_coroutine_threadsafe",
|
|
"current_task",
|
|
"all_tasks",
|
|
"_register_task",
|
|
"_unregister_task",
|
|
"_enter_task",
|
|
"_leave_task",
|
|
)
|
|
|
|
_T = TypeVar("_T")
|
|
_T_co = TypeVar("_T_co", covariant=True)
|
|
_T1 = TypeVar("_T1")
|
|
_T2 = TypeVar("_T2")
|
|
_T3 = TypeVar("_T3")
|
|
_T4 = TypeVar("_T4")
|
|
_T5 = TypeVar("_T5")
|
|
_T6 = TypeVar("_T6")
|
|
_FT = TypeVar("_FT", bound=Future[Any])
|
|
if sys.version_info >= (3, 12):
|
|
_FutureLike: TypeAlias = Future[_T] | Awaitable[_T]
|
|
else:
|
|
_FutureLike: TypeAlias = Future[_T] | Generator[Any, None, _T] | Awaitable[_T]
|
|
_TaskYieldType: TypeAlias = Future[object] | None
|
|
|
|
FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
|
|
FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
|
|
ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
|
|
|
|
if sys.version_info >= (3, 13):
|
|
class _SyncAndAsyncIterator(Iterator[_T_co], AsyncIterator[_T_co], Protocol[_T_co]): ...
|
|
|
|
def as_completed(fs: Iterable[_FutureLike[_T]], *, timeout: float | None = None) -> _SyncAndAsyncIterator[Future[_T]]: ...
|
|
|
|
elif sys.version_info >= (3, 10):
|
|
def as_completed(fs: Iterable[_FutureLike[_T]], *, timeout: float | None = None) -> Iterator[Future[_T]]: ...
|
|
|
|
else:
|
|
def as_completed(
|
|
fs: Iterable[_FutureLike[_T]],
|
|
*,
|
|
loop: AbstractEventLoop | None = None,
|
|
timeout: float | None = None,
|
|
) -> Iterator[Future[_T]]: ...
|
|
|
|
@overload
|
|
def ensure_future(coro_or_future: _FT, *, loop: AbstractEventLoop | None = None) -> _FT: ... # type: ignore[overload-overlap]
|
|
@overload
|
|
def ensure_future(coro_or_future: Awaitable[_T], *, loop: AbstractEventLoop | None = None) -> Task[_T]: ...
|
|
|
|
# `gather()` actually returns a list with length equal to the number
|
|
# of tasks passed; however, Tuple is used similar to the annotation for
|
|
# zip() because typing does not support variadic type variables. See
|
|
# typing PR #1550 for discussion.
|
|
#
|
|
# N.B. Having overlapping overloads is the only way to get acceptable type inference in all edge cases.
|
|
if sys.version_info >= (3, 10):
|
|
@overload
|
|
def gather(coro_or_future1: _FutureLike[_T1], /, *, return_exceptions: Literal[False] = False) -> Future[tuple[_T1]]: ... # type: ignore[overload-overlap]
|
|
@overload
|
|
def gather( # type: ignore[overload-overlap]
|
|
coro_or_future1: _FutureLike[_T1],
|
|
coro_or_future2: _FutureLike[_T2],
|
|
/,
|
|
*,
|
|
return_exceptions: Literal[False] = False,
|
|
) -> Future[tuple[_T1, _T2]]: ...
|
|
@overload
|
|
def gather( # type: ignore[overload-overlap]
|
|
coro_or_future1: _FutureLike[_T1],
|
|
coro_or_future2: _FutureLike[_T2],
|
|
coro_or_future3: _FutureLike[_T3],
|
|
/,
|
|
*,
|
|
return_exceptions: Literal[False] = False,
|
|
) -> Future[tuple[_T1, _T2, _T3]]: ...
|
|
@overload
|
|
def gather( # type: ignore[overload-overlap]
|
|
coro_or_future1: _FutureLike[_T1],
|
|
coro_or_future2: _FutureLike[_T2],
|
|
coro_or_future3: _FutureLike[_T3],
|
|
coro_or_future4: _FutureLike[_T4],
|
|
/,
|
|
*,
|
|
return_exceptions: Literal[False] = False,
|
|
) -> Future[tuple[_T1, _T2, _T3, _T4]]: ...
|
|
@overload
|
|
def gather( # type: ignore[overload-overlap]
|
|
coro_or_future1: _FutureLike[_T1],
|
|
coro_or_future2: _FutureLike[_T2],
|
|
coro_or_future3: _FutureLike[_T3],
|
|
coro_or_future4: _FutureLike[_T4],
|
|
coro_or_future5: _FutureLike[_T5],
|
|
/,
|
|
*,
|
|
return_exceptions: Literal[False] = False,
|
|
) -> Future[tuple[_T1, _T2, _T3, _T4, _T5]]: ...
|
|
@overload
|
|
def gather( # type: ignore[overload-overlap]
|
|
coro_or_future1: _FutureLike[_T1],
|
|
coro_or_future2: _FutureLike[_T2],
|
|
coro_or_future3: _FutureLike[_T3],
|
|
coro_or_future4: _FutureLike[_T4],
|
|
coro_or_future5: _FutureLike[_T5],
|
|
coro_or_future6: _FutureLike[_T6],
|
|
/,
|
|
*,
|
|
return_exceptions: Literal[False] = False,
|
|
) -> Future[tuple[_T1, _T2, _T3, _T4, _T5, _T6]]: ...
|
|
@overload
|
|
def gather(*coros_or_futures: _FutureLike[_T], return_exceptions: Literal[False] = False) -> Future[list[_T]]: ... # type: ignore[overload-overlap]
|
|
@overload
|
|
def gather(coro_or_future1: _FutureLike[_T1], /, *, return_exceptions: bool) -> Future[tuple[_T1 | BaseException]]: ...
|
|
@overload
|
|
def gather(
|
|
coro_or_future1: _FutureLike[_T1],
|
|
coro_or_future2: _FutureLike[_T2],
|
|
/,
|
|
*,
|
|
return_exceptions: bool,
|
|
) -> Future[tuple[_T1 | BaseException, _T2 | BaseException]]: ...
|
|
@overload
|
|
def gather(
|
|
coro_or_future1: _FutureLike[_T1],
|
|
coro_or_future2: _FutureLike[_T2],
|
|
coro_or_future3: _FutureLike[_T3],
|
|
/,
|
|
*,
|
|
return_exceptions: bool,
|
|
) -> Future[tuple[_T1 | BaseException, _T2 | BaseException, _T3 | BaseException]]: ...
|
|
@overload
|
|
def gather(
|
|
coro_or_future1: _FutureLike[_T1],
|
|
coro_or_future2: _FutureLike[_T2],
|
|
coro_or_future3: _FutureLike[_T3],
|
|
coro_or_future4: _FutureLike[_T4],
|
|
/,
|
|
*,
|
|
return_exceptions: bool,
|
|
) -> Future[tuple[_T1 | BaseException, _T2 | BaseException, _T3 | BaseException, _T4 | BaseException]]: ...
|
|
@overload
|
|
def gather(
|
|
coro_or_future1: _FutureLike[_T1],
|
|
coro_or_future2: _FutureLike[_T2],
|
|
coro_or_future3: _FutureLike[_T3],
|
|
coro_or_future4: _FutureLike[_T4],
|
|
coro_or_future5: _FutureLike[_T5],
|
|
/,
|
|
*,
|
|
return_exceptions: bool,
|
|
) -> Future[
|
|
tuple[
|
|
_T1 | BaseException,
|
|
_T2 | BaseException,
|
|
_T3 | BaseException,
|
|
_T4 | BaseException,
|
|
_T5 | BaseException,
|
|
]
|
|
]: ...
|
|
@overload
|
|
def gather(
|
|
coro_or_future1: _FutureLike[_T1],
|
|
coro_or_future2: _FutureLike[_T2],
|
|
coro_or_future3: _FutureLike[_T3],
|
|
coro_or_future4: _FutureLike[_T4],
|
|
coro_or_future5: _FutureLike[_T5],
|
|
coro_or_future6: _FutureLike[_T6],
|
|
/,
|
|
*,
|
|
return_exceptions: bool,
|
|
) -> Future[
|
|
tuple[
|
|
_T1 | BaseException,
|
|
_T2 | BaseException,
|
|
_T3 | BaseException,
|
|
_T4 | BaseException,
|
|
_T5 | BaseException,
|
|
_T6 | BaseException,
|
|
]
|
|
]: ...
|
|
@overload
|
|
def gather(*coros_or_futures: _FutureLike[_T], return_exceptions: bool) -> Future[list[_T | BaseException]]: ...
|
|
|
|
else:
|
|
@overload
|
|
def gather( # type: ignore[overload-overlap]
|
|
coro_or_future1: _FutureLike[_T1],
|
|
/,
|
|
*,
|
|
loop: AbstractEventLoop | None = None,
|
|
return_exceptions: Literal[False] = False,
|
|
) -> Future[tuple[_T1]]: ...
|
|
@overload
|
|
def gather( # type: ignore[overload-overlap]
|
|
coro_or_future1: _FutureLike[_T1],
|
|
coro_or_future2: _FutureLike[_T2],
|
|
/,
|
|
*,
|
|
loop: AbstractEventLoop | None = None,
|
|
return_exceptions: Literal[False] = False,
|
|
) -> Future[tuple[_T1, _T2]]: ...
|
|
@overload
|
|
def gather( # type: ignore[overload-overlap]
|
|
coro_or_future1: _FutureLike[_T1],
|
|
coro_or_future2: _FutureLike[_T2],
|
|
coro_or_future3: _FutureLike[_T3],
|
|
/,
|
|
*,
|
|
loop: AbstractEventLoop | None = None,
|
|
return_exceptions: Literal[False] = False,
|
|
) -> Future[tuple[_T1, _T2, _T3]]: ...
|
|
@overload
|
|
def gather( # type: ignore[overload-overlap]
|
|
coro_or_future1: _FutureLike[_T1],
|
|
coro_or_future2: _FutureLike[_T2],
|
|
coro_or_future3: _FutureLike[_T3],
|
|
coro_or_future4: _FutureLike[_T4],
|
|
/,
|
|
*,
|
|
loop: AbstractEventLoop | None = None,
|
|
return_exceptions: Literal[False] = False,
|
|
) -> Future[tuple[_T1, _T2, _T3, _T4]]: ...
|
|
@overload
|
|
def gather( # type: ignore[overload-overlap]
|
|
coro_or_future1: _FutureLike[_T1],
|
|
coro_or_future2: _FutureLike[_T2],
|
|
coro_or_future3: _FutureLike[_T3],
|
|
coro_or_future4: _FutureLike[_T4],
|
|
coro_or_future5: _FutureLike[_T5],
|
|
/,
|
|
*,
|
|
loop: AbstractEventLoop | None = None,
|
|
return_exceptions: Literal[False] = False,
|
|
) -> Future[tuple[_T1, _T2, _T3, _T4, _T5]]: ...
|
|
@overload
|
|
def gather( # type: ignore[overload-overlap]
|
|
coro_or_future1: _FutureLike[_T1],
|
|
coro_or_future2: _FutureLike[_T2],
|
|
coro_or_future3: _FutureLike[_T3],
|
|
coro_or_future4: _FutureLike[_T4],
|
|
coro_or_future5: _FutureLike[_T5],
|
|
coro_or_future6: _FutureLike[_T6],
|
|
/,
|
|
*,
|
|
loop: AbstractEventLoop | None = None,
|
|
return_exceptions: Literal[False] = False,
|
|
) -> Future[tuple[_T1, _T2, _T3, _T4, _T5, _T6]]: ...
|
|
@overload
|
|
def gather( # type: ignore[overload-overlap]
|
|
*coros_or_futures: _FutureLike[_T],
|
|
loop: AbstractEventLoop | None = None,
|
|
return_exceptions: Literal[False] = False,
|
|
) -> Future[list[_T]]: ...
|
|
@overload
|
|
def gather( # type: ignore[overload-overlap]
|
|
coro_or_future1: _FutureLike[_T1],
|
|
/,
|
|
*,
|
|
loop: AbstractEventLoop | None = None,
|
|
return_exceptions: bool,
|
|
) -> Future[tuple[_T1 | BaseException]]: ...
|
|
@overload
|
|
def gather( # type: ignore[overload-overlap]
|
|
coro_or_future1: _FutureLike[_T1],
|
|
coro_or_future2: _FutureLike[_T2],
|
|
/,
|
|
*,
|
|
loop: AbstractEventLoop | None = None,
|
|
return_exceptions: bool,
|
|
) -> Future[tuple[_T1 | BaseException, _T2 | BaseException]]: ...
|
|
@overload
|
|
def gather( # type: ignore[overload-overlap]
|
|
coro_or_future1: _FutureLike[_T1],
|
|
coro_or_future2: _FutureLike[_T2],
|
|
coro_or_future3: _FutureLike[_T3],
|
|
/,
|
|
*,
|
|
loop: AbstractEventLoop | None = None,
|
|
return_exceptions: bool,
|
|
) -> Future[tuple[_T1 | BaseException, _T2 | BaseException, _T3 | BaseException]]: ...
|
|
@overload
|
|
def gather( # type: ignore[overload-overlap]
|
|
coro_or_future1: _FutureLike[_T1],
|
|
coro_or_future2: _FutureLike[_T2],
|
|
coro_or_future3: _FutureLike[_T3],
|
|
coro_or_future4: _FutureLike[_T4],
|
|
/,
|
|
*,
|
|
loop: AbstractEventLoop | None = None,
|
|
return_exceptions: bool,
|
|
) -> Future[tuple[_T1 | BaseException, _T2 | BaseException, _T3 | BaseException, _T4 | BaseException]]: ...
|
|
@overload
|
|
def gather( # type: ignore[overload-overlap]
|
|
coro_or_future1: _FutureLike[_T1],
|
|
coro_or_future2: _FutureLike[_T2],
|
|
coro_or_future3: _FutureLike[_T3],
|
|
coro_or_future4: _FutureLike[_T4],
|
|
coro_or_future5: _FutureLike[_T5],
|
|
coro_or_future6: _FutureLike[_T6],
|
|
/,
|
|
*,
|
|
loop: AbstractEventLoop | None = None,
|
|
return_exceptions: bool,
|
|
) -> Future[
|
|
tuple[
|
|
_T1 | BaseException,
|
|
_T2 | BaseException,
|
|
_T3 | BaseException,
|
|
_T4 | BaseException,
|
|
_T5 | BaseException,
|
|
_T6 | BaseException,
|
|
]
|
|
]: ...
|
|
@overload
|
|
def gather(
|
|
*coros_or_futures: _FutureLike[_T],
|
|
loop: AbstractEventLoop | None = None,
|
|
return_exceptions: bool,
|
|
) -> Future[list[_T | BaseException]]: ...
|
|
|
|
def run_coroutine_threadsafe(coro: _FutureLike[_T], loop: AbstractEventLoop) -> concurrent.futures.Future[_T]: ...
|
|
|
|
if sys.version_info >= (3, 10):
|
|
def shield(arg: _FutureLike[_T]) -> Future[_T]: ...
|
|
@overload
|
|
async def sleep(delay: float) -> None: ...
|
|
@overload
|
|
async def sleep(delay: float, result: _T) -> _T: ...
|
|
async def wait_for(fut: _FutureLike[_T], timeout: float | None) -> _T: ...
|
|
# MicroPython addition
|
|
@overload
|
|
async def sleep_ms(delay: float) -> None: ...
|
|
@overload
|
|
async def sleep_ms(delay: int) -> None: ...
|
|
|
|
else:
|
|
def shield(arg: _FutureLike[_T], *, loop: AbstractEventLoop | None = None) -> Future[_T]: ...
|
|
@overload
|
|
async def sleep(delay: float, *, loop: AbstractEventLoop | None = None) -> None: ...
|
|
@overload
|
|
async def sleep(delay: float, result: _T, *, loop: AbstractEventLoop | None = None) -> _T: ...
|
|
async def wait_for(fut: _FutureLike[_T], timeout: float | None, *, loop: AbstractEventLoop | None = None) -> _T: ...
|
|
# MicroPython addition
|
|
@overload
|
|
async def sleep_ms(delay: float) -> None: ...
|
|
@overload
|
|
async def sleep_ms(delay: int) -> None: ...
|
|
|
|
if sys.version_info >= (3, 11):
|
|
@overload
|
|
async def wait(fs: Iterable[_FT], *, timeout: float | None = None, return_when: str = "ALL_COMPLETED") -> tuple[set[_FT], set[_FT]]: ...
|
|
@overload
|
|
async def wait(
|
|
fs: Iterable[Task[_T]], *, timeout: float | None = None, return_when: str = "ALL_COMPLETED"
|
|
) -> tuple[set[Task[_T]], set[Task[_T]]]: ...
|
|
|
|
elif sys.version_info >= (3, 10):
|
|
@overload
|
|
async def wait( # type: ignore[overload-overlap]
|
|
fs: Iterable[_FT], *, timeout: float | None = None, return_when: str = "ALL_COMPLETED"
|
|
) -> tuple[set[_FT], set[_FT]]: ...
|
|
@overload
|
|
async def wait(
|
|
fs: Iterable[Awaitable[_T]],
|
|
*,
|
|
timeout: float | None = None,
|
|
return_when: str = "ALL_COMPLETED",
|
|
) -> tuple[set[Task[_T]], set[Task[_T]]]: ...
|
|
|
|
else:
|
|
@overload
|
|
async def wait( # type: ignore[overload-overlap]
|
|
fs: Iterable[_FT],
|
|
*,
|
|
loop: AbstractEventLoop | None = None,
|
|
timeout: float | None = None,
|
|
return_when: str = "ALL_COMPLETED",
|
|
) -> tuple[set[_FT], set[_FT]]: ...
|
|
@overload
|
|
async def wait(
|
|
fs: Iterable[Awaitable[_T]],
|
|
*,
|
|
loop: AbstractEventLoop | None = None,
|
|
timeout: float | None = None,
|
|
return_when: str = "ALL_COMPLETED",
|
|
) -> tuple[set[Task[_T]], set[Task[_T]]]: ...
|
|
|
|
if sys.version_info >= (3, 12):
|
|
_TaskCompatibleCoro: TypeAlias = Coroutine[Any, Any, _T_co]
|
|
elif sys.version_info >= (3, 9):
|
|
_TaskCompatibleCoro: TypeAlias = Generator[_TaskYieldType, None, _T_co] | Coroutine[Any, Any, _T_co]
|
|
else:
|
|
_TaskCompatibleCoro: TypeAlias = Generator[_TaskYieldType, None, _T_co] | Awaitable[_T_co]
|
|
|
|
def all_tasks(loop: AbstractEventLoop | None = None) -> set[Task[Any]]: ...
|
|
|
|
if sys.version_info >= (3, 11):
|
|
def create_task(coro: _CoroutineLike[_T], *, name: str | None = None, context: Context | None = None) -> Task[_T]: ...
|
|
|
|
else:
|
|
def create_task(coro: _CoroutineLike[_T], *, name: str | None = None) -> Task[_T]: ...
|
|
|
|
if sys.version_info >= (3, 12):
|
|
from _asyncio import current_task as current_task
|
|
else:
|
|
def current_task(loop: AbstractEventLoop | None = None) -> Task[Any] | None: ...
|
|
|
|
if sys.version_info >= (3, 12):
|
|
_TaskT_co = TypeVar("_TaskT_co", bound=Task[Any], covariant=True)
|
|
|
|
class _CustomTaskConstructor(Protocol[_TaskT_co]):
|
|
def __call__(
|
|
self,
|
|
coro: _TaskCompatibleCoro[Any],
|
|
/,
|
|
*,
|
|
loop: AbstractEventLoop,
|
|
name: str | None,
|
|
context: Context | None,
|
|
eager_start: bool,
|
|
) -> _TaskT_co: ...
|
|
|
|
class _EagerTaskFactoryType(Protocol[_TaskT_co]):
|
|
def __call__(
|
|
self,
|
|
loop: AbstractEventLoop,
|
|
coro: _TaskCompatibleCoro[Any],
|
|
*,
|
|
name: str | None = None,
|
|
context: Context | None = None,
|
|
) -> _TaskT_co: ...
|
|
|
|
def create_eager_task_factory(
|
|
custom_task_constructor: _CustomTaskConstructor[_TaskT_co],
|
|
) -> _EagerTaskFactoryType[_TaskT_co]: ...
|
|
def eager_task_factory(
|
|
loop: AbstractEventLoop | None,
|
|
coro: _TaskCompatibleCoro[_T_co],
|
|
*,
|
|
name: str | None = None,
|
|
context: Context | None = None,
|
|
) -> Task[_T_co]: ...
|