Loading...
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
# SPDX-License-Identifier: GPL-2.0
#
# Copyright 2025 Canonical Ltd
#
"""Unifdef-based line-level analysis for source code.

This module provides functionality to analyse which lines in source files
are active vs inactive based on CONFIG_* settings, using the unifdef tool.
"""

import multiprocessing
import os
import re
import shutil
import subprocess
import tempfile
import time

from buildman import kconfiglib
from u_boot_pylib import tout
from analyser import Analyser, FileResult


def load_config(config_file, srcdir='.'):
    """Load CONFIG_* symbols from a .config file and Kconfig.

    Args:
        config_file (str): Path to .config file
        srcdir (str): Path to source directory (for Kconfig loading)

    Returns:
        tuple: (config_dict, error_message) where config_dict is a dictionary
            mapping CONFIG_* symbol names to values, and error_message is None
            on success or an error string on failure
    """
    config = {}

    # First, load from .config file
    with open(config_file, 'r', encoding='utf-8') as f:
        for line in f:
            line = line.strip()

            # Skip comments and blank lines
            if not line or line.startswith('#'):
                # Check for "is not set" pattern
                if ' is not set' in line:
                    # Extract CONFIG name: '# CONFIG_FOO is not set'
                    parts = line.split()
                    if len(parts) >= 2 and parts[1].startswith('CONFIG_'):
                        config_name = parts[1]
                        config[config_name] = None
                continue

            # Parse CONFIG_* assignments
            if '=' in line:
                name, value = line.split('=', 1)
                if name.startswith('CONFIG_'):
                    config[name] = value

    # Then, load all Kconfig symbols and set undefined ones to None
    # Only do this if we have a Kconfig file (i.e., in a real U-Boot tree)
    kconfig_path = os.path.join(srcdir, 'Kconfig')
    if not os.path.exists(kconfig_path):
        # No Kconfig - probably a test environment, just use .config values
        return config, None

    try:
        # Set environment variables needed by kconfiglib
        old_srctree = os.environ.get('srctree')
        old_ubootversion = os.environ.get('UBOOTVERSION')
        old_objdir = os.environ.get('KCONFIG_OBJDIR')

        os.environ['srctree'] = srcdir
        os.environ['UBOOTVERSION'] = 'dummy'
        os.environ['KCONFIG_OBJDIR'] = ''

        # Load Kconfig
        kconf = kconfiglib.Kconfig(warn=False)

        # Add all defined symbols that aren't already in config as None
        # kconfiglib provides names without CONFIG_ prefix
        for name in kconf.syms:
            config_name = f'CONFIG_{name}'
            if config_name not in config:
                # Symbol is defined in Kconfig but not in .config
                config[config_name] = None

        # Restore environment
        if old_srctree is not None:
            os.environ['srctree'] = old_srctree
        elif 'srctree' in os.environ:
            del os.environ['srctree']
        if old_ubootversion is not None:
            os.environ['UBOOTVERSION'] = old_ubootversion
        elif 'UBOOTVERSION' in os.environ:
            del os.environ['UBOOTVERSION']
        if old_objdir is not None:
            os.environ['KCONFIG_OBJDIR'] = old_objdir
        elif 'KCONFIG_OBJDIR' in os.environ:
            del os.environ['KCONFIG_OBJDIR']

        tout.progress(f'Loaded {len(kconf.syms)} Kconfig symbols')
    except (OSError, IOError, ValueError, ImportError) as e:
        # Return error if kconfiglib fails - we need all symbols for accurate analysis
        return None, f'Failed to load Kconfig symbols: {e}'

    return config, None


def match_lines(orig_lines, processed_output, source_file):
    """Match original and processed lines to determine which are active.

    Parses #line directives from unifdef -n output to determine exactly which
    lines from the original source are active vs inactive.

    Args:
        orig_lines (list): List of original source lines
        processed_output (str): Processed output from unifdef -n
        source_file (str): Path to source file (for matching #line directives)

    Returns:
        dict: Mapping of line numbers (1-indexed) to 'active'/'inactive' status
    """
    total_lines = len(orig_lines)
    line_status = {}

    # set up all lines as inactive
    for i in range(1, total_lines + 1):
        line_status[i] = 'inactive'

    # Parse #line directives to find which lines are active
    # Format: #line <number> '<file>'
    # When we see a #line directive, all following non-directive lines
    # come from that line number onward in the original file
    # If no #line directive appears at start, output starts at line 1
    current_line = 1  # Start at line 1 by default
    line_pattern = re.compile(r'^#line (\d+) "(.+)"$')
    source_basename = source_file.split('/')[-1]

    for output_line in processed_output.splitlines():
        # Check for #line directive
        match = line_pattern.match(output_line)
        if match:
            line_num = int(match.group(1))
            file_path = match.group(2)
            # Only track lines from our source file (unifdef may include
            # #line directives from headers)
            if file_path == source_file or file_path.endswith(source_basename):
                current_line = line_num
            else:
                # This is a #line for a different file (e.g., header)
                # Stop tracking until we see our file again
                current_line = None
        elif current_line is not None:
            # This is a real line from the source file
            if current_line <= total_lines:
                line_status[current_line] = 'active'
            current_line += 1

    return line_status


def worker(args):
    """Run unifdef on a source file to determine active/inactive lines.

    Uses unifdef with -k flag to process the file, then uses difflib to match
    original lines to processed lines to determine which are active vs inactive.

    Args:
        args (tuple): Tuple of (source_file, defs_file, unifdef_path,
            track_lines)

    Returns:
        Tuple of (source_file, total_lines, active_lines, inactive_lines,
            line_status, error_msg)
        line_status is a dict mapping line numbers to 'active'/'inactive', or
            {} if not tracked
        error_msg is None on success, or an error string on failure
    """
    source_file, defs_file, unifdef_path, track_lines = args

    try:
        with open(source_file, 'r', encoding='utf-8', errors='ignore') as f:
            orig_lines = f.readlines()

        total_lines = len(orig_lines)

        # Run unifdef to process the file
        # -n: add #line directives for tracking original line numbers
        # -E: error on unterminated conditionals
        # -f: use defs file
        result = subprocess.run(
            [unifdef_path, '-n', '-E', '-f', defs_file, source_file],
            capture_output=True,
            text=True,
            encoding='utf-8',
            errors='ignore',
            check=False
        )

        if result.returncode > 1:
            # Error running unifdef
            # Check if it's an 'obfuscated' error - these are expected for
            # complex macros
            if 'Obfuscated' in result.stderr:
                # Obfuscated error - unifdef still produces output, so
                # continue processing (don't return early)
                pass
            else:
                # Real error
                error_msg = (f'unifdef failed on {source_file} with return '
                             f'code {result.returncode}\nstderr: '
                             f'{result.stderr}')
                return (source_file, 0, 0, 0, {}, error_msg)

        # Parse unifdef output to determine which lines are active
        if track_lines:
            line_status = match_lines(orig_lines, result.stdout, source_file)
            active_lines = len([s for s in line_status.values()
                               if s == 'active'])
        else:
            line_status = {}
            # Count non-#line directive lines in output
            active_lines = len([line for line in result.stdout.splitlines()
                               if not line.startswith('#line')])
        inactive_lines = total_lines - active_lines

        return (source_file, total_lines, active_lines, inactive_lines,
                line_status, None)
    except (OSError, IOError) as e:
        # Failed to execute unifdef or read source file
        error_msg = f'Failed to process {source_file}: {e}'
        return (source_file, 0, 0, 0, {}, error_msg)


class UnifdefAnalyser(Analyser):
    """Analyser that uses unifdef to determine active lines.

    This analyser handles the creation of a unifdef configuration file from
    CONFIG_* symbols and provides methods to analyse source files.

    Attributes:
        config (dict): Dictionary of CONFIG_* symbols and their values
        unifdef_cfg (str): Path to temporary unifdef configuration file
    """

    def __init__(self, config_file, srcdir, used_sources, unifdef_path,
                 include_headers, keep_temps=False):
        """Set up the analyser with config file path.

        Args:
            config_file (str): Path to .config file
            srcdir (str): Path to source root directory
            used_sources (set): Set of source files that are compiled
            unifdef_path (str): Path to unifdef executable
            include_headers (bool): If True, include header files; otherwise
                only .c and .S
            keep_temps (bool): If True, keep temporary files for debugging
        """
        super().__init__(srcdir, keep_temps)
        self.config_file = config_file
        self.used_sources = used_sources
        self.unifdef_path = unifdef_path
        self.include_headers = include_headers
        self.unifdef_cfg = None

    def _create_unifdef_config(self, config):
        """Create a temporary unifdef configuration file.

        Args:
            config (dict): Dictionary mapping CONFIG_* names to values

        Creates a file with -D and -U directives for each CONFIG_* symbol
        that can be passed to unifdef via -f flag.
        """
        # Create temporary file for unifdef directives
        fd, self.unifdef_cfg = tempfile.mkstemp(prefix='unifdef_',
                                                suffix='.cfg')

        with os.fdopen(fd, 'w') as f:
            for name, value in sorted(config.items()):
                if value is None or value == '' or value == 'n':
                    # Symbol is not set - undefine it
                    f.write(f'#undef {name}\n')
                elif value is True or value == 'y':
                    # Boolean CONFIG - define it as 1
                    f.write(f'#define {name} 1\n')
                elif value == 'm':
                    # Module - treat as not set for U-Boot
                    f.write(f'#undef {name}\n')
                elif (isinstance(value, str) and value.startswith('"') and
                      value.endswith('"')):
                    # String value with quotes - use as-is
                    f.write(f'#define {name} {value}\n')
                else:
                    # Numeric or other value
                    try:
                        # Try to parse as integer
                        int_val = int(value, 0)
                        f.write(f'#define {name} {int_val}\n')
                    except (ValueError, TypeError):
                        # Not an integer - escape and quote it
                        escaped_value = (str(value).replace('\\', '\\\\')
                                       .replace('"', '\\"'))
                        f.write(f'#define {name} "{escaped_value}"\n')

    def __del__(self):
        """Clean up temporary unifdef config file"""
        if self.unifdef_cfg and os.path.exists(self.unifdef_cfg):
            # Keep the file if requested
            if self.keep_temps:
                tout.debug(f'Keeping unifdef config file: {self.unifdef_cfg}')
                return
            try:
                os.unlink(self.unifdef_cfg)
            except OSError:
                pass

    def process(self, jobs=None):
        """Perform line-level analysis on used source files.

        Args:
            jobs (int): Number of parallel jobs (None = use all CPUs)

        Returns:
            Dictionary mapping source files to analysis results, or None on
                error
        """
        # Validate config file exists
        if not os.path.exists(self.config_file):
            tout.error(f'Config file not found: {self.config_file}')
            return None

        # Check if unifdef exists (check both absolute path and PATH)
        if os.path.isabs(self.unifdef_path):
            # Absolute path - check if it exists
            if not os.path.exists(self.unifdef_path):
                tout.fatal(f'unifdef not found at: {self.unifdef_path}')
        else:
            # Relative path or command name - check PATH
            unifdef_full = shutil.which(self.unifdef_path)
            if not unifdef_full:
                tout.fatal(f'unifdef not found in PATH: {self.unifdef_path}')
            self.unifdef_path = unifdef_full

        # Load configuration
        tout.progress('Loading configuration...')
        config, error = load_config(self.config_file, self.srcdir)
        if error:
            tout.fatal(error)
        tout.progress(f'Loaded {len(config)} config symbols')

        # Create unifdef config file
        self._create_unifdef_config(config)

        tout.progress('Analysing preprocessor conditionals...')
        file_results = {}

        # Filter sources to only .c and .S files unless include_headers is set
        used_sources = self.used_sources
        if not self.include_headers:
            filtered_sources = {s for s in used_sources
                                if s.endswith('.c') or s.endswith('.S')}
            excluded_count = len(used_sources) - len(filtered_sources)
            if excluded_count > 0:
                tout.progress(f'Excluding {excluded_count} header files ' +
                              '(use -i to include them)')
            used_sources = filtered_sources

        # Count lines in defs file
        with open(self.unifdef_cfg, 'r', encoding='utf-8') as f:
            defs_lines = len(f.readlines())

        # Use multiprocessing for parallel unifdef execution
        # Prepare arguments for parallel processing
        source_list = sorted(used_sources)
        worker_args = [(source_file, self.unifdef_cfg, self.unifdef_path, True)
                       for source_file in source_list]

        tout.progress(f'Running unifdef on {len(source_list)} files...')
        start_time = time.time()

        # If jobs=1, run directly without multiprocessing for easier debugging
        if jobs == 1:
            results = [worker(args) for args in worker_args]
        else:
            with multiprocessing.Pool(processes=jobs) as pool:
                results = list(pool.imap(worker, worker_args, chunksize=10))
        elapsed_time = time.time() - start_time

        # Convert results to file_results dict and calculate totals
        # Check for errors first
        total_source_lines = 0
        errors = []
        for (source_file, total_lines, active_lines, inactive_lines,
             line_status, error_msg) in results:
            if error_msg:
                errors.append(error_msg)
            else:
                file_results[source_file] = FileResult(
                    total_lines=total_lines,
                    active_lines=active_lines,
                    inactive_lines=inactive_lines,
                    line_status=line_status
                )
                total_source_lines += total_lines

        # Report any errors
        if errors:
            for error in errors:
                tout.error(error)
            tout.fatal(f'unifdef failed on {len(errors)} file(s)')

        kloc = total_source_lines // 1000
        tout.info(f'Analysed {len(file_results)} files ({kloc} kLOC, ' +
                  f'{defs_lines} defs) in {elapsed_time:.1f} seconds')
        tout.info(f'Unifdef directives file: {self.unifdef_cfg}')

        # Clean up temporary unifdef config file (unless in debug mode)
        if tout.verbose >= tout.DEBUG:
            tout.debug(f'Keeping unifdef directives file: {self.unifdef_cfg}')
        else:
            try:
                os.unlink(self.unifdef_cfg)
                tout.debug(f'Cleaned up {self.unifdef_cfg}')
            except OSError as e:
                tout.debug(f'Failed to clean up {self.unifdef_cfg}: {e}')

        return file_results