patman: Move library functions into a library directory

The patman directory has a number of modules which are used by other tools
in U-Boot. This makes it hard to package the tools using pypi since the
common files must be copied along with the tool that uses them.

To address this, move these files into a new u_boot_pylib library. This
can be packaged separately and listed as a dependency of each tool.

Signed-off-by: Simon Glass <sjg@chromium.org>
This commit is contained in:
Simon Glass
2023-02-23 18:18:04 -07:00
parent 00290d6a5b
commit 4583c00236
87 changed files with 182 additions and 151 deletions

View File

@@ -0,0 +1,4 @@
# SPDX-License-Identifier: GPL-2.0+
__all__ = ['command', 'cros_subprocess','terminal', 'test_util', 'tools',
'tout']

23
tools/u_boot_pylib/__main__.py Executable file
View File

@@ -0,0 +1,23 @@
#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-2.0+
#
# Copyright 2023 Google LLC
#
import os
import sys
if __name__ == "__main__":
# Allow 'from u_boot_pylib import xxx to work'
our_path = os.path.dirname(os.path.realpath(__file__))
sys.path.append(os.path.join(our_path, '..'))
# Run tests
from u_boot_pylib import terminal
from u_boot_pylib import test_util
result = test_util.run_test_suites(
'u_boot_pylib', False, False, False, None, None, None,
['terminal'])
sys.exit(0 if result.wasSuccessful() else 1)

View File

@@ -0,0 +1,139 @@
# SPDX-License-Identifier: GPL-2.0+
# Copyright (c) 2011 The Chromium OS Authors.
#
import os
from u_boot_pylib import cros_subprocess
"""Shell command ease-ups for Python."""
class CommandResult:
"""A class which captures the result of executing a command.
Members:
stdout: stdout obtained from command, as a string
stderr: stderr obtained from command, as a string
return_code: Return code from command
exception: Exception received, or None if all ok
"""
def __init__(self, stdout='', stderr='', combined='', return_code=0,
exception=None):
self.stdout = stdout
self.stderr = stderr
self.combined = combined
self.return_code = return_code
self.exception = exception
def to_output(self, binary):
if not binary:
self.stdout = self.stdout.decode('utf-8')
self.stderr = self.stderr.decode('utf-8')
self.combined = self.combined.decode('utf-8')
return self
# This permits interception of RunPipe for test purposes. If it is set to
# a function, then that function is called with the pipe list being
# executed. Otherwise, it is assumed to be a CommandResult object, and is
# returned as the result for every run_pipe() call.
# When this value is None, commands are executed as normal.
test_result = None
def run_pipe(pipe_list, infile=None, outfile=None,
capture=False, capture_stderr=False, oneline=False,
raise_on_error=True, cwd=None, binary=False,
output_func=None, **kwargs):
"""
Perform a command pipeline, with optional input/output filenames.
Args:
pipe_list: List of command lines to execute. Each command line is
piped into the next, and is itself a list of strings. For
example [ ['ls', '.git'] ['wc'] ] will pipe the output of
'ls .git' into 'wc'.
infile: File to provide stdin to the pipeline
outfile: File to store stdout
capture: True to capture output
capture_stderr: True to capture stderr
oneline: True to strip newline chars from output
output_func: Output function to call with each output fragment
(if it returns True the function terminates)
kwargs: Additional keyword arguments to cros_subprocess.Popen()
Returns:
CommandResult object
"""
if test_result:
if hasattr(test_result, '__call__'):
# pylint: disable=E1102
result = test_result(pipe_list=pipe_list)
if result:
return result
else:
return test_result
# No result: fall through to normal processing
result = CommandResult(b'', b'', b'')
last_pipe = None
pipeline = list(pipe_list)
user_pipestr = '|'.join([' '.join(pipe) for pipe in pipe_list])
kwargs['stdout'] = None
kwargs['stderr'] = None
while pipeline:
cmd = pipeline.pop(0)
if last_pipe is not None:
kwargs['stdin'] = last_pipe.stdout
elif infile:
kwargs['stdin'] = open(infile, 'rb')
if pipeline or capture:
kwargs['stdout'] = cros_subprocess.PIPE
elif outfile:
kwargs['stdout'] = open(outfile, 'wb')
if capture_stderr:
kwargs['stderr'] = cros_subprocess.PIPE
try:
last_pipe = cros_subprocess.Popen(cmd, cwd=cwd, **kwargs)
except Exception as err:
result.exception = err
if raise_on_error:
raise Exception("Error running '%s': %s" % (user_pipestr, str))
result.return_code = 255
return result.to_output(binary)
if capture:
result.stdout, result.stderr, result.combined = (
last_pipe.communicate_filter(output_func))
if result.stdout and oneline:
result.output = result.stdout.rstrip(b'\r\n')
result.return_code = last_pipe.wait()
else:
result.return_code = os.waitpid(last_pipe.pid, 0)[1]
if raise_on_error and result.return_code:
raise Exception("Error running '%s'" % user_pipestr)
return result.to_output(binary)
def output(*cmd, **kwargs):
kwargs['raise_on_error'] = kwargs.get('raise_on_error', True)
return run_pipe([cmd], capture=True, **kwargs).stdout
def output_one_line(*cmd, **kwargs):
"""Run a command and output it as a single-line string
The command us expected to produce a single line of output
Returns:
String containing output of command
"""
raise_on_error = kwargs.pop('raise_on_error', True)
result = run_pipe([cmd], capture=True, oneline=True,
raise_on_error=raise_on_error, **kwargs).stdout.strip()
return result
def run(*cmd, **kwargs):
return run_pipe([cmd], **kwargs).stdout
def run_list(cmd):
return run_pipe([cmd], capture=True).stdout
def stop_all():
cros_subprocess.stay_alive = False

View File

@@ -0,0 +1,401 @@
# Copyright (c) 2012 The Chromium OS Authors.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
#
# Copyright (c) 2003-2005 by Peter Astrand <astrand@lysator.liu.se>
# Licensed to PSF under a Contributor Agreement.
# See http://www.python.org/2.4/license for licensing details.
"""Subprocess execution
This module holds a subclass of subprocess.Popen with our own required
features, mainly that we get access to the subprocess output while it
is running rather than just at the end. This makes it easier to show
progress information and filter output in real time.
"""
import errno
import os
import pty
import select
import subprocess
import sys
import unittest
# Import these here so the caller does not need to import subprocess also.
PIPE = subprocess.PIPE
STDOUT = subprocess.STDOUT
PIPE_PTY = -3 # Pipe output through a pty
stay_alive = True
class Popen(subprocess.Popen):
"""Like subprocess.Popen with ptys and incremental output
This class deals with running a child process and filtering its output on
both stdout and stderr while it is running. We do this so we can monitor
progress, and possibly relay the output to the user if requested.
The class is similar to subprocess.Popen, the equivalent is something like:
Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
But this class has many fewer features, and two enhancement:
1. Rather than getting the output data only at the end, this class sends it
to a provided operation as it arrives.
2. We use pseudo terminals so that the child will hopefully flush its output
to us as soon as it is produced, rather than waiting for the end of a
line.
Use communicate_filter() to handle output from the subprocess.
"""
def __init__(self, args, stdin=None, stdout=PIPE_PTY, stderr=PIPE_PTY,
shell=False, cwd=None, env=None, **kwargs):
"""Cut-down constructor
Args:
args: Program and arguments for subprocess to execute.
stdin: See subprocess.Popen()
stdout: See subprocess.Popen(), except that we support the sentinel
value of cros_subprocess.PIPE_PTY.
stderr: See subprocess.Popen(), except that we support the sentinel
value of cros_subprocess.PIPE_PTY.
shell: See subprocess.Popen()
cwd: Working directory to change to for subprocess, or None if none.
env: Environment to use for this subprocess, or None to inherit parent.
kwargs: No other arguments are supported at the moment. Passing other
arguments will cause a ValueError to be raised.
"""
stdout_pty = None
stderr_pty = None
if stdout == PIPE_PTY:
stdout_pty = pty.openpty()
stdout = os.fdopen(stdout_pty[1])
if stderr == PIPE_PTY:
stderr_pty = pty.openpty()
stderr = os.fdopen(stderr_pty[1])
super(Popen, self).__init__(args, stdin=stdin,
stdout=stdout, stderr=stderr, shell=shell, cwd=cwd, env=env,
**kwargs)
# If we're on a PTY, we passed the slave half of the PTY to the subprocess.
# We want to use the master half on our end from now on. Setting this here
# does make some assumptions about the implementation of subprocess, but
# those assumptions are pretty minor.
# Note that if stderr is STDOUT, then self.stderr will be set to None by
# this constructor.
if stdout_pty is not None:
self.stdout = os.fdopen(stdout_pty[0])
if stderr_pty is not None:
self.stderr = os.fdopen(stderr_pty[0])
# Insist that unit tests exist for other arguments we don't support.
if kwargs:
raise ValueError("Unit tests do not test extra args - please add tests")
def convert_data(self, data):
"""Convert stdout/stderr data to the correct format for output
Args:
data: Data to convert, or None for ''
Returns:
Converted data, as bytes
"""
if data is None:
return b''
return data
def communicate_filter(self, output, input_buf=''):
"""Interact with process: Read data from stdout and stderr.
This method runs until end-of-file is reached, then waits for the
subprocess to terminate.
The output function is sent all output from the subprocess and must be
defined like this:
def output([self,] stream, data)
Args:
stream: the stream the output was received on, which will be
sys.stdout or sys.stderr.
data: a string containing the data
Returns:
True to terminate the process
Note: The data read is buffered in memory, so do not use this
method if the data size is large or unlimited.
Args:
output: Function to call with each fragment of output.
Returns:
A tuple (stdout, stderr, combined) which is the data received on
stdout, stderr and the combined data (interleaved stdout and stderr).
Note that the interleaved output will only be sensible if you have
set both stdout and stderr to PIPE or PIPE_PTY. Even then it depends on
the timing of the output in the subprocess. If a subprocess flips
between stdout and stderr quickly in succession, by the time we come to
read the output from each we may see several lines in each, and will read
all the stdout lines, then all the stderr lines. So the interleaving
may not be correct. In this case you might want to pass
stderr=cros_subprocess.STDOUT to the constructor.
This feature is still useful for subprocesses where stderr is
rarely used and indicates an error.
Note also that if you set stderr to STDOUT, then stderr will be empty
and the combined output will just be the same as stdout.
"""
read_set = []
write_set = []
stdout = None # Return
stderr = None # Return
if self.stdin:
# Flush stdio buffer. This might block, if the user has
# been writing to .stdin in an uncontrolled fashion.
self.stdin.flush()
if input_buf:
write_set.append(self.stdin)
else:
self.stdin.close()
if self.stdout:
read_set.append(self.stdout)
stdout = bytearray()
if self.stderr and self.stderr != self.stdout:
read_set.append(self.stderr)
stderr = bytearray()
combined = bytearray()
stop_now = False
input_offset = 0
while read_set or write_set:
try:
rlist, wlist, _ = select.select(read_set, write_set, [], 0.2)
except select.error as e:
if e.args[0] == errno.EINTR:
continue
raise
if not stay_alive:
self.terminate()
if self.stdin in wlist:
# When select has indicated that the file is writable,
# we can write up to PIPE_BUF bytes without risk
# blocking. POSIX defines PIPE_BUF >= 512
chunk = input_buf[input_offset : input_offset + 512]
bytes_written = os.write(self.stdin.fileno(), chunk)
input_offset += bytes_written
if input_offset >= len(input_buf):
self.stdin.close()
write_set.remove(self.stdin)
if self.stdout in rlist:
data = b''
# We will get an error on read if the pty is closed
try:
data = os.read(self.stdout.fileno(), 1024)
except OSError:
pass
if not len(data):
self.stdout.close()
read_set.remove(self.stdout)
else:
stdout += data
combined += data
if output:
stop_now = output(sys.stdout, data)
if self.stderr in rlist:
data = b''
# We will get an error on read if the pty is closed
try:
data = os.read(self.stderr.fileno(), 1024)
except OSError:
pass
if not len(data):
self.stderr.close()
read_set.remove(self.stderr)
else:
stderr += data
combined += data
if output:
stop_now = output(sys.stderr, data)
if stop_now:
self.terminate()
# All data exchanged. Translate lists into strings.
stdout = self.convert_data(stdout)
stderr = self.convert_data(stderr)
combined = self.convert_data(combined)
self.wait()
return (stdout, stderr, combined)
# Just being a unittest.TestCase gives us 14 public methods. Unless we
# disable this, we can only have 6 tests in a TestCase. That's not enough.
#
# pylint: disable=R0904
class TestSubprocess(unittest.TestCase):
"""Our simple unit test for this module"""
class MyOperation:
"""Provides a operation that we can pass to Popen"""
def __init__(self, input_to_send=None):
"""Constructor to set up the operation and possible input.
Args:
input_to_send: a text string to send when we first get input. We will
add \r\n to the string.
"""
self.stdout_data = ''
self.stderr_data = ''
self.combined_data = ''
self.stdin_pipe = None
self._input_to_send = input_to_send
if input_to_send:
pipe = os.pipe()
self.stdin_read_pipe = pipe[0]
self._stdin_write_pipe = os.fdopen(pipe[1], 'w')
def output(self, stream, data):
"""Output handler for Popen. Stores the data for later comparison"""
if stream == sys.stdout:
self.stdout_data += data
if stream == sys.stderr:
self.stderr_data += data
self.combined_data += data
# Output the input string if we have one.
if self._input_to_send:
self._stdin_write_pipe.write(self._input_to_send + '\r\n')
self._stdin_write_pipe.flush()
def _basic_check(self, plist, oper):
"""Basic checks that the output looks sane."""
self.assertEqual(plist[0], oper.stdout_data)
self.assertEqual(plist[1], oper.stderr_data)
self.assertEqual(plist[2], oper.combined_data)
# The total length of stdout and stderr should equal the combined length
self.assertEqual(len(plist[0]) + len(plist[1]), len(plist[2]))
def test_simple(self):
"""Simple redirection: Get process list"""
oper = TestSubprocess.MyOperation()
plist = Popen(['ps']).communicate_filter(oper.output)
self._basic_check(plist, oper)
def test_stderr(self):
"""Check stdout and stderr"""
oper = TestSubprocess.MyOperation()
cmd = 'echo fred >/dev/stderr && false || echo bad'
plist = Popen([cmd], shell=True).communicate_filter(oper.output)
self._basic_check(plist, oper)
self.assertEqual(plist [0], 'bad\r\n')
self.assertEqual(plist [1], 'fred\r\n')
def test_shell(self):
"""Check with and without shell works"""
oper = TestSubprocess.MyOperation()
cmd = 'echo test >/dev/stderr'
self.assertRaises(OSError, Popen, [cmd], shell=False)
plist = Popen([cmd], shell=True).communicate_filter(oper.output)
self._basic_check(plist, oper)
self.assertEqual(len(plist [0]), 0)
self.assertEqual(plist [1], 'test\r\n')
def test_list_args(self):
"""Check with and without shell works using list arguments"""
oper = TestSubprocess.MyOperation()
cmd = ['echo', 'test', '>/dev/stderr']
plist = Popen(cmd, shell=False).communicate_filter(oper.output)
self._basic_check(plist, oper)
self.assertEqual(plist [0], ' '.join(cmd[1:]) + '\r\n')
self.assertEqual(len(plist [1]), 0)
oper = TestSubprocess.MyOperation()
# this should be interpreted as 'echo' with the other args dropped
cmd = ['echo', 'test', '>/dev/stderr']
plist = Popen(cmd, shell=True).communicate_filter(oper.output)
self._basic_check(plist, oper)
self.assertEqual(plist [0], '\r\n')
def test_cwd(self):
"""Check we can change directory"""
for shell in (False, True):
oper = TestSubprocess.MyOperation()
plist = Popen('pwd', shell=shell, cwd='/tmp').communicate_filter(
oper.output)
self._basic_check(plist, oper)
self.assertEqual(plist [0], '/tmp\r\n')
def test_env(self):
"""Check we can change environment"""
for add in (False, True):
oper = TestSubprocess.MyOperation()
env = os.environ
if add:
env ['FRED'] = 'fred'
cmd = 'echo $FRED'
plist = Popen(cmd, shell=True, env=env).communicate_filter(oper.output)
self._basic_check(plist, oper)
self.assertEqual(plist [0], add and 'fred\r\n' or '\r\n')
def test_extra_args(self):
"""Check we can't add extra arguments"""
self.assertRaises(ValueError, Popen, 'true', close_fds=False)
def test_basic_input(self):
"""Check that incremental input works
We set up a subprocess which will prompt for name. When we see this prompt
we send the name as input to the process. It should then print the name
properly to stdout.
"""
oper = TestSubprocess.MyOperation('Flash')
prompt = 'What is your name?: '
cmd = 'echo -n "%s"; read name; echo Hello $name' % prompt
plist = Popen([cmd], stdin=oper.stdin_read_pipe,
shell=True).communicate_filter(oper.output)
self._basic_check(plist, oper)
self.assertEqual(len(plist [1]), 0)
self.assertEqual(plist [0], prompt + 'Hello Flash\r\r\n')
def test_isatty(self):
"""Check that ptys appear as terminals to the subprocess"""
oper = TestSubprocess.MyOperation()
cmd = ('if [ -t %d ]; then echo "terminal %d" >&%d; '
'else echo "not %d" >&%d; fi;')
both_cmds = ''
for fd in (1, 2):
both_cmds += cmd % (fd, fd, fd, fd, fd)
plist = Popen(both_cmds, shell=True).communicate_filter(oper.output)
self._basic_check(plist, oper)
self.assertEqual(plist [0], 'terminal 1\r\n')
self.assertEqual(plist [1], 'terminal 2\r\n')
# Now try with PIPE and make sure it is not a terminal
oper = TestSubprocess.MyOperation()
plist = Popen(both_cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
shell=True).communicate_filter(oper.output)
self._basic_check(plist, oper)
self.assertEqual(plist [0], 'not 1\n')
self.assertEqual(plist [1], 'not 2\n')
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,270 @@
# SPDX-License-Identifier: GPL-2.0+
# Copyright (c) 2011 The Chromium OS Authors.
#
"""Terminal utilities
This module handles terminal interaction including ANSI color codes.
"""
import os
import re
import shutil
import sys
# Selection of when we want our output to be colored
COLOR_IF_TERMINAL, COLOR_ALWAYS, COLOR_NEVER = range(3)
# Initially, we are set up to print to the terminal
print_test_mode = False
print_test_list = []
# The length of the last line printed without a newline
last_print_len = None
# credit:
# stackoverflow.com/questions/14693701/how-can-i-remove-the-ansi-escape-sequences-from-a-string-in-python
ansi_escape = re.compile(r'\x1b(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
class PrintLine:
"""A line of text output
Members:
text: Text line that was printed
newline: True to output a newline after the text
colour: Text colour to use
"""
def __init__(self, text, colour, newline=True, bright=True):
self.text = text
self.newline = newline
self.colour = colour
self.bright = bright
def __eq__(self, other):
return (self.text == other.text and
self.newline == other.newline and
self.colour == other.colour and
self.bright == other.bright)
def __str__(self):
return ("newline=%s, colour=%s, bright=%d, text='%s'" %
(self.newline, self.colour, self.bright, self.text))
def calc_ascii_len(text):
"""Calculate the length of a string, ignoring any ANSI sequences
When displayed on a terminal, ANSI sequences don't take any space, so we
need to ignore them when calculating the length of a string.
Args:
text: Text to check
Returns:
Length of text, after skipping ANSI sequences
>>> col = Color(COLOR_ALWAYS)
>>> text = col.build(Color.RED, 'abc')
>>> len(text)
14
>>> calc_ascii_len(text)
3
>>>
>>> text += 'def'
>>> calc_ascii_len(text)
6
>>> text += col.build(Color.RED, 'abc')
>>> calc_ascii_len(text)
9
"""
result = ansi_escape.sub('', text)
return len(result)
def trim_ascii_len(text, size):
"""Trim a string containing ANSI sequences to the given ASCII length
The string is trimmed with ANSI sequences being ignored for the length
calculation.
>>> col = Color(COLOR_ALWAYS)
>>> text = col.build(Color.RED, 'abc')
>>> len(text)
14
>>> calc_ascii_len(trim_ascii_len(text, 4))
3
>>> calc_ascii_len(trim_ascii_len(text, 2))
2
>>> text += 'def'
>>> calc_ascii_len(trim_ascii_len(text, 4))
4
>>> text += col.build(Color.RED, 'ghi')
>>> calc_ascii_len(trim_ascii_len(text, 7))
7
"""
if calc_ascii_len(text) < size:
return text
pos = 0
out = ''
left = size
# Work through each ANSI sequence in turn
for m in ansi_escape.finditer(text):
# Find the text before the sequence and add it to our string, making
# sure it doesn't overflow
before = text[pos:m.start()]
toadd = before[:left]
out += toadd
# Figure out how much non-ANSI space we have left
left -= len(toadd)
# Add the ANSI sequence and move to the position immediately after it
out += m.group()
pos = m.start() + len(m.group())
# Deal with text after the last ANSI sequence
after = text[pos:]
toadd = after[:left]
out += toadd
return out
def tprint(text='', newline=True, colour=None, limit_to_line=False, bright=True):
"""Handle a line of output to the terminal.
In test mode this is recorded in a list. Otherwise it is output to the
terminal.
Args:
text: Text to print
newline: True to add a new line at the end of the text
colour: Colour to use for the text
"""
global last_print_len
if print_test_mode:
print_test_list.append(PrintLine(text, colour, newline, bright))
else:
if colour:
col = Color()
text = col.build(colour, text, bright=bright)
if newline:
print(text)
last_print_len = None
else:
if limit_to_line:
cols = shutil.get_terminal_size().columns
text = trim_ascii_len(text, cols)
print(text, end='', flush=True)
last_print_len = calc_ascii_len(text)
def print_clear():
"""Clear a previously line that was printed with no newline"""
global last_print_len
if last_print_len:
print('\r%s\r' % (' '* last_print_len), end='', flush=True)
last_print_len = None
def set_print_test_mode(enable=True):
"""Go into test mode, where all printing is recorded"""
global print_test_mode
print_test_mode = enable
get_print_test_lines()
def get_print_test_lines():
"""Get a list of all lines output through tprint()
Returns:
A list of PrintLine objects
"""
global print_test_list
ret = print_test_list
print_test_list = []
return ret
def echo_print_test_lines():
"""Print out the text lines collected"""
for line in print_test_list:
if line.colour:
col = Color()
print(col.build(line.colour, line.text), end='')
else:
print(line.text, end='')
if line.newline:
print()
class Color(object):
"""Conditionally wraps text in ANSI color escape sequences."""
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
BOLD = -1
BRIGHT_START = '\033[1;%dm'
NORMAL_START = '\033[22;%dm'
BOLD_START = '\033[1m'
RESET = '\033[0m'
def __init__(self, colored=COLOR_IF_TERMINAL):
"""Create a new Color object, optionally disabling color output.
Args:
enabled: True if color output should be enabled. If False then this
class will not add color codes at all.
"""
try:
self._enabled = (colored == COLOR_ALWAYS or
(colored == COLOR_IF_TERMINAL and
os.isatty(sys.stdout.fileno())))
except:
self._enabled = False
def start(self, color, bright=True):
"""Returns a start color code.
Args:
color: Color to use, .e.g BLACK, RED, etc.
Returns:
If color is enabled, returns an ANSI sequence to start the given
color, otherwise returns empty string
"""
if self._enabled:
base = self.BRIGHT_START if bright else self.NORMAL_START
return base % (color + 30)
return ''
def stop(self):
"""Returns a stop color code.
Returns:
If color is enabled, returns an ANSI color reset sequence,
otherwise returns empty string
"""
if self._enabled:
return self.RESET
return ''
def build(self, color, text, bright=True):
"""Returns text with conditionally added color escape sequences.
Keyword arguments:
color: Text color -- one of the color constants defined in this
class.
text: The text to color.
Returns:
If self._enabled is False, returns the original text. If it's True,
returns text with color escape sequences based on the value of
color.
"""
if not self._enabled:
return text
if color == self.BOLD:
start = self.BOLD_START
else:
base = self.BRIGHT_START if bright else self.NORMAL_START
start = base % (color + 30)
return start + text + self.RESET

View File

@@ -0,0 +1,216 @@
# SPDX-License-Identifier: GPL-2.0+
#
# Copyright (c) 2016 Google, Inc
#
from contextlib import contextmanager
import doctest
import glob
import multiprocessing
import os
import sys
import unittest
from u_boot_pylib import command
from io import StringIO
use_concurrent = True
try:
from concurrencytest import ConcurrentTestSuite
from concurrencytest import fork_for_tests
except:
use_concurrent = False
def run_test_coverage(prog, filter_fname, exclude_list, build_dir, required=None,
extra_args=None):
"""Run tests and check that we get 100% coverage
Args:
prog: Program to run (with be passed a '-t' argument to run tests
filter_fname: Normally all *.py files in the program's directory will
be included. If this is not None, then it is used to filter the
list so that only filenames that don't contain filter_fname are
included.
exclude_list: List of file patterns to exclude from the coverage
calculation
build_dir: Build directory, used to locate libfdt.py
required: List of modules which must be in the coverage report
extra_args (str): Extra arguments to pass to the tool before the -t/test
arg
Raises:
ValueError if the code coverage is not 100%
"""
# This uses the build output from sandbox_spl to get _libfdt.so
path = os.path.dirname(prog)
if filter_fname:
glob_list = glob.glob(os.path.join(path, '*.py'))
glob_list = [fname for fname in glob_list if filter_fname in fname]
else:
glob_list = []
glob_list += exclude_list
glob_list += ['*libfdt.py', '*site-packages*', '*dist-packages*']
glob_list += ['*concurrencytest*']
test_cmd = 'test' if 'binman' in prog or 'patman' in prog else '-t'
prefix = ''
if build_dir:
prefix = 'PYTHONPATH=$PYTHONPATH:%s/sandbox_spl/tools ' % build_dir
cmd = ('%spython3-coverage run '
'--omit "%s" %s %s %s -P1' % (prefix, ','.join(glob_list),
prog, extra_args or '', test_cmd))
os.system(cmd)
stdout = command.output('python3-coverage', 'report')
lines = stdout.splitlines()
if required:
# Convert '/path/to/name.py' just the module name 'name'
test_set = set([os.path.splitext(os.path.basename(line.split()[0]))[0]
for line in lines if '/etype/' in line])
missing_list = required
missing_list.discard('__init__')
missing_list.difference_update(test_set)
if missing_list:
print('Missing tests for %s' % (', '.join(missing_list)))
print(stdout)
ok = False
coverage = lines[-1].split(' ')[-1]
ok = True
print(coverage)
if coverage != '100%':
print(stdout)
print("To get a report in 'htmlcov/index.html', type: python3-coverage html")
print('Coverage error: %s, but should be 100%%' % coverage)
ok = False
if not ok:
raise ValueError('Test coverage failure')
# Use this to suppress stdout/stderr output:
# with capture_sys_output() as (stdout, stderr)
# ...do something...
@contextmanager
def capture_sys_output():
capture_out, capture_err = StringIO(), StringIO()
old_out, old_err = sys.stdout, sys.stderr
try:
sys.stdout, sys.stderr = capture_out, capture_err
yield capture_out, capture_err
finally:
sys.stdout, sys.stderr = old_out, old_err
class FullTextTestResult(unittest.TextTestResult):
"""A test result class that can print extended text results to a stream
This is meant to be used by a TestRunner as a result class. Like
TextTestResult, this prints out the names of tests as they are run,
errors as they occur, and a summary of the results at the end of the
test run. Beyond those, this prints information about skipped tests,
expected failures and unexpected successes.
Args:
stream: A file-like object to write results to
descriptions (bool): True to print descriptions with test names
verbosity (int): Detail of printed output per test as they run
Test stdout and stderr always get printed when buffering
them is disabled by the test runner. In addition to that,
0: Print nothing
1: Print a dot per test
2: Print test names
"""
def __init__(self, stream, descriptions, verbosity):
self.verbosity = verbosity
super().__init__(stream, descriptions, verbosity)
def printErrors(self):
"Called by TestRunner after test run to summarize the tests"
# The parent class doesn't keep unexpected successes in the same
# format as the rest. Adapt it to what printErrorList expects.
unexpected_successes = [
(test, 'Test was expected to fail, but succeeded.\n')
for test in self.unexpectedSuccesses
]
super().printErrors() # FAIL and ERROR
self.printErrorList('SKIP', self.skipped)
self.printErrorList('XFAIL', self.expectedFailures)
self.printErrorList('XPASS', unexpected_successes)
def addSkip(self, test, reason):
"""Called when a test is skipped."""
# Add empty line to keep spacing consistent with other results
if not reason.endswith('\n'):
reason += '\n'
super().addSkip(test, reason)
def run_test_suites(toolname, debug, verbosity, test_preserve_dirs, processes,
test_name, toolpath, class_and_module_list):
"""Run a series of test suites and collect the results
Args:
toolname: Name of the tool that ran the tests
debug: True to enable debugging, which shows a full stack trace on error
verbosity: Verbosity level to use (0-4)
test_preserve_dirs: True to preserve the input directory used by tests
so that it can be examined afterwards (only useful for debugging
tests). If a single test is selected (in args[0]) it also preserves
the output directory for this test. Both directories are displayed
on the command line.
processes: Number of processes to use to run tests (None=same as #CPUs)
test_name: Name of test to run, or None for all
toolpath: List of paths to use for tools
class_and_module_list: List of test classes (type class) and module
names (type str) to run
"""
sys.argv = [sys.argv[0]]
if debug:
sys.argv.append('-D')
if verbosity:
sys.argv.append('-v%d' % verbosity)
if toolpath:
for path in toolpath:
sys.argv += ['--toolpath', path]
suite = unittest.TestSuite()
loader = unittest.TestLoader()
runner = unittest.TextTestRunner(
stream=sys.stdout,
verbosity=(1 if verbosity is None else verbosity),
resultclass=FullTextTestResult,
)
if use_concurrent and processes != 1:
suite = ConcurrentTestSuite(suite,
fork_for_tests(processes or multiprocessing.cpu_count()))
for module in class_and_module_list:
if isinstance(module, str) and (not test_name or test_name == module):
suite.addTests(doctest.DocTestSuite(module))
for module in class_and_module_list:
if isinstance(module, str):
continue
# Test the test module about our arguments, if it is interested
if hasattr(module, 'setup_test_args'):
setup_test_args = getattr(module, 'setup_test_args')
setup_test_args(preserve_indir=test_preserve_dirs,
preserve_outdirs=test_preserve_dirs and test_name is not None,
toolpath=toolpath, verbosity=verbosity)
if test_name:
# Since Python v3.5 If an ImportError or AttributeError occurs
# while traversing a name then a synthetic test that raises that
# error when run will be returned. Check that the requested test
# exists, otherwise these errors are included in the results.
if test_name in loader.getTestCaseNames(module):
suite.addTests(loader.loadTestsFromName(test_name, module))
else:
suite.addTests(loader.loadTestsFromTestCase(module))
print(f" Running {toolname} tests ".center(70, "="))
result = runner.run(suite)
print()
return result

596
tools/u_boot_pylib/tools.py Normal file
View File

@@ -0,0 +1,596 @@
# SPDX-License-Identifier: GPL-2.0+
#
# Copyright (c) 2016 Google, Inc
#
import glob
import os
import shlex
import shutil
import sys
import tempfile
import urllib.request
from u_boot_pylib import command
from u_boot_pylib import tout
# Output directly (generally this is temporary)
outdir = None
# True to keep the output directory around after exiting
preserve_outdir = False
# Path to the Chrome OS chroot, if we know it
chroot_path = None
# Search paths to use for filename(), used to find files
search_paths = []
tool_search_paths = []
# Tools and the packages that contain them, on debian
packages = {
'lz4': 'liblz4-tool',
}
# List of paths to use when looking for an input file
indir = []
def prepare_output_dir(dirname, preserve=False):
"""Select an output directory, ensuring it exists.
This either creates a temporary directory or checks that the one supplied
by the user is valid. For a temporary directory, it makes a note to
remove it later if required.
Args:
dirname: a string, name of the output directory to use to store
intermediate and output files. If is None - create a temporary
directory.
preserve: a Boolean. If outdir above is None and preserve is False, the
created temporary directory will be destroyed on exit.
Raises:
OSError: If it cannot create the output directory.
"""
global outdir, preserve_outdir
preserve_outdir = dirname or preserve
if dirname:
outdir = dirname
if not os.path.isdir(outdir):
try:
os.makedirs(outdir)
except OSError as err:
raise ValueError(
f"Cannot make output directory 'outdir': 'err.strerror'")
tout.debug("Using output directory '%s'" % outdir)
else:
outdir = tempfile.mkdtemp(prefix='binman.')
tout.debug("Using temporary directory '%s'" % outdir)
def _remove_output_dir():
global outdir
shutil.rmtree(outdir)
tout.debug("Deleted temporary directory '%s'" % outdir)
outdir = None
def finalise_output_dir():
global outdir, preserve_outdir
"""Tidy up: delete output directory if temporary and not preserved."""
if outdir and not preserve_outdir:
_remove_output_dir()
outdir = None
def get_output_filename(fname):
"""Return a filename within the output directory.
Args:
fname: Filename to use for new file
Returns:
The full path of the filename, within the output directory
"""
return os.path.join(outdir, fname)
def get_output_dir():
"""Return the current output directory
Returns:
str: The output directory
"""
return outdir
def _finalise_for_test():
"""Remove the output directory (for use by tests)"""
global outdir
if outdir:
_remove_output_dir()
outdir = None
def set_input_dirs(dirname):
"""Add a list of input directories, where input files are kept.
Args:
dirname: a list of paths to input directories to use for obtaining
files needed by binman to place in the image.
"""
global indir
indir = dirname
tout.debug("Using input directories %s" % indir)
def get_input_filename(fname, allow_missing=False):
"""Return a filename for use as input.
Args:
fname: Filename to use for new file
allow_missing: True if the filename can be missing
Returns:
fname, if indir is None;
full path of the filename, within the input directory;
None, if file is missing and allow_missing is True
Raises:
ValueError if file is missing and allow_missing is False
"""
if not indir or fname[:1] == '/':
return fname
for dirname in indir:
pathname = os.path.join(dirname, fname)
if os.path.exists(pathname):
return pathname
if allow_missing:
return None
raise ValueError("Filename '%s' not found in input path (%s) (cwd='%s')" %
(fname, ','.join(indir), os.getcwd()))
def get_input_filename_glob(pattern):
"""Return a list of filenames for use as input.
Args:
pattern: Filename pattern to search for
Returns:
A list of matching files in all input directories
"""
if not indir:
return glob.glob(pattern)
files = []
for dirname in indir:
pathname = os.path.join(dirname, pattern)
files += glob.glob(pathname)
return sorted(files)
def align(pos, align):
if align:
mask = align - 1
pos = (pos + mask) & ~mask
return pos
def not_power_of_two(num):
return num and (num & (num - 1))
def set_tool_paths(toolpaths):
"""Set the path to search for tools
Args:
toolpaths: List of paths to search for tools executed by run()
"""
global tool_search_paths
tool_search_paths = toolpaths
def path_has_file(path_spec, fname):
"""Check if a given filename is in the PATH
Args:
path_spec: Value of PATH variable to check
fname: Filename to check
Returns:
True if found, False if not
"""
for dir in path_spec.split(':'):
if os.path.exists(os.path.join(dir, fname)):
return True
return False
def get_host_compile_tool(env, name):
"""Get the host-specific version for a compile tool
This checks the environment variables that specify which version of
the tool should be used (e.g. ${HOSTCC}).
The following table lists the host-specific versions of the tools
this function resolves to:
Compile Tool | Host version
--------------+----------------
as | ${HOSTAS}
ld | ${HOSTLD}
cc | ${HOSTCC}
cpp | ${HOSTCPP}
c++ | ${HOSTCXX}
ar | ${HOSTAR}
nm | ${HOSTNM}
ldr | ${HOSTLDR}
strip | ${HOSTSTRIP}
objcopy | ${HOSTOBJCOPY}
objdump | ${HOSTOBJDUMP}
dtc | ${HOSTDTC}
Args:
name: Command name to run
Returns:
host_name: Exact command name to run instead
extra_args: List of extra arguments to pass
"""
host_name = None
extra_args = []
if name in ('as', 'ld', 'cc', 'cpp', 'ar', 'nm', 'ldr', 'strip',
'objcopy', 'objdump', 'dtc'):
host_name, *host_args = env.get('HOST' + name.upper(), '').split(' ')
elif name == 'c++':
host_name, *host_args = env.get('HOSTCXX', '').split(' ')
if host_name:
return host_name, extra_args
return name, []
def get_target_compile_tool(name, cross_compile=None):
"""Get the target-specific version for a compile tool
This first checks the environment variables that specify which
version of the tool should be used (e.g. ${CC}). If those aren't
specified, it checks the CROSS_COMPILE variable as a prefix for the
tool with some substitutions (e.g. "${CROSS_COMPILE}gcc" for cc).
The following table lists the target-specific versions of the tools
this function resolves to:
Compile Tool | First choice | Second choice
--------------+----------------+----------------------------
as | ${AS} | ${CROSS_COMPILE}as
ld | ${LD} | ${CROSS_COMPILE}ld.bfd
| | or ${CROSS_COMPILE}ld
cc | ${CC} | ${CROSS_COMPILE}gcc
cpp | ${CPP} | ${CROSS_COMPILE}gcc -E
c++ | ${CXX} | ${CROSS_COMPILE}g++
ar | ${AR} | ${CROSS_COMPILE}ar
nm | ${NM} | ${CROSS_COMPILE}nm
ldr | ${LDR} | ${CROSS_COMPILE}ldr
strip | ${STRIP} | ${CROSS_COMPILE}strip
objcopy | ${OBJCOPY} | ${CROSS_COMPILE}objcopy
objdump | ${OBJDUMP} | ${CROSS_COMPILE}objdump
dtc | ${DTC} | (no CROSS_COMPILE version)
Args:
name: Command name to run
Returns:
target_name: Exact command name to run instead
extra_args: List of extra arguments to pass
"""
env = dict(os.environ)
target_name = None
extra_args = []
if name in ('as', 'ld', 'cc', 'cpp', 'ar', 'nm', 'ldr', 'strip',
'objcopy', 'objdump', 'dtc'):
target_name, *extra_args = env.get(name.upper(), '').split(' ')
elif name == 'c++':
target_name, *extra_args = env.get('CXX', '').split(' ')
if target_name:
return target_name, extra_args
if cross_compile is None:
cross_compile = env.get('CROSS_COMPILE', '')
if name in ('as', 'ar', 'nm', 'ldr', 'strip', 'objcopy', 'objdump'):
target_name = cross_compile + name
elif name == 'ld':
try:
if run(cross_compile + 'ld.bfd', '-v'):
target_name = cross_compile + 'ld.bfd'
except:
target_name = cross_compile + 'ld'
elif name == 'cc':
target_name = cross_compile + 'gcc'
elif name == 'cpp':
target_name = cross_compile + 'gcc'
extra_args = ['-E']
elif name == 'c++':
target_name = cross_compile + 'g++'
else:
target_name = name
return target_name, extra_args
def get_env_with_path():
"""Get an updated environment with the PATH variable set correctly
If there are any search paths set, these need to come first in the PATH so
that these override any other version of the tools.
Returns:
dict: New environment with PATH updated, or None if there are not search
paths
"""
if tool_search_paths:
env = dict(os.environ)
env['PATH'] = ':'.join(tool_search_paths) + ':' + env['PATH']
return env
def run_result(name, *args, **kwargs):
"""Run a tool with some arguments
This runs a 'tool', which is a program used by binman to process files and
perhaps produce some output. Tools can be located on the PATH or in a
search path.
Args:
name: Command name to run
args: Arguments to the tool
for_host: True to resolve the command to the version for the host
for_target: False to run the command as-is, without resolving it
to the version for the compile target
raise_on_error: Raise an error if the command fails (True by default)
Returns:
CommandResult object
"""
try:
binary = kwargs.get('binary')
for_host = kwargs.get('for_host', False)
for_target = kwargs.get('for_target', not for_host)
raise_on_error = kwargs.get('raise_on_error', True)
env = get_env_with_path()
if for_target:
name, extra_args = get_target_compile_tool(name)
args = tuple(extra_args) + args
elif for_host:
name, extra_args = get_host_compile_tool(env, name)
args = tuple(extra_args) + args
name = os.path.expanduser(name) # Expand paths containing ~
all_args = (name,) + args
result = command.run_pipe([all_args], capture=True, capture_stderr=True,
env=env, raise_on_error=False, binary=binary)
if result.return_code:
if raise_on_error:
raise ValueError("Error %d running '%s': %s" %
(result.return_code,' '.join(all_args),
result.stderr or result.stdout))
return result
except ValueError:
if env and not path_has_file(env['PATH'], name):
msg = "Please install tool '%s'" % name
package = packages.get(name)
if package:
msg += " (e.g. from package '%s')" % package
raise ValueError(msg)
raise
def tool_find(name):
"""Search the current path for a tool
This uses both PATH and any value from set_tool_paths() to search for a tool
Args:
name (str): Name of tool to locate
Returns:
str: Full path to tool if found, else None
"""
name = os.path.expanduser(name) # Expand paths containing ~
paths = []
pathvar = os.environ.get('PATH')
if pathvar:
paths = pathvar.split(':')
if tool_search_paths:
paths += tool_search_paths
for path in paths:
fname = os.path.join(path, name)
if os.path.isfile(fname) and os.access(fname, os.X_OK):
return fname
def run(name, *args, **kwargs):
"""Run a tool with some arguments
This runs a 'tool', which is a program used by binman to process files and
perhaps produce some output. Tools can be located on the PATH or in a
search path.
Args:
name: Command name to run
args: Arguments to the tool
for_host: True to resolve the command to the version for the host
for_target: False to run the command as-is, without resolving it
to the version for the compile target
Returns:
CommandResult object
"""
result = run_result(name, *args, **kwargs)
if result is not None:
return result.stdout
def filename(fname):
"""Resolve a file path to an absolute path.
If fname starts with ##/ and chroot is available, ##/ gets replaced with
the chroot path. If chroot is not available, this file name can not be
resolved, `None' is returned.
If fname is not prepended with the above prefix, and is not an existing
file, the actual file name is retrieved from the passed in string and the
search_paths directories (if any) are searched to for the file. If found -
the path to the found file is returned, `None' is returned otherwise.
Args:
fname: a string, the path to resolve.
Returns:
Absolute path to the file or None if not found.
"""
if fname.startswith('##/'):
if chroot_path:
fname = os.path.join(chroot_path, fname[3:])
else:
return None
# Search for a pathname that exists, and return it if found
if fname and not os.path.exists(fname):
for path in search_paths:
pathname = os.path.join(path, os.path.basename(fname))
if os.path.exists(pathname):
return pathname
# If not found, just return the standard, unchanged path
return fname
def read_file(fname, binary=True):
"""Read and return the contents of a file.
Args:
fname: path to filename to read, where ## signifiies the chroot.
Returns:
data read from file, as a string.
"""
with open(filename(fname), binary and 'rb' or 'r') as fd:
data = fd.read()
#self._out.Info("Read file '%s' size %d (%#0x)" %
#(fname, len(data), len(data)))
return data
def write_file(fname, data, binary=True):
"""Write data into a file.
Args:
fname: path to filename to write
data: data to write to file, as a string
"""
#self._out.Info("Write file '%s' size %d (%#0x)" %
#(fname, len(data), len(data)))
with open(filename(fname), binary and 'wb' or 'w') as fd:
fd.write(data)
def get_bytes(byte, size):
"""Get a string of bytes of a given size
Args:
byte: Numeric byte value to use
size: Size of bytes/string to return
Returns:
A bytes type with 'byte' repeated 'size' times
"""
return bytes([byte]) * size
def to_bytes(string):
"""Convert a str type into a bytes type
Args:
string: string to convert
Returns:
A bytes type
"""
return string.encode('utf-8')
def to_string(bval):
"""Convert a bytes type into a str type
Args:
bval: bytes value to convert
Returns:
Python 3: A bytes type
Python 2: A string type
"""
return bval.decode('utf-8')
def to_hex(val):
"""Convert an integer value (or None) to a string
Returns:
hex value, or 'None' if the value is None
"""
return 'None' if val is None else '%#x' % val
def to_hex_size(val):
"""Return the size of an object in hex
Returns:
hex value of size, or 'None' if the value is None
"""
return 'None' if val is None else '%#x' % len(val)
def print_full_help(fname):
"""Print the full help message for a tool using an appropriate pager.
Args:
fname: Path to a file containing the full help message
"""
pager = shlex.split(os.getenv('PAGER', ''))
if not pager:
lesspath = shutil.which('less')
pager = [lesspath] if lesspath else None
if not pager:
pager = ['more']
command.run(*pager, fname)
def download(url, tmpdir_pattern='.patman'):
"""Download a file to a temporary directory
Args:
url (str): URL to download
tmpdir_pattern (str): pattern to use for the temporary directory
Returns:
Tuple:
Full path to the downloaded archive file in that directory,
or None if there was an error while downloading
Temporary directory name
"""
print('- downloading: %s' % url)
leaf = url.split('/')[-1]
tmpdir = tempfile.mkdtemp(tmpdir_pattern)
response = urllib.request.urlopen(url)
fname = os.path.join(tmpdir, leaf)
fd = open(fname, 'wb')
meta = response.info()
size = int(meta.get('Content-Length'))
done = 0
block_size = 1 << 16
status = ''
# Read the file in chunks and show progress as we go
while True:
buffer = response.read(block_size)
if not buffer:
print(chr(8) * (len(status) + 1), '\r', end=' ')
break
done += len(buffer)
fd.write(buffer)
status = r'%10d MiB [%3d%%]' % (done // 1024 // 1024,
done * 100 // size)
status = status + chr(8) * (len(status) + 1)
print(status, end=' ')
sys.stdout.flush()
print('\r', end='')
sys.stdout.flush()
fd.close()
if done != size:
print('Error, failed to download')
os.remove(fname)
fname = None
return fname, tmpdir

179
tools/u_boot_pylib/tout.py Normal file
View File

@@ -0,0 +1,179 @@
# SPDX-License-Identifier: GPL-2.0+
# Copyright (c) 2016 Google, Inc
#
# Terminal output logging.
#
import sys
from u_boot_pylib import terminal
# Output verbosity levels that we support
ERROR, WARNING, NOTICE, INFO, DETAIL, DEBUG = range(6)
in_progress = False
"""
This class handles output of progress and other useful information
to the user. It provides for simple verbosity level control and can
output nothing but errors at verbosity zero.
The idea is that modules set up an Output object early in their years and pass
it around to other modules that need it. This keeps the output under control
of a single class.
Public properties:
verbose: Verbosity level: 0=silent, 1=progress, 3=full, 4=debug
"""
def __enter__():
return
def __exit__(unused1, unused2, unused3):
"""Clean up and remove any progress message."""
clear_progress()
return False
def user_is_present():
"""This returns True if it is likely that a user is present.
Sometimes we want to prompt the user, but if no one is there then this
is a waste of time, and may lock a script which should otherwise fail.
Returns:
True if it thinks the user is there, and False otherwise
"""
return stdout_is_tty and verbose > 0
def clear_progress():
"""Clear any active progress message on the terminal."""
global in_progress
if verbose > 0 and stdout_is_tty and in_progress:
_stdout.write('\r%s\r' % (" " * len (_progress)))
_stdout.flush()
in_progress = False
def progress(msg, warning=False, trailer='...'):
"""Display progress information.
Args:
msg: Message to display.
warning: True if this is a warning."""
global in_progress
clear_progress()
if verbose > 0:
_progress = msg + trailer
if stdout_is_tty:
col = _color.YELLOW if warning else _color.GREEN
_stdout.write('\r' + _color.build(col, _progress))
_stdout.flush()
in_progress = True
else:
_stdout.write(_progress + '\n')
def _output(level, msg, color=None):
"""Output a message to the terminal.
Args:
level: Verbosity level for this message. It will only be displayed if
this as high as the currently selected level.
msg; Message to display.
error: True if this is an error message, else False.
"""
if verbose >= level:
clear_progress()
if color:
msg = _color.build(color, msg)
if level < NOTICE:
print(msg, file=sys.stderr)
else:
print(msg)
def do_output(level, msg):
"""Output a message to the terminal.
Args:
level: Verbosity level for this message. It will only be displayed if
this as high as the currently selected level.
msg; Message to display.
"""
_output(level, msg)
def error(msg):
"""Display an error message
Args:
msg; Message to display.
"""
_output(ERROR, msg, _color.RED)
def warning(msg):
"""Display a warning message
Args:
msg; Message to display.
"""
_output(WARNING, msg, _color.YELLOW)
def notice(msg):
"""Display an important infomation message
Args:
msg; Message to display.
"""
_output(NOTICE, msg)
def info(msg):
"""Display an infomation message
Args:
msg; Message to display.
"""
_output(INFO, msg)
def detail(msg):
"""Display a detailed message
Args:
msg; Message to display.
"""
_output(DETAIL, msg)
def debug(msg):
"""Display a debug message
Args:
msg; Message to display.
"""
_output(DEBUG, msg)
def user_output(msg):
"""Display a message regardless of the current output level.
This is used when the output was specifically requested by the user.
Args:
msg; Message to display.
"""
_output(0, msg)
def init(_verbose=WARNING, stdout=sys.stdout):
"""Initialize a new output object.
Args:
verbose: Verbosity level (0-4).
stdout: File to use for stdout.
"""
global verbose, _progress, _color, _stdout, stdout_is_tty
verbose = _verbose
_progress = '' # Our last progress message
_color = terminal.Color()
_stdout = stdout
# TODO(sjg): Move this into Chromite libraries when we have them
stdout_is_tty = hasattr(sys.stdout, 'isatty') and sys.stdout.isatty()
stderr_is_tty = hasattr(sys.stderr, 'isatty') and sys.stderr.isatty()
def uninit():
clear_progress()
init()

View File

@@ -0,0 +1 @@
__main__.py