diff --git a/README.md b/README.md index 0a910ce..d333749 100644 --- a/README.md +++ b/README.md @@ -228,6 +228,8 @@ Available options: to `SHELL,TERM` - `-t, --title=` - Specify the title of the asciicast - `-i, --idle-time-limit=<sec>` - Limit recorded terminal inactivity to max `<sec>` seconds +- `--cols=<n>` - Override terminal columns for recorded process +- `--rows=<n>` - Override terminal rows for recorded process - `-y, --yes` - Answer "yes" to all prompts (e.g. upload confirmation) - `-q, --quiet` - Be quiet, suppress all notices/warnings (implies -y) diff --git a/asciinema/__main__.py b/asciinema/__main__.py index 64905f1..7c11928 100644 --- a/asciinema/__main__.py +++ b/asciinema/__main__.py @@ -12,6 +12,14 @@ from .commands.record import RecordCommand from .commands.upload import UploadCommand +def positive_int(value: str) -> int: + _value = int(value) + if _value <= 0: + raise argparse.ArgumentTypeError("must be positive") + + return _value + + def positive_float(value: str) -> float: _value = float(value) if _value <= 0.0: @@ -120,6 +128,18 @@ For help on a specific command run: type=positive_float, default=maybe_str(cfg.record_idle_time_limit), ) + parser_rec.add_argument( + "--cols", + help="override terminal columns for recorded process", + type=positive_int, + default=None, + ) + parser_rec.add_argument( + "--rows", + help="override terminal rows for recorded process", + type=positive_int, + default=None, + ) parser_rec.add_argument( "-y", "--yes", diff --git a/asciinema/commands/record.py b/asciinema/commands/record.py index 18b26c5..cadc13b 100644 --- a/asciinema/commands/record.py +++ b/asciinema/commands/record.py @@ -21,6 +21,8 @@ class RecordCommand(Command): # pylint: disable=too-many-instance-attributes self.title = args.title self.assume_yes = args.yes or args.quiet self.idle_time_limit = args.idle_time_limit + self.cols_override = args.cols + self.rows_override = args.rows self.append = args.append self.overwrite = args.overwrite self.raw = args.raw @@ -109,6 +111,8 @@ class RecordCommand(Command): # pylint: disable=too-many-instance-attributes writer=self.writer, notifier=self.notifier, key_bindings=self.key_bindings, + cols_override=self.cols_override, + rows_override=self.rows_override, ) except v2.LoadError: self.print_error( diff --git a/asciinema/pty_.py b/asciinema/pty_.py index 03dac97..ee0008f 100644 --- a/asciinema/pty_.py +++ b/asciinema/pty_.py @@ -8,7 +8,7 @@ import signal import struct import termios import time -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Callable, Dict, List, Optional, Tuple from .term import raw @@ -17,11 +17,14 @@ from .term import raw def record( command: Any, writer: Any, + get_tty_size: Callable[[], Tuple[int, int]], env: Any = None, rec_stdin: bool = False, time_offset: float = 0, notifier: Any = None, key_bindings: Optional[Dict[str, Any]] = None, + tty_stdin_fd: int = pty.STDIN_FILENO, + tty_stdout_fd: int = pty.STDOUT_FILENO, ) -> None: if env is None: env = os.environ @@ -46,18 +49,15 @@ def record( # 1. Get the terminal size of the real terminal. # 2. Set the same size on the pseudoterminal. - if os.isatty(pty.STDOUT_FILENO): - buf = array.array("h", [0, 0, 0, 0]) - fcntl.ioctl(pty.STDOUT_FILENO, termios.TIOCGWINSZ, buf, True) - else: - buf = array.array("h", [24, 80, 0, 0]) + cols, rows = get_tty_size() + buf = array.array("h", [rows, cols, 0, 0]) fcntl.ioctl(master_fd, termios.TIOCSWINSZ, buf) def _write_stdout(data: Any) -> None: """Writes to stdout as if the child process had written the data.""" - os.write(pty.STDOUT_FILENO, data) + os.write(tty_stdout_fd, data) def _handle_master_read(data: Any) -> None: """Handles new data on child process stdout.""" @@ -120,7 +120,7 @@ def record( when new data arrives. """ - fds = [master_fd, pty.STDIN_FILENO, signal_fd] + fds = [master_fd, tty_stdin_fd, signal_fd] while True: try: @@ -136,10 +136,10 @@ def record( else: _handle_master_read(data) - if pty.STDIN_FILENO in rfds: - data = os.read(pty.STDIN_FILENO, 1024) + if tty_stdin_fd in rfds: + data = os.read(tty_stdin_fd, 1024) if not data: - fds.remove(pty.STDIN_FILENO) + fds.remove(tty_stdin_fd) else: _handle_stdin_read(data) @@ -188,7 +188,7 @@ def record( start_time = time.time() - time_offset - with raw(pty.STDIN_FILENO): + with raw(tty_stdin_fd): try: _copy(pipe_r) except (IOError, OSError): diff --git a/asciinema/recorder.py b/asciinema/recorder.py index 117f4f4..0d08493 100644 --- a/asciinema/recorder.py +++ b/asciinema/recorder.py @@ -3,7 +3,6 @@ import time from typing import Any, Callable, Dict, Optional, Tuple, Type from . import pty_ as pty # avoid collisions with standard library `pty` -from . import term from .asciicast import v2 from .asciicast.v2 import writer as w2 from .async_worker import async_worker @@ -23,6 +22,8 @@ def record( # pylint: disable=too-many-arguments,too-many-locals record_: Callable[..., None] = pty.record, notifier: Any = None, key_bindings: Optional[Dict[str, Any]] = None, + cols_override: Optional[int] = None, + rows_override: Optional[int] = None, ) -> None: if command is None: command = os.environ.get("SHELL", "sh") @@ -38,11 +39,16 @@ def record( # pylint: disable=too-many-arguments,too-many-locals if capture_env is None: capture_env = ["SHELL", "TERM"] - w, h = term.get_size() + tty_stdin_fd = 0 + tty_stdout_fd = 1 + + get_tty_size = _get_tty_size(tty_stdout_fd, cols_override, rows_override) + + cols, rows = get_tty_size() full_metadata: Dict[str, Any] = { - "width": w, - "height": h, + "width": cols, + "height": rows, "timestamp": int(time.time()), } @@ -75,11 +81,14 @@ def record( # pylint: disable=too-many-arguments,too-many-locals record_( ["sh", "-c", command], _writer, + get_tty_size, command_env, rec_stdin, time_offset, _notifier, key_bindings, + tty_stdin_fd=tty_stdin_fd, + tty_stdout_fd=tty_stdout_fd, ) @@ -143,3 +152,27 @@ class async_notifier(async_worker): # we catch *ALL* exceptions here because we don't want failed # notification to crash the recording session pass + + +def _get_tty_size( + fd: int, cols_override: Optional[int], rows_override: Optional[int] +) -> Callable[[], Tuple[int, int]]: + if cols_override is not None and rows_override is not None: + + def fixed_size() -> Tuple[int, int]: + return (cols_override, rows_override) # type: ignore + + return fixed_size + + if not os.isatty(fd): + + def fallback_size() -> Tuple[int, int]: + return (cols_override or 80, rows_override or 24) + + return fallback_size + + def size() -> Tuple[int, int]: + cols, rows = os.get_terminal_size(fd) + return (cols_override or cols, rows_override or rows) + + return size diff --git a/asciinema/term.py b/asciinema/term.py index 7303675..0e851cd 100644 --- a/asciinema/term.py +++ b/asciinema/term.py @@ -1,10 +1,9 @@ import os import select -import subprocess import termios as tty # avoid `Module "tty" has no attribute ...` errors from time import sleep from tty import setraw -from typing import IO, Any, List, Optional, Tuple, Union +from typing import IO, Any, List, Optional, Union class raw: @@ -33,13 +32,3 @@ def read_blocking(fd: int, timeout: Any) -> bytes: return os.read(fd, 1024) return b"" - - -def get_size() -> Tuple[int, int]: - try: - return os.get_terminal_size() - except: # pylint: disable=bare-except # noqa: E722 - return ( - int(subprocess.check_output(["tput", "cols"])), - int(subprocess.check_output(["tput", "lines"])), - ) diff --git a/man/asciinema.1.md b/man/asciinema.1.md index f16edd0..39ad08c 100644 --- a/man/asciinema.1.md +++ b/man/asciinema.1.md @@ -93,6 +93,12 @@ Available options: `-i, --idle-time-limit=<sec>` : Limit recorded terminal inactivity to max `<sec>` seconds + `--cols=<n>` + : Override terminal columns for recorded process + + `--rows=<n>` + : Override terminal rows for recorded process + `-y, --yes` : Answer "yes" to all prompts (e.g. upload confirmation) diff --git a/tests/pty_test.py b/tests/pty_test.py index df7aa62..49fb93c 100644 --- a/tests/pty_test.py +++ b/tests/pty_test.py @@ -44,6 +44,6 @@ class TestRecord(Test): "; sys.stdout.write('bar')" ), ] - asciinema.pty_.record(command, output) + asciinema.pty_.record(command, output, lambda: (80, 24)) assert output.data == [b"foo", b"bar"]