unitelabs.sdk.client.base_client

Attributes

  • Name
    T
    Type
    Value

    = typing.TypeVar('T')

    Description

Functions

  • parse_error(response : dict) -> ClientError

    Parameters

    • Name
      response
      Type
      dict
      Default
      Description

    Response

    Type
    ClientError
    Description

  • cancellable(aiter : collections.abc.AsyncIterable[Iter], cancel : asyncio.Event) -> collections.abc.AsyncIterable[Iter]

    Wraps async iterators with a cancellable event.

    Parameters

    • Name
      aiter
      Type
      collections.abc.AsyncIterable[Iter]
      Default
      Description

    • Name
      cancel
      Type
      asyncio.Event
      Default
      Description

    Response

    Type
    collections.abc.AsyncIterable[Iter]
    Description

Classes

  • BaseClient

    Methods

    • __init__(
        self,
        base_url : str[],
        auth_url : str[],
        client_id : str[],
        client_secret : str[],
        http_client : typing.Optional[httpx.AsyncClient],
        **kwargs
      ) -> None

      Parameters

      • Name
        self
        Type
        Default
        Description

      • Name
        base_url
        Type
        str[]
        Default
        = None
        Description

      • Name
        auth_url
        Type
        str[]
        Default
        = None
        Description

      • Name
        client_id
        Type
        str[]
        Default
        = None
        Description

      • Name
        client_secret
        Type
        str[]
        Default
        = None
        Description

      • Name
        http_client
        Type
        typing.Optional[httpx.AsyncClient]
        Default
        = None
        Description

      • Name
        **kwargs
        Type
        Default
        = {}
        Description

    • health(self) -> None

    • get(
        self,
        url,
        *args,
        **kwargs
      ) -> None

      Parameters

      • Name
        self
        Type
        Default
        Description

      • Name
        url
        Type
        Default
        Description

      • Name
        *args
        Type
        Default
        = ()
        Description

      • Name
        **kwargs
        Type
        Default
        = {}
        Description

    • post(
        self,
        url,
        *args,
        **kwargs
      ) -> None

      Parameters

      • Name
        self
        Type
        Default
        Description

      • Name
        url
        Type
        Default
        Description

      • Name
        *args
        Type
        Default
        = ()
        Description

      • Name
        **kwargs
        Type
        Default
        = {}
        Description

    • delete(
        self,
        url,
        *args,
        **kwargs
      ) -> None

      Parameters

      • Name
        self
        Type
        Default
        Description

      • Name
        url
        Type
        Default
        Description

      • Name
        *args
        Type
        Default
        = ()
        Description

      • Name
        **kwargs
        Type
        Default
        = {}
        Description

    • stream(
        self,
        url,
        cancel : asyncio.Event,
        **kwargs
      ) -> collections.abc.AsyncIterator[tuple[str, typing.Any]]

      Parameters

      • Name
        self
        Type
        Default
        Description

      • Name
        url
        Type
        Default
        Description

      • Name
        cancel
        Type
        asyncio.Event
        Default
        Description

      • Name
        **kwargs
        Type
        Default
        = {}
        Description

      Response

      Type
      collections.abc.AsyncIterator[tuple[str, typing.Any]]
      Description

    • aclose(self) -> None

      Close underlying connection.

    • __aenter__(self) -> None

    • __aexit__(
        self,
        exc_type : typing.Optional[typing.Type[BaseException]],
        exc_value : typing.Optional[BaseException],
        traceback : typing.Optional[types.TracebackType]
      ) -> None

      Parameters

      • Name
        self
        Type
        Default
        Description

      • Name
        exc_type
        Type
        typing.Optional[typing.Type[BaseException]]
        Default
        = None
        Description

      • Name
        exc_value
        Type
        typing.Optional[BaseException]
        Default
        = None
        Description

      • Name
        traceback
        Type
        typing.Optional[types.TracebackType]
        Default
        = None
        Description

    Attributes

    • Name
      base_url
      Type
      Value

      = base_url.rstrip('/') + '/v1'

      Description

    • Name
      auth_url
      Type
      Value

      = auth_url.rstrip('/')

      Description

    • Name
      client_id
      Type
      Value

      = client_id

      Description

    • Name
      client_secret
      Type
      Value

      = client_secret

      Description

    • Name
      _session
      Type
      httpx.AsyncClient
      Value

      = http_client or httpx.AsyncClient(base_url=self.base_url, auth=OAuth2(f'{self.auth_url}/token', client_id=self.client_id, client_secret=self.client_secret), **kwargs)

      Description

    • Name
      kwargs
      Type
      Value

      = kwargs

      Description

    • Name
      is_closed
      Type
      bool
      Value

      = None

      Description

Copyright © 2024