unitelabs.sdk.connect
Packages
Attributes
- Name
__all__
- Type
- list[str]
- Value
= None
- Description
Functions
Classes
Client
- MRO
- ⎿⎽⎽⎽ModulesClient
- ⎿⎽⎽⎽⎽ServicesClient
- ⎿⎽⎽⎽⎽⎽Client
Methods
list_subscriptions(self) -> list[SubscriptionResponse]
Response
- Type
- list[SubscriptionResponse]
- Description
create_subscription(
- self,
- action_id : str,
- parameters : dict,
- retry : bool,
- retry_delay : float,
- timeouts : int[][],
- interval : int[][]
Parameters
- Name
self
- Type
- Default
- Description
- Name
action_id
- Type
- str
- Default
- Description
- Name
parameters
- Type
- dict
- Default
- Description
- Name
retry
- Type
- bool
- Default
- = False
- Description
- Name
retry_delay
- Type
- float
- Default
- = 1.0
- Description
- Name
timeouts
- Type
- int[][]
- Default
- = None
- Description
- Name
interval
- Type
- int[][]
- Default
- = None
- Description
Response
- Type
- Subscription
- Description
delete_subscription(self, subscription_id : str) -> None
Parameters
- Name
self
- Type
- Default
- Description
- Name
subscription_id
- Type
- str
- Default
- Description
__init__(
- self,
- base_url : str[],
- auth_url : str[],
- client_id : str[],
- client_secret : str[],
- http_client : typing.Optional[httpx.AsyncClient],
- **kwargs
Parameters
- Name
self
- Type
- Default
- Description
- Name
base_url
- Type
- str[]
- Default
- = None
- Description
- Name
auth_url
- Type
- str[]
- Default
- = None
- Description
- Name
client_id
- Type
- str[]
- Default
- = None
- Description
- Name
client_secret
- Type
- str[]
- Default
- = None
- Description
- Name
http_client
- Type
- typing.Optional[httpx.AsyncClient]
- Default
- = None
- Description
- Name
**kwargs
- Type
- Default
- = {}
- Description
health(self) -> None
get(
- self,
- url,
- *args,
- **kwargs
Parameters
- Name
self
- Type
- Default
- Description
- Name
url
- Type
- Default
- Description
- Name
*args
- Type
- Default
- = ()
- Description
- Name
**kwargs
- Type
- Default
- = {}
- Description
post(
- self,
- url,
- *args,
- **kwargs
Parameters
- Name
self
- Type
- Default
- Description
- Name
url
- Type
- Default
- Description
- Name
*args
- Type
- Default
- = ()
- Description
- Name
**kwargs
- Type
- Default
- = {}
- Description
delete(
- self,
- url,
- *args,
- **kwargs
Parameters
- Name
self
- Type
- Default
- Description
- Name
url
- Type
- Default
- Description
- Name
*args
- Type
- Default
- = ()
- Description
- Name
**kwargs
- Type
- Default
- = {}
- Description
stream(
- self,
- url,
- cancel : asyncio.Event,
- **kwargs
Parameters
- Name
self
- Type
- Default
- Description
- Name
url
- Type
- Default
- Description
- Name
cancel
- Type
- asyncio.Event
- Default
- Description
- Name
**kwargs
- Type
- Default
- = {}
- Description
Response
- Type
- collections.abc.AsyncIterator[tuple[str, typing.Any]]
- Description
aclose(self) -> None
Close underlying connection.
__aenter__(self) -> None
__aexit__(
- self,
- exc_type : typing.Optional[typing.Type[BaseException]],
- exc_value : typing.Optional[BaseException],
- traceback : typing.Optional[types.TracebackType]
Parameters
- Name
self
- Type
- Default
- Description
- Name
exc_type
- Type
- typing.Optional[typing.Type[BaseException]]
- Default
- = None
- Description
- Name
exc_value
- Type
- typing.Optional[BaseException]
- Default
- = None
- Description
- Name
traceback
- Type
- typing.Optional[types.TracebackType]
- Default
- = None
- Description
Attributes
- Name
base_url
- Type
- Value
= base_url.rstrip('/') + '/v1'
- Description
- Name
auth_url
- Type
- Value
= auth_url.rstrip('/')
- Description
- Name
client_id
- Type
- Value
= client_id
- Description
- Name
client_secret
- Type
- Value
= client_secret
- Description
- Name
_session
- Type
- httpx.AsyncClient
- Value
= http_client or httpx.AsyncClient(base_url=self.base_url, auth=OAuth2(f'{self.auth_url}/token', client_id=self.client_id, client_secret=self.client_secret), **kwargs)
- Description
- Name
kwargs
- Type
- Value
= kwargs
- Description
- Name
is_closed
- Type
- bool
- Value
= None
- Description
ClientError
Base class for all exceptions that may occur when communicating with a connector.
- Bases
- Exception
Service
- Decorators
- dataclasses.dataclass
Methods
Attributes
ConnectModule
Connect to the devices in your lab.
- Bases
- types.ModuleType
Methods
__init__(self, name : str) -> None
Parameters
- Name
self
- Type
- Default
- Description
- Name
name
- Type
- str
- Default
- Description
init(self) -> None
__dir__(self) -> typing.Iterable[str]
Response
- Type
- typing.Iterable[str]
- Description
Attributes
- Name
__cache
- Type
- dict[str, Service[]]
- Value
= {}
- Description
- Name
logger
- Type
- logging.Logger
- Value
= None
- Description
- Name
__all__
- Type
- typing.Iterable[str]
- Value
= None
- Description
UniteLabsExample
A UniteLabs SiLA Python Example Server
Methods
__init__(self) -> None
Attributes
- Name
sila_service
- Type
- Module
- Value
= None
- Description
- Name
observable_command_test
- Type
- ObservableCommandTest
- Value
= None
- Description
- Name
observable_property_test
- Type
- ObservablePropertyTest
- Value
= None
- Description
- Name
unobservable_command_test
- Type
- UnobservableCommandTest
- Value
= None
- Description
- Name
unobservable_property_test
- Type
- UnobservablePropertyTest
- Value
= None
- Description
- Name
greeting_provider
- Type
- Module
- Value
= None
- Description
- Name
client
- Type
- Client
- Value
= None
- Description
- Name
id
- Type
- str
- Value
= ''
- Description
- Name
name
- Type
- str
- Value
= ''
- Description
- Name
modules
- Type
- dict[str, Module]
- Value
= dataclasses.field(repr=False, default_factory=dict)
- Description