Bye bye Python implementation!

This commit is contained in:
Marcin Kulik
2023-12-20 16:55:38 +01:00
parent 037f751372
commit 06e2f3278f
35 changed files with 0 additions and 2915 deletions

View File

@@ -1,34 +0,0 @@
import sys
__author__ = "Marcin Kulik"
__version__ = "2.4.0"
if sys.version_info < (3, 7):
raise ImportError("Python < 3.7 is unsupported.")
# pylint: disable=wrong-import-position
from typing import Any, Optional
from .recorder import record
def record_asciicast( # pylint: disable=too-many-arguments
path_: str,
command: Any = None,
append: bool = False,
idle_time_limit: Optional[int] = None,
record_stdin: bool = False,
title: Optional[str] = None,
command_env: Any = None,
capture_env: Any = None,
) -> None:
record(
path_,
command=command,
append=append,
idle_time_limit=idle_time_limit,
record_stdin=record_stdin,
title=title,
command_env=command_env,
capture_env=capture_env,
)

View File

@@ -1,262 +0,0 @@
import argparse
import locale
import os
import sys
from typing import Any, Optional
from . import __version__, config
from .commands.auth import AuthCommand
from .commands.cat import CatCommand
from .commands.play import PlayCommand
from .commands.record import RecordCommand
from .commands.upload import UploadCommand
def valid_encoding() -> bool:
def _locales() -> Optional[str]:
try:
return locale.nl_langinfo(locale.CODESET)
except AttributeError:
return locale.getlocale()[-1]
loc = _locales()
if loc is None:
return False
else:
return loc.upper() in ("US-ASCII", "UTF-8", "UTF8")
def positive_int(value: str) -> int:
_value = int(value)
if _value <= 0:
raise argparse.ArgumentTypeError("must be positive")
return _value
def positive_float(value: str) -> float:
_value = float(value)
if _value <= 0.0:
raise argparse.ArgumentTypeError("must be positive")
return _value
def maybe_str(v: Any) -> Optional[str]:
if v is not None:
return str(v)
return None
def main() -> Any:
if not valid_encoding():
sys.stderr.write(
"asciinema needs an ASCII or UTF-8 character encoding to run. "
"Check the output of `locale` command.\n"
)
return 1
try:
cfg = config.load()
except config.ConfigError as e:
sys.stderr.write(f"{e}\n")
return 1
# create the top-level parser
parser = argparse.ArgumentParser(
description="Record and share your terminal sessions, the right way.",
epilog="""example usage:
Record terminal and upload it to asciinema.org:
\x1b[1masciinema rec\x1b[0m
Record terminal to local file:
\x1b[1masciinema rec demo.cast\x1b[0m
Record terminal and upload it to asciinema.org, specifying title:
\x1b[1masciinema rec -t "My git tutorial"\x1b[0m
Record terminal to local file, limiting idle time to max 2.5 sec:
\x1b[1masciinema rec -i 2.5 demo.cast\x1b[0m
Replay terminal recording from local file:
\x1b[1masciinema play demo.cast\x1b[0m
Replay terminal recording hosted on asciinema.org:
\x1b[1masciinema play https://asciinema.org/a/difqlgx86ym6emrmd8u62yqu8\x1b[0m
Print full output of recorded session:
\x1b[1masciinema cat demo.cast\x1b[0m
For help on a specific command run:
\x1b[1masciinema <command> -h\x1b[0m""", # noqa: E501
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
"--version", action="version", version=f"asciinema {__version__}"
)
subparsers = parser.add_subparsers()
# create the parser for the `rec` command
parser_rec = subparsers.add_parser("rec", help="Record terminal session")
parser_rec.add_argument(
"--stdin",
help="enable stdin recording, disabled by default",
action="store_true",
default=cfg.record_stdin,
)
parser_rec.add_argument(
"--append",
help="append to existing recording",
action="store_true",
default=False,
)
parser_rec.add_argument(
"--raw",
help="save only raw stdout output",
action="store_true",
default=False,
)
parser_rec.add_argument(
"--overwrite",
help="overwrite the file if it already exists",
action="store_true",
default=False,
)
parser_rec.add_argument(
"-c",
"--command",
help="command to record, defaults to $SHELL",
default=cfg.record_command,
)
parser_rec.add_argument(
"-e",
"--env",
help="list of environment variables to capture, defaults to "
+ config.DEFAULT_RECORD_ENV,
default=cfg.record_env,
)
parser_rec.add_argument("-t", "--title", help="title of the asciicast")
parser_rec.add_argument(
"-i",
"--idle-time-limit",
help="limit recorded idle time to given number of seconds",
type=positive_float,
default=maybe_str(cfg.record_idle_time_limit),
)
parser_rec.add_argument(
"--cols",
help="override terminal columns for recorded process",
type=positive_int,
default=None,
)
parser_rec.add_argument(
"--rows",
help="override terminal rows for recorded process",
type=positive_int,
default=None,
)
parser_rec.add_argument(
"-y",
"--yes",
help='answer "yes" to all prompts (e.g. upload confirmation)',
action="store_true",
default=cfg.record_yes,
)
parser_rec.add_argument(
"-q",
"--quiet",
help="be quiet, suppress all notices/warnings (implies -y)",
action="store_true",
default=cfg.record_quiet,
)
parser_rec.add_argument(
"filename",
nargs="?",
default="",
help="filename/path to save the recording to",
)
parser_rec.set_defaults(cmd=RecordCommand)
# create the parser for the `play` command
parser_play = subparsers.add_parser("play", help="Replay terminal session")
parser_play.add_argument(
"-i",
"--idle-time-limit",
help="limit idle time during playback to given number of seconds",
type=positive_float,
default=maybe_str(cfg.play_idle_time_limit),
)
parser_play.add_argument(
"-s",
"--speed",
help="set playback speed (can be fractional)",
type=positive_float,
default=cfg.play_speed,
)
parser_play.add_argument(
"-l",
"--loop",
help="loop loop loop loop",
action="store_true",
default=False,
)
parser_play.add_argument(
"-m",
"--pause-on-markers",
help="automatically pause on markers",
action="store_true",
default=False,
)
parser_play.add_argument(
"--out-fmt",
help="select output format",
choices=["raw", "asciicast"],
default="raw",
)
parser_play.add_argument(
"--stream",
help="select stream to play",
choices=["o", "i"],
default=None,
)
parser_play.add_argument(
"filename", help='local path, http/ipfs URL or "-" (read from stdin)'
)
parser_play.set_defaults(cmd=PlayCommand)
# create the parser for the `cat` command
parser_cat = subparsers.add_parser(
"cat", help="Print full output of terminal sessions"
)
parser_cat.add_argument(
"filename",
nargs="+",
help='local path, http/ipfs URL or "-" (read from stdin)',
)
parser_cat.set_defaults(cmd=CatCommand)
# create the parser for the `upload` command
parser_upload = subparsers.add_parser(
"upload", help="Upload locally saved terminal session to asciinema.org"
)
parser_upload.add_argument(
"filename", help="filename or path of local recording"
)
parser_upload.set_defaults(cmd=UploadCommand)
# create the parser for the `auth` command
parser_auth = subparsers.add_parser(
"auth", help="Manage recordings on asciinema.org account"
)
parser_auth.set_defaults(cmd=AuthCommand)
# parse the args and call whatever function was selected
args = parser.parse_args()
if hasattr(args, "cmd"):
command = args.cmd(args, cfg, os.environ)
code = command.execute()
return code
parser.print_help()
return 1
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,103 +0,0 @@
import json
import platform
import re
from typing import Any, Callable, Dict, Optional, Tuple, Union
from urllib.parse import urlparse
from . import __version__
from .http_adapter import HTTPConnectionError
from .urllib_http_adapter import URLLibHttpAdapter
class APIError(Exception):
def __init__(self, e: str, retryable: bool):
super().__init__(e)
self.retryable = retryable
class Api:
def __init__(
self,
url: str,
user: Optional[str],
install_id: str,
http_adapter: Any = None,
) -> None:
self.url = url
self.user = user
self.install_id = install_id
self.http_adapter = (
http_adapter if http_adapter is not None else URLLibHttpAdapter()
)
def hostname(self) -> Optional[str]:
return urlparse(self.url).hostname
def auth_url(self) -> str:
return f"{self.url}/connect/{self.install_id}"
def upload_url(self) -> str:
return f"{self.url}/api/asciicasts"
def upload_asciicast(self, path_: str) -> Tuple[Any, Any]:
with open(path_, "rb") as f:
try:
status, headers, body = self.http_adapter.post(
self.upload_url(),
files={"asciicast": ("ascii.cast", f)},
headers=self._headers(),
username=self.user,
password=self.install_id,
)
except HTTPConnectionError as e:
raise APIError(str(e), True) from e
if status not in (200, 201):
self._handle_error(status, body)
if (headers.get("content-type") or "")[0:16] == "application/json":
result = json.loads(body)
else:
result = {"url": body}
return result, headers.get("Warning")
def _headers(self) -> Dict[str, Union[Callable[[], str], str]]:
return {"user-agent": self._user_agent(), "accept": "application/json"}
@staticmethod
def _user_agent() -> str:
os = re.sub("([^-]+)-(.*)", "\\1/\\2", platform.platform())
return (
f"asciinema/{__version__} {platform.python_implementation()}"
f"/{platform.python_version()} {os}"
)
@staticmethod
def _handle_error(status: int, body: bytes) -> None:
errors = {
400: f"Invalid request: {body.decode('utf-8', 'replace')}",
401: "Invalid or revoked install ID",
404: (
"API endpoint not found. "
"This asciinema version may no longer be supported. "
"Please upgrade to the latest version."
),
413: "Sorry, the size of your recording exceeds the server-configured limit.",
422: f"Invalid asciicast: {body.decode('utf-8', 'replace')}",
503: "The server is down for maintenance.",
}
error = errors.get(status)
if not error:
if status >= 500:
error = (
"The server is having temporary problems. "
"Try again in a minute."
)
else:
error = f"HTTP status: {status}"
raise APIError(error, status >= 500)

View File

@@ -1,139 +0,0 @@
import codecs
import gzip
import os
import sys
import urllib.error
from codecs import StreamReader
from html.parser import HTMLParser
from io import BytesIO
from typing import Any, List, TextIO, Union
from urllib.parse import urlparse, urlunparse
from urllib.request import Request, urlopen
from . import v1, v2
class LoadError(Exception):
pass
class Parser(HTMLParser):
def __init__(self) -> None:
HTMLParser.__init__(self)
self.url = None
def error(self, message: str) -> None:
raise NotImplementedError(
"subclasses of ParserBase must override error()"
", but HTMLParser does not"
)
def handle_starttag(self, tag: str, attrs: List[Any]) -> None:
# look for <link rel="alternate"
# type="application/x-asciicast"
# href="https://...cast">
if tag == "link":
# avoid modifying function signature keyword args from base class
_attrs = {}
for k, v in attrs:
_attrs[k] = v
if _attrs.get("rel") == "alternate":
type_ = _attrs.get("type")
if type_ in (
"application/asciicast+json",
"application/x-asciicast",
):
self.url = _attrs.get("href")
def open_url(url: str) -> Union[StreamReader, TextIO]:
if url == "-":
return sys.stdin
if url.startswith("ipfs://"):
url = f"https://ipfs.io/ipfs/{url[7:]}"
elif url.startswith("dweb:/ipfs/"):
url = f"https://ipfs.io/{url[5:]}"
if url.startswith("http:") or url.startswith("https:"):
req = Request(url)
req.add_header("Accept-Encoding", "gzip")
body = None
content_type = None
utf8_reader = codecs.getreader("utf-8")
with urlopen(req) as response:
body = response
content_type = response.headers["Content-Type"]
url = response.geturl() # final URL after redirects
if response.headers["Content-Encoding"] == "gzip":
body = gzip.open(body)
body = BytesIO(body.read())
if content_type and content_type.startswith("text/html"):
html = utf8_reader(body, errors="replace").read()
parser = Parser()
parser.feed(html)
new_url = parser.url
if not new_url:
raise LoadError(
'<link rel="alternate" '
'type="application/x-asciicast" '
'href="..."> '
"not found in fetched HTML document"
)
if "://" not in new_url:
base_url = urlparse(url)
if new_url.startswith("/"):
new_url = urlunparse(
(base_url[0], base_url[1], new_url, "", "", "")
)
else:
path = f"{os.path.dirname(base_url[2])}/{new_url}"
new_url = urlunparse(
(base_url[0], base_url[1], path, "", "", "")
)
return open_url(new_url)
return utf8_reader(body, errors="strict")
return open(url, mode="rt", encoding="utf-8")
class open_from_url:
FORMAT_ERROR = "only asciicast v1 and v2 formats can be opened"
def __init__(self, url: str) -> None:
self.url = url
self.file: Union[StreamReader, TextIO, None] = None
self.context: Any = None
def __enter__(self) -> Any:
try:
self.file = open_url(self.url)
first_line = self.file.readline()
try: # try v2 first
self.context = v2.open_from_file(first_line, self.file)
return self.context.__enter__()
except v2.LoadError:
try: # try v1 next
self.context = v1.open_from_file(first_line, self.file)
return self.context.__enter__()
except v1.LoadError as e:
raise LoadError(self.FORMAT_ERROR) from e
except (OSError, urllib.error.HTTPError) as e:
raise LoadError(str(e)) from e
def __exit__(
self, exc_type: str, exc_value: str, exc_traceback: str
) -> None:
self.context.__exit__(exc_type, exc_value, exc_traceback)

View File

@@ -1,41 +0,0 @@
from typing import Any, Generator, Iterable, List, Optional
def to_relative_time(
events: Iterable[Any],
) -> Generator[List[Any], None, None]:
prev_time = 0
for frame in events:
time, type_, data = frame
delay = time - prev_time
prev_time = time
yield [delay, type_, data]
def to_absolute_time(
events: Iterable[Any],
) -> Generator[List[Any], None, None]:
time = 0
for frame in events:
delay, type_, data = frame
time = time + delay
yield [time, type_, data]
def cap_relative_time(
events: Iterable[Any], time_limit: Optional[float]
) -> Iterable[Any]:
if time_limit:
return (
[min(delay, time_limit), type_, data]
for delay, type_, data in events
)
return events
def adjust_speed(
events: Iterable[Any], speed: Any
) -> Generator[List[Any], None, None]:
return ([delay / speed, type_, data] for delay, type_, data in events)

View File

@@ -1,69 +0,0 @@
import os
import sys
from os import path
from typing import Any, Callable, Optional, Tuple
from ..file_writer import file_writer
class writer(file_writer):
def __init__( # pylint: disable=too-many-arguments
self,
path_: str,
metadata: Any = None,
append: bool = False,
buffering: int = 0,
on_error: Optional[Callable[[str], None]] = None,
) -> None:
super().__init__(path_, on_error)
if (
append and path.exists(path_) and os.stat(path_).st_size == 0
): # true for pipes
append = False
self.buffering = buffering
if append:
self.mode = "ab"
self.header = None
else:
self.mode = "wb"
width = metadata["width"]
height = metadata["height"]
self.header = f"\x1b[8;{height};{width}t".encode("utf-8")
def __enter__(self) -> Any:
super().__enter__()
if self.header:
self._write(self.header)
return self
def write_stdout(self, _ts: float, data: Any) -> None:
self._write(data)
def write_stdin(self, ts: float, data: Any) -> None:
pass
def write_marker(self, ts: float) -> None:
pass
def write_resize(self, ts: float, size: Tuple[int, int]) -> None:
cols, rows = size
self._write(f"\x1b[8;{rows};{cols}t".encode("utf-8"))
# pylint: disable=consider-using-with
def _open_file(self) -> None:
if self.path == "-":
self.file = os.fdopen(
sys.stdout.fileno(),
mode=self.mode,
buffering=self.buffering,
closefd=False,
)
else:
self.file = open(
self.path, mode=self.mode, buffering=self.buffering
)

View File

@@ -1,71 +0,0 @@
import json
from codecs import StreamReader
from json.decoder import JSONDecodeError
from typing import (
Any,
Dict,
Generator,
Iterable,
List,
Optional,
TextIO,
Union,
)
from .events import to_absolute_time
class LoadError(Exception):
pass
class Asciicast:
def __init__(self, attrs: Dict[str, Any]) -> None:
self.version: int = 1
self.__attrs = attrs
self.idle_time_limit = None # v1 doesn't store it
@property
def v2_header(self) -> Dict[str, Any]:
keys = ["width", "height", "duration", "command", "title", "env"]
header = {
k: v
for k, v in self.__attrs.items()
if k in keys and v is not None
}
return header
def events(self, type_: Optional[str] = None) -> Iterable[List[Any]]:
if type_ in [None, "o"]:
return to_absolute_time(self.__stdout_events())
else:
return []
def __stdout_events(self) -> Generator[List[Any], None, None]:
for time, data in self.__attrs["stdout"]:
yield [time, "o", data]
class open_from_file:
FORMAT_ERROR: str = "only asciicast v1 format can be opened"
def __init__(
self, first_line: str, file: Union[TextIO, StreamReader]
) -> None:
self.first_line = first_line
self.file = file
def __enter__(self) -> Optional[Asciicast]:
try:
attrs = json.loads(self.first_line + self.file.read())
if attrs.get("version") == 1:
return Asciicast(attrs)
raise LoadError(self.FORMAT_ERROR)
except JSONDecodeError as e:
raise LoadError(self.FORMAT_ERROR) from e
def __exit__(
self, exc_type: str, exc_value: str, exc_traceback: str
) -> None:
self.file.close()

View File

@@ -1,186 +0,0 @@
import codecs
import json
import os
import sys
from codecs import StreamReader
from json.decoder import JSONDecodeError
from typing import (
Any,
Callable,
Dict,
Generator,
List,
Optional,
TextIO,
Tuple,
Union,
)
from ..file_writer import file_writer
class LoadError(Exception):
pass
class Asciicast:
def __init__(
self, f: Union[TextIO, StreamReader], header: Dict[str, Any]
) -> None:
self.version: int = 2
self.__file = f
self.v2_header = header
self.idle_time_limit = header.get("idle_time_limit")
def events(
self, type_: Optional[str] = None
) -> Generator[List[Any], None, None]:
if type_ is None:
for line in self.__file:
if line == "\n":
break
yield json.loads(line)
else:
for line in self.__file:
if line == "\n":
break
event = json.loads(line)
if event[1] == type_:
yield event
def build_from_header_and_file(
header: Dict[str, Any], f: Union[StreamReader, TextIO]
) -> Asciicast:
return Asciicast(f, header)
class open_from_file:
FORMAT_ERROR = "only asciicast v2 format can be opened"
def __init__(
self, first_line: str, file: Union[StreamReader, TextIO]
) -> None:
self.first_line = first_line
self.file = file
def __enter__(self) -> Asciicast:
try:
v2_header: Dict[str, Any] = json.loads(self.first_line)
if v2_header.get("version") == 2:
return build_from_header_and_file(v2_header, self.file)
raise LoadError(self.FORMAT_ERROR)
except JSONDecodeError as e:
raise LoadError(self.FORMAT_ERROR) from e
def __exit__(
self, exc_type: str, exc_value: str, exc_traceback: str
) -> None:
self.file.close()
def get_duration(path_: str) -> Any:
with open(path_, mode="rt", encoding="utf-8") as f:
first_line = f.readline()
with open_from_file(first_line, f) as a:
last_frame = None
for last_frame in a.events("o"):
pass
return last_frame[0]
def build_header(
width: Optional[int], height: Optional[int], metadata: Any
) -> Dict[str, Any]:
header = {"version": 2, "width": width, "height": height}
header.update(metadata)
assert "width" in header, "width missing in metadata"
assert "height" in header, "height missing in metadata"
assert isinstance(header["width"], int)
assert isinstance(header["height"], int)
if "timestamp" in header:
assert isinstance(header["timestamp"], (int, float))
return header
class writer(file_writer):
def __init__( # pylint: disable=too-many-arguments
self,
path_: str,
metadata: Any = None,
append: bool = False,
buffering: int = 1,
width: Optional[int] = None,
height: Optional[int] = None,
on_error: Optional[Callable[[str], None]] = None,
) -> None:
super().__init__(path_, on_error)
self.buffering = buffering
self.stdin_decoder = codecs.getincrementaldecoder("UTF-8")("replace")
self.stdout_decoder = codecs.getincrementaldecoder("UTF-8")("replace")
if append:
self.mode = "a"
self.header = None
else:
self.mode = "w"
self.header = build_header(width, height, metadata or {})
def __enter__(self) -> Any:
self._open_file()
if self.header:
self.__write_line(self.header)
return self
def write_stdout(self, ts: float, data: Union[str, bytes]) -> None:
if isinstance(data, str):
data = data.encode(encoding="utf-8", errors="strict")
data = self.stdout_decoder.decode(data)
self.__write_event(ts, "o", data)
def write_stdin(self, ts: float, data: Union[str, bytes]) -> None:
if isinstance(data, str):
data = data.encode(encoding="utf-8", errors="strict")
data = self.stdin_decoder.decode(data)
self.__write_event(ts, "i", data)
def write_marker(self, ts: float) -> None:
self.__write_event(ts, "m", "")
def write_resize(self, ts: float, size: Tuple[int, int]) -> None:
cols, rows = size
self.__write_event(ts, "r", f"{cols}x{rows}")
# pylint: disable=consider-using-with
def _open_file(self) -> None:
if self.path == "-":
self.file = os.fdopen(
sys.stdout.fileno(),
mode=self.mode,
buffering=self.buffering,
encoding="utf-8",
closefd=False,
)
else:
self.file = open(
self.path,
mode=self.mode,
buffering=self.buffering,
encoding="utf-8",
)
def __write_event(self, ts: float, etype: str, data: str) -> None:
self.__write_line([round(ts, 6), etype, data])
def __write_line(self, obj: Any) -> None:
line = json.dumps(
obj, ensure_ascii=False, indent=None, separators=(", ", ": ")
)
self._write(f"{line}\n")

View File

@@ -1,52 +0,0 @@
from typing import Any, Optional
try:
# Importing synchronize is to detect platforms where
# multiprocessing does not work (python issue 3770)
# and cause an ImportError. Otherwise it will happen
# later when trying to use Queue().
from multiprocessing import Process, Queue, synchronize
# pylint: disable=pointless-statement
lambda _=synchronize: None # avoid pruning import
except ImportError:
from queue import Queue # type: ignore
from threading import Thread as Process # type: ignore
class async_worker:
def __init__(self) -> None:
self.queue: Queue[Any] = Queue()
self.process: Optional[Process] = None
def __enter__(self) -> Any:
self.process = Process(target=self._run)
self.process.start()
return self
def _run(self) -> None:
try:
self.run()
except KeyboardInterrupt:
pass
def __exit__(
self, exc_type: str, exc_value: str, exc_traceback: str
) -> None:
self.queue.put(None)
assert isinstance(self.process, Process)
self.process.join()
if self.process.exitcode != 0:
raise RuntimeError(
f"worker process exited with code {self.process.exitcode}"
)
def enqueue(self, payload: Any) -> None:
self.queue.put(payload)
def run(self) -> None:
payload: Any
for payload in iter(self.queue.get, None):
# pylint: disable=no-member
self.perform(payload) # type: ignore[attr-defined]

View File

@@ -1,20 +0,0 @@
from typing import Any, Dict
from ..config import Config
from .command import Command
class AuthCommand(Command):
def __init__(self, args: Any, config: Config, env: Dict[str, str]) -> None:
Command.__init__(self, args, config, env)
def execute(self) -> None:
self.print(
f"Open the following URL in a web browser to link your install ID "
f"with your {self.api.hostname()} user account:\n\n"
f"{self.api.auth_url()}\n\n"
"This will associate all recordings uploaded from this machine "
"(past and future ones) to your account"
", and allow you to manage them (change title/theme, delete) at "
f"{self.api.hostname()}."
)

View File

@@ -1,35 +0,0 @@
import sys
from typing import Any, Dict
from .. import asciicast
from ..config import Config
from ..tty_ import raw
from .command import Command
class CatCommand(Command):
def __init__(self, args: Any, config: Config, env: Dict[str, str]):
Command.__init__(self, args, config, env)
self.filenames = args.filename
def execute(self) -> int:
try:
with open("/dev/tty", "rt", encoding="utf-8") as stdin:
with raw(stdin.fileno()):
return self.cat()
except OSError:
return self.cat()
def cat(self) -> int:
try:
for filename in self.filenames:
with asciicast.open_from_url(filename) as a:
for _, _type, text in a.events("o"):
sys.stdout.write(text)
sys.stdout.flush()
except asciicast.LoadError as e:
self.print_error(f"printing failed: {str(e)}")
return 1
return 0

View File

@@ -1,38 +0,0 @@
import os
import sys
from typing import Any, Dict, Optional
from ..api import Api
from ..config import Config
class Command:
def __init__(self, _args: Any, config: Config, env: Dict[str, str]):
self.quiet: bool = False
self.api = Api(config.api_url, env.get("USER"), config.install_id)
def print(
self,
text: str,
end: str = "\r\n",
color: Optional[int] = None,
force: bool = False,
flush: bool = False,
) -> None:
if not self.quiet or force:
if color is not None and os.isatty(sys.stderr.fileno()):
text = f"\x1b[0;3{color}m{text}\x1b[0m"
print(text, file=sys.stderr, end=end)
if flush:
sys.stderr.flush()
def print_info(self, text: str) -> None:
self.print(f"asciinema: {text}", color=2)
def print_warning(self, text: str) -> None:
self.print(f"asciinema: {text}", color=3)
def print_error(self, text: str) -> None:
self.print(f"asciinema: {text}", color=1, force=True)

View File

@@ -1,60 +0,0 @@
from typing import Any, Dict, Optional
from .. import asciicast
from ..commands.command import Command
from ..config import Config
from ..player import Player
class PlayCommand(Command):
def __init__(
self,
args: Any,
config: Config,
env: Dict[str, str],
player: Optional[Player] = None,
) -> None:
Command.__init__(self, args, config, env)
self.filename = args.filename
self.idle_time_limit = args.idle_time_limit
self.speed = args.speed
self.loop = args.loop
self.out_fmt = args.out_fmt
self.stream = args.stream
self.pause_on_markers = args.pause_on_markers
self.player = player if player is not None else Player()
self.key_bindings = {
"pause": config.play_pause_key,
"step": config.play_step_key,
"next_marker": config.play_next_marker_key,
}
def execute(self) -> int:
code = self.play()
if self.loop:
while code == 0:
code = self.play()
return code
def play(self) -> int:
try:
with asciicast.open_from_url(self.filename) as a:
self.player.play(
a,
idle_time_limit=self.idle_time_limit,
speed=self.speed,
key_bindings=self.key_bindings,
out_fmt=self.out_fmt,
stream=self.stream,
pause_on_markers=self.pause_on_markers,
)
except asciicast.LoadError as e:
self.print_error(f"playback failed: {str(e)}")
return 1
except KeyboardInterrupt:
return 1
return 0

View File

@@ -1,204 +0,0 @@
import os
import sys
from tempfile import NamedTemporaryFile
from typing import Any, Dict, Optional
from .. import notifier, recorder
from ..api import APIError
from ..asciicast import raw, v2
from ..commands.command import Command
from ..config import Config
class RecordCommand(Command): # pylint: disable=too-many-instance-attributes
def __init__(self, args: Any, config: Config, env: Dict[str, str]) -> None:
Command.__init__(self, args, config, env)
self.quiet = args.quiet
self.filename = args.filename
self.record_stdin = args.stdin
self.command = args.command
self.env_whitelist = args.env
self.title = args.title
self.assume_yes = args.yes or args.quiet
self.idle_time_limit = args.idle_time_limit
self.cols_override = args.cols
self.rows_override = args.rows
self.append = args.append
self.overwrite = args.overwrite
self.raw = args.raw
self.writer = raw.writer if args.raw else v2.writer
self.notifier = notifier.get_notifier(
config.notifications_enabled, config.notifications_command
)
self.env = env
self.key_bindings = {
"prefix": config.record_prefix_key,
"pause": config.record_pause_key,
"add_marker": config.record_add_marker_key,
}
# pylint: disable=too-many-branches
# pylint: disable=too-many-return-statements
# pylint: disable=too-many-statements
def execute(self) -> int:
interactive = False
append = self.append
if self.filename == "":
if self.raw:
self.print_error(
"filename required when recording in raw mode"
)
return 1
self.filename = _tmp_path()
interactive = True
if self.filename == "-":
if sys.stdout.isatty():
self.print_error(
f"when recording to stdout it must not be TTY - forgot to pipe?"
)
return 1
append = False
elif os.path.exists(self.filename):
if not os.access(self.filename, os.W_OK):
self.print_error(f"can't write to {self.filename}")
return 1
if os.stat(self.filename).st_size > 0 and self.overwrite:
os.remove(self.filename)
append = False
elif os.stat(self.filename).st_size > 0 and not append:
self.print_error(f"{self.filename} already exists, aborting")
self.print_error(
"use --overwrite option "
"if you want to overwrite existing recording"
)
self.print_error(
"use --append option "
"if you want to append to existing recording"
)
return 1
else:
dir_path = os.path.dirname(os.path.abspath(self.filename))
if not os.path.exists(dir_path):
self.print_error(f"directory {dir_path} doesn't exist")
return 1
if not os.access(dir_path, os.W_OK):
self.print_error(f"directory {dir_path} is not writable")
return 1
if append:
self.print_warning(
f"{self.filename} does not exist, not appending"
)
append = False
if append:
self.print_info(f"appending to asciicast at {self.filename}")
else:
if self.filename == "-":
self.print_info(f"recording asciicast to stdout")
else:
self.print_info(f"recording asciicast to {self.filename}")
if self.command:
self.print_info("""exit opened program when you're done""")
else:
self.print_info(
"""press <ctrl-d> or type "exit" when you're done"""
)
vars_: Any = filter(
None,
map(
(lambda var: var.strip()), # type: ignore
self.env_whitelist.split(","),
),
)
try:
recorder.record(
self.filename,
command=self.command,
append=append,
title=self.title,
idle_time_limit=self.idle_time_limit,
command_env=self.env,
capture_env=vars_,
record_stdin=self.record_stdin,
writer=self.writer,
notify=self.notifier.notify,
key_bindings=self.key_bindings,
cols_override=self.cols_override,
rows_override=self.rows_override,
)
except IOError as e:
self.print_error(f"I/O error: {str(e)}")
return 1
except v2.LoadError:
self.print_error(
"can only append to asciicast v2 format recordings"
)
return 1
self.print_info("recording finished")
if interactive:
if not self.assume_yes:
while True:
self.print(
f"(\x1b[1ms\x1b[0m)ave locally, (\x1b[1mu\x1b[0m)pload to {self.api.hostname()}, (\x1b[1md\x1b[0m)iscard\r\n[s,u,d]? ",
end="",
force=True,
flush=True,
)
try:
answer = sys.stdin.readline().strip().lower()
except KeyboardInterrupt:
self.print("")
answer = "s"
if answer == "s" or answer == "save":
self.print_info(f"asciicast saved to {self.filename}")
return 0
elif answer == "u" or answer == "upload":
break
elif answer == "d" or answer == "discard":
os.remove(self.filename)
self.print_info(f"asciicast discarded")
return 0
try:
result, warn = self.api.upload_asciicast(self.filename)
if warn:
self.print_warning(warn)
os.remove(self.filename)
self.print(result.get("message") or result["url"])
except APIError as e:
self.print("\r\x1b[A", end="")
self.print_error(f"upload failed: {str(e)}")
self.print_error(
f"retry later by running: asciinema upload {self.filename}"
)
return 1
elif self.filename != "-":
self.print_info(f"asciicast saved to {self.filename}")
return 0
def _tmp_path() -> Optional[str]:
return NamedTemporaryFile(suffix="-ascii.cast", delete=False).name

View File

@@ -1,36 +0,0 @@
from typing import Any
from ..api import APIError
from ..config import Config
from .command import Command
class UploadCommand(Command):
def __init__(self, args: Any, config: Config, env: Any) -> None:
Command.__init__(self, args, config, env)
self.filename = args.filename
def execute(self) -> int:
try:
result, warn = self.api.upload_asciicast(self.filename)
if warn:
self.print_warning(warn)
self.print(result.get("message") or result["url"])
except OSError as e:
self.print_error(f"upload failed: {str(e)}")
return 1
except APIError as e:
self.print_error(f"upload failed: {str(e)}")
if e.retryable:
self.print_error(
f"retry later by running: asciinema upload {self.filename}"
)
return 1
return 0

View File

@@ -1,228 +0,0 @@
import configparser
import os
from os import path
from typing import Any, Dict, Optional
from uuid import uuid4
DEFAULT_API_URL: str = "https://asciinema.org"
DEFAULT_RECORD_ENV: str = "SHELL,TERM"
class ConfigError(Exception):
pass
class Config:
def __init__(
self,
config_home: Any,
env: Optional[Dict[str, str]] = None,
) -> None:
self.config_home = config_home
self.config_file_path = path.join(config_home, "config")
self.install_id_path = path.join(self.config_home, "install-id")
self.config = configparser.ConfigParser()
self.config.read(self.config_file_path)
self.env = env if env is not None else os.environ
def upgrade(self) -> None:
try:
self.install_id
except ConfigError:
id_ = (
self.__api_token()
or self.__user_token()
or self.__gen_install_id()
)
self.__save_install_id(id_)
items = {
name: dict(section) for (name, section) in self.config.items()
}
if items in (
{"DEFAULT": {}, "api": {"token": id_}},
{"DEFAULT": {}, "user": {"token": id_}},
):
os.remove(self.config_file_path)
if self.env.get("ASCIINEMA_API_TOKEN"):
raise ConfigError(
"ASCIINEMA_API_TOKEN variable is no longer supported"
", please use ASCIINEMA_INSTALL_ID instead"
)
def __read_install_id(self) -> Optional[str]:
p = self.install_id_path
if path.isfile(p):
with open(p, "r", encoding="utf-8") as f:
return f.read().strip()
return None
@staticmethod
def __gen_install_id() -> str:
return f"{uuid4()}"
def __save_install_id(self, id_: str) -> None:
self.__create_config_home()
with open(self.install_id_path, "w", encoding="utf-8") as f:
f.write(id_)
def __create_config_home(self) -> None:
if not path.exists(self.config_home):
os.makedirs(self.config_home)
def __api_token(self) -> Optional[str]:
try:
return self.config.get("api", "token")
except (configparser.NoOptionError, configparser.NoSectionError):
return None
def __user_token(self) -> Optional[str]:
try:
return self.config.get("user", "token")
except (configparser.NoOptionError, configparser.NoSectionError):
return None
@property
def install_id(self) -> str:
id_ = self.env.get("ASCIINEMA_INSTALL_ID") or self.__read_install_id()
if id_:
return id_
raise ConfigError("no install ID found")
@property
def api_url(self) -> str:
return self.env.get(
"ASCIINEMA_API_URL",
self.config.get("api", "url", fallback=DEFAULT_API_URL),
)
@property
def record_stdin(self) -> bool:
return self.config.getboolean("record", "stdin", fallback=False)
@property
def record_command(self) -> Optional[str]:
return self.config.get("record", "command", fallback=None)
@property
def record_env(self) -> str:
return self.config.get("record", "env", fallback=DEFAULT_RECORD_ENV)
@property
def record_idle_time_limit(self) -> Optional[float]:
fallback = self.config.getfloat(
"record", "maxwait", fallback=None
) # pre 2.0
return self.config.getfloat(
"record", "idle_time_limit", fallback=fallback
)
@property
def record_yes(self) -> bool:
return self.config.getboolean("record", "yes", fallback=False)
@property
def record_quiet(self) -> bool:
return self.config.getboolean("record", "quiet", fallback=False)
@property
def record_prefix_key(self) -> Any:
return self.__get_key("record", "prefix")
@property
def record_pause_key(self) -> Any:
return self.__get_key("record", "pause", "C-\\")
@property
def record_add_marker_key(self) -> Any:
return self.__get_key("record", "add_marker")
@property
def play_idle_time_limit(self) -> Optional[float]:
fallback = self.config.getfloat(
"play", "maxwait", fallback=None
) # pre 2.0
return self.config.getfloat(
"play", "idle_time_limit", fallback=fallback
)
@property
def play_speed(self) -> float:
return self.config.getfloat("play", "speed", fallback=1.0)
@property
def play_pause_key(self) -> Any:
return self.__get_key("play", "pause", " ")
@property
def play_step_key(self) -> Any:
return self.__get_key("play", "step", ".")
@property
def play_next_marker_key(self) -> Any:
return self.__get_key("play", "next_marker", "]")
@property
def notifications_enabled(self) -> bool:
return self.config.getboolean(
"notifications", "enabled", fallback=True
)
@property
def notifications_command(self) -> Optional[str]:
return self.config.get("notifications", "command", fallback=None)
def __get_key(self, section: str, name: str, default: Any = None) -> Any:
key = self.config.get(section, f"{name}_key", fallback=default)
if key:
if len(key) == 3:
upper_key = key.upper()
if upper_key[0] == "C" and upper_key[1] == "-":
return bytes([ord(upper_key[2]) - 0x40])
raise ConfigError(
f"invalid {name} key definition '{key}' - use"
f": {name}_key = C-x (with control key modifier)"
f", or {name}_key = x (with no modifier)"
)
return key.encode("utf-8")
return None
def get_config_home(env: Any = None) -> Any:
if env is None:
env = os.environ
env_asciinema_config_home = env.get("ASCIINEMA_CONFIG_HOME")
env_xdg_config_home = env.get("XDG_CONFIG_HOME")
env_home = env.get("HOME")
config_home: Optional[str] = None
if env_asciinema_config_home:
config_home = env_asciinema_config_home
elif env_xdg_config_home:
config_home = path.join(env_xdg_config_home, "asciinema")
elif env_home:
if path.isfile(path.join(env_home, ".asciinema", "config")):
# location for versions < 1.1
config_home = path.join(env_home, ".asciinema")
else:
config_home = path.join(env_home, ".config", "asciinema")
else:
raise Exception(
"need $HOME or $XDG_CONFIG_HOME or $ASCIINEMA_CONFIG_HOME"
)
return config_home
def load(env: Any = None) -> Config:
if env is None:
env = os.environ
config = Config(get_config_home(env), env)
config.upgrade()
return config

Binary file not shown.

Before

Width:  |  Height:  |  Size: 25 KiB

View File

@@ -1,36 +0,0 @@
from typing import IO, Any, Callable, Optional
class file_writer:
def __init__(
self,
path: str,
on_error: Optional[Callable[[str], None]] = None,
) -> None:
self.path = path
self.file: Optional[IO[Any]] = None
self.on_error = on_error or noop
def __enter__(self) -> Any:
self._open_file()
return self
def __exit__(
self, exc_type: str, exc_value: str, exc_traceback: str
) -> None:
assert self.file is not None
self.file.close()
def _open_file(self) -> None:
raise NotImplementedError
def _write(self, data: Any) -> None:
try:
self.file.write(data) # type: ignore
except IOError as e:
self.on_error("Write error, recording suspended")
raise e
def noop(_: Any) -> None:
return None

View File

@@ -1,2 +0,0 @@
class HTTPConnectionError(Exception):
pass

View File

@@ -1,121 +0,0 @@
import shutil
import subprocess
from os import environ, path
from typing import Dict, List, Optional, Union
class Notifier:
def __init__(self, cmd: str) -> None:
self.cmd = cmd
@staticmethod
def get_icon_path() -> Optional[str]:
path_ = path.join(
path.dirname(path.realpath(__file__)),
"data/icon-256x256.png",
)
if path.exists(path_):
return path_
return None
def args(self, _text: str) -> List[str]:
return ["/bin/sh", "-c", self.cmd]
def is_available(self) -> bool:
return shutil.which(self.cmd) is not None
def notify(self, text: str) -> None:
# We do not want to raise a `CalledProcessError` on command failure.
# pylint: disable=subprocess-run-check
# We do not want to print *ANYTHING* to the terminal
# so we capture and ignore all output
subprocess.run(self.args(text), capture_output=True)
class AppleScriptNotifier(Notifier):
def __init__(self) -> None:
super().__init__("osascript")
def args(self, text: str) -> List[str]:
text = text.replace('"', '\\"')
return [
self.cmd,
"-e",
f'display notification "{text}" with title "asciinema"',
]
class LibNotifyNotifier(Notifier):
def __init__(self) -> None:
super().__init__("notify-send")
def args(self, text: str) -> List[str]:
icon_path = self.get_icon_path()
if icon_path is not None:
return [self.cmd, "-i", icon_path, "asciinema", text]
return [self.cmd, "asciinema", text]
class TerminalNotifier(Notifier):
def __init__(self) -> None:
super().__init__("terminal-notifier")
def args(self, text: str) -> List[str]:
icon_path = self.get_icon_path()
if icon_path is not None:
return [
"terminal-notifier",
"-title",
"asciinema",
"-message",
text,
"-appIcon",
icon_path,
]
return [
"terminal-notifier",
"-title",
"asciinema",
"-message",
text,
]
class CustomCommandNotifier(Notifier):
def env(self, text: str) -> Dict[str, str]:
icon_path = self.get_icon_path()
env = environ.copy()
env["TEXT"] = text
if icon_path is not None:
env["ICON_PATH"] = icon_path
return env
def notify(self, text: str) -> None:
# We do not want to raise a `CalledProcessError` on command failure.
# pylint: disable=subprocess-run-check
subprocess.run(
self.args(text), env=self.env(text), capture_output=True
)
class NoopNotifier: # pylint: disable=too-few-public-methods
def notify(self, text: str) -> None:
pass
def get_notifier(
enabled: bool = True, command: Optional[str] = None
) -> Union[Notifier, NoopNotifier]:
if enabled:
if command:
return CustomCommandNotifier(command)
for c in [TerminalNotifier, AppleScriptNotifier, LibNotifyNotifier]:
n = c()
if n.is_available():
return n
return NoopNotifier()

View File

@@ -1,190 +0,0 @@
import json
import sys
import time
from typing import Any, Dict, Optional, TextIO, Union
from .asciicast import events as ev
from .asciicast.v1 import Asciicast as v1
from .asciicast.v2 import Asciicast as v2
from .tty_ import raw, read_blocking
Header = Dict[str, Any]
class RawOutput:
def __init__(self, stream: Optional[str]) -> None:
self.stream = stream or "o"
def start(self, _header: Header) -> None:
pass
def write(self, _time: float, event_type: str, data: str) -> None:
if event_type == self.stream:
sys.stdout.write(data)
sys.stdout.flush()
class AsciicastOutput:
def __init__(self, stream: Optional[str]) -> None:
self.stream = stream
def start(self, header: Header) -> None:
self.__write_line(header)
def write(self, time: float, event_type: str, data: str) -> None:
if self.stream in [None, event_type]:
self.__write_line([time, event_type, data])
def __write_line(self, obj: Any) -> None:
line = json.dumps(
obj, ensure_ascii=False, indent=None, separators=(", ", ": ")
)
sys.stdout.write(f"{line}\r\n")
sys.stdout.flush()
Output = Union[RawOutput, AsciicastOutput]
class Player: # pylint: disable=too-few-public-methods
def play(
self,
asciicast: Union[v1, v2],
idle_time_limit: Optional[int] = None,
speed: float = 1.0,
key_bindings: Optional[Dict[str, Any]] = None,
out_fmt: str = "raw",
stream: Optional[str] = None,
pause_on_markers: bool = False,
) -> None:
if key_bindings is None:
key_bindings = {}
output: Output = (
RawOutput(stream) if out_fmt == "raw" else AsciicastOutput(stream)
)
try:
with open("/dev/tty", "rt", encoding="utf-8") as stdin:
with raw(stdin.fileno()):
self._play(
asciicast,
idle_time_limit,
speed,
stdin,
key_bindings,
output,
pause_on_markers,
)
except IOError:
self._play(
asciicast,
idle_time_limit,
speed,
None,
key_bindings,
output,
False,
)
@staticmethod
def _play( # pylint: disable=too-many-locals
asciicast: Union[v1, v2],
idle_time_limit: Optional[int],
speed: float,
stdin: Optional[TextIO],
key_bindings: Dict[str, Any],
output: Output,
pause_on_markers: bool,
) -> None:
idle_time_limit = idle_time_limit or asciicast.idle_time_limit
pause_key = key_bindings.get("pause")
step_key = key_bindings.get("step")
next_marker_key = key_bindings.get("next_marker")
events = asciicast.events()
events = ev.to_relative_time(events)
events = ev.cap_relative_time(events, idle_time_limit)
events = ev.to_absolute_time(events)
events = ev.adjust_speed(events, speed)
output.start(asciicast.v2_header)
ctrl_c = False
pause_elapsed_time: Optional[float] = None
events_iter = iter(events)
start_time = time.perf_counter()
def wait(timeout: int) -> bytes:
if stdin is not None:
return read_blocking(stdin.fileno(), timeout)
return b""
def next_event() -> Any:
try:
return events_iter.__next__()
except StopIteration:
return (None, None, None)
time_, event_type, text = next_event()
while time_ is not None and not ctrl_c:
if pause_elapsed_time:
while time_ is not None:
key = wait(1000)
if 0x03 in key: # ctrl-c
ctrl_c = True
break
if key == pause_key:
assert pause_elapsed_time is not None
start_time = time.perf_counter() - pause_elapsed_time
pause_elapsed_time = None
break
if key == step_key:
pause_elapsed_time = time_
output.write(time_, event_type, text)
time_, event_type, text = next_event()
elif key == next_marker_key:
while time_ is not None and event_type != "m":
output.write(time_, event_type, text)
time_, event_type, text = next_event()
if time_ is not None:
output.write(time_, event_type, text)
pause_elapsed_time = time_
time_, event_type, text = next_event()
else:
while time_ is not None:
elapsed_wall_time = time.perf_counter() - start_time
delay = time_ - elapsed_wall_time
key = b""
if delay > 0:
key = wait(delay)
if 0x03 in key: # ctrl-c
ctrl_c = True
break
elif key == pause_key:
pause_elapsed_time = time.perf_counter() - start_time
break
else:
output.write(time_, event_type, text)
if event_type == "m" and pause_on_markers:
pause_elapsed_time = time_
time_, event_type, text = next_event()
break
time_, event_type, text = next_event()
if ctrl_c:
raise KeyboardInterrupt()

View File

@@ -1,218 +0,0 @@
import array
import fcntl
import os
import pty
import select
import signal
import struct
import termios
import time
from typing import Any, Callable, Dict, List, Optional, Tuple
from .tty_ import raw
EXIT_SIGNALS = [
signal.SIGCHLD,
signal.SIGHUP,
signal.SIGTERM,
signal.SIGQUIT,
]
READ_LEN = 256 * 1024
# pylint: disable=too-many-arguments,too-many-locals,too-many-statements
def record(
command: Any,
env: Dict[str, str],
writer: Any,
get_tty_size: Callable[[], Tuple[int, int]],
notify: Callable[[str], None],
key_bindings: Dict[str, Any],
tty_stdin_fd: int = pty.STDIN_FILENO,
tty_stdout_fd: int = pty.STDOUT_FILENO,
) -> None:
pty_fd: Any = None
start_time: Optional[float] = None
pause_time: Optional[float] = None
prefix_mode: bool = False
prefix_key = key_bindings.get("prefix")
pause_key = key_bindings.get("pause")
add_marker_key = key_bindings.get("add_marker")
input_data = bytes()
def handle_resize() -> None:
size = get_tty_size()
set_pty_size(size)
assert start_time is not None
writer.write_resize(time.perf_counter() - start_time, size)
def set_pty_size(size: Tuple[int, int]) -> None:
cols, rows = size
buf = array.array("h", [rows, cols, 0, 0])
fcntl.ioctl(pty_fd, termios.TIOCSWINSZ, buf)
def handle_master_read(data: Any) -> None:
remaining_data = memoryview(data)
while remaining_data:
n = os.write(tty_stdout_fd, remaining_data)
remaining_data = remaining_data[n:]
if not pause_time:
assert start_time is not None
writer.write_stdout(time.perf_counter() - start_time, data)
def handle_stdin_read(data: Any) -> None:
nonlocal input_data
nonlocal pause_time
nonlocal start_time
nonlocal prefix_mode
if not prefix_mode and prefix_key and data == prefix_key:
prefix_mode = True
return
if prefix_mode or (
not prefix_key and data in [pause_key, add_marker_key]
):
prefix_mode = False
if data == pause_key:
if pause_time:
assert start_time is not None
start_time += time.perf_counter() - pause_time
pause_time = None
notify("Resumed recording")
else:
pause_time = time.perf_counter()
notify("Paused recording")
elif data == add_marker_key:
assert start_time is not None
writer.write_marker(time.perf_counter() - start_time)
notify("Marker added")
return
input_data += data
# save stdin unless paused or data is OSC response (e.g. \x1b]11;?\x07)
if not pause_time and not (
len(data) > 2
and data[0] == 0x1B
and data[1] == 0x5D
and data[-1] == 0x07
):
assert start_time is not None
writer.write_stdin(time.perf_counter() - start_time, data)
def copy(signal_fd: int) -> None: # pylint: disable=too-many-branches
nonlocal input_data
crfds = [pty_fd, tty_stdin_fd, signal_fd]
while True:
if len(input_data) > 0:
cwfds = [pty_fd]
else:
cwfds = []
try:
rfds, wfds, _ = select.select(crfds, cwfds, [])
except KeyboardInterrupt:
if tty_stdin_fd in crfds:
crfds.remove(tty_stdin_fd)
break
if pty_fd in rfds:
try:
data = os.read(pty_fd, READ_LEN)
except OSError as e:
data = b""
if not data: # Reached EOF.
break
else:
handle_master_read(data)
if tty_stdin_fd in rfds:
data = os.read(tty_stdin_fd, READ_LEN)
if not data:
if tty_stdin_fd in crfds:
crfds.remove(tty_stdin_fd)
else:
handle_stdin_read(data)
if signal_fd in rfds:
data = os.read(signal_fd, READ_LEN)
if data:
signals = struct.unpack(f"{len(data)}B", data)
for sig in signals:
if sig in EXIT_SIGNALS:
crfds.remove(signal_fd)
if sig == signal.SIGWINCH:
handle_resize()
if pty_fd in wfds:
try:
n = os.write(pty_fd, input_data)
input_data = input_data[n:]
except BlockingIOError:
pass
pid, pty_fd = pty.fork()
if pid == pty.CHILD:
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
os.execvpe(command[0], command, env)
flags = fcntl.fcntl(pty_fd, fcntl.F_GETFL, 0) | os.O_NONBLOCK
fcntl.fcntl(pty_fd, fcntl.F_SETFL, flags)
start_time = time.perf_counter()
set_pty_size(get_tty_size())
with SignalFD(EXIT_SIGNALS + [signal.SIGWINCH]) as sig_fd:
with raw(tty_stdin_fd):
try:
copy(sig_fd)
os.close(pty_fd)
except (IOError, OSError):
pass
os.waitpid(pid, 0)
class SignalFD:
def __init__(self, signals: List[signal.Signals]) -> None:
self.signals = signals
self.orig_handlers: List[Tuple[signal.Signals, Any]] = []
self.orig_wakeup_fd: Optional[int] = None
def __enter__(self) -> int:
r, w = os.pipe()
flags = fcntl.fcntl(w, fcntl.F_GETFL, 0) | os.O_NONBLOCK
fcntl.fcntl(w, fcntl.F_SETFL, flags)
self.orig_wakeup_fd = signal.set_wakeup_fd(w)
for sig, handler in self._noop_handlers(self.signals):
self.orig_handlers.append((sig, signal.signal(sig, handler)))
return r
def __exit__(self, type_: str, value: str, traceback: str) -> None:
assert self.orig_wakeup_fd is not None
signal.set_wakeup_fd(self.orig_wakeup_fd)
for sig, handler in self.orig_handlers:
signal.signal(sig, handler)
@staticmethod
def _noop_handlers(
signals: List[signal.Signals],
) -> List[Tuple[signal.Signals, Any]]:
return list(map(lambda s: (s, lambda signal, frame: None), signals))

View File

View File

@@ -1,199 +0,0 @@
import os
import sys
import time
from typing import Any, Callable, Dict, List, Optional, TextIO, Tuple, Type
from . import pty_ as pty # avoid collisions with standard library `pty`
from .asciicast import v2
from .asciicast.v2 import writer as w2
from .async_worker import async_worker
def record( # pylint: disable=too-many-arguments,too-many-locals
path_: str,
command: Optional[str] = None,
append: bool = False,
idle_time_limit: Optional[float] = None,
record_stdin: bool = False,
title: Optional[str] = None,
command_env: Optional[Dict[str, str]] = None,
capture_env: Optional[List[str]] = None,
writer: Type[w2] = v2.writer,
record_: Callable[..., None] = pty.record,
notify: Callable[[str], None] = lambda _: None,
key_bindings: Optional[Dict[str, Any]] = None,
cols_override: Optional[int] = None,
rows_override: Optional[int] = None,
) -> None:
if command is None:
command = os.environ.get("SHELL", "sh")
if command_env is None:
command_env = os.environ.copy()
if key_bindings is None:
key_bindings = {}
command_env["ASCIINEMA_REC"] = "1"
if capture_env is None:
capture_env = ["SHELL", "TERM"]
time_offset: float = 0
if append and os.stat(path_).st_size > 0:
time_offset = v2.get_duration(path_)
with tty_fds() as (tty_stdin_fd, tty_stdout_fd), async_notifier(
notify
) as _notifier:
get_tty_size = _get_tty_size(
tty_stdout_fd, cols_override, rows_override
)
cols, rows = get_tty_size()
metadata = build_metadata(
cols, rows, idle_time_limit, capture_env, command_env, title
)
sync_writer = writer(
path_, metadata, append, on_error=_notifier.queue.put
)
with async_writer(sync_writer, time_offset, record_stdin) as _writer:
record_(
["sh", "-c", command],
command_env,
_writer,
get_tty_size,
_notifier.notify,
key_bindings,
tty_stdin_fd=tty_stdin_fd,
tty_stdout_fd=tty_stdout_fd,
)
class tty_fds:
def __init__(self) -> None:
self.stdin_file: Optional[TextIO] = None
self.stdout_file: Optional[TextIO] = None
def __enter__(self) -> Tuple[int, int]:
try:
self.stdout_file = open("/dev/tty", "wt", encoding="utf_8")
except OSError:
self.stdout_file = open("/dev/null", "wt", encoding="utf_8")
return (sys.stdin.fileno(), self.stdout_file.fileno())
def __exit__(self, type_: str, value: str, traceback: str) -> None:
assert self.stdout_file is not None
self.stdout_file.close()
def build_metadata( # pylint: disable=too-many-arguments
cols: int,
rows: int,
idle_time_limit: Optional[float],
capture_env: List[str],
env: Dict[str, str],
title: Optional[str],
) -> Dict[str, Any]:
metadata: Dict[str, Any] = {
"width": cols,
"height": rows,
"timestamp": int(time.time()),
}
if idle_time_limit is not None:
metadata["idle_time_limit"] = idle_time_limit
metadata["env"] = {var: env.get(var) for var in capture_env}
if title:
metadata["title"] = title
return metadata
class async_writer(async_worker):
def __init__(
self, writer: w2, time_offset: float, record_stdin: bool
) -> None:
async_worker.__init__(self)
self.writer = writer
self.time_offset = time_offset
self.record_stdin = record_stdin
def write_stdin(self, ts: float, data: Any) -> None:
if self.record_stdin:
self.enqueue([ts, "i", data])
def write_stdout(self, ts: float, data: Any) -> None:
self.enqueue([ts, "o", data])
def write_marker(self, ts: float) -> None:
self.enqueue([ts, "m", None])
def write_resize(self, ts: float, size: Tuple[int, int]) -> None:
self.enqueue([ts, "r", size])
def run(self) -> None:
try:
with self.writer as w:
event: Tuple[float, str, Any]
for event in iter(self.queue.get, None):
assert event is not None
ts, etype, data = event
if etype == "o":
w.write_stdout(self.time_offset + ts, data)
elif etype == "i":
w.write_stdin(self.time_offset + ts, data)
elif etype == "m":
w.write_marker(self.time_offset + ts)
elif etype == "r":
w.write_resize(self.time_offset + ts, data)
except IOError:
for event in iter(self.queue.get, None):
pass
class async_notifier(async_worker):
def __init__(self, notify: Callable[[str], None]) -> None:
async_worker.__init__(self)
self._notify = notify
def notify(self, text: str) -> None:
self.enqueue(text)
def perform(self, text: str) -> None:
try:
self._notify(text)
except: # pylint: disable=bare-except # noqa: E722
# we catch *ALL* exceptions here because we don't want failed
# notification to crash the recording session
pass
def _get_tty_size(
fd: int, cols_override: Optional[int], rows_override: Optional[int]
) -> Callable[[], Tuple[int, int]]:
if cols_override is not None and rows_override is not None:
def fixed_size() -> Tuple[int, int]:
return (cols_override, rows_override) # type: ignore
return fixed_size
if not os.isatty(fd):
def fallback_size() -> Tuple[int, int]:
return (cols_override or 80, rows_override or 24)
return fallback_size
def size() -> Tuple[int, int]:
cols, rows = os.get_terminal_size(fd)
return (cols_override or cols, rows_override or rows)
return size

View File

@@ -1,34 +0,0 @@
import os
import select
import termios as tty # avoid `Module "tty" has no attribute ...` errors
from time import sleep
from tty import setraw
from typing import IO, Any, List, Optional, Union
class raw:
def __init__(self, fd: Union[IO[str], int]) -> None:
self.fd = fd
self.restore: bool = False
self.mode: Optional[List[Any]] = None
def __enter__(self) -> None:
try:
self.mode = tty.tcgetattr(self.fd)
setraw(self.fd)
self.restore = True
except tty.error: # this is `termios.error`
pass
def __exit__(self, type_: str, value: str, traceback: str) -> None:
if self.restore:
sleep(0.01) # give the terminal time to send answerbacks
assert isinstance(self.mode, list)
tty.tcsetattr(self.fd, tty.TCSAFLUSH, self.mode)
def read_blocking(fd: int, timeout: Any) -> bytes:
if fd in select.select([fd], [], [], timeout)[0]:
return os.read(fd, 1024)
return b""

View File

@@ -1,124 +0,0 @@
import codecs
import http
import io
import sys
from base64 import b64encode
from http.client import HTTPResponse
from typing import Any, Dict, Generator, Optional, Tuple
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
from uuid import uuid4
from .http_adapter import HTTPConnectionError
class MultipartFormdataEncoder:
def __init__(self) -> None:
self.boundary = uuid4().hex
self.content_type = f"multipart/form-data; boundary={self.boundary}"
@classmethod
def u(cls, s: Any) -> Any:
if sys.hexversion >= 0x03000000 and isinstance(s, bytes):
s = s.decode("utf-8")
return s
def iter(
self, fields: Dict[str, Any], files: Dict[str, Tuple[str, Any]]
) -> Generator[Tuple[bytes, int], None, None]:
"""
fields: {name: value} for regular form fields.
files: {name: (filename, file-type)} for data to be uploaded as files
yield body's chunk as bytes
"""
encoder = codecs.getencoder("utf-8")
for key, value in fields.items():
key = self.u(key)
yield encoder(f"--{self.boundary}\r\n")
yield encoder(
self.u(f'content-disposition: form-data; name="{key}"\r\n')
)
yield encoder("\r\n")
if isinstance(value, (int, float)):
value = str(value)
yield encoder(self.u(value))
yield encoder("\r\n")
for key, filename_and_f in files.items():
filename, f = filename_and_f
key = self.u(key)
filename = self.u(filename)
yield encoder(f"--{self.boundary}\r\n")
yield encoder(
self.u(
"content-disposition: form-data"
f'; name="{key}"'
f'; filename="{filename}"\r\n'
)
)
yield encoder("content-type: application/octet-stream\r\n")
yield encoder("\r\n")
data = f.read()
yield (data, len(data))
yield encoder("\r\n")
yield encoder(f"--{self.boundary}--\r\n")
def encode(
self, fields: Dict[str, Any], files: Dict[str, Tuple[str, Any]]
) -> Tuple[str, bytes]:
body = io.BytesIO()
for chunk, _ in self.iter(fields, files):
body.write(chunk)
return self.content_type, body.getvalue()
class URLLibHttpAdapter: # pylint: disable=too-few-public-methods
def post( # pylint: disable=too-many-arguments,too-many-locals
self,
url: str,
fields: Optional[Dict[str, Any]] = None,
files: Optional[Dict[str, Tuple[str, Any]]] = None,
headers: Optional[Dict[str, str]] = None,
username: Optional[str] = None,
password: Optional[str] = None,
) -> Tuple[Any, Optional[Dict[str, str]], bytes]:
# avoid dangerous mutable default arguments
if fields is None:
fields = {}
if files is None:
files = {}
if headers is None:
headers = {}
content_type, body = MultipartFormdataEncoder().encode(fields, files)
headers = headers.copy()
headers["content-type"] = content_type
if password:
encoded_auth = b64encode(
f"{username}:{password}".encode("utf_8")
).decode("utf_8")
headers["authorization"] = f"Basic {encoded_auth}"
request = Request(url, data=body, headers=headers, method="POST")
try:
with urlopen(request) as response:
status = response.status
headers = self._parse_headers(response)
body = response.read().decode("utf-8")
except HTTPError as e:
status = e.code
headers = {}
body = e.read()
except (http.client.RemoteDisconnected, URLError) as e:
raise HTTPConnectionError(str(e)) from e
return (status, headers, body)
@staticmethod
def _parse_headers(response: HTTPResponse) -> Dict[str, str]:
headers = {k.lower(): v for k, v in response.getheaders()}
return headers

View File

@@ -1,38 +0,0 @@
[build-system]
requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta"
[tool.black]
line-length = 79
target-version = ["py38"]
[tool.isort]
line_length = 79
profile = "black"
multi_line_output = 3
[tool.mypy]
check_untyped_defs = true
disallow_any_generics = true
disallow_incomplete_defs = true
disallow_subclassing_any = true
disallow_untyped_calls = true
disallow_untyped_decorators = true
disallow_untyped_defs = true
no_implicit_optional = true
no_implicit_reexport = true
show_error_context = true
warn_redundant_casts = true
warn_return_any = true
warn_unused_configs = true
warn_unused_ignores = true
exclude = []
[tool.pylint."MESSAGES CONTROL"]
disable = [
"invalid-name",
"missing-class-docstring",
"missing-function-docstring",
"missing-module-docstring",
]
min-similarity-lines = 7

View File

@@ -1,59 +0,0 @@
[metadata]
name = asciinema
version = 2.4.0
author = Marcin Kulik
author_email = m@ku1ik.com
url = https://asciinema.org
download_url =
https://github.com/asciinema/asciinema/archive/v%(version)s.tar.gz
description = Terminal session recorder
description_file = README.md
license = GNU GPLv3
license_file = LICENSE
long_description = file: README.md
long_description_content_type = text/markdown; charset=UTF-8
classifiers =
Development Status :: 5 - Production/Stable
Environment :: Console
Intended Audience :: Developers
Intended Audience :: System Administrators
License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)
Natural Language :: English
Programming Language :: Python
Programming Language :: Python :: 3.7
Programming Language :: Python :: 3.8
Programming Language :: Python :: 3.9
Programming Language :: Python :: 3.10
Programming Language :: Python :: 3.11
Topic :: System :: Shells
Topic :: Terminals
Topic :: Utilities
[options]
include_package_data = True
packages =
asciinema
asciinema.asciicast
asciinema.commands
install_requires =
[options.package_data]
asciinema = data/*.png
[options.entry_points]
console_scripts =
asciinema = asciinema.__main__:main
[options.data_files]
share/doc/asciinema =
CHANGELOG.md
CODE_OF_CONDUCT.md
CONTRIBUTING.md
README.md
doc/asciicast-v1.md
doc/asciicast-v2.md
share/man/man1 =
man/asciinema.1
[pycodestyle]
ignore = E501,E402,E722

View File

View File

@@ -1,28 +0,0 @@
import json
import tempfile
from asciinema.asciicast import v2
from ..test_helper import Test
class TestWriter(Test):
@staticmethod
def test_writing() -> None:
_file, path = tempfile.mkstemp()
with v2.writer(path, width=80, height=24) as w:
w.write_stdout(1, "x") # ensure it supports both str and bytes
w.write_stdout(2, bytes.fromhex("78 c5 bc c3 b3 c5"))
w.write_stdout(3, bytes.fromhex("82 c4 87"))
w.write_stdout(4, bytes.fromhex("78 78"))
with open(path, "rt", encoding="utf_8") as f:
lines = list(map(json.loads, f.read().strip().split("\n")))
assert lines == [
{"version": 2, "width": 80, "height": 24},
[1, "o", "x"],
[2, "o", "xżó"],
[3, "o", "łć"],
[4, "o", "xx"],
], f"got:\n\n{lines}"

View File

@@ -1,218 +0,0 @@
import re
import tempfile
from os import path
from typing import Dict, Optional
import asciinema.config as cfg
from asciinema.config import Config
def create_config(
content: Optional[str] = None, env: Optional[Dict[str, str]] = None
) -> Config:
# avoid redefining `dir` builtin
dir_ = tempfile.mkdtemp()
if content:
# avoid redefining `os.path`
path_ = f"{dir_}/config"
with open(path_, "wt", encoding="utf_8") as f:
f.write(content)
return cfg.Config(dir_, env)
def read_install_id(install_id_path: str) -> str:
with open(install_id_path, "rt", encoding="utf_8") as f:
return f.read().strip()
def test_upgrade_no_config_file() -> None:
config = create_config()
config.upgrade()
install_id = read_install_id(config.install_id_path)
assert re.match("^\\w{8}-\\w{4}-\\w{4}-\\w{4}-\\w{12}", install_id)
assert install_id == config.install_id
assert not path.exists(config.config_file_path)
# it must not change after another upgrade
config.upgrade()
assert read_install_id(config.install_id_path) == install_id
def test_upgrade_config_file_with_api_token() -> None:
config = create_config("[api]\ntoken = foo-bar-baz")
config.upgrade()
assert read_install_id(config.install_id_path) == "foo-bar-baz"
assert config.install_id == "foo-bar-baz"
assert not path.exists(config.config_file_path)
config.upgrade()
assert read_install_id(config.install_id_path) == "foo-bar-baz"
def test_upgrade_config_file_with_api_token_and_more() -> None:
config = create_config(
"[api]\ntoken = foo-bar-baz\nurl = http://example.com"
)
config.upgrade()
assert read_install_id(config.install_id_path) == "foo-bar-baz"
assert config.install_id == "foo-bar-baz"
assert config.api_url == "http://example.com"
assert path.exists(config.config_file_path)
config.upgrade()
assert read_install_id(config.install_id_path) == "foo-bar-baz"
def test_upgrade_config_file_with_user_token() -> None:
config = create_config("[user]\ntoken = foo-bar-baz")
config.upgrade()
assert read_install_id(config.install_id_path) == "foo-bar-baz"
assert config.install_id == "foo-bar-baz"
assert not path.exists(config.config_file_path)
config.upgrade()
assert read_install_id(config.install_id_path) == "foo-bar-baz"
def test_upgrade_config_file_with_user_token_and_more() -> None:
config = create_config(
"[user]\ntoken = foo-bar-baz\n[api]\nurl = http://example.com"
)
config.upgrade()
assert read_install_id(config.install_id_path) == "foo-bar-baz"
assert config.install_id == "foo-bar-baz"
assert config.api_url == "http://example.com"
assert path.exists(config.config_file_path)
config.upgrade()
assert read_install_id(config.install_id_path) == "foo-bar-baz"
def test_default_api_url() -> None:
config = create_config("")
assert config.api_url == "https://asciinema.org"
def test_default_record_stdin() -> None:
config = create_config("")
assert config.record_stdin is False
def test_default_record_command() -> None:
config = create_config("")
assert config.record_command is None
def test_default_record_env() -> None:
config = create_config("")
assert config.record_env == "SHELL,TERM"
def test_default_record_idle_time_limit() -> None:
config = create_config("")
assert config.record_idle_time_limit is None
def test_default_record_yes() -> None:
config = create_config("")
assert config.record_yes is False
def test_default_record_quiet() -> None:
config = create_config("")
assert config.record_quiet is False
def test_default_play_idle_time_limit() -> None:
config = create_config("")
assert config.play_idle_time_limit is None
def test_api_url() -> None:
config = create_config("[api]\nurl = http://the/url")
assert config.api_url == "http://the/url"
def test_api_url_when_override_set() -> None:
config = create_config(
"[api]\nurl = http://the/url", {"ASCIINEMA_API_URL": "http://the/url2"}
)
assert config.api_url == "http://the/url2"
def test_record_command() -> None:
command = "bash -l"
config = create_config(f"[record]\ncommand = {command}")
assert config.record_command == command
def test_record_stdin() -> None:
config = create_config("[record]\nstdin = yes")
assert config.record_stdin is True
def test_record_env() -> None:
config = create_config("[record]\nenv = FOO,BAR")
assert config.record_env == "FOO,BAR"
def test_record_idle_time_limit() -> None:
config = create_config("[record]\nidle_time_limit = 2.35")
assert config.record_idle_time_limit == 2.35
config = create_config("[record]\nmaxwait = 2.35")
assert config.record_idle_time_limit == 2.35
def test_record_yes() -> None:
yes = "yes"
config = create_config(f"[record]\nyes = {yes}")
assert config.record_yes is True
def test_record_quiet() -> None:
quiet = "yes"
config = create_config(f"[record]\nquiet = {quiet}")
assert config.record_quiet is True
def test_play_idle_time_limit() -> None:
config = create_config("[play]\nidle_time_limit = 2.35")
assert config.play_idle_time_limit == 2.35
config = create_config("[play]\nmaxwait = 2.35")
assert config.play_idle_time_limit == 2.35
def test_notifications_enabled() -> None:
config = create_config("")
assert config.notifications_enabled is True
config = create_config("[notifications]\nenabled = yes")
assert config.notifications_enabled is True
config = create_config("[notifications]\nenabled = no")
assert config.notifications_enabled is False
def test_notifications_command() -> None:
config = create_config("")
assert config.notifications_command is None
config = create_config(
'[notifications]\ncommand = tmux display-message "$TEXT"'
)
assert config.notifications_command == 'tmux display-message "$TEXT"'

View File

@@ -1,54 +0,0 @@
import os
import pty
from typing import Any, List, Union
import asciinema.pty_
from .test_helper import Test
class Writer:
def __init__(self) -> None:
self.data: List[Union[float, str]] = []
def write_stdout(self, _ts: float, data: Any) -> None:
self.data.append(data)
def write_stdin(self, ts: float, data: Any) -> None:
raise NotImplementedError
class TestRecord(Test):
def setUp(self) -> None:
self.real_os_write = os.write
os.write = self.os_write # type: ignore
def tearDown(self) -> None:
os.write = self.real_os_write
def os_write(self, fd: int, data: Any) -> None:
if fd != pty.STDOUT_FILENO:
self.real_os_write(fd, data)
@staticmethod
def test_record_command_writes_to_stdout() -> None:
writer = Writer()
command = [
"python3",
"-c",
(
"import sys"
"; import time"
"; sys.stdout.write('foo')"
"; sys.stdout.flush()"
"; time.sleep(0.01)"
"; sys.stdout.write('bar')"
),
]
asciinema.pty_.record(
command, {}, writer, lambda: (80, 24), lambda s: None, {}
)
assert writer.data == [b"foo", b"bar"]

View File

@@ -1,16 +0,0 @@
import sys
from codecs import StreamReader
from io import StringIO
from typing import Optional, TextIO, Union
stdout: Optional[Union[TextIO, StreamReader]] = None
class Test:
def setUp(self) -> None:
global stdout # pylint: disable=global-statement
self.real_stdout = sys.stdout
sys.stdout = stdout = StringIO()
def tearDown(self) -> None:
sys.stdout = self.real_stdout