Hardware Communication
Getting Started with the UniteLabs Omnibus Library for Hardware Communication
The Omnibus package is a Python library that simplifies hardware and software communication by providing an abstraction layer over various communication protocols (RS-232/485, USB, TCP/IP, and more). With the Omnibus, developers can seamlessly interact with devices without needing to delve into the complexities of specific communication methods.
This tutorial will walk you through the process of setting up the Omnibus library, establishing a connection with your hardware device, and performing basic read/write operations. By the end of this tutorial, you'll be ready to integrate the Omnibus into your projects and start communicating with your hardware devices efficiently.
Prerequisites
- A hardware device capable of serial communication.
- The communication specification of that device.
- Python 3 installed on your system.
Step 1: Installing the Omnibus Library
First, you need to install the Omnibus
library. You can use any Python package manager of your choice.
Open your terminal or command prompt and run the following commands. If you start from scratch, create a new project first.
poetry new my-app --src
cd my-app
Then setup and install the dependency in your current project.
poetry source add --priority=supplemental unitelabs \
https://gitlab.com/api/v4/groups/1009252/-/packages/pypi/simple
poetry add --source unitelabs "unitelabs-bus[serial]"
poetry install
This will download and install the Omnibus
library, making it available for use in your Python scripts.
Omnibus
library, e.g. unitelabs-bus[usb]
.unitelabs-bus
package to unitelabs-omnibus
. Additionally, we will distribute the package to PyPi to make it easier to install.Step 2: Connecting Your Hardware
Connect your hardware device to your computer using the appropriate serial connection (e.g., USB-to-serial adapter). Note the COM port (on Windows) or the device path (on Linux/Mac) that the hardware is connected to. You’ll need this information to establish a connection in your Python code.
To read out all available ports, you can use our device manager with:
poetry run python -m unitelabs.bus.utils.device_manager
Step 3: Writing Your First Python Script
Now, let's write a Python script to open a serial connection and communicate with your hardware device.
- Import the Omnibus Library
Start by importing the necessary classes and methods from the Omnibus library:from unitelabs.bus import Protocol, create_serial_connection, ByteCommand
- Initialize the Connection
Next, configure the serial connection. You need to specify the COM port (or device path), baud rate, and other parameters like timeout.protocol = Protocol( create_serial_connection, port='/dev/ttyUSB0', # Replace with your port name, e.g. '/dev/cu.usbmodem3210' on macOS, 'COM3' on Windows baudrate=9600, # Set the baud rate to match your device timeout=1, # Set a timeout for read operations ) await protocol.open()
In order for the protocol to establish a connection to the device, you need to call theopen()
method. - Send Data to the Device
To transmit data to your hardware device, you can use theexecute()
method.protocol.execute(ByteCommand(b"Hello, Device!\r\n"))
This line sends the binary data b"Hello, Device!\r\n" to your serial device. - Receive Data from the Device
Theexecute()
method is also responsible for returning the corresponding response sent from the device. To read data sent from the device, await the return of theexecute()
method. This reads data from the serial port until a newline character is encountered:response = await protocol.execute(ByteCommand(b"Hello, Device!\r\n")) print('Received:', response)
Theexecute()
method now waits until it receives a response or until it times out. If it reads a line of data from the device then it prints it to the console. - Close the Connection
After communication is complete, it’s good practice to close the connection to free the resources:protocol.close()
Step 4: Running the Script
In order to run asynchronous code, you need to wrap it in an async
method and call that via asyncio.run()
:
import asyncio
from unitelabs.bus import Protocol, create_serial_connection, ByteCommand
async def main():
protocol = Protocol(
create_serial_connection,
port='/dev/ttyUSB0', # Replace with your port name, e.g. '/dev/cu.usbmodem3210' on macOS, 'COM3' on Windows
baudrate=9600, # Set the baud rate to match your device
timeout=1, # Set a timeout for read operations
)
await protocol.open()
response = await protocol.execute(ByteCommand(b"Hello, Device!\r\n"))
print('Received:', response)
protocol.close()
if __name__ == "__main__":
asyncio.run(main())
Save your script (e.g., as src/my_app/__main__.py
) and run it from your terminal:
poetry run python -m my_app
If everything is set up correctly, the script will open the serial connection, send a message to your hardware device, receive a response, and print it to the console.
Step 5: Troubleshooting Common Issues
Here are a few tips if you run into issues:
- Port not found: Double-check the port name and ensure your device is properly connected.
- Incorrect baud rate: Ensure that the baud rate in your script matches the baud rate configured on your hardware.
- Timeouts: If you’re not receiving data, increase the timeout parameter or check the connection.
To receive a more verbose output, you can increase the log level. Add the following at the top of your script:
import logging
logging.basicConfig(level=logging.DEBUG)
Step 6: Expanding Your Communication
Now that you've mastered the basics of using the Omnibus library to establish a connection and perform simple read/write operations, it's time to explore more advanced use cases. The Omnibus library is designed with flexibility in mind, allowing you to extend its functionality to suit specific communication requirements or device protocols.
1. Creating Custom Protocols
In many cases, you want to create a custom protocol to encapsulate specific communication patterns or device commands. A protocol in Omnibus bundles all the functions your hardware device offers, presenting them in a human-readable and easy-to-use way.
Example: Creating a Custom Protocol
Let’s start by creating a custom protocol class that inherits from Protocol. This class can encapsulate common commands and responses your device supports:
class MyCustomProtocol(Protocol):
def __init__(self, port: str) -> None:
super().__init__(create_serial_connection, port=port, baudrate=9600, timeout=1)
async def hello(self) -> str:
return await self.execute(SerialCommand("HELLO"))
async def ping(self) -> str:
return await self.execute(SerialCommand("PING"))
In this example, MyCustomProtocol
defines two methods, hello
and ping
, that communicate with the device using specific commands. This setup makes it easy to call these methods without worrying about the underlying serial communication details.
2. Creating Custom Commands
When communicating with hardware, you might encounter devices that require custom message formats or specific error-handling routines. In such cases, you can create a custom command by subclassing the Command
class.
Example: Defining a Custom Command
Let’s create a custom command that handles a specific message format and centralizes error handling:
class SerialCommand(Command[str, str]):
def __init__(
self,
message: str,
read_terminator: bytes = b"\r\n",
write_terminator: bytes = b"\r\n",
encoding: str = "ascii",
**kwargs,
) -> None:
super().__init__(message, **kwargs)
self._read_terminator = read_terminator
self._write_terminator = write_terminator
self._encoding = encoding
def serialize(self, message: typing.Optional[str] = None) -> bytes:
message = message or self.message
msg = str.encode(message, encoding=self._encoding)
if not msg.endswith(self._write_terminator):
msg += self._write_terminator
return msg
def deserialize(self, response: typing.Optional[bytes]) -> str:
if response is None:
return self.result()
return response.strip(self._read_terminator).decode(encoding=self._encoding)
def _validate_response(self, data: bytes) -> bool:
return data.endswith(self._read_terminator)
In this custom SerialCommand
class, the serialize
method formats outgoing messages according to the device's expectations, appending a terminator if needed. The deserialize
method processes incoming responses, stripping any terminators and decoding the message. The _validate_response
method determines when a message coming from the device is considered finished; here we expect all responses from our serial device to end with the _read_terminator
and thus define a valid response by its presence.
Next Steps
Conclusion
This tutorial introduced you to the basics of using the Omnibus library for hardware communication in Python. With these foundational steps, you can start building sophisticated applications that interact with various serial-enabled devices. Whether you're working on IoT projects, embedded systems, or data acquisition, Omnibus provides a flexible and robust framework to meet your needs. Happy coding!