Merge pull request #453 from asciinema/feature/fifo-reopen

Improved handling of named pipes (FIFO)
This commit is contained in:
Marcin Kulik
2022-02-15 20:39:55 +01:00
committed by GitHub
5 changed files with 113 additions and 42 deletions

View File

@@ -1,40 +1,36 @@
from os import path, stat import os
from typing import IO, Any, Optional from os import path
from typing import Any, Callable, Optional
from ..file_writer import file_writer
class writer: class writer(file_writer):
def __init__( def __init__(
self, self,
path_: str, path_: str,
metadata: Any = None, metadata: Any = None,
append: bool = False, append: bool = False,
buffering: int = 0, buffering: int = 0,
on_error: Optional[Callable[[str], None]] = None,
) -> None: ) -> None:
super().__init__(path_, on_error)
if ( if (
append and path.exists(path_) and stat(path_).st_size == 0 append and path.exists(path_) and os.stat(path_).st_size == 0
): # true for pipes ): # true for pipes
append = False append = False
self.path = path_
self.buffering = buffering self.buffering = buffering
self.mode: str = "ab" if append else "wb" self.mode: str = "ab" if append else "wb"
self.file: Optional[IO[Any]] = None
self.metadata = metadata self.metadata = metadata
def __enter__(self) -> Any:
self.file = open(self.path, mode=self.mode, buffering=self.buffering)
return self
def __exit__(
self, exc_type: str, exc_value: str, exc_traceback: str
) -> None:
assert self.file is not None
self.file.close()
def write_stdout(self, _ts: float, data: Any) -> None: def write_stdout(self, _ts: float, data: Any) -> None:
assert self.file is not None self._write(data)
self.file.write(data)
# pylint: disable=no-self-use # pylint: disable=no-self-use
def write_stdin(self, ts: float, data: Any) -> None: def write_stdin(self, ts: float, data: Any) -> None:
pass pass
def _open_file(self) -> None:
self.file = open(self.path, mode=self.mode, buffering=self.buffering)

View File

@@ -1,9 +1,19 @@
import codecs import codecs
import json import json
from codecs import StreamReader from codecs import StreamReader
from io import IOBase
from json.decoder import JSONDecodeError from json.decoder import JSONDecodeError
from typing import IO, Any, Dict, Generator, List, Optional, TextIO, Union from typing import (
Any,
Callable,
Dict,
Generator,
List,
Optional,
TextIO,
Union,
)
from ..file_writer import file_writer
class LoadError(Exception): class LoadError(Exception):
@@ -86,21 +96,22 @@ def build_header(
return header return header
class writer: class writer(file_writer):
def __init__( # pylint: disable=too-many-arguments def __init__( # pylint: disable=too-many-arguments
self, self,
path: str, path_: str,
metadata: Any = None, metadata: Any = None,
append: bool = False, append: bool = False,
buffering: int = 1, buffering: int = 1,
width: Optional[int] = None, width: Optional[int] = None,
height: Optional[int] = None, height: Optional[int] = None,
on_error: Optional[Callable[[str], None]] = None,
) -> None: ) -> None:
self.path = path super().__init__(path_, on_error)
self.buffering = buffering self.buffering = buffering
self.stdin_decoder = codecs.getincrementaldecoder("UTF-8")("replace") self.stdin_decoder = codecs.getincrementaldecoder("UTF-8")("replace")
self.stdout_decoder = codecs.getincrementaldecoder("UTF-8")("replace") self.stdout_decoder = codecs.getincrementaldecoder("UTF-8")("replace")
self.file: Optional[IO[Any]] = None
if append: if append:
self.mode = "a" self.mode = "a"
@@ -110,24 +121,13 @@ class writer:
self.header = build_header(width, height, metadata or {}) self.header = build_header(width, height, metadata or {})
def __enter__(self) -> Any: def __enter__(self) -> Any:
self.file = open( self._open_file()
self.path,
mode=self.mode,
buffering=self.buffering,
encoding="utf-8",
)
if self.header: if self.header:
self.__write_line(self.header) self.__write_line(self.header)
return self return self
def __exit__(
self, exc_type: str, exc_value: str, exc_traceback: str
) -> None:
assert isinstance(self.file, IOBase)
self.file.close()
def write_stdout(self, ts: float, data: Union[str, bytes]) -> None: def write_stdout(self, ts: float, data: Union[str, bytes]) -> None:
if isinstance(data, str): if isinstance(data, str):
data = data.encode(encoding="utf-8", errors="strict") data = data.encode(encoding="utf-8", errors="strict")
@@ -140,6 +140,14 @@ class writer:
data = self.stdin_decoder.decode(data) data = self.stdin_decoder.decode(data)
self.__write_event(ts, "i", data) self.__write_event(ts, "i", data)
def _open_file(self) -> None:
self.file = open(
self.path,
mode=self.mode,
buffering=self.buffering,
encoding="utf-8",
)
def __write_event(self, ts: float, etype: str, data: str) -> None: def __write_event(self, ts: float, etype: str, data: str) -> None:
self.__write_line([round(ts, 6), etype, data]) self.__write_line([round(ts, 6), etype, data])
@@ -147,5 +155,5 @@ class writer:
line = json.dumps( line = json.dumps(
obj, ensure_ascii=False, indent=None, separators=(", ", ": ") obj, ensure_ascii=False, indent=None, separators=(", ", ": ")
) )
assert isinstance(self.file, IOBase)
self.file.write(f"{line}\n") self._write(f"{line}\n")

View File

@@ -31,6 +31,11 @@ class async_worker:
assert isinstance(self.process, Process) assert isinstance(self.process, Process)
self.process.join() self.process.join()
if self.process.exitcode != 0:
raise RuntimeError(
f"worker process exited with code {self.process.exitcode}"
)
def enqueue(self, payload: Any) -> None: def enqueue(self, payload: Any) -> None:
self.queue.put(payload) self.queue.put(payload)

43
asciinema/file_writer.py Normal file
View File

@@ -0,0 +1,43 @@
import os
import stat
from typing import IO, Any, Callable, Optional
class file_writer:
def __init__(
self,
path: str,
on_error: Optional[Callable[[str], None]] = None,
) -> None:
self.path = path
self.file: Optional[IO[Any]] = None
self.on_error = on_error
def __enter__(self) -> Any:
self._open_file()
return self
def __exit__(
self, exc_type: str, exc_value: str, exc_traceback: str
) -> None:
assert self.file is not None
self.file.close()
def _open_file(self) -> None:
raise NotImplementedError
def _write(self, data: Any) -> None:
try:
self.file.write(data) # type: ignore
except BrokenPipeError as e:
if stat.S_ISFIFO(os.stat(self.path).st_mode):
if self.on_error:
self.on_error("Broken pipe, reopening...")
self._open_file()
self.on_error("Output pipe reopened successfully")
else:
self._open_file()
self.file.write(data) # type: ignore
else:
raise e

View File

@@ -64,8 +64,14 @@ def record( # pylint: disable=too-many-arguments,too-many-locals
if append and os.stat(path_).st_size > 0: if append and os.stat(path_).st_size > 0:
time_offset = v2.get_duration(path_) time_offset = v2.get_duration(path_)
with async_writer(writer, path_, full_metadata, append) as _writer: with async_notifier(notifier) as _notifier:
with async_notifier(notifier) as _notifier: with async_writer(
writer,
path_,
full_metadata,
append,
_notifier.queue,
) as _writer:
record_( record_(
["sh", "-c", command], ["sh", "-c", command],
_writer, _writer,
@@ -79,13 +85,19 @@ def record( # pylint: disable=too-many-arguments,too-many-locals
class async_writer(async_worker): class async_writer(async_worker):
def __init__( def __init__(
self, writer: Type[w2], path_: str, metadata: Any, append: bool = False self,
writer: Type[w2],
path_: str,
metadata: Any,
append: bool = False,
notifier_q: Any = None,
) -> None: ) -> None:
async_worker.__init__(self) async_worker.__init__(self)
self.writer = writer self.writer = writer
self.path = path_ self.path = path_
self.metadata = metadata self.metadata = metadata
self.append = append self.append = append
self.notifier_q = notifier_q
def write_stdin(self, ts: float, data: Any) -> None: def write_stdin(self, ts: float, data: Any) -> None:
self.enqueue([ts, "i", data]) self.enqueue([ts, "i", data])
@@ -95,7 +107,10 @@ class async_writer(async_worker):
def run(self) -> None: def run(self) -> None:
with self.writer( with self.writer(
self.path, metadata=self.metadata, append=self.append self.path,
metadata=self.metadata,
append=self.append,
on_error=self.__on_error,
) as w: ) as w:
event: Tuple[float, str, Any] event: Tuple[float, str, Any]
for event in iter(self.queue.get, None): for event in iter(self.queue.get, None):
@@ -107,6 +122,10 @@ class async_writer(async_worker):
elif etype == "i": elif etype == "i":
w.write_stdin(ts, data) w.write_stdin(ts, data)
def __on_error(self, reason: str) -> None:
if self.notifier_q:
self.notifier_q.put(reason)
class async_notifier(async_worker): class async_notifier(async_worker):
def __init__(self, notifier: Any) -> None: def __init__(self, notifier: Any) -> None: