unitelabs.sdk.utils.asyncio

Attributes

  • Name
    P
    Type
    Value

    = typing_extensions.ParamSpec('P')

    Description

  • Name
    R
    Type
    Value

    = typing.TypeVar('R')

    Description

  • Name
    Iter
    Type
    Value

    = typing.TypeVar('Iter')

    Description

Functions

  • call_async(
      function : typing.Callable[P, typing.Coroutine[typing.Any, typing.Any, R]],
      *args : unitelabs.sdk.utils.asyncio.P.args,
      **kwargs : unitelabs.sdk.utils.asyncio.P.kwargs
    ) -> R

    Calls an async function synchronously and blocks until it finishes.

    Parameters

    • Name
      function
      Type
      typing.Callable[P, typing.Coroutine[typing.Any, typing.Any, R]]
      Default
      Description

    • Name
      *args
      Type
      unitelabs.sdk.utils.asyncio.P.args
      Default
      = ()
      Description

    • Name
      **kwargs
      Type
      unitelabs.sdk.utils.asyncio.P.kwargs
      Default
      = {}
      Description

    Response

    Type
    R
    Description

  • cancellable(aiter : collections.abc.AsyncIterable[Iter], cancel : asyncio.Event) -> collections.abc.AsyncIterable[Iter]

    Wraps async iterators with a cancellable event.

    Parameters

    • Name
      aiter
      Type
      collections.abc.AsyncIterable[Iter]
      Default
      Description

    • Name
      cancel
      Type
      asyncio.Event
      Default
      Description

    Response

    Type
    collections.abc.AsyncIterable[Iter]
    Description

  • coroutine(function : typing.Callable) -> typing.Callable

    Wraps click cli commands to run asynchronously.

    Parameters

    • Name
      function
      Type
      typing.Callable
      Default
      Description

    Response

    Type
    typing.Callable
    Description

Copyright © 2024