Loading...
# SPDX-License-Identifier: GPL-2.0
# Copyright (c) 2015-2016, NVIDIA CORPORATION. All rights reserved.

"""
Logic to spawn a sub-process and interact with its stdio.

This is used by console_board and console_sandbox

- console_board (for real hardware): Spawns 'u-boot-test-console' and provides
    access to the console input/output
- console_sandbox (for sandbox): Spawns 'u-boot' and provides access to the
    console input/output

In both cases, Spawn provides a way to send console commands and receive the
response from U-Boot. An expect() function helps to simplify things for the
higher levels.

The code in this file should be generic, i.e. not specific to sandbox or real
hardware.

Within the console_*py files, self.p is used to refer to the Spawn() object,
perhaps short for 'process'.
"""

import io
import os
import pty
import signal
import select
import sys
import termios
import time
import traceback

# Character to send (twice) to exit the terminal
EXIT_CHAR = 0x1d    # FS (Ctrl + ])


class Spawn:
    """Represents the stdio of a freshly created sub-process. Commands may be
    sent to the process, and responses waited for.
    """

    def __init__(self, args, cwd=None, decode_signal=False):
        """Spawn (fork/exec) the sub-process.

        Args:
            args (list of str): processs arguments. argv[0] is the command to
                execute.
            cwd (str or None): the directory to run the process in, or None for
                no change.
            decode_signal (bool): True to indicate the exception number when
                something goes wrong

        Returns:
            Nothing.
        """
        self.decode_signal = decode_signal
        self.waited = False
        self.exit_code = 0
        self.exit_info = ''

        (self.pid, self.fd) = pty.fork()
        if self.pid == 0:
            try:
                # For some reason, SIGHUP is set to SIG_IGN at this point when
                # run under "go" (www.go.cd). Perhaps this happens under any
                # background (non-interactive) system?
                signal.signal(signal.SIGHUP, signal.SIG_DFL)
                if cwd:
                    os.chdir(cwd)
                os.execvp(args[0], args)
            except:
                print('CHILD EXECEPTION:')
                traceback.print_exc()
            finally:
                os._exit(255)

        old = None
        try:
            isatty = False
            try:
                isatty = os.isatty(sys.stdout.fileno())

            # with --capture=tee-sys we cannot call fileno()
            except io.UnsupportedOperation:
                pass
            if isatty:
                new = termios.tcgetattr(self.fd)
                old = new
                new[3] = new[3] & ~(termios.ICANON | termios.ISIG)
                new[3] = new[3] & ~termios.ECHO
                new[6][termios.VMIN] = 0
                new[6][termios.VTIME] = 0
                termios.tcsetattr(self.fd, termios.TCSANOW, new)

            self.poll = select.poll()
            self.poll.register(self.fd, select.POLLIN | select.POLLPRI | select.POLLERR |
                               select.POLLHUP | select.POLLNVAL)
        except:
            if old:
                termios.tcsetattr(self.fd, termios.TCSANOW, old)
            self.close()
            raise

    def kill(self, sig):
        """Send unix signal "sig" to the child process.

        Args:
            sig (int): The signal number to send
        """
        os.kill(self.pid, sig)

    def checkalive(self):
        """Determine whether the child process is still running.

        Returns:
            tuple:
                True if process is alive, else False
                0 if process is alive, else exit code of process
                string describing what happened ('' or 'status/signal n')
        """
        if self.waited:
            return False, self.exit_code, self.exit_info

        w = os.waitpid(self.pid, os.WNOHANG)
        if w[0] == 0:
            return True, 0, 'running'
        status = w[1]

        if os.WIFEXITED(status):
            self.exit_code = os.WEXITSTATUS(status)
            self.exit_info = f'status {self.exit_code}'
        elif os.WIFSIGNALED(status):
            signum = os.WTERMSIG(status)
            self.exit_code = -signum
            self.exit_info = f'signal {signum} ({signal.Signals(signum).name})'
        self.waited = True
        return False, self.exit_code, self.exit_info

    def isalive(self):
        """Determine whether the child process is still running.

        Returns:
            bool: indicating whether process is alive
        """
        return self.checkalive()[0]

    def send(self, data):
        """Send data to the sub-process's stdin.

        Args:
            data (str): The data to send to the process.
        """
        os.write(self.fd, data.encode(errors='replace'))

    def receive(self, num_bytes):
        """Receive data from the sub-process's stdin.

        Args:
            num_bytes (int): Maximum number of bytes to read

        Returns:
            str: The data received

        Raises:
            ValueError if U-Boot died
        """
        try:
            c = os.read(self.fd, num_bytes).decode(errors='replace')
        except OSError as err:
            # With sandbox, try to detect when U-Boot exits when it
            # shouldn't and explain why. This is much more friendly than
            # just dying with an I/O error
            if self.decode_signal and err.errno == 5:  # I/O error
                alive, _, info = self.checkalive()
                if alive:
                    raise err
                raise ValueError(f'U-Boot exited with {info}') from err
            raise
        return c

    def close(self):
        """Close the stdio connection to the sub-process.

        This also waits a reasonable time for the sub-process to stop running.

        Args:
            None.

        Returns:
            str: Type of closure completed
        """
        # For Labgrid-sjg, ask it is exit gracefully, so it can transition the
        # board to the final state (like 'off') before exiting.
        if os.environ.get('USE_LABGRID_SJG'):
            self.send(chr(EXIT_CHAR) * 2)

            # Wait about 10 seconds for Labgrid to close and power off the board
            for _ in range(100):
                if not self.isalive():
                    return 'normal'
                time.sleep(0.1)

        # That didn't work, so try closing the PTY
        os.close(self.fd)
        for _ in range(100):
            if not self.isalive():
                return 'break'
            time.sleep(0.1)

        return 'timeout'