# tbot, Embedded Automation Tool
# Copyright (C) 2019 Harald Seiler
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import contextlib
import re
import shlex
import shutil
import typing
import tbot
import tbot.error
from .. import channel
from . import linux_shell, util, special, path
TBOT_PROMPT = b"TBOT-VEJPVC1QUk9NUFQK$ "
Self = typing.TypeVar("Self", bound="Bash")
[docs]class Bash(linux_shell.LinuxShell):
"""Bourne-again shell."""
@contextlib.contextmanager
def _init_shell(self) -> typing.Iterator:
try:
# Wait for shell to appear
util.wait_for_shell(self.ch)
# Set a blacklist of control characters. These characters are
# known to mess up the state of the shell. They are:
self.ch._write_blacklist = [
0x03, # ETX | End of Text / Interrupt
0x04, # EOT | End of Transmission
0x11, # DC1 | Device Control One (XON)
0x12, # DC2 | Device Control Two
0x13, # DC3 | Device Control Three (XOFF)
0x14, # DC4 | Device Control Four
0x15, # NAK | Negative Acknowledge
0x16, # SYN | Synchronous Idle
0x17, # ETB | End of Transmission Block
0x1A, # SUB | Substitute / Suspend Process
0x1C, # FS | File Separator
0x7F, # DEL | Delete
]
# Set prompt to a known string
#
# The prompt is mangled in a way which will be unfolded by the
# shell. This will ensure tbot won't accidentally read the prompt
# back early if the connection is slow.
self.ch.sendline(
b"PROMPT_COMMAND=''; PS1='"
+ TBOT_PROMPT[:6]
+ b"''"
+ TBOT_PROMPT[6:]
+ b"'",
)
self.ch.prompt = TBOT_PROMPT
self.ch.read_until_prompt()
# Disable history
self.ch.sendline("unset HISTFILE")
self.ch.read_until_prompt()
# Disable line editing
self.ch.sendline("set +o emacs; set +o vi")
self.ch.read_until_prompt()
# Set secondary prompt to ""
self.ch.sendline("PS2=''")
self.ch.read_until_prompt()
# Disable history expansion because it is not always affected by
# quoting rules and thus can mess with parameter values. For
# example, m.exec0("echo", "\n^") triggers the 'quick substitution'
# feature and will return "\n!!:s^\n" instead of the expected
# "\n^\n". As it is not really useful for tbot tests anyway,
# disable all history expansion 'magic characters' entirely.
self.ch.sendline("histchars=''")
self.ch.read_until_prompt()
# Set terminal size
termsize = shutil.get_terminal_size()
self.ch.sendline(f"stty cols {max(80, termsize.columns - 48)}")
self.ch.read_until_prompt()
self.ch.sendline(f"stty rows {termsize.lines}")
self.ch.read_until_prompt()
# Do a sanity check to assert that shell interaction is working
# exactly as expected
util.shell_sanity_check(self)
yield None
finally:
pass
def escape(
self: Self, *args: typing.Union[str, special.Special[Self], path.Path[Self]]
) -> str:
string_args = []
for arg in args:
if isinstance(arg, str):
string_args.append(shlex.quote(arg))
elif isinstance(arg, linux_shell.Special):
string_args.append(arg._to_string(self))
elif isinstance(arg, path.Path):
string_args.append(shlex.quote(arg.at_host(self)))
else:
raise TypeError(f"{type(arg)!r} is not a supported argument type!")
return " ".join(string_args)
def exec(
self: Self, *args: typing.Union[str, special.Special[Self], path.Path[Self]]
) -> typing.Tuple[int, str]:
cmd = self.escape(*args)
with tbot.log_event.command(self.name, cmd) as ev:
self.ch.sendline(cmd, read_back=True)
with self.ch.with_stream(ev, show_prompt=False):
out = self.ch.read_until_prompt()
ev.data["stdout"] = out
retcode = util.posix_fetch_return_code(self.ch, self)
return (retcode, out)
def exec0(
self: Self, *args: typing.Union[str, special.Special[Self], path.Path[Self]]
) -> str:
retcode, out = self.exec(*args)
if retcode != 0:
raise tbot.error.CommandFailure(self, args, repr=self.escape(*args))
return out
def test(
self: Self, *args: typing.Union[str, special.Special[Self], path.Path[Self]]
) -> bool:
retcode, _ = self.exec(*args)
return retcode == 0
def env(
self: Self, var: str, value: typing.Union[str, path.Path[Self], None] = None
) -> str:
return util.posix_environment(self, var, value)
@contextlib.contextmanager
def run(
self: Self, *args: typing.Union[str, special.Special[Self], path.Path[Self]]
) -> typing.Iterator[util.RunCommandProxy]:
def cmd_context(
proxy_ch: util.RunCommandProxy,
) -> typing.Generator[str, None, typing.Tuple[int, str]]:
cmd = self.escape(*args)
with contextlib.ExitStack() as cx:
ev = cx.enter_context(tbot.log_event.command(self.name, cmd))
proxy_ch.sendline(cmd, read_back=True)
cx.enter_context(proxy_ch.with_stream(ev, show_prompt=False))
assert proxy_ch.prompt is not None, "prompt is missing!"
# During the context (before calling terminate), the prompt
# string may never appear in the command output. If it does
# anyway, raise an Exception.
#
# The exception type is dynamically created here to capture
# some variables from the context. This way, the context knows
# of an early exit happening and can behave differently because
# of it.
early_exit = False
class CommandEndedException(util.CommandEndedException):
def __init__(self, string: bytes):
nonlocal early_exit
early_exit = True
proxy_ch._pre_terminate()
super().__init__(string)
def __str__(self) -> str:
return f"The interactive command {cmd!r} exited prematurely."
with proxy_ch.with_death_string(proxy_ch.prompt, CommandEndedException):
yield cmd
output = ""
if not early_exit:
output = proxy_ch.read_until_prompt()
ev.data["stdout"] = ev.getvalue()
retcode = util.posix_fetch_return_code(proxy_ch, self)
return (retcode, output)
yield from util.RunCommandProxy._ctx(
channel=self.ch, context=cmd_context, host=self, args=args
)
def open_channel(
self: Self, *args: typing.Union[str, special.Special[Self], path.Path[Self]]
) -> channel.Channel:
cmd = self.escape(*args)
# Disable the interrupt key in the outer shell
self.ch.sendline("stty -isig", read_back=True)
self.ch.read_until_prompt()
with tbot.log_event.command(self.name, cmd):
# Append `; exit` to ensure the channel won't live past the command
# exiting
self.ch.sendline(cmd + "; exit", read_back=True)
return self.ch.take()
@contextlib.contextmanager
def subshell(
self: Self, *args: typing.Union[str, special.Special[Self], path.Path[Self]]
) -> "typing.Iterator[Bash]":
if args == ():
cmd = "bash --norc --noprofile"
else:
cmd = self.escape(*args)
tbot.log_event.command(self.name, cmd)
self.ch.sendline(cmd)
try:
with self._init_shell():
yield self
finally:
self.ch.sendline("exit")
self.ch.read_until_prompt()
def interactive(self) -> None:
# Generate the endstring instead of having it as a constant
# so opening this files won't trigger an exit
endstr = (
"INTERACTIVE-END-"
+ hex(165_380_656_580_165_943_945_649_390_069_628_824_191)[2:]
)
termsize = shutil.get_terminal_size()
self.ch.sendline(self.escape("stty", "cols", str(termsize.columns)))
self.ch.sendline(self.escape("stty", "rows", str(termsize.lines)))
# Outer shell which is used to detect the end of the interactive session
self.ch.sendline(f"bash --norc --noprofile")
self.ch.sendline(f"PS1={endstr}")
self.ch.read_until_prompt(prompt=endstr)
# Inner shell which will be used by the user
self.ch.sendline("bash --norc --noprofile")
self.ch.sendline("set -o emacs")
prompt = self.escape(
f"\\[\\033[36m\\]{self.name}: \\[\\033[32m\\]\\w\\[\\033[0m\\]> "
)
self.ch.sendline(f"PS1={prompt}")
self.ch.read_until_prompt(prompt=re.compile(b"> (\x1B\\[.{0,10})?"))
self.ch.sendline()
tbot.log.message("Entering interactive shell ...")
self.ch.attach_interactive(end_magic=endstr)
tbot.log.message("Exiting interactive shell ...")
try:
self.ch.sendline("exit")
try:
self.ch.read_until_prompt(timeout=0.5)
except TimeoutError:
# we might still be in the inner shell so let's try exiting again
self.ch.sendline("exit")
self.ch.read_until_prompt(timeout=0.5)
except TimeoutError:
raise Exception("Failed to reacquire shell after interactive session!")