Loading...
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 | # SPDX-License-Identifier: GPL-2.0 # Copyright (c) 2015-2016, NVIDIA CORPORATION. All rights reserved. """ Logic to spawn a sub-process and interact with its stdio. This is used by console_board and console_sandbox - console_board (for real hardware): Spawns 'u-boot-test-console' and provides access to the console input/output - console_sandbox (for sandbox): Spawns 'u-boot' and provides access to the console input/output In both cases, Spawn provides a way to send console commands and receive the response from U-Boot. An expect() function helps to simplify things for the higher levels. The code in this file should be generic, i.e. not specific to sandbox or real hardware. Within the console_*py files, self.p is used to refer to the Spawn() object, perhaps short for 'process'. """ import io import os import pty import signal import select import sys import termios import time import traceback # Character to send (twice) to exit the terminal EXIT_CHAR = 0x1d # FS (Ctrl + ]) class Spawn: """Represents the stdio of a freshly created sub-process. Commands may be sent to the process, and responses waited for. """ def __init__(self, args, cwd=None, decode_signal=False): """Spawn (fork/exec) the sub-process. Args: args (list of str): processs arguments. argv[0] is the command to execute. cwd (str or None): the directory to run the process in, or None for no change. decode_signal (bool): True to indicate the exception number when something goes wrong Returns: Nothing. """ self.decode_signal = decode_signal self.waited = False self.exit_code = 0 self.exit_info = '' (self.pid, self.fd) = pty.fork() if self.pid == 0: try: # For some reason, SIGHUP is set to SIG_IGN at this point when # run under "go" (www.go.cd). Perhaps this happens under any # background (non-interactive) system? signal.signal(signal.SIGHUP, signal.SIG_DFL) if cwd: os.chdir(cwd) os.execvp(args[0], args) except: print('CHILD EXECEPTION:') traceback.print_exc() finally: os._exit(255) old = None try: isatty = False try: isatty = os.isatty(sys.stdout.fileno()) # with --capture=tee-sys we cannot call fileno() except io.UnsupportedOperation: pass if isatty: new = termios.tcgetattr(self.fd) old = new new[3] = new[3] & ~(termios.ICANON | termios.ISIG) new[3] = new[3] & ~termios.ECHO new[6][termios.VMIN] = 0 new[6][termios.VTIME] = 0 termios.tcsetattr(self.fd, termios.TCSANOW, new) self.poll = select.poll() self.poll.register(self.fd, select.POLLIN | select.POLLPRI | select.POLLERR | select.POLLHUP | select.POLLNVAL) except: if old: termios.tcsetattr(self.fd, termios.TCSANOW, old) self.close() raise def kill(self, sig): """Send unix signal "sig" to the child process. Args: sig (int): The signal number to send """ os.kill(self.pid, sig) def checkalive(self): """Determine whether the child process is still running. Returns: tuple: True if process is alive, else False 0 if process is alive, else exit code of process string describing what happened ('' or 'status/signal n') """ if self.waited: return False, self.exit_code, self.exit_info w = os.waitpid(self.pid, os.WNOHANG) if w[0] == 0: return True, 0, 'running' status = w[1] if os.WIFEXITED(status): self.exit_code = os.WEXITSTATUS(status) self.exit_info = f'status {self.exit_code}' elif os.WIFSIGNALED(status): signum = os.WTERMSIG(status) self.exit_code = -signum self.exit_info = f'signal {signum} ({signal.Signals(signum).name})' self.waited = True return False, self.exit_code, self.exit_info def isalive(self): """Determine whether the child process is still running. Returns: bool: indicating whether process is alive """ return self.checkalive()[0] def send(self, data): """Send data to the sub-process's stdin. Args: data (str): The data to send to the process. """ os.write(self.fd, data.encode(errors='replace')) def receive(self, num_bytes): """Receive data from the sub-process's stdin. Args: num_bytes (int): Maximum number of bytes to read Returns: str: The data received Raises: ValueError if U-Boot died """ try: c = os.read(self.fd, num_bytes).decode(errors='replace') except OSError as err: # With sandbox, try to detect when U-Boot exits when it # shouldn't and explain why. This is much more friendly than # just dying with an I/O error if self.decode_signal and err.errno == 5: # I/O error alive, _, info = self.checkalive() if alive: raise err raise ValueError(f'U-Boot exited with {info}') from err raise return c def close(self): """Close the stdio connection to the sub-process. This also waits a reasonable time for the sub-process to stop running. Args: None. Returns: str: Type of closure completed """ # For Labgrid-sjg, ask it is exit gracefully, so it can transition the # board to the final state (like 'off') before exiting. if os.environ.get('USE_LABGRID_SJG'): self.send(chr(EXIT_CHAR) * 2) # Wait about 10 seconds for Labgrid to close and power off the board for _ in range(100): if not self.isalive(): return 'normal' time.sleep(0.1) # That didn't work, so try closing the PTY os.close(self.fd) for _ in range(100): if not self.isalive(): return 'break' time.sleep(0.1) return 'timeout' |