Tutorial

Hardware Communication

Getting Started with the UniteLabs Omnibus Library for Hardware Communication

The Omnibus package is a Python library that simplifies hardware and software communication by providing an abstraction layer over various communication protocols (RS-232/485, USB, TCP/IP, and more). With the Omnibus, developers can seamlessly interact with devices without needing to delve into the complexities of specific communication methods.

This tutorial will walk you through the process of setting up the Omnibus library, establishing a connection with your hardware device, and performing basic read/write operations. By the end of this tutorial, you'll be ready to integrate the Omnibus into your projects and start communicating with your hardware devices efficiently.

Prerequisites

  • A hardware device capable of serial communication.
  • The communication specification of that device.
  • Python 3 installed on your system.

Step 1: Installing the Omnibus Library

First, you need to install the Omnibus library. You can use any Python package manager of your choice.

Open your terminal or command prompt and run the following commands. If you start from scratch, create a new project first.

Terminal
poetry new my-app --src
cd my-app

Then setup and install the dependency in your current project.

Terminal
poetry source add --priority=supplemental unitelabs \
  https://gitlab.com/api/v4/groups/1009252/-/packages/pypi/simple
poetry add --source unitelabs "unitelabs-bus[serial]"
poetry install

Open your terminal or command prompt and run the following commands. If you start from scratch, create a new project first.

Terminal
mkdir my-app
cd my-app
python -m venv .venv
.venv/bin/pip install -U pip

Then setup and install the dependency in your current project.

Terminal
.venv/bin/pip install "unitelabs-bus[serial]" \
  --index-url https://gitlab.com/api/v4/groups/1009252/-/packages/pypi/simple

This will download and install the Omnibus library, making it available for use in your Python scripts.

Note: If your device supports other protocols than serial, install the respective extra with the Omnibus library, e.g. unitelabs-bus[usb].
Coming Soon: We are currently in the process of renaming the unitelabs-bus package to unitelabs-omnibus. Additionally, we will distribute the package to PyPi to make it easier to install.

Step 2: Connecting Your Hardware

Connect your hardware device to your computer using the appropriate serial connection (e.g., USB-to-serial adapter). Note the COM port (on Windows) or the device path (on Linux/Mac) that the hardware is connected to. You’ll need this information to establish a connection in your Python code.

To read out all available ports, you can use our device manager with:

poetry run python -m unitelabs.bus.utils.device_manager
Coming Soon: We will provide a fully fledged Command Line Interface to easily identify connected devices and even interact with them.

Step 3: Writing Your First Python Script

Now, let's write a Python script to open a serial connection and communicate with your hardware device.

  1. Import the Omnibus Library
    Start by importing the necessary classes and methods from the Omnibus library:
    from unitelabs.bus import Protocol, create_serial_connection, ByteCommand
    
  2. Initialize the Connection
    Next, configure the serial connection. You need to specify the COM port (or device path), baud rate, and other parameters like timeout.
    protocol = Protocol(
        create_serial_connection,
        port='/dev/ttyUSB0',    # Replace with your port name, e.g. '/dev/cu.usbmodem3210' on macOS, 'COM3' on Windows
        baudrate=9600,          # Set the baud rate to match your device
        timeout=1,              # Set a timeout for read operations
    )
    await protocol.open()
    

    In order for the protocol to establish a connection to the device, you need to call the open() method.
  3. Send Data to the Device
    To transmit data to your hardware device, you can use the execute() method.
    protocol.execute(ByteCommand(b"Hello, Device!\r\n"))
    

    This line sends the binary data b"Hello, Device!\r\n" to your serial device.
  4. Receive Data from the Device
    The execute() method is also responsible for returning the corresponding response sent from the device. To read data sent from the device, await the return of the execute() method. This reads data from the serial port until a newline character is encountered:
    response = await protocol.execute(ByteCommand(b"Hello, Device!\r\n"))
    print('Received:', response)
    

    The execute() method now waits until it receives a response or until it times out. If it reads a line of data from the device then it prints it to the console.
  5. Close the Connection
    After communication is complete, it’s good practice to close the connection to free the resources:
    protocol.close()
    

Step 4: Running the Script

In order to run asynchronous code, you need to wrap it in an async method and call that via asyncio.run():

src/my_app/__main__.py
import asyncio

from unitelabs.bus import Protocol, create_serial_connection, ByteCommand


async def main():
    protocol = Protocol(
        create_serial_connection,
        port='/dev/ttyUSB0',    # Replace with your port name, e.g. '/dev/cu.usbmodem3210' on macOS, 'COM3' on Windows
        baudrate=9600,          # Set the baud rate to match your device
        timeout=1,              # Set a timeout for read operations
    )
    await protocol.open()

    response = await protocol.execute(ByteCommand(b"Hello, Device!\r\n"))
    print('Received:', response)

    protocol.close()


if __name__ == "__main__":
    asyncio.run(main())

Save your script (e.g., as src/my_app/__main__.py) and run it from your terminal:

poetry run python -m my_app

If everything is set up correctly, the script will open the serial connection, send a message to your hardware device, receive a response, and print it to the console.

Step 5: Troubleshooting Common Issues

Here are a few tips if you run into issues:

  • Port not found: Double-check the port name and ensure your device is properly connected.
  • Incorrect baud rate: Ensure that the baud rate in your script matches the baud rate configured on your hardware.
  • Timeouts: If you’re not receiving data, increase the timeout parameter or check the connection.

To receive a more verbose output, you can increase the log level. Add the following at the top of your script:

import logging

logging.basicConfig(level=logging.DEBUG)

Step 6: Expanding Your Communication

Now that you've mastered the basics of using the Omnibus library to establish a connection and perform simple read/write operations, it's time to explore more advanced use cases. The Omnibus library is designed with flexibility in mind, allowing you to extend its functionality to suit specific communication requirements or device protocols.

1. Creating Custom Protocols

In many cases, you want to create a custom protocol to encapsulate specific communication patterns or device commands. A protocol in Omnibus bundles all the functions your hardware device offers, presenting them in a human-readable and easy-to-use way.

Example: Creating a Custom Protocol

Let’s start by creating a custom protocol class that inherits from Protocol. This class can encapsulate common commands and responses your device supports:

class MyCustomProtocol(Protocol):
    def __init__(self, port: str) -> None:
        super().__init__(create_serial_connection, port=port, baudrate=9600, timeout=1)

    async def hello(self) -> str:
        return await self.execute(SerialCommand("HELLO"))

    async def ping(self) -> str:
        return await self.execute(SerialCommand("PING"))

In this example, MyCustomProtocol defines two methods, hello and ping, that communicate with the device using specific commands. This setup makes it easy to call these methods without worrying about the underlying serial communication details.

2. Creating Custom Commands

When communicating with hardware, you might encounter devices that require custom message formats or specific error-handling routines. In such cases, you can create a custom command by subclassing the Command class.

Example: Defining a Custom Command

Let’s create a custom command that handles a specific message format and centralizes error handling:

class SerialCommand(Command[str, str]):
    def __init__(
        self,
        message: str,
        read_terminator: bytes = b"\r\n",
        write_terminator: bytes = b"\r\n",
        encoding: str = "ascii",
        **kwargs,
    ) -> None:
        super().__init__(message, **kwargs)
        self._read_terminator = read_terminator
        self._write_terminator = write_terminator
        self._encoding = encoding

    def serialize(self, message: typing.Optional[str] = None) -> bytes:
        message = message or self.message
        msg = str.encode(message, encoding=self._encoding)
        if not msg.endswith(self._write_terminator):
            msg += self._write_terminator

        return msg

    def deserialize(self, response: typing.Optional[bytes]) -> str:
        if response is None:
            return self.result()

        return response.strip(self._read_terminator).decode(encoding=self._encoding)

    def _validate_response(self, data: bytes) -> bool:
        return data.endswith(self._read_terminator)

In this custom SerialCommand class, the serialize method formats outgoing messages according to the device's expectations, appending a terminator if needed. The deserialize method processes incoming responses, stripping any terminators and decoding the message. The _validate_response method determines when a message coming from the device is considered finished; here we expect all responses from our serial device to end with the _read_terminator and thus define a valid response by its presence.

Next Steps

Coming Soon: We are already working on guides about automatic detection of the correct serial port, reconnect and retry behaviour, and other edge cases.

Conclusion

This tutorial introduced you to the basics of using the Omnibus library for hardware communication in Python. With these foundational steps, you can start building sophisticated applications that interact with various serial-enabled devices. Whether you're working on IoT projects, embedded systems, or data acquisition, Omnibus provides a flexible and robust framework to meet your needs. Happy coding!


Copyright © 2024