Loading...
# SPDX-License-Identifier: GPL-2.0+
#
"""Common script for build- scripts

"""

import configparser
import contextlib
import os
from pathlib import Path
import shutil
import shlex
import subprocess
import sys
import tempfile
import time

OUR_PATH = os.path.dirname(os.path.realpath(__file__))
OUR1_PATH = os.path.dirname(OUR_PATH)

# Bring in the patman and test libraries (but don't override the first path in
# PYTHONPATH)
sys.path.insert(2, os.path.join(OUR1_PATH, 'tools'))
sys.path.insert(2, os.path.join(OUR1_PATH, 'test/py/tests'))

from u_boot_pylib import tools
from u_boot_pylib import tout
import fs_helper

MODERN_PCI = 'disable-legacy=on,disable-modern=off'


class Helper:
    def __init__(self, args):
        self.settings = None
        self.imagedir = None
        self.proc = None
        self.sock = None
        self.args = args
        self.mem = '512'
        self.bitness = 32 if args.word_32bit else 64
        self.qemu = None
        if self.args.arch == 'arm':
            if self.bitness == 64:
                self.os_arch = 'arm64'
            else:
                self.os_arch = 'arm'
        else:  # x86
            if self.bitness == 64:
                self.os_arch = 'amd64'
            else:
                self.os_arch = 'i386'

    def read_settings(self):
        """Get settings from the settings file"""
        self.settings = configparser.ConfigParser()
        fname = f'{os.getenv("HOME")}/.u_boot_qemu'
        if not os.path.exists(fname):
            print(f'No config file found: {fname}\nCreating one...\n')
            tools.write_file(fname, '''# U-Boot QEMU-scripts config

[DEFAULT]
# Set ubdir to the build directory where you build U-Boot out-of-tree
# We avoid in-tree build because it gets confusing trying different builds
# Each board gets a build in a separate subdir
build_dir = /tmp/b

# Image directory (for OS images)
image_dir = ~/dev/os

# Build the kernel with: make O=/tmp/kernel
bzimage = /tmp/kernel/arch/x86/boot/bzImage

# EFI image-output filename
efi_image_file = try.img

# Directory where OVMF-pure-efi.i386.fd etc. are kept
efi_dir = ~/dev/efi

# Directory where SCT image (sct.img) is kept
sct_dir = ~/dev/efi/sct

# Directory where the SCT image is temporarily mounted for modification
sct_mnt = /mnt/sct
''', binary=False)
        self.settings.read(fname)
        self.imagedir = Path(self.get_setting('image_dir', '~/dev'))

    def get_setting(self, name, fallback=None):
        """Get a setting by name

        Args:
            name (str): Name of setting to retrieve
            fallback (str or None): Value to return if the setting is missing
        """
        raw = self.settings.get('DEFAULT', name, fallback=fallback)
        return os.path.expandvars(os.path.expanduser(raw))

    @contextlib.contextmanager
    def make_disk(self, fname, size_mb=20, fs_type='ext4', use_part=False):
        """Create a raw disk image with files on it

        Args:
            fname (str): Filename to write the images to
            fs_type (str): Filesystem type to create (ext4 or vfat)
            size_mb (int): Size in MiB
            use_part (bool): True to create a partition table, False to use a
                raw disk image

        Yields:
            str: Directory to write the files into
        """
        with tempfile.NamedTemporaryFile() as tmp:
            with tempfile.TemporaryDirectory(prefix='build_helper.') as dirname:
                try:
                    yield dirname
                    fs_helper.mk_fs(None, fs_type, size_mb << 20, None, dirname,
                                    fs_img=tmp.name, quiet=True)
                finally:
                    pass

            if use_part:
                with open(fname, 'wb') as img:
                    img.truncate(size_mb << 20)
                    img.seek(1 << 20, 0)
                    img.write(tools.read_file(tmp.name))
                subprocess.run(
                    ['sfdisk', fname], text=True, check=True,
                    capture_output=True,
                    input=f'type=c, size={size_mb-1}M, start=1M,bootable')
            else:
                shutil.copy2(tmp.name, fname)

    def add_qemu_args(self, args, cmd, base_hd=0):
        """Add QEMU arguments according to the selected options

        This helps in creating the command-line used to run QEMU.

        Args:
            args (list of str): Existing arguments to add to
            cmd (argparse.Namespace): Program arguments
            base_hd (int): Base number to use for QEMU hd device
        """
        cmdline = []
        if args.kernel:
            cmd.extend(['-kernel', args.kernel])
        if args.initrd:
            cmd.extend(['-initrd', args.initrd])

        if args.enable_console:
            cmdline.append('console=ttyS0,115200,8n1')
        if args.root:
            cmdline.append(f'root={args.root}')
        if args.uuid:
            cmdline.append(f'root=/dev/disk/by-uuid/{args.uuid}')

        if cmdline:
            cmd.extend(['-append'] + [' '.join(cmdline)])

        os_path = None
        if args.os == 'ubuntu':
            img_name = f'{args.os}-{args.release}-desktop-{self.os_arch}.iso'
            os_path = self.imagedir / args.os / img_name
            if not os_path.exists():
                tout.error(f'OS image {os_path} specified but not found')
            else:
                cmd.extend([
                    '-drive',
                    f'if=virtio,file={os_path},format=raw,id=hd{base_hd},readonly=on'])
            base_hd += 1

        if args.disk:
            for i, d in enumerate(args.disk):
                disk = Path(d)
                if disk.exists():
                    iface = 'none' if args.scsi else 'virtio'
                    if args.scsi:
                        cmd.extend([
                            '-device',
                            f'virtio-scsi-pci,id=scsi0,{MODERN_PCI}',
                            '-device',
                            f'scsi-hd,bus=scsi0.0,drive=hd{base_hd + i}'])
                    cmd.extend([
                        '-drive',
                        f'if={iface},file={disk},format=raw,id=hd{base_hd + i}'])
                else:
                    tout.warning(f"Disk image '{disk}' not found")

        cmd.extend(['-object', 'rng-random,filename=/dev/urandom,id=rng0',
                    '-device', 'virtio-rng-pci,rng=rng0'])

    def setup_share(self, qemu_cmd):
        sock = Path('/tmp/virtiofs.sock')
        proc = None
        if self.args.share_dir:
            virtfs_dir = Path(self.args.share_dir)
            if not virtfs_dir.is_dir():
                tout.fatal(f'Error: VirtFS share directory {virtfs_dir} '
                           f'is not a valid directory')

            virtiofsd = Path('/usr/libexec/virtiofsd')
            if not virtiofsd.exists():
                tout.fatal(f'Error: virtiofsd not found at {virtiofsd}')

            # Clean up potential old socket file
            if sock.exists():
                try:
                    sock.unlink()
                    tout.info(f'Removed old socket file {sock}')
                except OSError as e:
                    tout.warning(
                        f'Warning: Could not remove old socket file {sock}: '
                        f'{e}')

            qemu_cmd.extend([
                '-chardev', f'socket,id=char0,path={sock}',
                '-device',
                'vhost-user-fs-pci,queue-size=1024,chardev=char0,tag=hostshare',
                '-object',
                f'memory-backend-file,id=mem,size={self.mem},mem-path=/dev/shm'
                    ',share=on',
                '-numa', 'node,memdev=mem'])

            virtiofsd_cmd = [
                str(virtiofsd),
                '--socket-path', str(sock),
                '--shared-dir', str(virtfs_dir),
                '--cache', 'auto']
            try:
                # Use Popen to run virtiofsd in the background
                proc = subprocess.Popen(virtiofsd_cmd, stdout=subprocess.PIPE,
                                        stderr=subprocess.PIPE)
                # Give virtiofsd a moment to start and create the socket
                time.sleep(0.5)
                if not sock.exists() and proc.poll() is not None:
                    stdout, stderr = proc.communicate()
                    tout.error('Error starting virtiofsd. Exit code: '
                               f'{proc.returncode}')
                    if stdout:
                        tout.error(f"virtiofsd stdout:\n{stdout.decode()}")
                    if stderr:
                        tout.error(f"virtiofsd stderr:\n{stderr.decode()}")
                    tout.fatal('Failed')
                self.proc = proc
                self.sock = sock

            except (subprocess.CalledProcessError, FileNotFoundError) as exc:
                tout.fatal(f'Failed to start virtiofsd: {exc}')

    def cleanup_share(self):
        # Clean up virtiofsd process and socket if it was started
        if self.proc:
            tout.info('Terminating virtiofsd')
            self.proc.terminate()
            try:
                self.proc.wait(timeout=5)
            except subprocess.TimeoutExpired:
                tout.warning(
                    'virtiofsd did not terminate gracefully; killing')
                self.proc.kill()
            if self.sock.exists():
                try:
                    self.sock.unlink()
                except OSError as e_os:
                    tout.warning('Warning: Could not remove virtiofs '
                                 f'socket {self.sock}: {e_os}')

    def run(self, qemu_cmd):
        tout.info(f'QEMU:\n{shlex.join(qemu_cmd)}\n')
        try:
            if self.args.run:
                subprocess.run(qemu_cmd, check=True)
        except FileNotFoundError:
            tout.fatal(f"Error: QEMU executable '{self.qemu}' not found")
        except subprocess.CalledProcessError as e:
            tout.fatal(f'QEMU execution failed with exit code {e.returncode}')
        finally:
            self.cleanup_share()

def add_common_args(parser):
    """Add some arguments which are common to build-efi/qemu scripts

    Args:
        parser (argparse.ArgumentParser): Parser to modify
    """
    parser.add_argument('-a', '--arch', default='arm', choices=['arm', 'x86'],
                        help='Select architecture (arm, x86) Default: arm')
    parser.add_argument('-B', '--no-build', action='store_true',
                        help="Don't build; assume a build exists")
    parser.add_argument('--build-dir', help='Directory to use for the build')
    parser.add_argument('-C', '--enable-console', action='store_true',
                        help="Enable linux console (x86 only)")
    parser.add_argument('-d', '--disk', nargs='*',
                        help='Root disk image file to use with QEMU')
    parser.add_argument('-D', '--share-dir', metavar='DIR',
                        help='Directory to share into the guest via virtiofs')
    parser.add_argument('-I', '--initrd',
                        help='Initial ramdisk to run using -initrd')
    parser.add_argument(
        '-k', '--kvm', action='store_true',
        help='Use KVM (Kernel-based Virtual Machine) for acceleration')
    parser.add_argument('-K', '--kernel',
                        help='Kernel to run using -kernel')
    parser.add_argument('-n', '--no-pager', action='store_true',
                        help="Disable the pager (for testing)")
    parser.add_argument('-o', '--os', metavar='NAME', choices=['ubuntu'],
                        help='Run a specified Operating System')
    parser.add_argument('-r', '--run', action='store_true',
                        help='Run QEMU with the image')
    parser.add_argument(
        '-R', '--release', default='24.04.1',
        help='Select OS release version (e.g, 24.04) Default: 24.04.1')
    parser.add_argument('-s', '--serial-only', action='store_true',
                        help='Run QEMU with serial only (no display)')
    parser.add_argument(
        '-S', '--scsi', action='store_true',
        help='Attach root disk using virtio-scsi instead of virtio-blk')
    parser.add_argument(
        '-t', '--root',
        help='Pass the given root device to linux via root=xxx')
    parser.add_argument(
        '-U', '--uuid',
        help='Pass the given root device to linux via root=/dev/disk/by-uuid/')
    parser.add_argument('-w', '--word-32bit', action='store_true',
                        help='Use 32-bit version for the build/architecture')