mirror of
https://github.com/asciinema/asciinema.git
synced 2025-12-16 19:58:03 +01:00
Merge pull request #472 from asciinema/recorder-refa
Recorder refactoring
This commit is contained in:
@@ -17,7 +17,7 @@ def record_asciicast( # pylint: disable=too-many-arguments
|
||||
command: Any = None,
|
||||
append: bool = False,
|
||||
idle_time_limit: Optional[int] = None,
|
||||
rec_stdin: bool = False,
|
||||
record_stdin: bool = False,
|
||||
title: Optional[str] = None,
|
||||
metadata: Any = None,
|
||||
command_env: Any = None,
|
||||
@@ -28,7 +28,7 @@ def record_asciicast( # pylint: disable=too-many-arguments
|
||||
command=command,
|
||||
append=append,
|
||||
idle_time_limit=idle_time_limit,
|
||||
rec_stdin=rec_stdin,
|
||||
record_stdin=record_stdin,
|
||||
title=title,
|
||||
metadata=metadata,
|
||||
command_env=command_env,
|
||||
|
||||
@@ -15,7 +15,7 @@ class RecordCommand(Command): # pylint: disable=too-many-instance-attributes
|
||||
Command.__init__(self, args, config, env)
|
||||
self.quiet = args.quiet
|
||||
self.filename = args.filename
|
||||
self.rec_stdin = args.stdin
|
||||
self.record_stdin = args.stdin
|
||||
self.command = args.command
|
||||
self.env_whitelist = args.env
|
||||
self.title = args.title
|
||||
@@ -107,9 +107,9 @@ class RecordCommand(Command): # pylint: disable=too-many-instance-attributes
|
||||
idle_time_limit=self.idle_time_limit,
|
||||
command_env=self.env,
|
||||
capture_env=vars_,
|
||||
rec_stdin=self.rec_stdin,
|
||||
record_stdin=self.record_stdin,
|
||||
writer=self.writer,
|
||||
notifier=self.notifier,
|
||||
notify=self.notifier.notify,
|
||||
key_bindings=self.key_bindings,
|
||||
cols_override=self.cols_override,
|
||||
rows_override=self.rows_override,
|
||||
|
||||
@@ -102,7 +102,7 @@ class CustomCommandNotifier(Notifier):
|
||||
|
||||
|
||||
class NoopNotifier: # pylint: disable=too-few-public-methods
|
||||
def notify(self) -> None:
|
||||
def notify(self, text: str) -> None:
|
||||
pass
|
||||
|
||||
|
||||
|
||||
@@ -16,20 +16,14 @@ from .term import raw
|
||||
# pylint: disable=too-many-arguments,too-many-locals,too-many-statements
|
||||
def record(
|
||||
command: Any,
|
||||
env: Dict[str, str],
|
||||
writer: Any,
|
||||
get_tty_size: Callable[[], Tuple[int, int]],
|
||||
env: Any = None,
|
||||
rec_stdin: bool = False,
|
||||
time_offset: float = 0,
|
||||
notifier: Any = None,
|
||||
key_bindings: Optional[Dict[str, Any]] = None,
|
||||
notify: Callable[[str], None],
|
||||
key_bindings: Dict[str, Any],
|
||||
tty_stdin_fd: int = pty.STDIN_FILENO,
|
||||
tty_stdout_fd: int = pty.STDOUT_FILENO,
|
||||
) -> None:
|
||||
if env is None:
|
||||
env = os.environ
|
||||
if key_bindings is None:
|
||||
key_bindings = {}
|
||||
master_fd: Any = None
|
||||
start_time: Optional[float] = None
|
||||
pause_time: Optional[float] = None
|
||||
@@ -37,10 +31,6 @@ def record(
|
||||
prefix_key = key_bindings.get("prefix")
|
||||
pause_key = key_bindings.get("pause")
|
||||
|
||||
def _notify(text: str) -> None:
|
||||
if notifier:
|
||||
notifier.notify(text)
|
||||
|
||||
def _set_pty_size() -> None:
|
||||
"""
|
||||
Sets the window size of the child pty based on the window size
|
||||
@@ -94,16 +84,16 @@ def record(
|
||||
assert start_time is not None
|
||||
start_time += time.time() - pause_time
|
||||
pause_time = None
|
||||
_notify("Resumed recording")
|
||||
notify("Resumed recording")
|
||||
else:
|
||||
pause_time = time.time()
|
||||
_notify("Paused recording")
|
||||
notify("Paused recording")
|
||||
|
||||
return
|
||||
|
||||
_write_master(data)
|
||||
|
||||
if rec_stdin and not pause_time:
|
||||
if not pause_time:
|
||||
assert start_time is not None
|
||||
writer.write_stdin(time.time() - start_time, data)
|
||||
|
||||
@@ -186,7 +176,7 @@ def record(
|
||||
|
||||
_set_pty_size()
|
||||
|
||||
start_time = time.time() - time_offset
|
||||
start_time = time.time()
|
||||
|
||||
with raw(tty_stdin_fd):
|
||||
try:
|
||||
|
||||
@@ -13,14 +13,14 @@ def record( # pylint: disable=too-many-arguments,too-many-locals
|
||||
command: Any = None,
|
||||
append: bool = False,
|
||||
idle_time_limit: Optional[int] = None,
|
||||
rec_stdin: bool = False,
|
||||
record_stdin: bool = False,
|
||||
title: Optional[str] = None,
|
||||
metadata: Any = None,
|
||||
command_env: Optional[Dict[Any, Any]] = None,
|
||||
capture_env: Any = None,
|
||||
writer: Type[w2] = v2.writer,
|
||||
record_: Callable[..., None] = pty.record,
|
||||
notifier: Any = None,
|
||||
notify: Callable[[str], None] = lambda _: None,
|
||||
key_bindings: Optional[Dict[str, Any]] = None,
|
||||
cols_override: Optional[int] = None,
|
||||
rows_override: Optional[int] = None,
|
||||
@@ -70,22 +70,18 @@ def record( # pylint: disable=too-many-arguments,too-many-locals
|
||||
if append and os.stat(path_).st_size > 0:
|
||||
time_offset = v2.get_duration(path_)
|
||||
|
||||
with async_notifier(notifier) as _notifier:
|
||||
with async_writer(
|
||||
writer,
|
||||
path_,
|
||||
full_metadata,
|
||||
append,
|
||||
_notifier.queue,
|
||||
) as _writer:
|
||||
with async_notifier(notify) as _notifier:
|
||||
sync_writer = writer(
|
||||
path_, full_metadata, append, on_error=_notifier.notify
|
||||
)
|
||||
|
||||
with async_writer(sync_writer, time_offset, record_stdin) as _writer:
|
||||
record_(
|
||||
["sh", "-c", command],
|
||||
command_env,
|
||||
_writer,
|
||||
get_tty_size,
|
||||
command_env,
|
||||
rec_stdin,
|
||||
time_offset,
|
||||
_notifier,
|
||||
_notifier.notify,
|
||||
key_bindings,
|
||||
tty_stdin_fd=tty_stdin_fd,
|
||||
tty_stdout_fd=tty_stdout_fd,
|
||||
@@ -94,60 +90,44 @@ def record( # pylint: disable=too-many-arguments,too-many-locals
|
||||
|
||||
class async_writer(async_worker):
|
||||
def __init__(
|
||||
self,
|
||||
writer: Type[w2],
|
||||
path_: str,
|
||||
metadata: Any,
|
||||
append: bool = False,
|
||||
notifier_q: Any = None,
|
||||
self, writer: w2, time_offset: float, record_stdin: bool
|
||||
) -> None:
|
||||
async_worker.__init__(self)
|
||||
self.writer = writer
|
||||
self.path = path_
|
||||
self.metadata = metadata
|
||||
self.append = append
|
||||
self.notifier_q = notifier_q
|
||||
self.time_offset = time_offset
|
||||
self.record_stdin = record_stdin
|
||||
|
||||
def write_stdin(self, ts: float, data: Any) -> None:
|
||||
self.enqueue([ts, "i", data])
|
||||
if self.record_stdin:
|
||||
self.enqueue([ts, "i", data])
|
||||
|
||||
def write_stdout(self, ts: float, data: Any) -> None:
|
||||
self.enqueue([ts, "o", data])
|
||||
|
||||
def run(self) -> None:
|
||||
with self.writer(
|
||||
self.path,
|
||||
metadata=self.metadata,
|
||||
append=self.append,
|
||||
on_error=self.__on_error,
|
||||
) as w:
|
||||
with self.writer as w:
|
||||
event: Tuple[float, str, Any]
|
||||
for event in iter(self.queue.get, None):
|
||||
assert event is not None
|
||||
ts, etype, data = event
|
||||
|
||||
if etype == "o":
|
||||
w.write_stdout(ts, data)
|
||||
w.write_stdout(self.time_offset + ts, data)
|
||||
elif etype == "i":
|
||||
w.write_stdin(ts, data)
|
||||
|
||||
def __on_error(self, reason: str) -> None:
|
||||
if self.notifier_q:
|
||||
self.notifier_q.put(reason)
|
||||
w.write_stdin(self.time_offset + ts, data)
|
||||
|
||||
|
||||
class async_notifier(async_worker):
|
||||
def __init__(self, notifier: Any) -> None:
|
||||
def __init__(self, notify: Callable[[str], None]) -> None:
|
||||
async_worker.__init__(self)
|
||||
self.notifier = notifier
|
||||
self._notify = notify
|
||||
|
||||
def notify(self, text: str) -> None:
|
||||
self.enqueue(text)
|
||||
|
||||
def perform(self, text: str) -> None:
|
||||
try:
|
||||
if self.notifier:
|
||||
self.notifier.notify(text)
|
||||
self._notify(text)
|
||||
except: # pylint: disable=bare-except # noqa: E722
|
||||
# we catch *ALL* exceptions here because we don't want failed
|
||||
# notification to crash the recording session
|
||||
|
||||
@@ -6,7 +6,7 @@ import asciinema.pty_
|
||||
from .test_helper import Test
|
||||
|
||||
|
||||
class FakeStdout:
|
||||
class Writer:
|
||||
def __init__(self):
|
||||
self.data = []
|
||||
|
||||
@@ -30,7 +30,7 @@ class TestRecord(Test):
|
||||
self.real_os_write(fd, data)
|
||||
|
||||
def test_record_command_writes_to_stdout(self):
|
||||
output = FakeStdout()
|
||||
writer = Writer()
|
||||
|
||||
command = [
|
||||
"python3",
|
||||
@@ -44,6 +44,9 @@ class TestRecord(Test):
|
||||
"; sys.stdout.write('bar')"
|
||||
),
|
||||
]
|
||||
asciinema.pty_.record(command, output, lambda: (80, 24))
|
||||
|
||||
assert output.data == [b"foo", b"bar"]
|
||||
asciinema.pty_.record(
|
||||
command, {}, writer, lambda: (80, 24), lambda s: None, {}
|
||||
)
|
||||
|
||||
assert writer.data == [b"foo", b"bar"]
|
||||
|
||||
Reference in New Issue
Block a user