unitelabs.sdk.utils.asyncio
Attributes
- Name
P
- Type
- Value
= typing_extensions.ParamSpec('P')
- Description
- Name
R
- Type
- Value
= typing.TypeVar('R')
- Description
- Name
Iter
- Type
- Value
= typing.TypeVar('Iter')
- Description
Functions
call_async(
- *args : unitelabs.sdk.utils.asyncio.P.args,
- **kwargs : unitelabs.sdk.utils.asyncio.P.kwargs
Calls an async function synchronously and blocks until it finishes.
Parameters
- Name
*args
- Type
- unitelabs.sdk.utils.asyncio.P.args
- Default
- = ()
- Description
- Name
**kwargs
- Type
- unitelabs.sdk.utils.asyncio.P.kwargs
- Default
- = {}
- Description
Response
- Type
- R
- Description
cancellable(aiter : collections.abc.AsyncIterable[Iter], cancel : asyncio.Event) -> collections.abc.AsyncIterable[Iter]
Wraps async iterators with a cancellable event.
Parameters
- Name
aiter
- Type
- collections.abc.AsyncIterable[Iter]
- Default
- Description
- Name
cancel
- Type
- asyncio.Event
- Default
- Description
Response
- Type
- collections.abc.AsyncIterable[Iter]
- Description
coroutine(function : typing.Callable) -> typing.Callable
Wraps click cli commands to run asynchronously.
Parameters
- Name
function
- Type
- typing.Callable
- Default
- Description
Response
- Type
- typing.Callable
- Description