unitelabs.sdk.core
Packages
Attributes
- Name
__all__
- Type
- Value
= [ "Service", "Module", "Action", "Property", "Sensor", "Control", "Subscription", "ClientError", "AuthError", "ExecutionError", "ParameterError", "TransportError" ]
- Description
Classes
Action
- Decorators
- dataclasses.dataclass
Methods
__init__(
- self,
- client : Client,
- id : str,
- name : str,
- type : typing.Literal['PROPERTY', 'SENSOR', 'CONTROL', 'DIAGNOSTIC'],
- parameters : dict[str, Parameter],
- output : dict[str, Output],
- responses : dict[str, Response]
Parameters
- Name
self
- Type
- Default
- Description
- Name
client
- Type
- Client
- Default
- Description
- Name
id
- Type
- str
- Default
- = ''
- Description
- Name
name
- Type
- str
- Default
- = ''
- Description
- Name
type
- Type
- typing.Literal['PROPERTY', 'SENSOR', 'CONTROL', 'DIAGNOSTIC']
- Default
- = 'CONTROL'
- Description
- Name
parameters
- Type
- dict[str, Parameter]
- Default
- Description
- Name
output
- Type
- dict[str, Output]
- Default
- Description
- Name
responses
- Type
- dict[str, Response]
- Default
- Description
_parse_parameters(self, parameters : dict) -> dict
Parameters
- Name
self
- Type
- Default
- Description
- Name
parameters
- Type
- dict
- Default
- Description
Response
- Type
- dict
- Description
_parse_output(self, outputs : dict) -> None
Parameters
- Name
self
- Type
- Default
- Description
- Name
outputs
- Type
- dict
- Default
- Description
_parse_responses(self, responses : dict) -> None
Parameters
- Name
self
- Type
- Default
- Description
- Name
responses
- Type
- dict
- Default
- Description
_parse_data_type(self, value : typing.Optional[typing.Any], schema : StructureElementModel) -> typing.Union[str, int, float, bool, bytes, date, unitelabs.sdk.datetime.time, unitelabs.sdk.datetime.datetime, list, dict]
Parameters
- Name
self
- Type
- Default
- Description
- Name
value
- Type
- typing.Optional[typing.Any]
- Default
- Description
- Name
schema
- Type
- StructureElementModel
- Default
- Description
Response
- Type
- typing.Union[str, int, float, bool, bytes, date, unitelabs.sdk.datetime.time, unitelabs.sdk.datetime.datetime, list, dict]
- Description
_to_data_type(self, value : typing.Optional[typing.Any], schema : StructureElementModel) -> typing.Union[str, int, float, bool, bytes, date, unitelabs.sdk.datetime.time, unitelabs.sdk.datetime.datetime, list, dict]
Parameters
- Name
self
- Type
- Default
- Description
- Name
value
- Type
- typing.Optional[typing.Any]
- Default
- Description
- Name
schema
- Type
- StructureElementModel
- Default
- Description
Response
- Type
- typing.Union[str, int, float, bool, bytes, date, unitelabs.sdk.datetime.time, unitelabs.sdk.datetime.datetime, list, dict]
- Description
Attributes
- Name
client
- Type
- Client
- Value
= None
- Description
- Name
id
- Type
- str
- Value
= ''
- Description
- Name
name
- Type
- str
- Value
= ''
- Description
- Name
type
- Type
- typing.Literal['PROPERTY', 'SENSOR', 'CONTROL', 'DIAGNOSTIC']
- Value
= 'CONTROL'
- Description
- Name
parameters
- Type
- dict[str, Parameter]
- Value
= dataclasses.field(repr=False, default_factory=dict)
- Description
- Name
output
- Type
- dict[str, Output]
- Value
= dataclasses.field(repr=False, default_factory=dict)
- Description
- Name
responses
- Type
- dict[str, Response]
- Value
= dataclasses.field(repr=False, default_factory=dict)
- Description
Control
Methods
__call__(self, **kwargs) -> None
Parameters
- Name
self
- Type
- Default
- Description
- Name
**kwargs
- Type
- Default
- = {}
- Description
__init__(
- self,
- client : Client,
- id : str,
- name : str,
- type : typing.Literal['PROPERTY', 'SENSOR', 'CONTROL', 'DIAGNOSTIC'],
- parameters : dict[str, Parameter],
- output : dict[str, Output],
- responses : dict[str, Response]
Parameters
- Name
self
- Type
- Default
- Description
- Name
client
- Type
- Client
- Default
- Description
- Name
id
- Type
- str
- Default
- = ''
- Description
- Name
name
- Type
- str
- Default
- = ''
- Description
- Name
type
- Type
- typing.Literal['PROPERTY', 'SENSOR', 'CONTROL', 'DIAGNOSTIC']
- Default
- = 'CONTROL'
- Description
- Name
parameters
- Type
- dict[str, Parameter]
- Default
- Description
- Name
output
- Type
- dict[str, Output]
- Default
- Description
- Name
responses
- Type
- dict[str, Response]
- Default
- Description
_parse_parameters(self, parameters : dict) -> dict
Parameters
- Name
self
- Type
- Default
- Description
- Name
parameters
- Type
- dict
- Default
- Description
Response
- Type
- dict
- Description
_parse_output(self, outputs : dict) -> None
Parameters
- Name
self
- Type
- Default
- Description
- Name
outputs
- Type
- dict
- Default
- Description
_parse_responses(self, responses : dict) -> None
Parameters
- Name
self
- Type
- Default
- Description
- Name
responses
- Type
- dict
- Default
- Description
_parse_data_type(self, value : typing.Optional[typing.Any], schema : StructureElementModel) -> typing.Union[str, int, float, bool, bytes, date, unitelabs.sdk.datetime.time, unitelabs.sdk.datetime.datetime, list, dict]
Parameters
- Name
self
- Type
- Default
- Description
- Name
value
- Type
- typing.Optional[typing.Any]
- Default
- Description
- Name
schema
- Type
- StructureElementModel
- Default
- Description
Response
- Type
- typing.Union[str, int, float, bool, bytes, date, unitelabs.sdk.datetime.time, unitelabs.sdk.datetime.datetime, list, dict]
- Description
_to_data_type(self, value : typing.Optional[typing.Any], schema : StructureElementModel) -> typing.Union[str, int, float, bool, bytes, date, unitelabs.sdk.datetime.time, unitelabs.sdk.datetime.datetime, list, dict]
Parameters
- Name
self
- Type
- Default
- Description
- Name
value
- Type
- typing.Optional[typing.Any]
- Default
- Description
- Name
schema
- Type
- StructureElementModel
- Default
- Description
Response
- Type
- typing.Union[str, int, float, bool, bytes, date, unitelabs.sdk.datetime.time, unitelabs.sdk.datetime.datetime, list, dict]
- Description
Attributes
- Name
type
- Type
- typing.Literal[]
- Value
= 'CONTROL'
- Description
- Name
client
- Type
- Client
- Value
= None
- Description
- Name
id
- Type
- str
- Value
= ''
- Description
- Name
name
- Type
- str
- Value
= ''
- Description
- Name
parameters
- Type
- dict[str, Parameter]
- Value
= dataclasses.field(repr=False, default_factory=dict)
- Description
- Name
output
- Type
- dict[str, Output]
- Value
= dataclasses.field(repr=False, default_factory=dict)
- Description
- Name
responses
- Type
- dict[str, Response]
- Value
= dataclasses.field(repr=False, default_factory=dict)
- Description
Property
Methods
__call__(self) -> None
__init__(
- self,
- client : Client,
- id : str,
- name : str,
- type : typing.Literal['PROPERTY', 'SENSOR', 'CONTROL', 'DIAGNOSTIC'],
- parameters : dict[str, Parameter],
- output : dict[str, Output],
- responses : dict[str, Response]
Parameters
- Name
self
- Type
- Default
- Description
- Name
client
- Type
- Client
- Default
- Description
- Name
id
- Type
- str
- Default
- = ''
- Description
- Name
name
- Type
- str
- Default
- = ''
- Description
- Name
type
- Type
- typing.Literal['PROPERTY', 'SENSOR', 'CONTROL', 'DIAGNOSTIC']
- Default
- = 'CONTROL'
- Description
- Name
parameters
- Type
- dict[str, Parameter]
- Default
- Description
- Name
output
- Type
- dict[str, Output]
- Default
- Description
- Name
responses
- Type
- dict[str, Response]
- Default
- Description
_parse_parameters(self, parameters : dict) -> dict
Parameters
- Name
self
- Type
- Default
- Description
- Name
parameters
- Type
- dict
- Default
- Description
Response
- Type
- dict
- Description
_parse_output(self, outputs : dict) -> None
Parameters
- Name
self
- Type
- Default
- Description
- Name
outputs
- Type
- dict
- Default
- Description
_parse_responses(self, responses : dict) -> None
Parameters
- Name
self
- Type
- Default
- Description
- Name
responses
- Type
- dict
- Default
- Description
_parse_data_type(self, value : typing.Optional[typing.Any], schema : StructureElementModel) -> typing.Union[str, int, float, bool, bytes, date, unitelabs.sdk.datetime.time, unitelabs.sdk.datetime.datetime, list, dict]
Parameters
- Name
self
- Type
- Default
- Description
- Name
value
- Type
- typing.Optional[typing.Any]
- Default
- Description
- Name
schema
- Type
- StructureElementModel
- Default
- Description
Response
- Type
- typing.Union[str, int, float, bool, bytes, date, unitelabs.sdk.datetime.time, unitelabs.sdk.datetime.datetime, list, dict]
- Description
_to_data_type(self, value : typing.Optional[typing.Any], schema : StructureElementModel) -> typing.Union[str, int, float, bool, bytes, date, unitelabs.sdk.datetime.time, unitelabs.sdk.datetime.datetime, list, dict]
Parameters
- Name
self
- Type
- Default
- Description
- Name
value
- Type
- typing.Optional[typing.Any]
- Default
- Description
- Name
schema
- Type
- StructureElementModel
- Default
- Description
Response
- Type
- typing.Union[str, int, float, bool, bytes, date, unitelabs.sdk.datetime.time, unitelabs.sdk.datetime.datetime, list, dict]
- Description
Attributes
- Name
type
- Type
- typing.Literal[]
- Value
= 'PROPERTY'
- Description
- Name
client
- Type
- Client
- Value
= None
- Description
- Name
id
- Type
- str
- Value
= ''
- Description
- Name
name
- Type
- str
- Value
= ''
- Description
- Name
parameters
- Type
- dict[str, Parameter]
- Value
= dataclasses.field(repr=False, default_factory=dict)
- Description
- Name
output
- Type
- dict[str, Output]
- Value
= dataclasses.field(repr=False, default_factory=dict)
- Description
- Name
responses
- Type
- dict[str, Response]
- Value
= dataclasses.field(repr=False, default_factory=dict)
- Description
Sensor
Methods
__call__(
- self,
- interval : typing.Optional[int],
- **kwargs
Parameters
- Name
self
- Type
- Default
- Description
- Name
interval
- Type
- typing.Optional[int]
- Default
- = None
- Description
- Name
**kwargs
- Type
- Default
- = {}
- Description
__init__(
- self,
- client : Client,
- id : str,
- name : str,
- type : typing.Literal['PROPERTY', 'SENSOR', 'CONTROL', 'DIAGNOSTIC'],
- parameters : dict[str, Parameter],
- output : dict[str, Output],
- responses : dict[str, Response]
Parameters
- Name
self
- Type
- Default
- Description
- Name
client
- Type
- Client
- Default
- Description
- Name
id
- Type
- str
- Default
- = ''
- Description
- Name
name
- Type
- str
- Default
- = ''
- Description
- Name
type
- Type
- typing.Literal['PROPERTY', 'SENSOR', 'CONTROL', 'DIAGNOSTIC']
- Default
- = 'CONTROL'
- Description
- Name
parameters
- Type
- dict[str, Parameter]
- Default
- Description
- Name
output
- Type
- dict[str, Output]
- Default
- Description
- Name
responses
- Type
- dict[str, Response]
- Default
- Description
_parse_parameters(self, parameters : dict) -> dict
Parameters
- Name
self
- Type
- Default
- Description
- Name
parameters
- Type
- dict
- Default
- Description
Response
- Type
- dict
- Description
_parse_output(self, outputs : dict) -> None
Parameters
- Name
self
- Type
- Default
- Description
- Name
outputs
- Type
- dict
- Default
- Description
_parse_responses(self, responses : dict) -> None
Parameters
- Name
self
- Type
- Default
- Description
- Name
responses
- Type
- dict
- Default
- Description
_parse_data_type(self, value : typing.Optional[typing.Any], schema : StructureElementModel) -> typing.Union[str, int, float, bool, bytes, date, unitelabs.sdk.datetime.time, unitelabs.sdk.datetime.datetime, list, dict]
Parameters
- Name
self
- Type
- Default
- Description
- Name
value
- Type
- typing.Optional[typing.Any]
- Default
- Description
- Name
schema
- Type
- StructureElementModel
- Default
- Description
Response
- Type
- typing.Union[str, int, float, bool, bytes, date, unitelabs.sdk.datetime.time, unitelabs.sdk.datetime.datetime, list, dict]
- Description
_to_data_type(self, value : typing.Optional[typing.Any], schema : StructureElementModel) -> typing.Union[str, int, float, bool, bytes, date, unitelabs.sdk.datetime.time, unitelabs.sdk.datetime.datetime, list, dict]
Parameters
- Name
self
- Type
- Default
- Description
- Name
value
- Type
- typing.Optional[typing.Any]
- Default
- Description
- Name
schema
- Type
- StructureElementModel
- Default
- Description
Response
- Type
- typing.Union[str, int, float, bool, bytes, date, unitelabs.sdk.datetime.time, unitelabs.sdk.datetime.datetime, list, dict]
- Description
Attributes
- Name
type
- Type
- typing.Literal[]
- Value
= 'SENSOR'
- Description
- Name
client
- Type
- Client
- Value
= None
- Description
- Name
id
- Type
- str
- Value
= ''
- Description
- Name
name
- Type
- str
- Value
= ''
- Description
- Name
parameters
- Type
- dict[str, Parameter]
- Value
= dataclasses.field(repr=False, default_factory=dict)
- Description
- Name
output
- Type
- dict[str, Output]
- Value
= dataclasses.field(repr=False, default_factory=dict)
- Description
- Name
responses
- Type
- dict[str, Response]
- Value
= dataclasses.field(repr=False, default_factory=dict)
- Description
AuthError
- MRO
Methods
__init__(
- self,
- error : typing.Literal['invalid_request', 'invalid_client', 'invalid_grant', 'invalid_scope', 'unauthorized_client', 'unsupported_grant_type'],
- error_description : typing.Optional[str],
- error_uri : typing.Optional[str]
Parameters
- Name
self
- Type
- Default
- Description
- Name
error
- Type
- typing.Literal['invalid_request', 'invalid_client', 'invalid_grant', 'invalid_scope', 'unauthorized_client', 'unsupported_grant_type']
- Default
- Description
- Name
error_description
- Type
- typing.Optional[str]
- Default
- = None
- Description
- Name
error_uri
- Type
- typing.Optional[str]
- Default
- = None
- Description
Attributes
- Name
error
- Type
- Value
= error
- Description
- Name
error_description
- Type
- Value
= error_description
- Description
- Name
error_uri
- Type
- Value
= error_uri
- Description
ClientError
Base class for all exceptions that may occur when communicating with a connector.
- Bases
- Exception
ExecutionError
Failed to execute an aciton on the connector.
Methods
__init__(
- self,
- identifier : str,
- message : str
Parameters
- Name
self
- Type
- Default
- Description
- Name
identifier
- Type
- str
- Default
- = ''
- Description
- Name
message
- Type
- str
- Default
- = ''
- Description
Attributes
- Name
identifier
- Type
- Value
= identifier
- Description
- Name
message
- Type
- Value
= message
- Description
ParameterError
Failed to validate the given parameters.
Methods
__init__(
- self,
- parameter : str,
- message : str
Parameters
- Name
self
- Type
- Default
- Description
- Name
parameter
- Type
- str
- Default
- Description
- Name
message
- Type
- str
- Default
- Description
Attributes
- Name
parameter
- Type
- Value
= parameter
- Description
- Name
message
- Type
- Value
= message
- Description
TransportError
Base class for all exceptions that occur at the level of the transport.
Module
- Decorators
- dataclasses.dataclass
Methods
Attributes
Service
- Decorators
- dataclasses.dataclass
Methods
Attributes
Subscription
- Bases
- collections.abc.AsyncIterator[T]
- Decorators
- dataclasses.dataclass
Methods
__init__(
- self,
- subscribe : typing.Callable[[], collections.abc.AsyncIterator[tuple[str, T]]],
- retry : bool,
- retry_delay : typing.Union[int, float],
- timeouts : typing.Union[int, float, None],
- pipes : list[typing.Callable],
- queue : asyncio.Queue[T],
- _state : SubscriptionState,
- _current : typing.Union[T, None]
Parameters
- Name
self
- Type
- Default
- Description
- Name
subscribe
- Type
- typing.Callable[[], collections.abc.AsyncIterator[tuple[str, T]]]
- Default
- Description
- Name
retry
- Type
- bool
- Default
- = False
- Description
- Name
retry_delay
- Type
- typing.Union[int, float]
- Default
- = 0
- Description
- Name
timeouts
- Type
- typing.Union[int, float, None]
- Default
- = None
- Description
- Name
pipes
- Type
- list[typing.Callable]
- Default
- Description
- Name
queue
- Type
- asyncio.Queue[T]
- Default
- Description
- Name
_state
- Type
- SubscriptionState
- Default
- Description
- Name
_current
- Type
- typing.Union[T, None]
- Default
- = None
- Description
__post_init__(self) -> None
transition_to(self, state : SubscriptionState) -> None
Parameters
- Name
self
- Type
- Default
- Description
- Name
state
- Type
- SubscriptionState
- Default
- Description
aopen(self) -> None
aclose(self) -> None
pipe(self, pipe : typing.Callable[[], R]) -> Subscription[R]
Parameters
- Name
self
- Type
- Default
- Description
- Name
pipe
- Type
- typing.Callable[[], R]
- Default
- Description
Response
- Type
- Subscription[R]
- Description
__aiter__(self) -> None
__aexit__(
- self,
- exc_type : typing.Optional[typing.Type[BaseException]],
- exc_value : typing.Optional[BaseException],
- traceback : typing.Optional[types.TracebackType]
Parameters
- Name
self
- Type
- Default
- Description
- Name
exc_type
- Type
- typing.Optional[typing.Type[BaseException]]
- Default
- = None
- Description
- Name
exc_value
- Type
- typing.Optional[BaseException]
- Default
- = None
- Description
- Name
traceback
- Type
- typing.Optional[types.TracebackType]
- Default
- = None
- Description
Attributes
- Name
subscribe
- Type
- typing.Callable[[], collections.abc.AsyncIterator[tuple[str, T]]]
- Value
= None
- Description
- Name
retry
- Type
- bool
- Value
= False
- Description
- Name
retry_delay
- Type
- typing.Union[int, float]
- Value
= 0
- Description
- Name
timeouts
- Type
- typing.Union[int, float, None]
- Value
= None
- Description
- Name
pipes
- Type
- list[typing.Callable]
- Value
= dataclasses.field(default_factory=list)
- Description
- Name
queue
- Type
- asyncio.Queue[T]
- Value
= dataclasses.field(default_factory=asyncio.Queue)
- Description
- Name
_state
- Type
- SubscriptionState
- Value
= dataclasses.field(default_factory=Pending)
- Description
- Name
_current
- Type
- typing.Union[T, None]
- Value
= None
- Description
- Name
current
- Type
- T
- Value
= None
- Description