unitelabs.sdk.client
Packages
Attributes
- Name
__all__
- Type
- Value
= [ "Client" ]
- Description
Classes
Client
- MRO
- ⎿⎽⎽⎽ModulesClient
- ⎿⎽⎽⎽⎽ServicesClient
- ⎿⎽⎽⎽⎽⎽Client
Methods
list_subscriptions(self) -> list[SubscriptionResponse]
Response
- Type
- list[SubscriptionResponse]
- Description
create_subscription(
- self,
- action_id : str,
- parameters : dict,
- retry : bool,
- retry_delay : float,
- timeouts : int[][],
- interval : int[][]
Parameters
- Name
self
- Type
- Default
- Description
- Name
action_id
- Type
- str
- Default
- Description
- Name
parameters
- Type
- dict
- Default
- Description
- Name
retry
- Type
- bool
- Default
- = False
- Description
- Name
retry_delay
- Type
- float
- Default
- = 1.0
- Description
- Name
timeouts
- Type
- int[][]
- Default
- = None
- Description
- Name
interval
- Type
- int[][]
- Default
- = None
- Description
Response
- Type
- Subscription
- Description
delete_subscription(self, subscription_id : str) -> None
Parameters
- Name
self
- Type
- Default
- Description
- Name
subscription_id
- Type
- str
- Default
- Description
__init__(
- self,
- base_url : str[],
- auth_url : str[],
- client_id : str[],
- client_secret : str[],
- http_client : typing.Optional[httpx.AsyncClient],
- **kwargs
Parameters
- Name
self
- Type
- Default
- Description
- Name
base_url
- Type
- str[]
- Default
- = None
- Description
- Name
auth_url
- Type
- str[]
- Default
- = None
- Description
- Name
client_id
- Type
- str[]
- Default
- = None
- Description
- Name
client_secret
- Type
- str[]
- Default
- = None
- Description
- Name
http_client
- Type
- typing.Optional[httpx.AsyncClient]
- Default
- = None
- Description
- Name
**kwargs
- Type
- Default
- = {}
- Description
health(self) -> None
get(
- self,
- url,
- *args,
- **kwargs
Parameters
- Name
self
- Type
- Default
- Description
- Name
url
- Type
- Default
- Description
- Name
*args
- Type
- Default
- = ()
- Description
- Name
**kwargs
- Type
- Default
- = {}
- Description
post(
- self,
- url,
- *args,
- **kwargs
Parameters
- Name
self
- Type
- Default
- Description
- Name
url
- Type
- Default
- Description
- Name
*args
- Type
- Default
- = ()
- Description
- Name
**kwargs
- Type
- Default
- = {}
- Description
delete(
- self,
- url,
- *args,
- **kwargs
Parameters
- Name
self
- Type
- Default
- Description
- Name
url
- Type
- Default
- Description
- Name
*args
- Type
- Default
- = ()
- Description
- Name
**kwargs
- Type
- Default
- = {}
- Description
stream(
- self,
- url,
- cancel : asyncio.Event,
- **kwargs
Parameters
- Name
self
- Type
- Default
- Description
- Name
url
- Type
- Default
- Description
- Name
cancel
- Type
- asyncio.Event
- Default
- Description
- Name
**kwargs
- Type
- Default
- = {}
- Description
Response
- Type
- collections.abc.AsyncIterator[tuple[str, typing.Any]]
- Description
aclose(self) -> None
Close underlying connection.
__aenter__(self) -> None
__aexit__(
- self,
- exc_type : typing.Optional[typing.Type[BaseException]],
- exc_value : typing.Optional[BaseException],
- traceback : typing.Optional[types.TracebackType]
Parameters
- Name
self
- Type
- Default
- Description
- Name
exc_type
- Type
- typing.Optional[typing.Type[BaseException]]
- Default
- = None
- Description
- Name
exc_value
- Type
- typing.Optional[BaseException]
- Default
- = None
- Description
- Name
traceback
- Type
- typing.Optional[types.TracebackType]
- Default
- = None
- Description
Attributes
- Name
base_url
- Type
- Value
= base_url.rstrip('/') + '/v1'
- Description
- Name
auth_url
- Type
- Value
= auth_url.rstrip('/')
- Description
- Name
client_id
- Type
- Value
= client_id
- Description
- Name
client_secret
- Type
- Value
= client_secret
- Description
- Name
_session
- Type
- httpx.AsyncClient
- Value
= http_client or httpx.AsyncClient(base_url=self.base_url, auth=OAuth2(f'{self.auth_url}/token', client_id=self.client_id, client_secret=self.client_secret), **kwargs)
- Description
- Name
kwargs
- Type
- Value
= kwargs
- Description
- Name
is_closed
- Type
- bool
- Value
= None
- Description