Spaces:
Paused
Paused
| from __future__ import annotations | |
| import collections.abc as cabc | |
| import contextlib | |
| import io | |
| import os | |
| import shlex | |
| import shutil | |
| import sys | |
| import tempfile | |
| import typing as t | |
| from types import TracebackType | |
| from . import _compat | |
| from . import formatting | |
| from . import termui | |
| from . import utils | |
| from ._compat import _find_binary_reader | |
| if t.TYPE_CHECKING: | |
| from _typeshed import ReadableBuffer | |
| from .core import Command | |
| class EchoingStdin: | |
| def __init__(self, input: t.BinaryIO, output: t.BinaryIO) -> None: | |
| self._input = input | |
| self._output = output | |
| self._paused = False | |
| def __getattr__(self, x: str) -> t.Any: | |
| return getattr(self._input, x) | |
| def _echo(self, rv: bytes) -> bytes: | |
| if not self._paused: | |
| self._output.write(rv) | |
| return rv | |
| def read(self, n: int = -1) -> bytes: | |
| return self._echo(self._input.read(n)) | |
| def read1(self, n: int = -1) -> bytes: | |
| return self._echo(self._input.read1(n)) # type: ignore | |
| def readline(self, n: int = -1) -> bytes: | |
| return self._echo(self._input.readline(n)) | |
| def readlines(self) -> list[bytes]: | |
| return [self._echo(x) for x in self._input.readlines()] | |
| def __iter__(self) -> cabc.Iterator[bytes]: | |
| return iter(self._echo(x) for x in self._input) | |
| def __repr__(self) -> str: | |
| return repr(self._input) | |
| def _pause_echo(stream: EchoingStdin | None) -> cabc.Iterator[None]: | |
| if stream is None: | |
| yield | |
| else: | |
| stream._paused = True | |
| yield | |
| stream._paused = False | |
| class BytesIOCopy(io.BytesIO): | |
| """Patch ``io.BytesIO`` to let the written stream be copied to another. | |
| .. versionadded:: 8.2 | |
| """ | |
| def __init__(self, copy_to: io.BytesIO) -> None: | |
| super().__init__() | |
| self.copy_to = copy_to | |
| def flush(self) -> None: | |
| super().flush() | |
| self.copy_to.flush() | |
| def write(self, b: ReadableBuffer) -> int: | |
| self.copy_to.write(b) | |
| return super().write(b) | |
| class StreamMixer: | |
| """Mixes `<stdout>` and `<stderr>` streams. | |
| The result is available in the ``output`` attribute. | |
| .. versionadded:: 8.2 | |
| """ | |
| def __init__(self) -> None: | |
| self.output: io.BytesIO = io.BytesIO() | |
| self.stdout: io.BytesIO = BytesIOCopy(copy_to=self.output) | |
| self.stderr: io.BytesIO = BytesIOCopy(copy_to=self.output) | |
| class _NamedTextIOWrapper(io.TextIOWrapper): | |
| def __init__( | |
| self, buffer: t.BinaryIO, name: str, mode: str, **kwargs: t.Any | |
| ) -> None: | |
| super().__init__(buffer, **kwargs) | |
| self._name = name | |
| self._mode = mode | |
| def name(self) -> str: | |
| return self._name | |
| def mode(self) -> str: | |
| return self._mode | |
| def __next__(self) -> str: # type: ignore | |
| try: | |
| line = super().__next__() | |
| except StopIteration as e: | |
| raise EOFError() from e | |
| return line | |
| def make_input_stream( | |
| input: str | bytes | t.IO[t.Any] | None, charset: str | |
| ) -> t.BinaryIO: | |
| # Is already an input stream. | |
| if hasattr(input, "read"): | |
| rv = _find_binary_reader(t.cast("t.IO[t.Any]", input)) | |
| if rv is not None: | |
| return rv | |
| raise TypeError("Could not find binary reader for input stream.") | |
| if input is None: | |
| input = b"" | |
| elif isinstance(input, str): | |
| input = input.encode(charset) | |
| return io.BytesIO(input) | |
| class Result: | |
| """Holds the captured result of an invoked CLI script. | |
| :param runner: The runner that created the result | |
| :param stdout_bytes: The standard output as bytes. | |
| :param stderr_bytes: The standard error as bytes. | |
| :param output_bytes: A mix of ``stdout_bytes`` and ``stderr_bytes``, as the | |
| user would see it in its terminal. | |
| :param return_value: The value returned from the invoked command. | |
| :param exit_code: The exit code as integer. | |
| :param exception: The exception that happened if one did. | |
| :param exc_info: Exception information (exception type, exception instance, | |
| traceback type). | |
| .. versionchanged:: 8.2 | |
| ``stderr_bytes`` no longer optional, ``output_bytes`` introduced and | |
| ``mix_stderr`` has been removed. | |
| .. versionadded:: 8.0 | |
| Added ``return_value``. | |
| """ | |
| def __init__( | |
| self, | |
| runner: CliRunner, | |
| stdout_bytes: bytes, | |
| stderr_bytes: bytes, | |
| output_bytes: bytes, | |
| return_value: t.Any, | |
| exit_code: int, | |
| exception: BaseException | None, | |
| exc_info: tuple[type[BaseException], BaseException, TracebackType] | |
| | None = None, | |
| ): | |
| self.runner = runner | |
| self.stdout_bytes = stdout_bytes | |
| self.stderr_bytes = stderr_bytes | |
| self.output_bytes = output_bytes | |
| self.return_value = return_value | |
| self.exit_code = exit_code | |
| self.exception = exception | |
| self.exc_info = exc_info | |
| def output(self) -> str: | |
| """The terminal output as unicode string, as the user would see it. | |
| .. versionchanged:: 8.2 | |
| No longer a proxy for ``self.stdout``. Now has its own independent stream | |
| that is mixing `<stdout>` and `<stderr>`, in the order they were written. | |
| """ | |
| return self.output_bytes.decode(self.runner.charset, "replace").replace( | |
| "\r\n", "\n" | |
| ) | |
| def stdout(self) -> str: | |
| """The standard output as unicode string.""" | |
| return self.stdout_bytes.decode(self.runner.charset, "replace").replace( | |
| "\r\n", "\n" | |
| ) | |
| def stderr(self) -> str: | |
| """The standard error as unicode string. | |
| .. versionchanged:: 8.2 | |
| No longer raise an exception, always returns the `<stderr>` string. | |
| """ | |
| return self.stderr_bytes.decode(self.runner.charset, "replace").replace( | |
| "\r\n", "\n" | |
| ) | |
| def __repr__(self) -> str: | |
| exc_str = repr(self.exception) if self.exception else "okay" | |
| return f"<{type(self).__name__} {exc_str}>" | |
| class CliRunner: | |
| """The CLI runner provides functionality to invoke a Click command line | |
| script for unittesting purposes in a isolated environment. This only | |
| works in single-threaded systems without any concurrency as it changes the | |
| global interpreter state. | |
| :param charset: the character set for the input and output data. | |
| :param env: a dictionary with environment variables for overriding. | |
| :param echo_stdin: if this is set to `True`, then reading from `<stdin>` writes | |
| to `<stdout>`. This is useful for showing examples in | |
| some circumstances. Note that regular prompts | |
| will automatically echo the input. | |
| :param catch_exceptions: Whether to catch any exceptions other than | |
| ``SystemExit`` when running :meth:`~CliRunner.invoke`. | |
| .. versionchanged:: 8.2 | |
| Added the ``catch_exceptions`` parameter. | |
| .. versionchanged:: 8.2 | |
| ``mix_stderr`` parameter has been removed. | |
| """ | |
| def __init__( | |
| self, | |
| charset: str = "utf-8", | |
| env: cabc.Mapping[str, str | None] | None = None, | |
| echo_stdin: bool = False, | |
| catch_exceptions: bool = True, | |
| ) -> None: | |
| self.charset = charset | |
| self.env: cabc.Mapping[str, str | None] = env or {} | |
| self.echo_stdin = echo_stdin | |
| self.catch_exceptions = catch_exceptions | |
| def get_default_prog_name(self, cli: Command) -> str: | |
| """Given a command object it will return the default program name | |
| for it. The default is the `name` attribute or ``"root"`` if not | |
| set. | |
| """ | |
| return cli.name or "root" | |
| def make_env( | |
| self, overrides: cabc.Mapping[str, str | None] | None = None | |
| ) -> cabc.Mapping[str, str | None]: | |
| """Returns the environment overrides for invoking a script.""" | |
| rv = dict(self.env) | |
| if overrides: | |
| rv.update(overrides) | |
| return rv | |
| def isolation( | |
| self, | |
| input: str | bytes | t.IO[t.Any] | None = None, | |
| env: cabc.Mapping[str, str | None] | None = None, | |
| color: bool = False, | |
| ) -> cabc.Iterator[tuple[io.BytesIO, io.BytesIO, io.BytesIO]]: | |
| """A context manager that sets up the isolation for invoking of a | |
| command line tool. This sets up `<stdin>` with the given input data | |
| and `os.environ` with the overrides from the given dictionary. | |
| This also rebinds some internals in Click to be mocked (like the | |
| prompt functionality). | |
| This is automatically done in the :meth:`invoke` method. | |
| :param input: the input stream to put into `sys.stdin`. | |
| :param env: the environment overrides as dictionary. | |
| :param color: whether the output should contain color codes. The | |
| application can still override this explicitly. | |
| .. versionadded:: 8.2 | |
| An additional output stream is returned, which is a mix of | |
| `<stdout>` and `<stderr>` streams. | |
| .. versionchanged:: 8.2 | |
| Always returns the `<stderr>` stream. | |
| .. versionchanged:: 8.0 | |
| `<stderr>` is opened with ``errors="backslashreplace"`` | |
| instead of the default ``"strict"``. | |
| .. versionchanged:: 4.0 | |
| Added the ``color`` parameter. | |
| """ | |
| bytes_input = make_input_stream(input, self.charset) | |
| echo_input = None | |
| old_stdin = sys.stdin | |
| old_stdout = sys.stdout | |
| old_stderr = sys.stderr | |
| old_forced_width = formatting.FORCED_WIDTH | |
| formatting.FORCED_WIDTH = 80 | |
| env = self.make_env(env) | |
| stream_mixer = StreamMixer() | |
| if self.echo_stdin: | |
| bytes_input = echo_input = t.cast( | |
| t.BinaryIO, EchoingStdin(bytes_input, stream_mixer.stdout) | |
| ) | |
| sys.stdin = text_input = _NamedTextIOWrapper( | |
| bytes_input, encoding=self.charset, name="<stdin>", mode="r" | |
| ) | |
| if self.echo_stdin: | |
| # Force unbuffered reads, otherwise TextIOWrapper reads a | |
| # large chunk which is echoed early. | |
| text_input._CHUNK_SIZE = 1 # type: ignore | |
| sys.stdout = _NamedTextIOWrapper( | |
| stream_mixer.stdout, encoding=self.charset, name="<stdout>", mode="w" | |
| ) | |
| sys.stderr = _NamedTextIOWrapper( | |
| stream_mixer.stderr, | |
| encoding=self.charset, | |
| name="<stderr>", | |
| mode="w", | |
| errors="backslashreplace", | |
| ) | |
| # type: ignore | |
| def visible_input(prompt: str | None = None) -> str: | |
| sys.stdout.write(prompt or "") | |
| val = next(text_input).rstrip("\r\n") | |
| sys.stdout.write(f"{val}\n") | |
| sys.stdout.flush() | |
| return val | |
| # type: ignore | |
| def hidden_input(prompt: str | None = None) -> str: | |
| sys.stdout.write(f"{prompt or ''}\n") | |
| sys.stdout.flush() | |
| return next(text_input).rstrip("\r\n") | |
| # type: ignore | |
| def _getchar(echo: bool) -> str: | |
| char = sys.stdin.read(1) | |
| if echo: | |
| sys.stdout.write(char) | |
| sys.stdout.flush() | |
| return char | |
| default_color = color | |
| def should_strip_ansi( | |
| stream: t.IO[t.Any] | None = None, color: bool | None = None | |
| ) -> bool: | |
| if color is None: | |
| return not default_color | |
| return not color | |
| old_visible_prompt_func = termui.visible_prompt_func | |
| old_hidden_prompt_func = termui.hidden_prompt_func | |
| old__getchar_func = termui._getchar | |
| old_should_strip_ansi = utils.should_strip_ansi # type: ignore | |
| old__compat_should_strip_ansi = _compat.should_strip_ansi | |
| termui.visible_prompt_func = visible_input | |
| termui.hidden_prompt_func = hidden_input | |
| termui._getchar = _getchar | |
| utils.should_strip_ansi = should_strip_ansi # type: ignore | |
| _compat.should_strip_ansi = should_strip_ansi | |
| old_env = {} | |
| try: | |
| for key, value in env.items(): | |
| old_env[key] = os.environ.get(key) | |
| if value is None: | |
| try: | |
| del os.environ[key] | |
| except Exception: | |
| pass | |
| else: | |
| os.environ[key] = value | |
| yield (stream_mixer.stdout, stream_mixer.stderr, stream_mixer.output) | |
| finally: | |
| for key, value in old_env.items(): | |
| if value is None: | |
| try: | |
| del os.environ[key] | |
| except Exception: | |
| pass | |
| else: | |
| os.environ[key] = value | |
| sys.stdout = old_stdout | |
| sys.stderr = old_stderr | |
| sys.stdin = old_stdin | |
| termui.visible_prompt_func = old_visible_prompt_func | |
| termui.hidden_prompt_func = old_hidden_prompt_func | |
| termui._getchar = old__getchar_func | |
| utils.should_strip_ansi = old_should_strip_ansi # type: ignore | |
| _compat.should_strip_ansi = old__compat_should_strip_ansi | |
| formatting.FORCED_WIDTH = old_forced_width | |
| def invoke( | |
| self, | |
| cli: Command, | |
| args: str | cabc.Sequence[str] | None = None, | |
| input: str | bytes | t.IO[t.Any] | None = None, | |
| env: cabc.Mapping[str, str | None] | None = None, | |
| catch_exceptions: bool | None = None, | |
| color: bool = False, | |
| **extra: t.Any, | |
| ) -> Result: | |
| """Invokes a command in an isolated environment. The arguments are | |
| forwarded directly to the command line script, the `extra` keyword | |
| arguments are passed to the :meth:`~clickpkg.Command.main` function of | |
| the command. | |
| This returns a :class:`Result` object. | |
| :param cli: the command to invoke | |
| :param args: the arguments to invoke. It may be given as an iterable | |
| or a string. When given as string it will be interpreted | |
| as a Unix shell command. More details at | |
| :func:`shlex.split`. | |
| :param input: the input data for `sys.stdin`. | |
| :param env: the environment overrides. | |
| :param catch_exceptions: Whether to catch any other exceptions than | |
| ``SystemExit``. If :data:`None`, the value | |
| from :class:`CliRunner` is used. | |
| :param extra: the keyword arguments to pass to :meth:`main`. | |
| :param color: whether the output should contain color codes. The | |
| application can still override this explicitly. | |
| .. versionadded:: 8.2 | |
| The result object has the ``output_bytes`` attribute with | |
| the mix of ``stdout_bytes`` and ``stderr_bytes``, as the user would | |
| see it in its terminal. | |
| .. versionchanged:: 8.2 | |
| The result object always returns the ``stderr_bytes`` stream. | |
| .. versionchanged:: 8.0 | |
| The result object has the ``return_value`` attribute with | |
| the value returned from the invoked command. | |
| .. versionchanged:: 4.0 | |
| Added the ``color`` parameter. | |
| .. versionchanged:: 3.0 | |
| Added the ``catch_exceptions`` parameter. | |
| .. versionchanged:: 3.0 | |
| The result object has the ``exc_info`` attribute with the | |
| traceback if available. | |
| """ | |
| exc_info = None | |
| if catch_exceptions is None: | |
| catch_exceptions = self.catch_exceptions | |
| with self.isolation(input=input, env=env, color=color) as outstreams: | |
| return_value = None | |
| exception: BaseException | None = None | |
| exit_code = 0 | |
| if isinstance(args, str): | |
| args = shlex.split(args) | |
| try: | |
| prog_name = extra.pop("prog_name") | |
| except KeyError: | |
| prog_name = self.get_default_prog_name(cli) | |
| try: | |
| return_value = cli.main(args=args or (), prog_name=prog_name, **extra) | |
| except SystemExit as e: | |
| exc_info = sys.exc_info() | |
| e_code = t.cast("int | t.Any | None", e.code) | |
| if e_code is None: | |
| e_code = 0 | |
| if e_code != 0: | |
| exception = e | |
| if not isinstance(e_code, int): | |
| sys.stdout.write(str(e_code)) | |
| sys.stdout.write("\n") | |
| e_code = 1 | |
| exit_code = e_code | |
| except Exception as e: | |
| if not catch_exceptions: | |
| raise | |
| exception = e | |
| exit_code = 1 | |
| exc_info = sys.exc_info() | |
| finally: | |
| sys.stdout.flush() | |
| sys.stderr.flush() | |
| stdout = outstreams[0].getvalue() | |
| stderr = outstreams[1].getvalue() | |
| output = outstreams[2].getvalue() | |
| return Result( | |
| runner=self, | |
| stdout_bytes=stdout, | |
| stderr_bytes=stderr, | |
| output_bytes=output, | |
| return_value=return_value, | |
| exit_code=exit_code, | |
| exception=exception, | |
| exc_info=exc_info, # type: ignore | |
| ) | |
| def isolated_filesystem( | |
| self, temp_dir: str | os.PathLike[str] | None = None | |
| ) -> cabc.Iterator[str]: | |
| """A context manager that creates a temporary directory and | |
| changes the current working directory to it. This isolates tests | |
| that affect the contents of the CWD to prevent them from | |
| interfering with each other. | |
| :param temp_dir: Create the temporary directory under this | |
| directory. If given, the created directory is not removed | |
| when exiting. | |
| .. versionchanged:: 8.0 | |
| Added the ``temp_dir`` parameter. | |
| """ | |
| cwd = os.getcwd() | |
| dt = tempfile.mkdtemp(dir=temp_dir) | |
| os.chdir(dt) | |
| try: | |
| yield dt | |
| finally: | |
| os.chdir(cwd) | |
| if temp_dir is None: | |
| try: | |
| shutil.rmtree(dt) | |
| except OSError: | |
| pass | |