tbot.machine.channel
¶
The channel
module provides the low-level
implementation if interaction with machine. A channel is analogous to a serial
connection: You can send and receive bytes. This ‘raw’ interface is abstracted
as a ChannelIO
, for which there are multiple
Implementations.
Channel
class¶
The channel class has multiple different interfaces at different levels of abstraction which allow interacting with the underlying ‘raw’ channel. These are:
Byte-level interface: The
write()
andread()
methods allow the most basic interaction. For a bit more ergonomic use, there is also aread_iter()
.Log-stream interface: For interaction with the
tbot.log
module and to allow capturing the entire received data in parallel to interfacing with the remote, channels provide thewith_stream()
context manager.Death-strings: ‘Death-strings’ are strings (or regular expressions), which should never appear on the channel and when they do, an exception will be raised. In an embedded setting, it might make sense to add the kernel-panic header as a death-string, for example. For this purpose, the
with_death_string()
context-manager is provided.pexpect interface: tbot’s channel implementation also has methods mimicking the pexpect interface. These are:
Prompt handling: The
read_until_prompt()
method allows waiting for a prompt string to appear. A global prompt-string can be configured withwith_prompt()
.Borrowing & taking: To model ownership of the channel, the
borrow()
context-handler allows creating a copy of the channel which temporarily holds exclusive access to it. This ensures that all references to the old channel are blocked for the duration of the borrow.take()
does the same thing but permanently moves ownership.Interactive Mode: Using
attach_interactive()
, an interactive session with this channel can be started.
- class tbot.machine.channel.Channel(channel_io: ChannelIO)[source]¶
Bases:
AbstractContextManager
- slow_send_delay: float | None¶
There are unfortunately some serial consoles which cannot process large amounts of data at once. The receive buffer on the other end overflows and characters get dropped.
To mitigate this, a
Channel
can be configured to send chunks of data with a delay. To do this, set theslow_send_delay
to a (fractional) number of seconds to wait between chunks andslow_send_chunksize
to the maximum size of each chunk to send.Example:
# Configuration for a machine which needs slow sending class BoardWithSlowSend(connector.ConsoleConnector, board.Board): def connect(self, mach): ch = mach.open_channel("picocom", "-b", "115200", "-q", "/dev/ttyUSB0") ch.slow_send_delay = 0.01 ch.slow_send_chunksize = 32 return ch
New in version 0.9.3.
- slow_send_chunksize: int¶
Maximum size of each chunk to send, when
slow_send_delay
is set. Check its documentation for details.New in version 0.9.3.
- write(buf: bytes, _ignore_blacklist: bool = False) None [source]¶
Write some bytes to this channel.
write()
ensures the whole buffer was written. If this was not possible, it will throw an exception.- Parameters:
buf (bytes) – Buffer with bytes to be written.
- Raises:
ChannelClosedException – If the channel was closed previous to, or during writing.
- read(n: int = -1, timeout: float | None = None) bytes [source]¶
Receive some bytes from this channel.
If
n
is-1
,read()
will wait until at least one byte is available and will then return all available bytes. Otherwise it will wait until exactlyn
bytes could be read. If timeout is not None and expires before this is the case,read()
will raise an exception.Warning
read()
does not ensure that the returned bytes end with the end of a Unicode character boundary. This means, decoding as Unicode can fail and code usingread()
should be prepared to deal with this case.
- read_iter(max: int = 9223372036854775807, timeout: float | None = None) Iterator[bytes] [source]¶
Iterate over chunks of bytes read from the channel.
read_iter
reads at mostmax
bytes from the channel before the iterator is exhausted. Iftimeout
is notNone
and expires beforemax
bytes could be read, the next iteration attempt will raise an exception.
- with_stream(stream: TextIO, show_prompt: bool = True) Iterator[Channel] [source]¶
Attach a stream to this channel.
All data read from the channel will also be sent to the stream. This can be used, for example, to capture the entire boot-log of a board.
with_stream
should be used as a context-manager:import tbot with tbot.log.message("Output: ") as ev, chan.with_stream(ev): # During this context block, output is captured into `ev` ...
- Parameters:
stream (io.TextIOBase) – The stream to attach.
show_prompt (bool) – Whether the currently configured prompt should also be sent to the stream if detected.
- close() None [source]¶
Close this channel.
The following is always true:
channel.close() assert channel.closed
- property closed: bool¶
Whether this channel was already closed.
Warning
A
channel.write()
immediately after checkingchannel.closed
might still fail in the unlucky case where the remote end closed the channel just in between the two calls.
- send(s: str | bytes, read_back: bool = False, timeout: float | None = None, _ignore_blacklist: bool = False) None [source]¶
Send data to this channel.
Send
s
to this channel and optionally read it back (to not clobber the next read).
- sendline(s: str | bytes = '', read_back: bool = False, timeout: float | None = None) None [source]¶
Send data to this channel and terminate with a newline.
Send
s
and a newline (\r
) to this channel and optionally read it back (to not clobber the next read).
- sendcontrol(c: str) None [source]¶
Send a control-character to this terminal.
c
is the keyboard key which would need to be pressed (for exampleC
forCTRL-C
). See C0 and C1 control codes for more info.- Parameters:
c (str) – Control character to send.
- readline(timeout: float | None = None, lineending: str | bytes = '\r\n') str [source]¶
Read until the next line ending.
Example:
ch.sendline("echo Hello; echo World", read_back=True) assert ch.readline() == "Hello\n" assert ch.readline() == "World\n"
- expect(patterns: bytes | BoundedPattern | Pattern | str | List[bytes | BoundedPattern | Pattern | str], timeout: float | None = None) ExpectResult [source]¶
Wait for a pattern to appear in the incoming data.
This method is similar to pexpect’s
expect()
although there are a few important differences.expect()
will read ahead in the input stream until one of the patterns inpatterns
matches or, if notNone
, thetimeout
expires. It might read further than the given pattern, if the input contains follow-up bytes in the same chunk of data.Different to pexpect, the results are available as an Expect Result (
ExpectResult
) which is returned on match.- Parameters:
patterns – Pattern(s) to wait for. Can be either a single pattern or a list of patterns. See Search Strings for more info about which types can be passed as patterns.
timeout (None,float) – Optional timeout.
- with_prompt(prompt_in: bytes | BoundedPattern | Pattern | str) Iterator[Channel] [source]¶
Set the prompt for this channel during a context.
with_prompt
is a context-manager that sets the prompt for this channel for the duration of a context:with chan.with_prompt("=> "): chan.sendline("echo Foo", read_back=True) # Waits for `=> ` chan.read_until_prompt()
- Parameters:
prompt (ConvenientSearchString) – The new prompt pattern/string. See Search Strings for more info.
- read_until_prompt(prompt: bytes | BoundedPattern | Pattern | str | None = None, timeout: float | None = None) str [source]¶
Read until prompt is detected.
Read from the channel until the configured prompt string is detected. All data captured up until the prompt is returned, decoded as UTF-8. If
prompt
isNone
, the prompt which was set usingtbot.machine.channel.Channel.with_prompt()
is used.- Parameters:
prompt (ConvenientSearchString) – The prompt to read up to. It must appear as the very last readable data in the channel’s data stream. See Search Strings for more info about which types can be passed for this parameter.
timeout (float) – Optional timeout. If
timeout
is set and expires before the prompt was detected,read_until_prompt
raises an exception.
- Return type:
- Returns:
UTF-8 decoded string of all bytes read up to the prompt.
- read_until_timeout(timeout: float | None) str [source]¶
Read until the given timeout expires.
This method will only return after the given timeout expires. This can be useful when waiting for something to start up, but when all console output should still become immediately visible on stdout.
Another use-case is waiting for a death-string to appear or a run-command to exit. In those cases, pass
None
as timeout to make this method wait indefinitely.
- borrow() Iterator[Channel] [source]¶
Temporarily borrow this channel for the duration of a context.
Example:
with chan.borrow() as chan2: # `chan` cannot be accessed inside this context chan2.sendline("Hello World") # `chan` can be accessed again here chan.sendintr()
- take() Channel [source]¶
Move ownership of this channel.
All existing references to this channel will no longer be accessible after calling
take()
. Use this to mark transitions of a channel into a new (irreversible) context. For example, when a board boots from U-Boot to Linux, U-Boot is no longer accessible.
- attach_interactive(end_magic: str | bytes | None = None, ctrld_exit: bool = False) None [source]¶
Connect tbot’s terminal to this channel.
Allows the user to interact directly with whatever this channel is connected to.
The interactive session can be exited at any point by pressing
CTRL+]
three times within 1 second.- Parameters:
Changed in version 0.10.1:
The escape sequence is now “Press
CTRL-]
three times within 1 second”.The
ctrld_exit
parameter was added to restore the old “CTRL-D
to exit” behavior.
Search Strings¶
A lot of channel methods are marked to take a ConvenientSearchString
:
- class tbot.machine.channel.ConvenientSearchString¶
This is just an ‘umbrella’ above a few different possible types which can be passed in here:
bytes
: Will be matched literally.str
: Will be encoded as UTF-8 and the resulting bytes are matched literally.Compiled Regex Pattern: An
re
byte-pattern with a bounded length can also be passed in. The bounded length is required for efficient searching. This means, instead of.*
you need to match a maximum number of chars like.{0,80}
.
Example:
import re
# A `str` or `bytes` object will be matched literally
with chan.with_death_string("Kernel panic - not syncing: "):
...
# A regexp pattern (byte-pattern!)
pat = re.compile(b"[a-zA-Z0-9-]{1,80} login: ")
chan.read_until_prompt(pat)
Expect Result¶
- class tbot.machine.channel.channel.ExpectResult(i: int, match: str | Match[bytes], before: str, after: str)[source]¶
Bases:
tuple
Result from a call to
expect()
.Create new instance of ExpectResult(i, match, before, after)
Implementations¶
ChannelIO
interface¶
This is the interface each channel implementation needs to implement. The actual channel class is just a wrapper ontop of it.
- class tbot.machine.channel.ChannelIO(*args, **kwds)[source]¶
Bases:
AbstractContextManager
- abstract write(buf: bytes) int [source]¶
Write some bytes to this channel.
write()
returns the number of bytes written. This number might be lower thanlen(buf)
.- Parameters:
buf (bytes) – Buffer with bytes to be written.
- Raises:
ChannelClosedException – If the channel was closed previous to, or during writing.
- abstract read(n: int, timeout: float | None = None) bytes [source]¶
Receive some bytes from this channel.
Return at most
n
bytes, but at least 1 (ifn
is not0
). Raise an exception iftimeout
is notNone
and expires before data was received.
- abstract close() None [source]¶
Close this channel.
The following invariant must be upheld by an implementation:
channel.close() assert channel.closed