ServicesClient

Methods

  • list_services(self) -> list[Service]

    Response

    Type
    list[Service]
    Description

  • get_service(self, service_id : str) -> Service

    Parameters

    • Name
      self
      Type
      Default
      Description

    • Name
      service_id
      Type
      str
      Default
      Description

    Response

    Type
    Service
    Description

  • get_service_by_name(self, name : str) -> Service[]

    Parameters

    • Name
      self
      Type
      Default
      Description

    • Name
      name
      Type
      str
      Default
      Description

    Response

    Type
    Service[]
    Description

  • __init__(
      self,
      base_url : str[],
      auth_url : str[],
      client_id : str[],
      client_secret : str[],
      http_client : typing.Optional[httpx.AsyncClient],
      **kwargs
    ) -> None

    Parameters

    • Name
      self
      Type
      Default
      Description

    • Name
      base_url
      Type
      str[]
      Default
      = None
      Description

    • Name
      auth_url
      Type
      str[]
      Default
      = None
      Description

    • Name
      client_id
      Type
      str[]
      Default
      = None
      Description

    • Name
      client_secret
      Type
      str[]
      Default
      = None
      Description

    • Name
      http_client
      Type
      typing.Optional[httpx.AsyncClient]
      Default
      = None
      Description

    • Name
      **kwargs
      Type
      Default
      = {}
      Description

  • health(self) -> None

  • get(
      self,
      url,
      *args,
      **kwargs
    ) -> None

    Parameters

    • Name
      self
      Type
      Default
      Description

    • Name
      url
      Type
      Default
      Description

    • Name
      *args
      Type
      Default
      = ()
      Description

    • Name
      **kwargs
      Type
      Default
      = {}
      Description

  • post(
      self,
      url,
      *args,
      **kwargs
    ) -> None

    Parameters

    • Name
      self
      Type
      Default
      Description

    • Name
      url
      Type
      Default
      Description

    • Name
      *args
      Type
      Default
      = ()
      Description

    • Name
      **kwargs
      Type
      Default
      = {}
      Description

  • delete(
      self,
      url,
      *args,
      **kwargs
    ) -> None

    Parameters

    • Name
      self
      Type
      Default
      Description

    • Name
      url
      Type
      Default
      Description

    • Name
      *args
      Type
      Default
      = ()
      Description

    • Name
      **kwargs
      Type
      Default
      = {}
      Description

  • stream(
      self,
      url,
      cancel : asyncio.Event,
      **kwargs
    ) -> collections.abc.AsyncIterator[tuple[str, typing.Any]]

    Parameters

    • Name
      self
      Type
      Default
      Description

    • Name
      url
      Type
      Default
      Description

    • Name
      cancel
      Type
      asyncio.Event
      Default
      Description

    • Name
      **kwargs
      Type
      Default
      = {}
      Description

    Response

    Type
    collections.abc.AsyncIterator[tuple[str, typing.Any]]
    Description

  • aclose(self) -> None

    Close underlying connection.

  • __aenter__(self) -> None

  • __aexit__(
      self,
      exc_type : typing.Optional[typing.Type[BaseException]],
      exc_value : typing.Optional[BaseException],
      traceback : typing.Optional[types.TracebackType]
    ) -> None

    Parameters

    • Name
      self
      Type
      Default
      Description

    • Name
      exc_type
      Type
      typing.Optional[typing.Type[BaseException]]
      Default
      = None
      Description

    • Name
      exc_value
      Type
      typing.Optional[BaseException]
      Default
      = None
      Description

    • Name
      traceback
      Type
      typing.Optional[types.TracebackType]
      Default
      = None
      Description

Attributes

  • Name
    base_url
    Type
    Value

    = base_url.rstrip('/') + '/v1'

    Description

  • Name
    auth_url
    Type
    Value

    = auth_url.rstrip('/')

    Description

  • Name
    client_id
    Type
    Value

    = client_id

    Description

  • Name
    client_secret
    Type
    Value

    = client_secret

    Description

  • Name
    _session
    Type
    httpx.AsyncClient
    Value

    = http_client or httpx.AsyncClient(base_url=self.base_url, auth=OAuth2(f'{self.auth_url}/token', client_id=self.client_id, client_secret=self.client_secret), **kwargs)

    Description

  • Name
    kwargs
    Type
    Value

    = kwargs

    Description

  • Name
    is_closed
    Type
    bool
    Value

    = None

    Description

Copyright © 2024