Source code for tbot_contrib.connector.pyserial

import abc
import os
import typing
import contextlib

import tbot.error
from tbot.machine import channel, connector, linux

    import serial
except ImportError:
    raise tbot.error.TbotException(
The PyserialConnector requires pyserial to be installed:

    pip3 install pyserial"""

__all__ = ("PyserialConnector",)


_AnyPath = typing.Union[str, os.PathLike]

class PyserialChannelIO(channel.ChannelIO):
    def __init__(self, port: _AnyPath, baudrate: int) -> None:
        self.serial = serial.Serial(os.fspath(port), baudrate=baudrate, exclusive=True)
        self.serial.timeout = 0

    def write(self, buf: bytes) -> int:
        if self.closed:
            raise channel.ChannelClosedException(), buf, True)
        return len(buf)

    def read(self, n: int, timeout: typing.Optional[float] = None) -> bytes:
        if self.closed:
            raise channel.ChannelClosedException()

            # Block for the first byte only
            self.serial.timeout = timeout
            first =

            if first == b"":
                raise TimeoutError()

            assert len(first) == 1, f"Result is longer than expected ({first!r})!"
            self.serial.timeout = 0

        remaining = b""
        if n > 1:
            # If there is more, read it now (non-blocking)
            remaining =, READ_CHUNK_SIZE) - 1)

        return, first + remaining, False)

    def close(self) -> None:
        if self.closed:
            raise channel.ChannelClosedException()


    def fileno(self) -> int:
        return self.serial.fileno()

    def closed(self) -> bool:
        return not self.serial.is_open

    def update_pty(self, columns: int, lines: int) -> None:
        tbot.log.warning("Cannot update pty for pyserial connections")

class PyserialChannel(channel.Channel):
    def __init__(self, port: _AnyPath, baudrate: int) -> None:
        super().__init__(PyserialChannelIO(port, baudrate))

M = typing.TypeVar("M", bound="PyserialConnector")

[docs]class PyserialConnector(connector.Connector): """ Connect to a console connected to **localhost** (i.e. the host tbot is running on) using `pyserial`_. **Example**: .. code-block:: python from tbot_contrib.connector import pyserial class MyBoard(pyserial.PyserialConnector, board.Board): serial_port = "/dev/ttyUSB0" baudrate = 57600 """ @property @abc.abstractmethod def serial_port(self) -> _AnyPath: """ Serial port to connect to. Keep in mind that this path is **not** on the lab-host but on the localhost. """ raise tbot.error.AbstractMethodError() baudrate = 115200 """ Baudrate of the serial line. """ def __init__(self, host: typing.Optional[linux.LinuxShell] = None) -> None: if not isinstance(host, connector.SubprocessConnector): raise tbot.error.TbotException( f"PyserialConnector can only use a localhost host (got {host!r})!" ) = host @classmethod @contextlib.contextmanager def from_context( cls: typing.Type[M], context: "tbot.Context" ) -> typing.Iterator[M]: with context() as ctx: lh = ctx.request(tbot.role.LabHost) m: M = ctx.enter_context(cls(lh)) # type: ignore yield m def _connect(self) -> channel.Channel: return PyserialChannel(self.serial_port, self.baudrate) def clone(self) -> typing.NoReturn: raise tbot.error.TbotException("Can't clone a (py)serial connection")