Merge pull request #445 from djds/develop

Add PEP 518 build; annotate types; drop Python < 3.6 support
This commit is contained in:
Marcin Kulik
2022-02-13 21:22:37 +01:00
committed by GitHub
44 changed files with 1803 additions and 988 deletions

7
.github/dependabot.yml vendored Normal file
View File

@@ -0,0 +1,7 @@
---
version: 2
updates:
- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: "daily"

View File

@@ -1,44 +1,110 @@
---
name: build
on: [push, pull_request]
on:
- push
- pull_request
jobs:
# Code style checks
health:
name: Code health check
name: code health check
runs-on: ubuntu-latest
steps:
- name: Checkout Asciinema
- name: checkout asciinema
uses: actions/checkout@v2
- name: Setup Python
- name: setup Python
uses: actions/setup-python@v2
with:
python-version: 3.9
- name: Install dependencies
run: pip install pycodestyle twine setuptools>=38.6.0 cmarkgfm
python-version: "3.9"
- name: install dependencies
run: pip install build cmarkgfm pycodestyle twine
- name: Run pycodestyle
run: find . -name \*.py -exec pycodestyle --ignore=E501,E402,E722 {} +
run: >
find .
-name '*\.py'
-exec pycodestyle --ignore=E402,E501,E722,W503 "{}" \+
- name: Run twine
run: |
python setup.py --quiet sdist
python3 -m build
twine check dist/*
# Asciinema checks
asciinema:
name: Asciinema - py${{ matrix.python }}
name: Asciinema
runs-on: ubuntu-latest
strategy:
matrix:
python: [3.6, 3.7, 3.8, 3.9]
python:
- "3.6"
- "3.7"
- "3.8"
- "3.9"
- "3.10"
env:
TERM: dumb
steps:
- name: Checkout Asciinema
- name: checkout Asciinema
uses: actions/checkout@v2
- name: Setup Python
- name: setup Python
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python }}
- name: Install dependencies
run: pip install nose
- name: Run Asciinema tests
- name: install dependencies
run: pip install pytest
- name: run Asciinema tests
run: script -e -c make test
build_distros:
name: build distro images
strategy:
matrix:
distros:
- alpine
- arch
- centos
- debian
- fedora
- ubuntu
runs-on: ubuntu-latest
steps:
- name: Set up Docker buildx
id: buildx
uses: docker/setup-buildx-action@v1
- name: Authenticate to GHCR
uses: docker/login-action@v1
with:
registry: ghcr.io
username: "${{ github.actor }}"
password: "${{ secrets.GITHUB_TOKEN }}"
- name: "Build ${{ matrix.distros }} image"
uses: docker/build-push-action@v2
with:
file: "tests/distros/Dockerfile.${{ matrix.distros }}"
tags: |
"ghcr.io/${{ github.repository }}:${{ matrix.distros }}"
push: true
test_distros:
name: integration test distro images
needs: build_distros
strategy:
matrix:
distros:
- alpine
- arch
- centos
- debian
- fedora
- ubuntu
runs-on: ubuntu-latest
container:
image: "ghcr.io/${{ github.repository }}:${{ matrix.distros }}"
credentials:
username: "${{ github.actor }}"
password: "${{ secrets.GITHUB_TOKEN }}"
# https://github.community/t/permission-problems-when-checking-out-code-as-part-of-github-action/202263
options: "--interactive --tty --user=1001:121"
steps:
- name: checkout Asciinema
uses: actions/checkout@v2
- name: run integration tests
env:
TERM: dumb
shell: 'script --return --quiet --command "bash {0}"'
run: make test.integration

40
.pre-commit-config.yaml Normal file
View File

@@ -0,0 +1,40 @@
---
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.1.0
hooks:
- id: check-ast
- id: check-json
- id: check-toml
- id: check-yaml
- id: end-of-file-fixer
- id: requirements-txt-fixer
- id: trailing-whitespace
- repo: https://github.com/PyCQA/isort
rev: 5.10.1
hooks:
- id: isort
- repo: https://github.com/psf/black
rev: 22.1.0
hooks:
- id: black
- repo: https://github.com/adrienverge/yamllint
rev: v1.26.3
hooks:
- id: yamllint
- repo: https://github.com/myint/autoflake
rev: v1.4
hooks:
- id: autoflake
args:
- --in-place
- --recursive
- --expand-star-imports
- --remove-all-unused-imports
- --remove-duplicate-keys
- --remove-unused-variables
- repo: https://github.com/pre-commit/mirrors-mypy
rev: "v0.931"
hooks:
- id: mypy
exclude: tests

View File

@@ -9,27 +9,28 @@ RUN apt-get update \
ca-certificates \
locales \
python3 \
python3-setuptools \
python3-pip \
&& localedef \
-i en_US \
-c \
-f UTF-8 \
-A /usr/share/locale/locale.alias en_US.UTF-8
-A /usr/share/locale/locale.alias \
en_US.UTF-8
COPY setup.cfg setup.py *.md /usr/src/app/
COPY pyproject.toml setup.cfg *.md /usr/src/app/
COPY doc/*.md /usr/src/app/doc/
COPY man/asciinema.1 /usr/src/app/man/
COPY asciinema/ /usr/src/app/asciinema/
COPY README.md LICENSE /usr/src/app/
WORKDIR /usr/src/app
RUN python3 setup.py install
RUN pip3 install .
WORKDIR /root
ENV LANG="en_US.utf8"
ENV SHELL="/bin/bash"
ENV USER="docker"
WORKDIR /root
ENTRYPOINT ["/usr/local/bin/asciinema"]
CMD ["--help"]

View File

@@ -1,31 +1,75 @@
NAME=asciinema
VERSION=`python3 -c "import asciinema; print(asciinema.__version__)"`
NAME := asciinema
VERSION := $(shell python3 -c "import asciinema; print(asciinema.__version__)")
test: test-unit test-integration
VIRTUAL_ENV ?= .venv
test-unit:
nosetests
.PHONY: test
test: test.unit test.integration
test-integration:
.PHONY: test.unit
test.unit:
pytest
.PHONY: test.integration
test.integration:
tests/integration.sh
.PHONY: test.distros
test.distros:
tests/distros.sh
.PHONY: release
release: test tag push
release-test: test push-test
.PHONY: release.test
release.test: test push.test
tag:
git tag | grep "v$(VERSION)" && echo "Tag v$(VERSION) exists" && exit 1 || true
.PHONY: .tag.exists
.tag.exists:
@git tag \
| grep -q "v$(VERSION)" \
&& echo "Tag v$(VERSION) exists" \
&& exit 1
.PHONY: tag
tag: .tag.exists
git tag -s -m "Releasing $(VERSION)" v$(VERSION)
git push origin v$(VERSION)
push:
python3 -m pip install --user --upgrade --quiet twine
python3 setup.py sdist bdist_wheel
.PHONY: .venv
.venv:
python3 -m venv $(VIRTUAL_ENV)
.PHONY: .pip
.pip: .venv
. $(VIRTUAL_ENV)/bin/activate \
&& python3 -m pip install --upgrade build twine
build: .pip
. $(VIRTUAL_ENV)/bin/activate \
&& python3 -m build .
install: build
. $(VIRTUAL_ENV)/bin/activate \
&& python3 -m pip install .
.PHONY: push
push: .pip build
python3 -m twine upload dist/*
push-test:
python3 -m pip install --user --upgrade --quiet twine
python3 setup.py sdist bdist_wheel
.PHONY: push.test
push.test: .pip build
python3 -m twine upload --repository testpypi dist/*
.PHONY: test test-unit test-integration release release-test tag push push-test
.PHONY: clean
clean:
rm -rf dist *.egg-info
clean.all: clean
find . \
-type d \
-name __pycache__ \
-o -name .pytest_cache \
-o -name .mypy_cache \
-exec rm -r "{}" +

169
README.md
View File

@@ -19,23 +19,33 @@ them in a terminal as well as in a web browser.
Install latest version ([other installation options](#installation))
using [pipx](https://pypa.github.io/pipx/) (if you have it):
pipx install asciinema
```sh
pipx install asciinema
```
If you don't have pipx, install using pip with your preferred Python version:
python3 -m pip install asciinema
```sh
python3 -m pip install asciinema
```
Record your first session:
asciinema rec first.cast
```sh
asciinema rec first.cast
```
Now replay it with double speed:
asciinema play -s 2 first.cast
```sh
asciinema play -s 2 first.cast
```
Or with normal speed but with idle time limited to 2 seconds:
asciinema play -i 2 first.cast
```sh
asciinema play -i 2 first.cast
```
You can pass `-i 2` to `asciinema rec` as well, to set it permanently on a
recording. Idle time limiting makes the recordings much more interesting to
@@ -43,7 +53,9 @@ watch. Try it.
If you want to watch and share it on the web, upload it:
asciinema upload first.cast
```sh
asciinema upload first.cast
```
The above uploads it to [asciinema.org](https://asciinema.org), which is a
default [asciinema-server](https://github.com/asciinema/asciinema-server)
@@ -52,7 +64,9 @@ browser.
You can record and upload in one step by omitting the filename:
asciinema rec
```sh
asciinema rec
```
You'll be asked to confirm the upload when the recording is done. Nothing is
sent anywhere without your consent.
@@ -64,17 +78,21 @@ cover installation, usage and hosting of the recordings in more detail.
### Python package from PyPI
[PyPI]: https://pypi.python.org/pypi/asciinema
[pypi]: https://pypi.python.org/pypi/asciinema
asciinema is available on [PyPI] and can
be installed with [pipx](https://pypa.github.io/pipx/) (if you have it)
or with pip (Python 3 with setuptools required):
asciinema is available on [PyPI] and can be installed with
[pipx](https://pypa.github.io/pipx/) (if you have it) or with pip (Python 3
with setuptools required):
pipx install asciinema
```sh
pipx install asciinema
```
Or with pip (using your preferred Python version):
python3 -m pip install asciinema
```sh
python3 -m pip install asciinema
```
Installing from [PyPI] is the recommended way of installation, which gives you the latest released version.
@@ -91,32 +109,45 @@ can clone the repo and run asciinema straight from the checkout.
Clone the repo:
git clone https://github.com/asciinema/asciinema.git
cd asciinema
```sh
git clone https://github.com/asciinema/asciinema.git
cd asciinema
```
If you want latest stable version:
git checkout master
```sh
git checkout master
```
If you want current development version:
git checkout develop
```sh
git checkout develop
```
Then run it with:
python3 -m asciinema --version
```sh
python3 -m asciinema --version
```
### Docker image
asciinema Docker image is based on Ubuntu 20.04 and has the latest version of
asciinema Docker image is based on [Ubuntu
20.04](https://releases.ubuntu.com/20.04/) and has the latest version of
asciinema recorder pre-installed.
docker pull asciinema/asciinema
```sh
docker pull docker.io/asciinema/asciinema
```
When running it don't forget to allocate a pseudo-TTY (`-t`), keep STDIN open
(`-i`) and mount config directory volume (`-v`):
docker run --rm -ti -v "$HOME/.config/asciinema":/root/.config/asciinema asciinema/asciinema rec
```sh
docker run --rm -it -v "${HOME}/.config/asciinema:/root/.config/asciinema" docker.io/asciinema/asciinema rec
```
Container's entrypoint is set to `/usr/local/bin/asciinema` so you can run the
container with any arguments you would normally pass to `asciinema` binary (see
@@ -124,13 +155,28 @@ Usage section for commands and options).
There's not much software installed in this image though. In most cases you may
want to install extra programs before recording. One option is to derive new
image from this one (start your custom Dockerfile with `FROM
asciinema/asciinema`). Another option is to start the container with `/bin/bash`
image from this one (start your custom Dockerfile with `FROM asciinema/asciinema`). Another option is to start the container with `/bin/bash`
as the entrypoint, install extra packages and manually start `asciinema rec`:
docker run --rm -ti -v "$HOME/.config/asciinema":/root/.config/asciinema --entrypoint=/bin/bash asciinema/asciinema
root@6689517d99a1:~# apt-get install foobar
root@6689517d99a1:~# asciinema rec
```console
docker run --rm -it -v "${HOME}/.config/asciinema:/root/.config/asciinema" --entrypoint=/bin/bash docker.io/asciinema/asciinema rec
root@6689517d99a1:~# apt-get install foobar
root@6689517d99a1:~# asciinema rec
```
It is also possible to run the docker container as a non-root user, which has
security benefits. You can specify a user and group id at runtime to give the
application permission similar to the calling user on your host.
```sh
docker run --rm -it \
--env=ASCIINEMA_CONFIG_HOME="/run/user/$(id -u)/.config/asciinema" \
--user="$(id -u):$(id -g)" \
--volume="${HOME}/.config/asciinema:/run/user/$(id -u)/.config/asciinema:rw" \
--volume="${PWD}:/data:rw" \
--workdir='/data' \
docker.io/asciinema/asciinema rec
```
## Usage
@@ -142,7 +188,7 @@ all available commands with their options.
### `rec [filename]`
__Record terminal session.__
**Record terminal session.**
By running `asciinema rec [filename]` you start a new recording session. The
command (process) that is recorded can be specified with `-c` option (see
@@ -173,17 +219,17 @@ prompt or play a sound when the shell is being recorded.
Available options:
* `--stdin` - Enable stdin (keyboard) recording (see below)
* `--append` - Append to existing recording
* `--raw` - Save raw STDOUT output, without timing information or other metadata
* `--overwrite` - Overwrite the recording if it already exists
* `-c, --command=<command>` - Specify command to record, defaults to $SHELL
* `-e, --env=<var-names>` - List of environment variables to capture, defaults
- `--stdin` - Enable stdin (keyboard) recording (see below)
- `--append` - Append to existing recording
- `--raw` - Save raw STDOUT output, without timing information or other metadata
- `--overwrite` - Overwrite the recording if it already exists
- `-c, --command=<command>` - Specify command to record, defaults to $SHELL
- `-e, --env=<var-names>` - List of environment variables to capture, defaults
to `SHELL,TERM`
* `-t, --title=<title>` - Specify the title of the asciicast
* `-i, --idle-time-limit=<sec>` - Limit recorded terminal inactivity to max `<sec>` seconds
* `-y, --yes` - Answer "yes" to all prompts (e.g. upload confirmation)
* `-q, --quiet` - Be quiet, suppress all notices/warnings (implies -y)
- `-t, --title=<title>` - Specify the title of the asciicast
- `-i, --idle-time-limit=<sec>` - Limit recorded terminal inactivity to max `<sec>` seconds
- `-y, --yes` - Answer "yes" to all prompts (e.g. upload confirmation)
- `-q, --quiet` - Be quiet, suppress all notices/warnings (implies -y)
Stdin recording allows for capturing of all characters typed in by the user in
the currently recorded shell. This may be used by a player (e.g.
@@ -194,7 +240,7 @@ instance), it's disabled by default, and has to be explicitly enabled via
### `play <filename>`
__Replay recorded asciicast in a terminal.__
**Replay recorded asciicast in a terminal.**
This command replays given asciicast (as recorded by `rec` command) directly in
your terminal.
@@ -207,32 +253,41 @@ Following keyboard shortcuts are available:
Playing from a local file:
asciinema play /path/to/asciicast.cast
```sh
asciinema play /path/to/asciicast.cast
```
Playing from HTTP(S) URL:
asciinema play https://asciinema.org/a/22124.cast
asciinema play http://example.com/demo.cast
```sh
asciinema play https://asciinema.org/a/22124.cast
asciinema play http://example.com/demo.cast
```
Playing from asciicast page URL (requires `<link rel="alternate"
type="application/x-asciicast" href="/my/ascii.cast">` in page's HTML):
Playing from asciicast page URL (requires `<link rel="alternate" type="application/x-asciicast" href="/my/ascii.cast">` in page's HTML):
asciinema play https://asciinema.org/a/22124
asciinema play http://example.com/blog/post.html
```sh
asciinema play https://asciinema.org/a/22124
asciinema play http://example.com/blog/post.html
```
Playing from stdin:
cat /path/to/asciicast.cast | asciinema play -
ssh user@host cat asciicast.cast | asciinema play -
```sh
cat /path/to/asciicast.cast | asciinema play -
ssh user@host cat asciicast.cast | asciinema play -
```
Playing from IPFS:
asciinema play dweb:/ipfs/QmNe7FsYaHc9SaDEAEXbaagAzNw9cH7YbzN4xV7jV1MCzK/ascii.cast
```sh
asciinema play dweb:/ipfs/QmNe7FsYaHc9SaDEAEXbaagAzNw9cH7YbzN4xV7jV1MCzK/ascii.cast
```
Available options:
* `-i, --idle-time-limit=<sec>` - Limit replayed terminal inactivity to max `<sec>` seconds
* `-s, --speed=<factor>` - Playback speed (can be fractional)
- `-i, --idle-time-limit=<sec>` - Limit replayed terminal inactivity to max `<sec>` seconds
- `-s, --speed=<factor>` - Playback speed (can be fractional)
> For the best playback experience it is recommended to run `asciinema play` in
> a terminal of dimensions not smaller than the one used for recording, as
@@ -240,7 +295,7 @@ Available options:
### `cat <filename>`
__Print full output of recorded asciicast to a terminal.__
**Print full output of recorded asciicast to a terminal.**
While `asciinema play <filename>` replays the recorded session using timing
information saved in the asciicast, `asciinema cat <filename>` dumps the full
@@ -251,18 +306,17 @@ output (including all escape sequences) to a terminal immediately.
### `upload <filename>`
__Upload recorded asciicast to asciinema.org site.__
**Upload recorded asciicast to asciinema.org site.**
This command uploads given asciicast (recorded by `rec` command) to
asciinema.org, where it can be watched and shared.
`asciinema rec demo.cast` + `asciinema play demo.cast` + `asciinema upload
demo.cast` is a nice combo if you want to review an asciicast before
`asciinema rec demo.cast` + `asciinema play demo.cast` + `asciinema upload demo.cast` is a nice combo if you want to review an asciicast before
publishing it on asciinema.org.
### `auth`
__Link your install ID with your asciinema.org user account.__
**Link your install ID with your asciinema.org user account.**
If you want to manage your recordings (change title/theme, delete) at
asciinema.org you need to link your "install ID" with asciinema.org user
@@ -402,6 +456,7 @@ source [contributors](https://github.com/asciinema/asciinema/contributors).
## License
Copyright &copy; 20112019 Marcin Kulik.
Copyright &copy; 20112021 Marcin Kulik.
All code is licensed under the GPL, v3 or later. See LICENSE file for details.
All code is licensed under the GPL, v3 or later. See [LICENSE](./LICENSE) file
for details.

View File

@@ -1,19 +1,30 @@
import sys
__author__ = 'Marcin Kulik'
__version__ = '2.0.2'
__author__ = "Marcin Kulik"
__version__ = "2.2.0"
if sys.version_info[0] < 3:
raise ImportError('Python < 3 is unsupported.')
if sys.version_info < (3, 6):
raise ImportError("Python < 3.6 is unsupported.")
import asciinema.recorder
# pylint: disable=wrong-import-position
from typing import Any, Optional
from .recorder import record
def record_asciicast(path, command=None, append=False, idle_time_limit=None,
rec_stdin=False, title=None, metadata=None,
command_env=None, capture_env=None):
asciinema.recorder.record(
path,
def record_asciicast( # pylint: disable=too-many-arguments
path_: str,
command: Any = None,
append: bool = False,
idle_time_limit: Optional[int] = None,
rec_stdin: bool = False,
title: Optional[str] = None,
metadata: Any = None,
command_env: Any = None,
capture_env: Any = None,
) -> None:
record(
path_,
command=command,
append=append,
idle_time_limit=idle_time_limit,
@@ -21,5 +32,5 @@ def record_asciicast(path, command=None, append=False, idle_time_limit=None,
title=title,
metadata=metadata,
command_env=command_env,
capture_env=capture_env
capture_env=capture_env,
)

View File

@@ -1,40 +1,48 @@
import locale
import argparse
import locale
import os
import sys
from typing import Any, Optional
from asciinema import __version__
import asciinema.config as config
from asciinema.commands.auth import AuthCommand
from asciinema.commands.record import RecordCommand
from asciinema.commands.play import PlayCommand
from asciinema.commands.cat import CatCommand
from asciinema.commands.upload import UploadCommand
from . import __version__, config
from .commands.auth import AuthCommand
from .commands.cat import CatCommand
from .commands.play import PlayCommand
from .commands.record import RecordCommand
from .commands.upload import UploadCommand
def positive_float(value):
value = float(value)
if value <= 0.0:
def positive_float(value: str) -> float:
_value = float(value)
if _value <= 0.0:
raise argparse.ArgumentTypeError("must be positive")
return value
return _value
def maybe_str(v):
def maybe_str(v: Any) -> Optional[str]:
if v is not None:
return str(v)
return None
def main():
if locale.nl_langinfo(locale.CODESET).upper() not in ['US-ASCII', 'UTF-8', 'UTF8']:
print("asciinema needs an ASCII or UTF-8 character encoding to run. Check the output of `locale` command.")
sys.exit(1)
def main() -> Any:
if locale.nl_langinfo(locale.CODESET).upper() not in [
"US-ASCII",
"UTF-8",
"UTF8",
]:
sys.stderr.write(
"asciinema needs an ASCII or UTF-8 character encoding to run. "
"Check the output of `locale` command.\n"
)
return 1
try:
cfg = config.load()
except config.ConfigError as e:
sys.stderr.write(str(e) + '\n')
sys.exit(1)
sys.stderr.write(f"{e}\n")
return 1
# create the top-level parser
parser = argparse.ArgumentParser(
@@ -56,60 +64,140 @@ def main():
\x1b[1masciinema cat demo.cast\x1b[0m
For help on a specific command run:
\x1b[1masciinema <command> -h\x1b[0m""",
formatter_class=argparse.RawDescriptionHelpFormatter
\x1b[1masciinema <command> -h\x1b[0m""", # noqa: E501
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
"--version", action="version", version=f"asciinema {__version__}"
)
parser.add_argument('--version', action='version', version='asciinema %s' % __version__)
subparsers = parser.add_subparsers()
# create the parser for the "rec" command
parser_rec = subparsers.add_parser('rec', help='Record terminal session')
parser_rec.add_argument('--stdin', help='enable stdin recording, disabled by default', action='store_true', default=cfg.record_stdin)
parser_rec.add_argument('--append', help='append to existing recording', action='store_true', default=False)
parser_rec.add_argument('--raw', help='save only raw stdout output', action='store_true', default=False)
parser_rec.add_argument('--overwrite', help='overwrite the file if it already exists', action='store_true', default=False)
parser_rec.add_argument('-c', '--command', help='command to record, defaults to $SHELL', default=cfg.record_command)
parser_rec.add_argument('-e', '--env', help='list of environment variables to capture, defaults to ' + config.DEFAULT_RECORD_ENV, default=cfg.record_env)
parser_rec.add_argument('-t', '--title', help='title of the asciicast')
parser_rec.add_argument('-i', '--idle-time-limit', help='limit recorded idle time to given number of seconds', type=positive_float, default=maybe_str(cfg.record_idle_time_limit))
parser_rec.add_argument('-y', '--yes', help='answer "yes" to all prompts (e.g. upload confirmation)', action='store_true', default=cfg.record_yes)
parser_rec.add_argument('-q', '--quiet', help='be quiet, suppress all notices/warnings (implies -y)', action='store_true', default=cfg.record_quiet)
parser_rec.add_argument('filename', nargs='?', default='', help='filename/path to save the recording to')
# create the parser for the `rec` command
parser_rec = subparsers.add_parser("rec", help="Record terminal session")
parser_rec.add_argument(
"--stdin",
help="enable stdin recording, disabled by default",
action="store_true",
default=cfg.record_stdin,
)
parser_rec.add_argument(
"--append",
help="append to existing recording",
action="store_true",
default=False,
)
parser_rec.add_argument(
"--raw",
help="save only raw stdout output",
action="store_true",
default=False,
)
parser_rec.add_argument(
"--overwrite",
help="overwrite the file if it already exists",
action="store_true",
default=False,
)
parser_rec.add_argument(
"-c",
"--command",
help="command to record, defaults to $SHELL",
default=cfg.record_command,
)
parser_rec.add_argument(
"-e",
"--env",
help="list of environment variables to capture, defaults to "
+ config.DEFAULT_RECORD_ENV,
default=cfg.record_env,
)
parser_rec.add_argument("-t", "--title", help="title of the asciicast")
parser_rec.add_argument(
"-i",
"--idle-time-limit",
help="limit recorded idle time to given number of seconds",
type=positive_float,
default=maybe_str(cfg.record_idle_time_limit),
)
parser_rec.add_argument(
"-y",
"--yes",
help='answer "yes" to all prompts (e.g. upload confirmation)',
action="store_true",
default=cfg.record_yes,
)
parser_rec.add_argument(
"-q",
"--quiet",
help="be quiet, suppress all notices/warnings (implies -y)",
action="store_true",
default=cfg.record_quiet,
)
parser_rec.add_argument(
"filename",
nargs="?",
default="",
help="filename/path to save the recording to",
)
parser_rec.set_defaults(cmd=RecordCommand)
# create the parser for the "play" command
parser_play = subparsers.add_parser('play', help='Replay terminal session')
parser_play.add_argument('-i', '--idle-time-limit', help='limit idle time during playback to given number of seconds', type=positive_float, default=maybe_str(cfg.play_idle_time_limit))
parser_play.add_argument('-s', '--speed', help='playback speedup (can be fractional)', type=positive_float, default=cfg.play_speed)
parser_play.add_argument('filename', help='local path, http/ipfs URL or "-" (read from stdin)')
# create the parser for the `play` command
parser_play = subparsers.add_parser("play", help="Replay terminal session")
parser_play.add_argument(
"-i",
"--idle-time-limit",
help="limit idle time during playback to given number of seconds",
type=positive_float,
default=maybe_str(cfg.play_idle_time_limit),
)
parser_play.add_argument(
"-s",
"--speed",
help="playback speedup (can be fractional)",
type=positive_float,
default=cfg.play_speed,
)
parser_play.add_argument(
"filename", help='local path, http/ipfs URL or "-" (read from stdin)'
)
parser_play.set_defaults(cmd=PlayCommand)
# create the parser for the "cat" command
parser_cat = subparsers.add_parser('cat', help='Print full output of terminal session')
parser_cat.add_argument('filename', help='local path, http/ipfs URL or "-" (read from stdin)')
# create the parser for the `cat` command
parser_cat = subparsers.add_parser(
"cat", help="Print full output of terminal session"
)
parser_cat.add_argument(
"filename", help='local path, http/ipfs URL or "-" (read from stdin)'
)
parser_cat.set_defaults(cmd=CatCommand)
# create the parser for the "upload" command
parser_upload = subparsers.add_parser('upload', help='Upload locally saved terminal session to asciinema.org')
parser_upload.add_argument('filename', help='filename or path of local recording')
# create the parser for the `upload` command
parser_upload = subparsers.add_parser(
"upload", help="Upload locally saved terminal session to asciinema.org"
)
parser_upload.add_argument(
"filename", help="filename or path of local recording"
)
parser_upload.set_defaults(cmd=UploadCommand)
# create the parser for the "auth" command
parser_auth = subparsers.add_parser('auth', help='Manage recordings on asciinema.org account')
# create the parser for the `auth` command
parser_auth = subparsers.add_parser(
"auth", help="Manage recordings on asciinema.org account"
)
parser_auth.set_defaults(cmd=AuthCommand)
# parse the args and call whatever function was selected
args = parser.parse_args()
if hasattr(args, 'cmd'):
if hasattr(args, "cmd"):
command = args.cmd(args, cfg, os.environ)
code = command.execute()
sys.exit(code)
else:
parser.print_help()
sys.exit(1)
return code
parser.print_help()
return 1
if __name__ == '__main__':
main()
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,11 +1,12 @@
import json
import platform
import re
import json
from typing import Any, Callable, Dict, Optional, Tuple, Union
from urllib.parse import urlparse
from asciinema import __version__
from asciinema.urllib_http_adapter import URLLibHttpAdapter
from asciinema.http_adapter import HTTPConnectionError
from . import __version__
from .http_adapter import HTTPConnectionError
from .urllib_http_adapter import URLLibHttpAdapter
class APIError(Exception):
@@ -13,73 +14,89 @@ class APIError(Exception):
class Api:
def __init__(self, url, user, install_id, http_adapter=None):
def __init__(
self,
url: str,
user: Optional[str],
install_id: str,
http_adapter: Any = None,
) -> None:
self.url = url
self.user = user
self.install_id = install_id
self.http_adapter = http_adapter if http_adapter is not None else URLLibHttpAdapter()
self.http_adapter = (
http_adapter if http_adapter is not None else URLLibHttpAdapter()
)
def hostname(self):
def hostname(self) -> Optional[str]:
return urlparse(self.url).hostname
def auth_url(self):
return "{}/connect/{}".format(self.url, self.install_id)
def auth_url(self) -> str:
return f"{self.url}/connect/{self.install_id}"
def upload_url(self):
return "{}/api/asciicasts".format(self.url)
def upload_url(self) -> str:
return f"{self.url}/api/asciicasts"
def upload_asciicast(self, path):
with open(path, 'rb') as f:
def upload_asciicast(self, path_: str) -> Tuple[Any, Any]:
with open(path_, "rb") as f:
try:
status, headers, body = self.http_adapter.post(
self.upload_url(),
files={"asciicast": ("ascii.cast", f)},
headers=self._headers(),
username=self.user,
password=self.install_id
password=self.install_id,
)
except HTTPConnectionError as e:
raise APIError(str(e))
raise APIError(str(e)) from e
if status != 200 and status != 201:
if status not in (200, 201):
self._handle_error(status, body)
if (headers.get('content-type') or '')[0:16] == 'application/json':
if (headers.get("content-type") or "")[0:16] == "application/json":
result = json.loads(body)
else:
result = {'url': body}
result = {"url": body}
return result, headers.get('Warning')
return result, headers.get("Warning")
def _headers(self):
return {'User-Agent': self._user_agent(), 'Accept': 'application/json'}
def _headers(self) -> Dict[str, Union[Callable[[], str], str]]:
return {"user-agent": self._user_agent, "accept": "application/json"}
def _user_agent(self):
os = re.sub('([^-]+)-(.*)', '\\1/\\2', platform.platform())
@property
@staticmethod
def _user_agent() -> str:
os = re.sub("([^-]+)-(.*)", "\\1/\\2", platform.platform())
return 'asciinema/%s %s/%s %s' % (__version__,
platform.python_implementation(),
platform.python_version(),
os
)
return (
f"asciinema/{__version__} {platform.python_implementation()}"
f"/{platform.python_version()} {os}"
)
def _handle_error(self, status, body):
@staticmethod
def _handle_error(status: int, body: str) -> None:
errors = {
400: "Invalid request: %s" % body,
400: f"Invalid request: {body}",
401: "Invalid or revoked install ID",
404: "API endpoint not found. This asciinema version may no longer be supported. Please upgrade to the latest version.",
404: (
"API endpoint not found. "
"This asciinema version may no longer be supported. "
"Please upgrade to the latest version."
),
413: "Sorry, your asciicast is too big.",
422: "Invalid asciicast: %s" % body,
503: "The server is down for maintenance. Try again in a minute."
422: f"Invalid asciicast: {body}",
503: "The server is down for maintenance. Try again in a minute.",
}
error = errors.get(status)
if not error:
if status >= 500:
error = "The server is having temporary problems. Try again in a minute."
error = (
"The server is having temporary problems. "
"Try again in a minute."
)
else:
error = "HTTP status: %i" % status
error = f"HTTP status: {status}"
raise APIError(error)

View File

@@ -1,92 +1,116 @@
import sys
import os
from urllib.request import Request, urlopen
from urllib.parse import urlparse, urlunparse
import urllib.error
import html.parser
import gzip
import codecs
import gzip
import os
import sys
import urllib.error
from codecs import StreamReader
from html.parser import HTMLParser
from typing import Any, List, TextIO, Union
from urllib.parse import urlparse, urlunparse
from urllib.request import Request, urlopen
from . import v1
from . import v2
from . import v1, v2
class LoadError(Exception):
pass
class Parser(html.parser.HTMLParser):
def __init__(self):
html.parser.HTMLParser.__init__(self)
class Parser(HTMLParser):
def __init__(self) -> None:
HTMLParser.__init__(self)
self.url = None
def handle_starttag(self, tag, attrs_list):
# look for <link rel="alternate" type="application/x-asciicast" href="https://...cast">
if tag == 'link':
attrs = {}
for k, v in attrs_list:
attrs[k] = v
def error(self, message: str) -> None:
raise NotImplementedError(
"subclasses of ParserBase must override error()"
", but HTMLParser does not"
)
if attrs.get('rel') == 'alternate':
type = attrs.get('type')
if type == 'application/asciicast+json' or type == 'application/x-asciicast':
self.url = attrs.get('href')
def handle_starttag(self, tag: str, attrs: List[Any]) -> None:
# look for <link rel="alternate"
# type="application/x-asciicast"
# href="https://...cast">
if tag == "link":
# avoid modifying function signature keyword args from base class
_attrs = {}
for k, v in attrs:
_attrs[k] = v
if _attrs.get("rel") == "alternate":
type_ = _attrs.get("type")
if type_ in (
"application/asciicast+json",
"application/x-asciicast",
):
self.url = _attrs.get("href")
def open_url(url):
def open_url(url: str) -> Union[StreamReader, TextIO]:
if url == "-":
return sys.stdin
if url.startswith("ipfs://"):
url = "https://ipfs.io/ipfs/%s" % url[7:]
url = f"https://ipfs.io/ipfs/{url[7:]}"
elif url.startswith("dweb:/ipfs/"):
url = "https://ipfs.io/%s" % url[5:]
url = f"https://ipfs.io/{url[5:]}"
if url.startswith("http:") or url.startswith("https:"):
req = Request(url)
req.add_header('Accept-Encoding', 'gzip')
response = urlopen(req)
body = response
url = response.geturl() # final URL after redirects
req.add_header("Accept-Encoding", "gzip")
with urlopen(req) as response:
body = response
url = response.geturl() # final URL after redirects
if response.headers['Content-Encoding'] == 'gzip':
body = gzip.open(body)
if response.headers["Content-Encoding"] == "gzip":
body = gzip.open(body)
utf8_reader = codecs.getreader('utf-8')
content_type = response.headers['Content-Type']
utf8_reader = codecs.getreader("utf-8")
content_type = response.headers["Content-Type"]
if content_type and content_type.startswith('text/html'):
html = utf8_reader(body, errors='replace').read()
parser = Parser()
parser.feed(html)
new_url = parser.url
if content_type and content_type.startswith("text/html"):
html = utf8_reader(body, errors="replace").read()
parser = Parser()
parser.feed(html)
new_url = parser.url
if not new_url:
raise LoadError("""<link rel="alternate" type="application/x-asciicast" href="..."> not found in fetched HTML document""")
if not new_url:
raise LoadError(
'<link rel="alternate" '
'type="application/x-asciicast" '
'href="..."> '
"not found in fetched HTML document"
)
if "://" not in new_url:
base_url = urlparse(url)
if "://" not in new_url:
base_url = urlparse(url)
if new_url.startswith("/"):
new_url = urlunparse((base_url[0], base_url[1], new_url, '', '', ''))
else:
path = os.path.dirname(base_url[2]) + '/' + new_url
new_url = urlunparse((base_url[0], base_url[1], path, '', '', ''))
if new_url.startswith("/"):
new_url = urlunparse(
(base_url[0], base_url[1], new_url, "", "", "")
)
else:
path = f"{os.path.dirname(base_url[2])}/{new_url}"
new_url = urlunparse(
(base_url[0], base_url[1], path, "", "", "")
)
return open_url(new_url)
return open_url(new_url)
return utf8_reader(body, errors='strict')
return utf8_reader(body, errors="strict")
return open(url, mode='rt', encoding='utf-8')
return open(url, mode="rt", encoding="utf-8")
class open_from_url():
class open_from_url:
FORMAT_ERROR = "only asciicast v1 and v2 formats can be opened"
def __init__(self, url):
def __init__(self, url: str) -> None:
self.url = url
self.file: Union[StreamReader, TextIO, None] = None
self.context: Any = None
def __enter__(self):
def __enter__(self) -> Any:
try:
self.file = open_url(self.url)
first_line = self.file.readline()
@@ -98,11 +122,13 @@ class open_from_url():
try: # try v1 next
self.context = v1.open_from_file(first_line, self.file)
return self.context.__enter__()
except v1.LoadError:
raise LoadError(self.FORMAT_ERROR)
except v1.LoadError as e:
raise LoadError(self.FORMAT_ERROR) from e
except (OSError, urllib.error.HTTPError) as e:
raise LoadError(str(e))
raise LoadError(str(e)) from e
def __exit__(self, exc_type, exc_value, exc_traceback):
def __exit__(
self, exc_type: str, exc_value: str, exc_traceback: str
) -> None:
self.context.__exit__(exc_type, exc_value, exc_traceback)

View File

@@ -1,28 +1,41 @@
def to_relative_time(events):
from typing import Any, Generator, List, Optional
def to_relative_time(
events: Generator[List[Any], None, None]
) -> Generator[List[Any], None, None]:
prev_time = 0
for frame in events:
time, type, data = frame
time, type_, data = frame
delay = time - prev_time
prev_time = time
yield [delay, type, data]
yield [delay, type_, data]
def to_absolute_time(events):
def to_absolute_time(
events: Generator[List[Any], None, None]
) -> Generator[List[Any], None, None]:
time = 0
for frame in events:
delay, type, data = frame
delay, type_, data = frame
time = time + delay
yield [time, type, data]
yield [time, type_, data]
def cap_relative_time(events, time_limit):
def cap_relative_time(
events: Generator[List[Any], None, None], time_limit: Optional[float]
) -> Generator[List[Any], None, None]:
if time_limit:
return ([min(delay, time_limit), type, data] for delay, type, data in events)
else:
return events
return (
[min(delay, time_limit), type_, data]
for delay, type_, data in events
)
return events
def adjust_speed(events, speed):
return ([delay / speed, type, data] for delay, type, data in events)
def adjust_speed(
events: Generator[List[Any], None, None], speed: Any
) -> Generator[List[Any], None, None]:
return ([delay / speed, type_, data] for delay, type_, data in events)

View File

@@ -1,25 +1,40 @@
import os
from os import path, stat
from typing import IO, Any, Optional
class writer():
def __init__(self, path, metadata=None, append=False, buffering=0):
if append and os.path.exists(path) and os.stat(path).st_size == 0: # true for pipes
class writer:
def __init__(
self,
path_: str,
metadata: Any = None,
append: bool = False,
buffering: int = 0,
) -> None:
if (
append and path.exists(path_) and stat(path_).st_size == 0
): # true for pipes
append = False
self.path = path
self.path = path_
self.buffering = buffering
self.mode = 'ab' if append else 'wb'
self.mode: str = "ab" if append else "wb"
self.file: Optional[IO[Any]] = None
self.metadata = metadata
def __enter__(self):
def __enter__(self) -> Any:
self.file = open(self.path, mode=self.mode, buffering=self.buffering)
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
def __exit__(
self, exc_type: str, exc_value: str, exc_traceback: str
) -> None:
assert self.file is not None
self.file.close()
def write_stdout(self, ts, data):
def write_stdout(self, _ts: float, data: Any) -> None:
assert self.file is not None
self.file.write(data)
def write_stdin(self, ts, data):
# pylint: disable=no-self-use
def write_stdin(self, ts: float, data: Any) -> None:
pass

View File

@@ -1,13 +1,9 @@
import json
import json.decoder
from codecs import StreamReader
from json.decoder import JSONDecodeError
from typing import Any, Dict, Generator, List, Optional, TextIO, Union
from asciinema.asciicast.events import to_absolute_time
try:
JSONDecodeError = json.decoder.JSONDecodeError
except AttributeError:
JSONDecodeError = ValueError
from .events import to_absolute_time
class LoadError(Exception):
@@ -15,46 +11,52 @@ class LoadError(Exception):
class Asciicast:
def __init__(self, attrs):
self.version = 1
def __init__(self, attrs: Dict[str, Any]) -> None:
self.version: int = 1
self.__attrs = attrs
self.idle_time_limit = None # v1 doesn't store it
@property
def v2_header(self):
keys = ['width', 'height', 'duration', 'command', 'title', 'env']
header = {k: v for k, v in self.__attrs.items() if k in keys and v is not None}
def v2_header(self) -> Dict[str, Any]:
keys = ["width", "height", "duration", "command", "title", "env"]
header = {
k: v
for k, v in self.__attrs.items()
if k in keys and v is not None
}
return header
def __stdout_events(self):
for time, data in self.__attrs['stdout']:
yield [time, 'o', data]
def __stdout_events(self) -> Generator[List[Any], None, None]:
for time, data in self.__attrs["stdout"]:
yield [time, "o", data]
def events(self):
def events(self) -> Any:
return self.stdout_events()
def stdout_events(self):
def stdout_events(self) -> Generator[List[Any], None, None]:
return to_absolute_time(self.__stdout_events())
class open_from_file():
FORMAT_ERROR = "only asciicast v1 format can be opened"
class open_from_file:
FORMAT_ERROR: str = "only asciicast v1 format can be opened"
def __init__(self, first_line, file):
def __init__(
self, first_line: str, file: Union[TextIO, StreamReader]
) -> None:
self.first_line = first_line
self.file = file
def __enter__(self):
def __enter__(self) -> Optional[Asciicast]:
try:
attrs = json.loads(self.first_line + self.file.read())
if attrs.get('version') == 1:
if attrs.get("version") == 1:
return Asciicast(attrs)
else:
raise LoadError(self.FORMAT_ERROR)
except JSONDecodeError as e:
raise LoadError(self.FORMAT_ERROR)
except JSONDecodeError as e:
raise LoadError(self.FORMAT_ERROR) from e
def __exit__(self, exc_type, exc_value, exc_traceback):
def __exit__(
self, exc_type: str, exc_value: str, exc_traceback: str
) -> None:
self.file.close()

View File

@@ -1,12 +1,9 @@
import json
import json.decoder
import time
import codecs
try:
JSONDecodeError = json.decoder.JSONDecodeError
except AttributeError:
JSONDecodeError = ValueError
import json
from codecs import StreamReader
from io import IOBase
from json.decoder import JSONDecodeError
from typing import IO, Any, Dict, Generator, List, Optional, TextIO, Union
class LoadError(Exception):
@@ -14,113 +11,141 @@ class LoadError(Exception):
class Asciicast:
def __init__(self, f, header):
self.version = 2
def __init__(
self, f: Union[TextIO, StreamReader], header: Dict[str, Any]
) -> None:
self.version: int = 2
self.__file = f
self.v2_header = header
self.idle_time_limit = header.get('idle_time_limit')
self.idle_time_limit = header.get("idle_time_limit")
def events(self):
def events(self) -> Generator[Any, None, None]:
for line in self.__file:
yield json.loads(line)
def stdout_events(self):
for time, type, data in self.events():
if type == 'o':
yield [time, type, data]
def stdout_events(self) -> Generator[List[Any], None, None]:
for time, type_, data in self.events():
if type_ == "o":
yield [time, type_, data]
def build_from_header_and_file(header, f):
def build_from_header_and_file(
header: Dict[str, Any], f: Union[StreamReader, TextIO]
) -> Asciicast:
return Asciicast(f, header)
class open_from_file():
class open_from_file:
FORMAT_ERROR = "only asciicast v2 format can be opened"
def __init__(self, first_line, file):
def __init__(
self, first_line: str, file: Union[StreamReader, TextIO]
) -> None:
self.first_line = first_line
self.file = file
def __enter__(self):
def __enter__(self) -> Asciicast:
try:
v2_header = json.loads(self.first_line)
if v2_header.get('version') == 2:
v2_header: Dict[str, Any] = json.loads(self.first_line)
if v2_header.get("version") == 2:
return build_from_header_and_file(v2_header, self.file)
else:
raise LoadError(self.FORMAT_ERROR)
except JSONDecodeError as e:
raise LoadError(self.FORMAT_ERROR)
except JSONDecodeError as e:
raise LoadError(self.FORMAT_ERROR) from e
def __exit__(self, exc_type, exc_value, exc_traceback):
def __exit__(
self, exc_type: str, exc_value: str, exc_traceback: str
) -> None:
self.file.close()
def get_duration(path):
with open(path, mode='rt', encoding='utf-8') as f:
def get_duration(path_: str) -> Any:
with open(path_, mode="rt", encoding="utf-8") as f:
first_line = f.readline()
with open_from_file(first_line, f) as a:
last_frame = None
for last_frame in a.stdout_events():
pass
return last_frame[0]
def build_header(width, height, metadata):
header = {'version': 2, 'width': width, 'height': height}
def build_header(
width: Optional[int], height: Optional[int], metadata: Any
) -> Dict[str, Any]:
header = {"version": 2, "width": width, "height": height}
header.update(metadata)
assert 'width' in header, 'width missing in metadata'
assert 'height' in header, 'height missing in metadata'
assert type(header['width']) == int
assert type(header['height']) == int
assert "width" in header, "width missing in metadata"
assert "height" in header, "height missing in metadata"
assert isinstance(header["width"], int)
assert isinstance(header["height"], int)
if 'timestamp' in header:
assert type(header['timestamp']) == int or type(header['timestamp']) == float
if "timestamp" in header:
assert isinstance(header["timestamp"], (int, float))
return header
class writer():
def __init__(self, path, metadata=None, append=False, buffering=1, width=None, height=None):
class writer:
def __init__( # pylint: disable=too-many-arguments
self,
path: str,
metadata: Any = None,
append: bool = False,
buffering: int = 1,
width: Optional[int] = None,
height: Optional[int] = None,
) -> None:
self.path = path
self.buffering = buffering
self.stdin_decoder = codecs.getincrementaldecoder('UTF-8')('replace')
self.stdout_decoder = codecs.getincrementaldecoder('UTF-8')('replace')
self.stdin_decoder = codecs.getincrementaldecoder("UTF-8")("replace")
self.stdout_decoder = codecs.getincrementaldecoder("UTF-8")("replace")
self.file: Optional[IO[Any]] = None
if append:
self.mode = 'a'
self.mode = "a"
self.header = None
else:
self.mode = 'w'
self.mode = "w"
self.header = build_header(width, height, metadata or {})
def __enter__(self):
self.file = open(self.path, mode=self.mode, buffering=self.buffering)
def __enter__(self) -> Any:
self.file = open(
self.path,
mode=self.mode,
buffering=self.buffering,
encoding="utf-8",
)
if self.header:
self.__write_line(self.header)
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
def __exit__(
self, exc_type: str, exc_value: str, exc_traceback: str
) -> None:
assert isinstance(self.file, IOBase)
self.file.close()
def write_stdout(self, ts, data):
if type(data) == str:
data = data.encode(encoding='utf-8', errors='strict')
def write_stdout(self, ts: float, data: Union[str, bytes]) -> None:
if isinstance(data, str):
data = data.encode(encoding="utf-8", errors="strict")
data = self.stdout_decoder.decode(data)
self.__write_event(ts, 'o', data)
self.__write_event(ts, "o", data)
def write_stdin(self, ts, data):
if type(data) == str:
data = data.encode(encoding='utf-8', errors='strict')
def write_stdin(self, ts: float, data: Union[str, bytes]) -> None:
if isinstance(data, str):
data = data.encode(encoding="utf-8", errors="strict")
data = self.stdin_decoder.decode(data)
self.__write_event(ts, 'i', data)
self.__write_event(ts, "i", data)
def __write_event(self, ts, etype, data):
def __write_event(self, ts: float, etype: str, data: str) -> None:
self.__write_line([round(ts, 6), etype, data])
def __write_line(self, obj):
line = json.dumps(obj, ensure_ascii=False, indent=None, separators=(', ', ': '))
self.file.write(line + '\n')
def __write_line(self, obj: Any) -> None:
line = json.dumps(
obj, ensure_ascii=False, indent=None, separators=(", ", ": ")
)
assert isinstance(self.file, IOBase)
self.file.write(f"{line}\n")

View File

@@ -1,31 +1,41 @@
from typing import Any, Optional
try:
# Importing synchronize is to detect platforms where
# multiprocessing does not work (python issue 3770)
# and cause an ImportError. Otherwise it will happen
# later when trying to use Queue().
from multiprocessing import synchronize, Process, Queue
from multiprocessing import Process, Queue, synchronize
# pylint: disable=pointless-statement
lambda _=synchronize: None # avoid pruning import
except ImportError:
from threading import Thread as Process
from queue import Queue
from queue import Queue # type: ignore
from threading import Thread as Process # type: ignore
class async_worker():
class async_worker:
def __init__(self) -> None:
self.queue: Queue[Any] = Queue()
self.process: Optional[Process] = None
def __init__(self):
self.queue = Queue()
def __enter__(self):
def __enter__(self) -> Any:
self.process = Process(target=self.run)
self.process.start()
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
def __exit__(
self, exc_type: str, exc_value: str, exc_traceback: str
) -> None:
self.queue.put(None)
assert isinstance(self.process, Process)
self.process.join()
def enqueue(self, payload):
def enqueue(self, payload: Any) -> None:
self.queue.put(payload)
def run(self):
def run(self) -> None:
payload: Any
for payload in iter(self.queue.get, None):
self.perform(payload)
# pylint: disable=no-member
self.perform(payload) # type: ignore[attr-defined]

View File

@@ -1,16 +1,20 @@
from asciinema.commands.command import Command
from typing import Any, Dict
from ..config import Config
from .command import Command
class AuthCommand(Command):
def __init__(self, args, config, env):
def __init__(self, args: Any, config: Config, env: Dict[str, str]) -> None:
Command.__init__(self, args, config, env)
def execute(self):
self.print('Open the following URL in a web browser to link your '
'install ID with your %s user account:\n\n'
'%s\n\n'
'This will associate all recordings uploaded from this machine '
'(past and future ones) to your account, '
'and allow you to manage them (change title/theme, delete) at %s.'
% (self.api.hostname(), self.api.auth_url(), self.api.hostname()))
def execute(self) -> None:
self.print(
f"Open the following URL in a web browser to link your install ID "
f"with your {self.api.hostname()} user account:\n\n"
f"{self.api.auth_url()}\n\n"
"This will associate all recordings uploaded from this machine "
"(past and future ones) to your account"
", and allow you to manage them (change title/theme, delete) at "
f"{self.api.hostname()}."
)

View File

@@ -1,27 +1,28 @@
import sys
from typing import Any, Dict
from asciinema.commands.command import Command
from asciinema.term import raw
import asciinema.asciicast as asciicast
from .. import asciicast
from ..config import Config
from ..term import raw
from .command import Command
class CatCommand(Command):
def __init__(self, args, config, env):
def __init__(self, args: Any, config: Config, env: Dict[str, str]):
Command.__init__(self, args, config, env)
self.filename = args.filename
def execute(self):
def execute(self) -> int:
try:
stdin = open('/dev/tty')
with raw(stdin.fileno()):
with asciicast.open_from_url(self.filename) as a:
for t, _type, text in a.stdout_events():
sys.stdout.write(text)
sys.stdout.flush()
with open("/dev/tty", "rt", encoding="utf-8") as stdin:
with raw(stdin.fileno()):
with asciicast.open_from_url(self.filename) as a:
for _, _type, text in a.stdout_events():
sys.stdout.write(text)
sys.stdout.flush()
except asciicast.LoadError as e:
self.print_error("printing failed: %s" % str(e))
self.print_error(f"printing failed: {str(e)}")
return 1
return 0

View File

@@ -1,23 +1,32 @@
import sys
from typing import Any, Dict, TextIO
from asciinema.api import Api
from ..api import Api
from ..config import Config
class Command:
def __init__(self, args, config, env):
self.quiet = False
def __init__(self, _args: Any, config: Config, env: Dict[str, str]):
self.quiet: bool = False
self.api = Api(config.api_url, env.get("USER"), config.install_id)
def print(self, text, file=sys.stdout, end="\n", force=False):
def print(
self,
text: str,
file_: TextIO = sys.stdout,
end: str = "\n",
force: bool = False,
) -> None:
if not self.quiet or force:
print(text, file=file, end=end)
print(text, file=file_, end=end)
def print_info(self, text):
self.print("\x1b[0;32masciinema: %s\x1b[0m" % text)
def print_info(self, text: str) -> None:
self.print(f"\x1b[0;32masciinema: {text}\x1b[0m")
def print_warning(self, text):
self.print("\x1b[0;33masciinema: %s\x1b[0m" % text)
def print_warning(self, text: str) -> None:
self.print(f"\x1b[0;33masciinema: {text}\x1b[0m")
def print_error(self, text):
self.print("\x1b[0;31masciinema: %s\x1b[0m" % text, file=sys.stderr, force=True)
def print_error(self, text: str) -> None:
self.print(
f"\x1b[0;31masciinema: {text}\x1b[0m", file_=sys.stderr, force=True
)

View File

@@ -1,28 +1,38 @@
from asciinema.commands.command import Command
from asciinema.player import Player
import asciinema.asciicast as asciicast
from typing import Any, Dict, Optional
from .. import asciicast
from ..commands.command import Command
from ..config import Config
from ..player import Player
class PlayCommand(Command):
def __init__(self, args, config, env, player=None):
def __init__(
self,
args: Any,
config: Config,
env: Dict[str, str],
player: Optional[Player] = None,
) -> None:
Command.__init__(self, args, config, env)
self.filename = args.filename
self.idle_time_limit = args.idle_time_limit
self.speed = args.speed
self.player = player if player is not None else Player()
self.key_bindings = {
'pause': config.play_pause_key,
'step': config.play_step_key
"pause": config.play_pause_key,
"step": config.play_step_key,
}
def execute(self):
def execute(self) -> int:
try:
with asciicast.open_from_url(self.filename) as a:
self.player.play(a, self.idle_time_limit, self.speed, self.key_bindings)
self.player.play(
a, self.idle_time_limit, self.speed, self.key_bindings
)
except asciicast.LoadError as e:
self.print_error("playback failed: %s" % str(e))
self.print_error(f"playback failed: {str(e)}")
return 1
except KeyboardInterrupt:
return 1

View File

@@ -1,18 +1,17 @@
import os
import sys
import tempfile
from tempfile import NamedTemporaryFile
from typing import Any, Dict, Optional
import asciinema.recorder as recorder
import asciinema.asciicast.raw as raw
import asciinema.asciicast.v2 as v2
import asciinema.notifier as notifier
from asciinema.api import APIError
from asciinema.commands.command import Command
from .. import notifier, recorder
from ..api import APIError
from ..asciicast import raw, v2
from ..commands.command import Command
from ..config import Config
class RecordCommand(Command):
def __init__(self, args, config, env):
class RecordCommand(Command): # pylint: disable=too-many-instance-attributes
def __init__(self, args: Any, config: Config, env: Dict[str, str]) -> None:
Command.__init__(self, args, config, env)
self.quiet = args.quiet
self.filename = args.filename
@@ -26,28 +25,34 @@ class RecordCommand(Command):
self.overwrite = args.overwrite
self.raw = args.raw
self.writer = raw.writer if args.raw else v2.writer
self.notifier = notifier.get_notifier(config.notifications_enabled, config.notifications_command)
self.notifier = notifier.get_notifier(
config.notifications_enabled, config.notifications_command
)
self.env = env
self.key_bindings = {
'prefix': config.record_prefix_key,
'pause': config.record_pause_key
"prefix": config.record_prefix_key,
"pause": config.record_pause_key,
}
def execute(self):
# pylint: disable=too-many-branches
# pylint: disable=too-many-return-statements
# pylint: disable=too-many-statements
def execute(self) -> int:
upload = False
append = self.append
if self.filename == "":
if self.raw:
self.print_error("filename required when recording in raw mode")
self.print_error(
"filename required when recording in raw mode"
)
return 1
else:
self.filename = _tmp_path()
upload = True
self.filename = _tmp_path()
upload = True
if os.path.exists(self.filename):
if not os.access(self.filename, os.W_OK):
self.print_error("can't write to %s" % self.filename)
self.print_error(f"can't write to {self.filename}")
return 1
if os.stat(self.filename).st_size > 0 and self.overwrite:
@@ -55,25 +60,41 @@ class RecordCommand(Command):
append = False
elif os.stat(self.filename).st_size > 0 and not append:
self.print_error("%s already exists, aborting" % self.filename)
self.print_error("use --overwrite option if you want to overwrite existing recording")
self.print_error("use --append option if you want to append to existing recording")
self.print_error(f"{self.filename} already exists, aborting")
self.print_error(
"use --overwrite option "
"if you want to overwrite existing recording"
)
self.print_error(
"use --append option "
"if you want to append to existing recording"
)
return 1
elif append:
self.print_warning("%s does not exist, not appending" % self.filename)
self.print_warning(
f"{self.filename} does not exist, not appending"
)
append = False
if append:
self.print_info("appending to asciicast at %s" % self.filename)
self.print_info(f"appending to asciicast at {self.filename}")
else:
self.print_info("recording asciicast to %s" % self.filename)
self.print_info(f"recording asciicast to {self.filename}")
if self.command:
self.print_info("""exit opened program when you're done""")
else:
self.print_info("""press <ctrl-d> or type "exit" when you're done""")
self.print_info(
"""press <ctrl-d> or type "exit" when you're done"""
)
vars = filter(None, map((lambda var: var.strip()), self.env_whitelist.split(',')))
vars_: Any = filter(
None,
map(
(lambda var: var.strip()), # type: ignore
self.env_whitelist.split(","),
),
)
try:
recorder.record(
@@ -83,27 +104,31 @@ class RecordCommand(Command):
title=self.title,
idle_time_limit=self.idle_time_limit,
command_env=self.env,
capture_env=vars,
capture_env=vars_,
rec_stdin=self.rec_stdin,
writer=self.writer,
notifier=self.notifier,
key_bindings=self.key_bindings
key_bindings=self.key_bindings,
)
except v2.LoadError:
self.print_error("can only append to asciicast v2 format recordings")
self.print_error(
"can only append to asciicast v2 format recordings"
)
return 1
self.print_info("recording finished")
if upload:
if not self.assume_yes:
self.print_info("press <enter> to upload to %s, <ctrl-c> to save locally"
% self.api.hostname())
self.print_info(
f"press <enter> to upload to {self.api.hostname()}"
", <ctrl-c> to save locally"
)
try:
sys.stdin.readline()
except KeyboardInterrupt:
self.print("\r", end="")
self.print_info("asciicast saved to %s" % self.filename)
self.print_info(f"asciicast saved to {self.filename}")
return 0
try:
@@ -113,20 +138,20 @@ class RecordCommand(Command):
self.print_warning(warn)
os.remove(self.filename)
self.print(result.get('message') or result['url'])
self.print(result.get("message") or result["url"])
except APIError as e:
self.print("\r\x1b[A", end="")
self.print_error("upload failed: %s" % str(e))
self.print_error("retry later by running: asciinema upload %s" % self.filename)
self.print_error(f"upload failed: {str(e)}")
self.print_error(
f"retry later by running: asciinema upload {self.filename}"
)
return 1
else:
self.print_info("asciicast saved to %s" % self.filename)
self.print_info(f"asciicast saved to {self.filename}")
return 0
def _tmp_path():
fd, path = tempfile.mkstemp(suffix='-ascii.cast')
os.close(fd)
return path
def _tmp_path() -> Optional[str]:
return NamedTemporaryFile(suffix="-ascii.cast", delete=False).name

View File

@@ -1,29 +1,33 @@
from asciinema.commands.command import Command
from asciinema.api import APIError
from typing import Any
from ..api import APIError
from ..config import Config
from .command import Command
class UploadCommand(Command):
def __init__(self, args, config, env):
def __init__(self, args: Any, config: Config, env: Any) -> None:
Command.__init__(self, args, config, env)
self.filename = args.filename
def execute(self):
def execute(self) -> int:
try:
result, warn = self.api.upload_asciicast(self.filename)
if warn:
self.print_warning(warn)
self.print(result.get('message') or result['url'])
self.print(result.get("message") or result["url"])
except OSError as e:
self.print_error("upload failed: %s" % str(e))
self.print_error(f"upload failed: {str(e)}")
return 1
except APIError as e:
self.print_error("upload failed: %s" % str(e))
self.print_error("retry later by running: asciinema upload %s" % self.filename)
self.print_error(f"upload failed: {str(e)}")
self.print_error(
f"retry later by running: asciinema upload {self.filename}"
)
return 1
return 0

View File

@@ -1,168 +1,198 @@
import os
import os.path as path
import sys
import uuid
import configparser
import os
from os import path
from typing import Any, Dict, Optional
from uuid import uuid4
DEFAULT_API_URL: str = "https://asciinema.org"
DEFAULT_RECORD_ENV: str = "SHELL,TERM"
class ConfigError(Exception):
pass
DEFAULT_API_URL = 'https://asciinema.org'
DEFAULT_RECORD_ENV = 'SHELL,TERM'
class Config:
def __init__(self, config_home, env=None):
def __init__(
self,
config_home: Any,
env: Optional[Dict[str, str]] = None,
) -> None:
self.config_home = config_home
self.config_file_path = path.join(config_home, "config")
self.install_id_path = path.join(self.config_home, 'install-id')
self.install_id_path = path.join(self.config_home, "install-id")
self.config = configparser.ConfigParser()
self.config.read(self.config_file_path)
self.env = env if env is not None else os.environ
def upgrade(self):
def upgrade(self) -> None:
try:
self.install_id
except ConfigError:
id = self.__api_token() or self.__user_token() or self.__gen_install_id()
self.__save_install_id(id)
id_ = (
self.__api_token()
or self.__user_token()
or self.__gen_install_id()
)
self.__save_install_id(id_)
items = {name: dict(section) for (name, section) in self.config.items()}
if items == {'DEFAULT': {}, 'api': {'token': id}} or items == {'DEFAULT': {}, 'user': {'token': id}}:
items = {
name: dict(section) for (name, section) in self.config.items()
}
if items in (
{"DEFAULT": {}, "api": {"token": id_}},
{"DEFAULT": {}, "user": {"token": id_}},
):
os.remove(self.config_file_path)
if self.env.get('ASCIINEMA_API_TOKEN'):
raise ConfigError('ASCIINEMA_API_TOKEN variable is no longer supported, please use ASCIINEMA_INSTALL_ID instead')
if self.env.get("ASCIINEMA_API_TOKEN"):
raise ConfigError(
"ASCIINEMA_API_TOKEN variable is no longer supported"
", please use ASCIINEMA_INSTALL_ID instead"
)
def __read_install_id(self):
def __read_install_id(self) -> Optional[str]:
p = self.install_id_path
if path.isfile(p):
with open(p, 'r') as f:
with open(p, "r", encoding="utf-8") as f:
return f.read().strip()
return None
def __gen_install_id(self):
return str(uuid.uuid4())
@staticmethod
def __gen_install_id() -> str:
return f"{uuid4()}"
def __save_install_id(self, id):
def __save_install_id(self, id_: str) -> None:
self.__create_config_home()
with open(self.install_id_path, 'w') as f:
f.write(id)
with open(self.install_id_path, "w", encoding="utf-8") as f:
f.write(id_)
def __create_config_home(self):
def __create_config_home(self) -> None:
if not path.exists(self.config_home):
os.makedirs(self.config_home)
def __api_token(self):
def __api_token(self) -> Optional[str]:
try:
return self.config.get('api', 'token')
return self.config.get("api", "token")
except (configparser.NoOptionError, configparser.NoSectionError):
pass
return None
def __user_token(self):
def __user_token(self) -> Optional[str]:
try:
return self.config.get('user', 'token')
return self.config.get("user", "token")
except (configparser.NoOptionError, configparser.NoSectionError):
pass
return None
@property
def install_id(self):
id = self.env.get('ASCIINEMA_INSTALL_ID') or self.__read_install_id()
def install_id(self) -> str:
id_ = self.env.get("ASCIINEMA_INSTALL_ID") or self.__read_install_id()
if id:
return id
else:
raise ConfigError('no install ID found')
if id_:
return id_
raise ConfigError("no install ID found")
@property
def api_url(self):
def api_url(self) -> str:
return self.env.get(
'ASCIINEMA_API_URL',
self.config.get('api', 'url', fallback=DEFAULT_API_URL)
"ASCIINEMA_API_URL",
self.config.get("api", "url", fallback=DEFAULT_API_URL),
)
@property
def record_stdin(self):
return self.config.getboolean('record', 'stdin', fallback=False)
def record_stdin(self) -> bool:
return self.config.getboolean("record", "stdin", fallback=False)
@property
def record_command(self):
return self.config.get('record', 'command', fallback=None)
def record_command(self) -> Optional[str]:
return self.config.get("record", "command", fallback=None)
@property
def record_env(self):
return self.config.get('record', 'env', fallback=DEFAULT_RECORD_ENV)
def record_env(self) -> str:
return self.config.get("record", "env", fallback=DEFAULT_RECORD_ENV)
@property
def record_idle_time_limit(self):
fallback = self.config.getfloat('record', 'maxwait', fallback=None) # pre 2.0
return self.config.getfloat('record', 'idle_time_limit', fallback=fallback)
def record_idle_time_limit(self) -> Optional[float]:
fallback = self.config.getfloat(
"record", "maxwait", fallback=None
) # pre 2.0
return self.config.getfloat(
"record", "idle_time_limit", fallback=fallback
)
@property
def record_yes(self):
return self.config.getboolean('record', 'yes', fallback=False)
def record_yes(self) -> bool:
return self.config.getboolean("record", "yes", fallback=False)
@property
def record_quiet(self):
return self.config.getboolean('record', 'quiet', fallback=False)
def record_quiet(self) -> bool:
return self.config.getboolean("record", "quiet", fallback=False)
@property
def record_prefix_key(self):
return self.__get_key('record', 'prefix')
def record_prefix_key(self) -> Any:
return self.__get_key("record", "prefix")
@property
def record_pause_key(self):
return self.__get_key('record', 'pause', 'C-\\')
def record_pause_key(self) -> Any:
return self.__get_key("record", "pause", "C-\\")
@property
def play_idle_time_limit(self):
fallback = self.config.getfloat('play', 'maxwait', fallback=None) # pre 2.0
return self.config.getfloat('play', 'idle_time_limit', fallback=fallback)
def play_idle_time_limit(self) -> Optional[float]:
fallback = self.config.getfloat(
"play", "maxwait", fallback=None
) # pre 2.0
return self.config.getfloat(
"play", "idle_time_limit", fallback=fallback
)
@property
def play_speed(self):
return self.config.getfloat('play', 'speed', fallback=1.0)
def play_speed(self) -> float:
return self.config.getfloat("play", "speed", fallback=1.0)
@property
def play_pause_key(self):
return self.__get_key('play', 'pause', ' ')
def play_pause_key(self) -> Any:
return self.__get_key("play", "pause", " ")
@property
def play_step_key(self):
return self.__get_key('play', 'step', '.')
def play_step_key(self) -> Any:
return self.__get_key("play", "step", ".")
@property
def notifications_enabled(self):
return self.config.getboolean('notifications', 'enabled', fallback=True)
def notifications_enabled(self) -> bool:
return self.config.getboolean(
"notifications", "enabled", fallback=True
)
@property
def notifications_command(self):
return self.config.get('notifications', 'command', fallback=None)
def notifications_command(self) -> Optional[str]:
return self.config.get("notifications", "command", fallback=None)
def __get_key(self, section, name, default=None):
key = self.config.get(section, name + '_key', fallback=default)
def __get_key(self, section: str, name: str, default: Any = None) -> Any:
key = self.config.get(section, f"{name}_key", fallback=default)
if key:
if len(key) == 3:
upper_key = key.upper()
if upper_key[0] == 'C' and upper_key[1] == '-':
if upper_key[0] == "C" and upper_key[1] == "-":
return bytes([ord(upper_key[2]) - 0x40])
else:
raise ConfigError('invalid {name} key definition \'{key}\' - use: {name}_key = C-x (with control key modifier), or {name}_key = x (with no modifier)'.format(name=name, key=key))
else:
return key.encode('utf-8')
raise ConfigError(
f"invalid {name} key definition '{key}' - use"
f": {name}_key = C-x (with control key modifier)"
f", or {name}_key = x (with no modifier)"
)
return key.encode("utf-8")
return None
def get_config_home(env=os.environ):
def get_config_home(env: Any = None) -> Any:
if env is None:
env = os.environ
env_asciinema_config_home = env.get("ASCIINEMA_CONFIG_HOME")
env_xdg_config_home = env.get("XDG_CONFIG_HOME")
env_home = env.get("HOME")
config_home = None
config_home: Optional[str] = None
if env_asciinema_config_home:
config_home = env_asciinema_config_home
@@ -175,12 +205,16 @@ def get_config_home(env=os.environ):
else:
config_home = path.join(env_home, ".config", "asciinema")
else:
raise Exception("need $HOME or $XDG_CONFIG_HOME or $ASCIINEMA_CONFIG_HOME")
raise Exception(
"need $HOME or $XDG_CONFIG_HOME or $ASCIINEMA_CONFIG_HOME"
)
return config_home
def load(env=os.environ):
def load(env: Any = None) -> Config:
if env is None:
env = os.environ
config = Config(get_config_home(env), env)
config.upgrade()
return config

View File

@@ -1,83 +1,121 @@
import os.path
import shutil
import subprocess
from os import environ, path
from typing import Dict, List, Optional, Union
class Notifier():
def is_available(self):
class Notifier:
def __init__(self, cmd: str) -> None:
self.cmd = cmd
@staticmethod
def get_icon_path() -> Optional[str]:
path_ = path.join(
path.dirname(path.realpath(__file__)),
"data/icon-256x256.png",
)
if path.exists(path_):
return path_
return None
def args(self, _text: str) -> List[str]:
return ["/bin/sh", "-c", self.cmd]
def is_available(self) -> bool:
return shutil.which(self.cmd) is not None
def notify(self, text):
subprocess.run(self.args(text), capture_output=True)
# we don't want to print *ANYTHING* to the terminal
def notify(self, text: str) -> None:
# We do not want to raise a `CalledProcessError` on command failure.
# pylint: disable=subprocess-run-check
# We do not want to print *ANYTHING* to the terminal
# so we capture and ignore all output
def get_icon_path(self):
path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "data/icon-256x256.png")
if os.path.exists(path):
return path
subprocess.run(self.args(text), capture_output=True)
class AppleScriptNotifier(Notifier):
cmd = "osascript"
def __init__(self) -> None:
super().__init__("osascript")
def args(self, text):
def args(self, text: str) -> List[str]:
text = text.replace('"', '\\"')
return ['osascript', '-e', 'display notification "{}" with title "asciinema"'.format(text)]
return [
self.cmd,
"-e",
f'display notification "{text}" with title "asciinema"',
]
class LibNotifyNotifier(Notifier):
cmd = "notify-send"
def __init__(self) -> None:
super().__init__("notify-send")
def args(self, text):
def args(self, text: str) -> List[str]:
icon_path = self.get_icon_path()
if icon_path is not None:
return ['notify-send', '-i', icon_path, 'asciinema', text]
else:
return ['notify-send', 'asciinema', text]
return [self.cmd, "-i", icon_path, "asciinema", text]
return [self.cmd, "asciinema", text]
class TerminalNotifier(Notifier):
cmd = "terminal-notifier"
def __init__(self) -> None:
super().__init__("terminal-notifier")
def args(self, text):
def args(self, text: str) -> List[str]:
icon_path = self.get_icon_path()
if icon_path is not None:
return ['terminal-notifier', '-title', 'asciinema', '-message', text, '-appIcon', icon_path]
else:
return ['terminal-notifier', '-title', 'asciinema', '-message', text]
return [
"terminal-notifier",
"-title",
"asciinema",
"-message",
text,
"-appIcon",
icon_path,
]
return [
"terminal-notifier",
"-title",
"asciinema",
"-message",
text,
]
class CustomCommandNotifier(Notifier):
def __init__(self, command):
Notifier.__init__(self)
self.command = command
def env(self, text: str) -> Dict[str, str]:
icon_path = self.get_icon_path()
env = environ.copy()
env["TEXT"] = text
if icon_path is not None:
env["ICON_PATH"] = icon_path
return env
def notify(self, text):
args = ['/bin/sh', '-c', self.command]
env = os.environ.copy()
env['TEXT'] = text
env['ICON_PATH'] = self.get_icon_path()
subprocess.run(args, env=env, capture_output=True)
def notify(self, text: str) -> None:
# We do not want to raise a `CalledProcessError` on command failure.
# pylint: disable=subprocess-run-check
subprocess.run(
self.args(text), env=self.env(text), capture_output=True
)
class NoopNotifier():
def notify(self, text):
class NoopNotifier: # pylint: disable=too-few-public-methods
def notify(self) -> None:
pass
def get_notifier(enabled=True, command=None):
def get_notifier(
enabled: bool = True, command: Optional[str] = None
) -> Union[Notifier, NoopNotifier]:
if enabled:
if command:
return CustomCommandNotifier(command)
else:
for c in [TerminalNotifier, AppleScriptNotifier, LibNotifyNotifier]:
n = c()
for c in [TerminalNotifier, AppleScriptNotifier, LibNotifyNotifier]:
n = c()
if n.is_available():
return n
if n.is_available():
return n
return NoopNotifier()

View File

@@ -1,25 +1,43 @@
import os
import sys
import time
from typing import Any, Dict, Optional, TextIO, Union
import asciinema.asciicast.events as ev
from asciinema.term import raw, read_blocking
from .asciicast import events as ev
from .asciicast.v1 import Asciicast as v1
from .asciicast.v2 import Asciicast as v2
from .term import raw, read_blocking
class Player:
def play(self, asciicast, idle_time_limit=None, speed=1.0, key_bindings={}):
class Player: # pylint: disable=too-few-public-methods
def play(
self,
asciicast: Union[v1, v2],
idle_time_limit: Optional[int] = None,
speed: float = 1.0,
key_bindings: Optional[Dict[str, Any]] = None,
) -> None:
if key_bindings is None:
key_bindings = {}
try:
stdin = open('/dev/tty')
with raw(stdin.fileno()):
self._play(asciicast, idle_time_limit, speed, stdin, key_bindings)
except Exception:
with open("/dev/tty", "rt", encoding="utf-8") as stdin:
with raw(stdin.fileno()):
self._play(
asciicast, idle_time_limit, speed, stdin, key_bindings
)
except Exception: # pylint: disable=broad-except
self._play(asciicast, idle_time_limit, speed, None, key_bindings)
def _play(self, asciicast, idle_time_limit, speed, stdin, key_bindings):
@staticmethod
def _play( # pylint: disable=too-many-locals
asciicast: Union[v1, v2],
idle_time_limit: Optional[int],
speed: float,
stdin: Optional[TextIO],
key_bindings: Dict[str, Any],
) -> None:
idle_time_limit = idle_time_limit or asciicast.idle_time_limit
pause_key = key_bindings.get('pause')
step_key = key_bindings.get('step')
pause_key = key_bindings.get("pause")
step_key = key_bindings.get("step")
stdout = asciicast.stdout_events()
stdout = ev.to_relative_time(stdout)
@@ -30,7 +48,7 @@ class Player:
base_time = time.time()
ctrl_c = False
paused = False
pause_time = None
pause_time: Optional[float] = None
for t, _type, text in stdout:
delay = t - (time.time() - base_time)
@@ -46,7 +64,8 @@ class Player:
if data == pause_key:
paused = False
base_time = base_time + (time.time() - pause_time)
assert pause_time is not None
base_time += time.time() - pause_time
break
if data == step_key:

View File

@@ -1,69 +1,82 @@
import array
import errno
import fcntl
import io
import os
import pty
import select
import shlex
import signal
import struct
import sys
import termios
import time
from typing import Any, Dict, List, Optional, Tuple
from asciinema.term import raw
from .term import raw
def record(command, writer, env=os.environ, rec_stdin=False, time_offset=0, notifier=None, key_bindings={}):
master_fd = None
start_time = None
pause_time = None
prefix_mode = False
prefix_key = key_bindings.get('prefix')
pause_key = key_bindings.get('pause')
# pylint: disable=too-many-arguments,too-many-locals,too-many-statements
def record(
command: Any,
writer: Any,
env: Any = None,
rec_stdin: bool = False,
time_offset: float = 0,
notifier: Any = None,
key_bindings: Optional[Dict[str, Any]] = None,
) -> None:
if env is None:
env = os.environ
if key_bindings is None:
key_bindings = {}
master_fd: Any = None
start_time: Optional[float] = None
pause_time: Optional[float] = None
prefix_mode: bool = False
prefix_key = key_bindings.get("prefix")
pause_key = key_bindings.get("pause")
def _notify(text):
def _notify(text: str) -> None:
if notifier:
notifier.notify(text)
def _set_pty_size():
'''
def _set_pty_size() -> None:
"""
Sets the window size of the child pty based on the window size
of our own controlling terminal.
'''
"""
# Get the terminal size of the real terminal, set it on the pseudoterminal.
# 1. Get the terminal size of the real terminal.
# 2. Set the same size on the pseudoterminal.
if os.isatty(pty.STDOUT_FILENO):
buf = array.array('h', [0, 0, 0, 0])
buf = array.array("h", [0, 0, 0, 0])
fcntl.ioctl(pty.STDOUT_FILENO, termios.TIOCGWINSZ, buf, True)
else:
buf = array.array('h', [24, 80, 0, 0])
buf = array.array("h", [24, 80, 0, 0])
fcntl.ioctl(master_fd, termios.TIOCSWINSZ, buf)
def _write_stdout(data):
'''Writes to stdout as if the child process had written the data.'''
def _write_stdout(data: Any) -> None:
"""Writes to stdout as if the child process had written the data."""
os.write(pty.STDOUT_FILENO, data)
def _handle_master_read(data):
'''Handles new data on child process stdout.'''
def _handle_master_read(data: Any) -> None:
"""Handles new data on child process stdout."""
if not pause_time:
assert start_time is not None
writer.write_stdout(time.time() - start_time, data)
_write_stdout(data)
def _write_master(data):
'''Writes to the child process from its controlling terminal.'''
def _write_master(data: Any) -> None:
"""Writes to the child process from its controlling terminal."""
while data:
n = os.write(master_fd, data)
data = data[n:]
def _handle_stdin_read(data):
'''Handles new data on child process stdin.'''
def _handle_stdin_read(data: Any) -> None:
"""Handles new data on child process stdin."""
nonlocal pause_time
nonlocal start_time
@@ -78,44 +91,43 @@ def record(command, writer, env=os.environ, rec_stdin=False, time_offset=0, noti
if data == pause_key:
if pause_time:
start_time = start_time + (time.time() - pause_time)
assert start_time is not None
start_time += time.time() - pause_time
pause_time = None
_notify('Resumed recording')
_notify("Resumed recording")
else:
pause_time = time.time()
_notify('Paused recording')
_notify("Paused recording")
return
_write_master(data)
if rec_stdin and not pause_time:
assert start_time is not None
writer.write_stdin(time.time() - start_time, data)
def _signals(signal_list):
def _signals(signal_list: Any) -> List[Tuple[Any, Any]]:
old_handlers = []
for sig, handler in signal_list:
old_handlers.append((sig, signal.signal(sig, handler)))
return old_handlers
def _copy(signal_fd):
'''Main select loop.
def _copy(signal_fd: int) -> None: # pylint: disable=too-many-branches
"""Main select loop.
Passes control to _master_read() or _stdin_read()
when new data arrives.
'''
"""
fds = [master_fd, pty.STDIN_FILENO, signal_fd]
while True:
try:
rfds, wfds, xfds = select.select(fds, [], [])
rfds, _, _ = select.select(fds, [], [])
except OSError as e: # Python >= 3.3
if e.errno == errno.EINTR:
continue
except select.error as e: # Python < 3.3
if e.args[0] == 4:
continue
if master_fd in rfds:
data = os.read(master_fd, 1024)
@@ -134,12 +146,17 @@ def record(command, writer, env=os.environ, rec_stdin=False, time_offset=0, noti
if signal_fd in rfds:
data = os.read(signal_fd, 1024)
if data:
signals = struct.unpack('%uB' % len(data), data)
signals = struct.unpack(f"{len(data)}B", data)
for sig in signals:
if sig in [signal.SIGCHLD, signal.SIGHUP, signal.SIGTERM, signal.SIGQUIT]:
if sig in [
signal.SIGCHLD,
signal.SIGHUP,
signal.SIGTERM,
signal.SIGQUIT,
]:
os.close(master_fd)
return
elif sig == signal.SIGWINCH:
if sig == signal.SIGWINCH:
_set_pty_size()
pid, master_fd = pty.fork()
@@ -154,12 +171,18 @@ def record(command, writer, env=os.environ, rec_stdin=False, time_offset=0, noti
signal.set_wakeup_fd(pipe_w)
old_handlers = _signals(map(lambda s: (s, lambda signal, frame: None),
[signal.SIGWINCH,
signal.SIGCHLD,
signal.SIGHUP,
signal.SIGTERM,
signal.SIGQUIT]))
old_handlers = _signals(
map(
lambda s: (s, lambda signal, frame: None),
[
signal.SIGWINCH,
signal.SIGCHLD,
signal.SIGHUP,
signal.SIGTERM,
signal.SIGQUIT,
],
)
)
_set_pty_size()

0
asciinema/py.typed Normal file
View File

View File

@@ -1,102 +1,126 @@
import os
import time
from typing import Any, Callable, Dict, Optional, Tuple, Type
import asciinema.asciicast.v2 as v2
import asciinema.pty as pty
import asciinema.term as term
from asciinema.async_worker import async_worker
from . import pty_ as pty # avoid collisions with standard library `pty`
from . import term
from .asciicast import v2
from .asciicast.v2 import writer as w2
from .async_worker import async_worker
def record(path, command=None, append=False, idle_time_limit=None,
rec_stdin=False, title=None, metadata=None, command_env=None,
capture_env=None, writer=v2.writer, record=pty.record, notifier=None,
key_bindings={}):
def record( # pylint: disable=too-many-arguments,too-many-locals
path_: str,
command: Any = None,
append: bool = False,
idle_time_limit: Optional[int] = None,
rec_stdin: bool = False,
title: Optional[str] = None,
metadata: Any = None,
command_env: Optional[Dict[Any, Any]] = None,
capture_env: Any = None,
writer: Type[w2] = v2.writer,
record_: Callable[..., None] = pty.record,
notifier: Any = None,
key_bindings: Optional[Dict[str, Any]] = None,
) -> None:
if command is None:
command = os.environ.get('SHELL') or 'sh'
command = os.environ.get("SHELL", "sh")
if command_env is None:
command_env = os.environ.copy()
command_env['ASCIINEMA_REC'] = '1'
if key_bindings is None:
key_bindings = {}
command_env["ASCIINEMA_REC"] = "1"
if capture_env is None:
capture_env = ['SHELL', 'TERM']
capture_env = ["SHELL", "TERM"]
w, h = term.get_size()
full_metadata = {
'width': w,
'height': h,
'timestamp': int(time.time())
full_metadata: Dict[str, Any] = {
"width": w,
"height": h,
"timestamp": int(time.time()),
}
full_metadata.update(metadata or {})
if idle_time_limit is not None:
full_metadata['idle_time_limit'] = idle_time_limit
full_metadata["idle_time_limit"] = idle_time_limit
if capture_env:
full_metadata['env'] = {var: command_env.get(var) for var in capture_env}
full_metadata["env"] = {
var: command_env.get(var) for var in capture_env
}
if title:
full_metadata['title'] = title
full_metadata["title"] = title
time_offset = 0
time_offset: float = 0
if append and os.stat(path).st_size > 0:
time_offset = v2.get_duration(path)
if append and os.stat(path_).st_size > 0:
time_offset = v2.get_duration(path_)
with async_writer(writer, path, full_metadata, append) as w:
with async_notifier(notifier) as n:
record(
['sh', '-c', command],
w,
with async_writer(writer, path_, full_metadata, append) as _writer:
with async_notifier(notifier) as _notifier:
record_(
["sh", "-c", command],
_writer,
command_env,
rec_stdin,
time_offset,
n,
key_bindings
_notifier,
key_bindings,
)
class async_writer(async_worker):
def __init__(self, writer, path, metadata, append=False):
def __init__(
self, writer: Type[w2], path_: str, metadata: Any, append: bool = False
) -> None:
async_worker.__init__(self)
self.writer = writer
self.path = path
self.path = path_
self.metadata = metadata
self.append = append
def write_stdin(self, ts, data):
self.enqueue([ts, 'i', data])
def write_stdin(self, ts: float, data: Any) -> None:
self.enqueue([ts, "i", data])
def write_stdout(self, ts, data):
self.enqueue([ts, 'o', data])
def write_stdout(self, ts: float, data: Any) -> None:
self.enqueue([ts, "o", data])
def run(self):
with self.writer(self.path, metadata=self.metadata, append=self.append) as w:
def run(self) -> None:
with self.writer(
self.path, metadata=self.metadata, append=self.append
) as w:
event: Tuple[float, str, Any]
for event in iter(self.queue.get, None):
assert event is not None
ts, etype, data = event
if etype == 'o':
if etype == "o":
w.write_stdout(ts, data)
elif etype == 'i':
elif etype == "i":
w.write_stdin(ts, data)
class async_notifier(async_worker):
def __init__(self, notifier):
def __init__(self, notifier: Any) -> None:
async_worker.__init__(self)
self.notifier = notifier
def notify(self, text):
def notify(self, text: str) -> None:
self.enqueue(text)
def perform(self, text):
def perform(self, text: str) -> None:
try:
if self.notifier:
self.notifier.notify(text)
except:
except: # pylint: disable=bare-except # noqa: E722
# we catch *ALL* exceptions here because we don't want failed
# notification to crash the recording session
pass

View File

@@ -1,42 +1,45 @@
import os
import select
import subprocess
import time
import tty
import termios as tty # avoid `Module "tty" has no attribute ...` errors
from time import sleep
from tty import setraw
from typing import IO, Any, List, Optional, Tuple, Union
class raw():
def __init__(self, fd):
class raw:
def __init__(self, fd: Union[IO[str], int]) -> None:
self.fd = fd
self.restore = False
self.restore: bool = False
self.mode: Optional[List[Any]] = None
def __enter__(self):
def __enter__(self) -> None:
try:
self.mode = tty.tcgetattr(self.fd)
tty.setraw(self.fd)
setraw(self.fd)
self.restore = True
except tty.error: # This is the same as termios.error
except tty.error: # this is `termios.error`
pass
def __exit__(self, type, value, traceback):
def __exit__(self, type_: str, value: str, traceback: str) -> None:
if self.restore:
# Give the terminal time to send answerbacks
time.sleep(0.01)
sleep(0.01) # give the terminal time to send answerbacks
assert isinstance(self.mode, list)
tty.tcsetattr(self.fd, tty.TCSAFLUSH, self.mode)
def read_blocking(fd, timeout):
def read_blocking(fd: int, timeout: Any) -> bytes:
if fd in select.select([fd], [], [], timeout)[0]:
return os.read(fd, 1024)
return b''
return b""
def get_size():
def get_size() -> Tuple[int, int]:
try:
return os.get_terminal_size()
except:
except: # pylint: disable=bare-except # noqa: E722
return (
int(subprocess.check_output(['tput', 'cols'])),
int(subprocess.check_output(['tput', 'lines']))
int(subprocess.check_output(["tput", "cols"])),
int(subprocess.check_output(["tput", "lines"])),
)

View File

@@ -1,94 +1,124 @@
import codecs
import sys
import uuid
import io
import base64
import http
from urllib.request import Request, urlopen
import io
import sys
from base64 import b64encode
from http.client import HTTPResponse
from typing import Any, Dict, Generator, Optional, Tuple
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
from uuid import uuid4
from .http_adapter import HTTPConnectionError
class MultipartFormdataEncoder:
def __init__(self):
self.boundary = uuid.uuid4().hex
self.content_type = 'multipart/form-data; boundary={}'.format(self.boundary)
def __init__(self) -> None:
self.boundary = uuid4().hex
self.content_type = f"multipart/form-data; boundary={self.boundary}"
@classmethod
def u(cls, s):
def u(cls, s: Any) -> Any:
if sys.hexversion >= 0x03000000 and isinstance(s, bytes):
s = s.decode('utf-8')
s = s.decode("utf-8")
return s
def iter(self, fields, files):
def iter(
self, fields: Dict[str, Any], files: Dict[str, Tuple[str, Any]]
) -> Generator[Tuple[bytes, int], None, None]:
"""
fields is a dict of {name: value} for regular form fields.
files is a dict of {name: (filename, file-type)} for data to be uploaded as files
Yield body's chunk as bytes
fields: {name: value} for regular form fields.
files: {name: (filename, file-type)} for data to be uploaded as files
yield body's chunk as bytes
"""
encoder = codecs.getencoder('utf-8')
encoder = codecs.getencoder("utf-8")
for (key, value) in fields.items():
key = self.u(key)
yield encoder('--{}\r\n'.format(self.boundary))
yield encoder(self.u('Content-Disposition: form-data; name="{}"\r\n').format(key))
yield encoder('\r\n')
if isinstance(value, int) or isinstance(value, float):
yield encoder(f"--{self.boundary}\r\n")
yield encoder(
self.u(f'content-disposition: form-data; name="{key}"\r\n')
)
yield encoder("\r\n")
if isinstance(value, (int, float)):
value = str(value)
yield encoder(self.u(value))
yield encoder('\r\n')
yield encoder("\r\n")
for (key, filename_and_f) in files.items():
filename, f = filename_and_f
key = self.u(key)
filename = self.u(filename)
yield encoder('--{}\r\n'.format(self.boundary))
yield encoder(self.u('Content-Disposition: form-data; name="{}"; filename="{}"\r\n').format(key, filename))
yield encoder('Content-Type: application/octet-stream\r\n')
yield encoder('\r\n')
yield encoder(f"--{self.boundary}\r\n")
yield encoder(
self.u(
"content-disposition: form-data"
f'; name="{key}"'
f'; filename="{filename}"\r\n'
)
)
yield encoder("content-type: application/octet-stream\r\n")
yield encoder("\r\n")
data = f.read()
yield (data, len(data))
yield encoder('\r\n')
yield encoder('--{}--\r\n'.format(self.boundary))
yield encoder("\r\n")
yield encoder(f"--{self.boundary}--\r\n")
def encode(self, fields, files):
def encode(
self, fields: Dict[str, Any], files: Dict[str, Tuple[str, Any]]
) -> Tuple[str, bytes]:
body = io.BytesIO()
for chunk, chunk_len in self.iter(fields, files):
for chunk, _ in self.iter(fields, files):
body.write(chunk)
return self.content_type, body.getvalue()
class URLLibHttpAdapter:
class URLLibHttpAdapter: # pylint: disable=too-few-public-methods
def post( # pylint: disable=too-many-arguments,too-many-locals
self,
url: str,
fields: Optional[Dict[str, Any]] = None,
files: Optional[Dict[str, Tuple[str, Any]]] = None,
headers: Optional[Dict[str, str]] = None,
username: Optional[str] = None,
password: Optional[str] = None,
) -> Tuple[Any, Optional[Dict[str, str]], bytes]:
# avoid dangerous mutable default arguments
if fields is None:
fields = {}
if files is None:
files = {}
if headers is None:
headers = {}
def post(self, url, fields={}, files={}, headers={}, username=None, password=None):
content_type, body = MultipartFormdataEncoder().encode(fields, files)
headers = headers.copy()
headers["Content-Type"] = content_type
headers["content-type"] = content_type
if password:
auth = "%s:%s" % (username, password)
encoded_auth = base64.b64encode(bytes(auth, "utf-8"))
headers["Authorization"] = b"Basic " + encoded_auth
encoded_auth = b64encode(
f"{username}:{password}".encode("utf_8")
).decode("utf_8")
headers["authorization"] = f"Basic {encoded_auth}"
request = Request(url, data=body, headers=headers, method="POST")
try:
response = urlopen(request)
status = response.status
headers = self._parse_headers(response)
body = response.read().decode('utf-8')
with urlopen(request) as response:
status = response.status
headers = self._parse_headers(response)
body = response.read().decode("utf-8")
except HTTPError as e:
status = e.code
headers = {}
body = e.read().decode('utf-8')
body = e.read()
except (http.client.RemoteDisconnected, URLError) as e:
raise HTTPConnectionError(str(e))
raise HTTPConnectionError(str(e)) from e
return (status, headers, body)
def _parse_headers(self, response):
headers = {}
for k, v in response.getheaders():
headers[k.lower()] = v
@staticmethod
def _parse_headers(response: HTTPResponse) -> Dict[str, str]:
headers = {k.lower(): v for k, v in response.getheaders()}
return headers

37
pyproject.toml Normal file
View File

@@ -0,0 +1,37 @@
[build-system]
requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta"
[tool.black]
line-length = 79
target-version = ["py38"]
[tool.isort]
line_length = 79
profile = "black"
multi_line_output = 3
[tool.mypy]
check_untyped_defs = true
disallow_any_generics = true
disallow_incomplete_defs = true
disallow_subclassing_any = true
disallow_untyped_calls = true
disallow_untyped_decorators = true
disallow_untyped_defs = true
no_implicit_optional = true
no_implicit_reexport = true
show_error_context = true
warn_redundant_casts = true
warn_return_any = true
warn_unused_configs = true
warn_unused_ignores = true
exclude = []
[tool.pylint."MESSAGES CONTROL"]
disable = [
"invalid-name",
"missing-class-docstring",
"missing-function-docstring",
"missing-module-docstring",
]

View File

@@ -1,6 +1,59 @@
[metadata]
name = asciinema
version = 2.2.0
author = Marcin Kulik
author_email = m@ku1ik.com
url = https://asciinema.org
download_url =
https://github.com/asciinema/asciinema/archive/v%(version)s.tar.gz
description = Terminal session recorder
description_file = README.md
license = GNU GPLv3
license_file = LICENSE
long_description = file: README.md
long_description_content_type = text/markdown; charset=UTF-8
classifiers =
Development Status :: 5 - Production/Stable
Environment :: Console
Intended Audience :: Developers
Intended Audience :: System Administrators
License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)
Natural Language :: English
Programming Language :: Python
Programming Language :: Python :: 3.6
Programming Language :: Python :: 3.7
Programming Language :: Python :: 3.8
Programming Language :: Python :: 3.9
Programming Language :: Python :: 3.10
Topic :: System :: Shells
Topic :: Terminals
Topic :: Utilities
[options]
include_package_data = True
packages =
asciinema
asciinema.asciicast
asciinema.commands
install_requires =
[options.package_data]
asciinema = data/*.png
[options.entry_points]
console_scripts =
asciinema = asciinema.__main__:main
[options.data_files]
share/doc/asciinema =
CHANGELOG.md
CODE_OF_CONDUCT.md
CONTRIBUTING.md
README.md
doc/asciicast-v1.md
doc/asciicast-v2.md
share/man/man1 =
man/asciinema.1
[pycodestyle]
ignore = E501,E402,E722

View File

@@ -1,58 +0,0 @@
import asciinema
import sys
from setuptools import setup
if sys.version_info.major < 3:
sys.exit('Python < 3 is unsupported.')
url_template = 'https://github.com/asciinema/asciinema/archive/v%s.tar.gz'
requirements = []
with open('README.md', encoding='utf8') as file:
long_description = file.read()
setup(
name='asciinema',
version=asciinema.__version__,
packages=['asciinema', 'asciinema.commands', 'asciinema.asciicast'],
license='GNU GPLv3',
description='Terminal session recorder',
long_description=long_description,
long_description_content_type='text/markdown',
author=asciinema.__author__,
author_email='m@ku1ik.com',
url='https://asciinema.org',
download_url=(url_template % asciinema.__version__),
entry_points={
'console_scripts': [
'asciinema = asciinema.__main__:main',
],
},
package_data={'asciinema': ['data/*.png']},
data_files=[('share/doc/asciinema', ['CHANGELOG.md',
'CODE_OF_CONDUCT.md',
'CONTRIBUTING.md',
'README.md',
'doc/asciicast-v1.md',
'doc/asciicast-v2.md']),
('share/man/man1', ['man/asciinema.1'])],
install_requires=requirements,
classifiers=[
'Development Status :: 5 - Production/Stable',
'Environment :: Console',
'Intended Audience :: Developers',
'Intended Audience :: System Administrators',
'License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)',
'Natural Language :: English',
'Programming Language :: Python',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Topic :: System :: Shells',
'Topic :: Terminals',
'Topic :: Utilities'
],
)

View File

@@ -1,24 +1,29 @@
from ..test_helper import Test
import asciinema.asciicast.v2 as v2
import tempfile
import json
import tempfile
import asciinema.asciicast.v2 as v2
from ..test_helper import Test
class TestWriter(Test):
def test_writing(self):
_file, path = tempfile.mkstemp()
with v2.writer(path, width=80, height=24) as w:
w.write_stdout(1, 'x') # ensure it supports both str and bytes
w.write_stdout(2, bytes.fromhex('78 c5 bc c3 b3 c5'))
w.write_stdout(3, bytes.fromhex('82 c4 87'))
w.write_stdout(4, bytes.fromhex('78 78'))
w.write_stdout(1, "x") # ensure it supports both str and bytes
w.write_stdout(2, bytes.fromhex("78 c5 bc c3 b3 c5"))
w.write_stdout(3, bytes.fromhex("82 c4 87"))
w.write_stdout(4, bytes.fromhex("78 78"))
with open(path, 'r') as f:
lines = list(map(json.loads, f.read().strip().split('\n')))
assert lines == [{"version": 2, "width": 80, "height": 24},
[1, "o", "x"],
[2, "o", "xżó"],
[3, "o", "łć"],
[4, "o", "xx"]], 'got:\n\n%s' % lines
with open(path, "r") as f:
lines = list(map(json.loads, f.read().strip().split("\n")))
assert lines == [
{"version": 2, "width": 80, "height": 24},
[1, "o", "x"],
[2, "o", "xżó"],
[3, "o", "łć"],
[4, "o", "xx"],
], (
"got:\n\n%s" % lines
)

View File

@@ -1,26 +1,25 @@
from nose.tools import assert_equal, assert_raises
import os
import os.path as path
import tempfile
import re
import tempfile
from os import path
import asciinema.config as cfg
def create_config(content=None, env={}):
dir = tempfile.mkdtemp()
# avoid redefining `dir` builtin
dir_ = tempfile.mkdtemp()
if content:
path = dir + '/config'
with open(path, 'w') as f:
# avoid redefining `os.path`
path_ = f"{dir_}/config"
with open(path_, "wt", encoding="utf_8") as f:
f.write(content)
return cfg.Config(dir, env)
return cfg.Config(dir_, env)
def read_install_id(install_id_path):
with open(install_id_path, 'r') as f:
with open(install_id_path, "rt", encoding="utf_8") as f:
return f.read().strip()
@@ -29,180 +28,187 @@ def test_upgrade_no_config_file():
config.upgrade()
install_id = read_install_id(config.install_id_path)
assert re.match('^\\w{8}-\\w{4}-\\w{4}-\\w{4}-\\w{12}', install_id)
assert_equal(install_id, config.install_id)
assert re.match("^\\w{8}-\\w{4}-\\w{4}-\\w{4}-\\w{12}", install_id)
assert install_id == config.install_id
assert not path.exists(config.config_file_path)
# it must not change after another upgrade
config.upgrade()
assert_equal(read_install_id(config.install_id_path), install_id)
assert read_install_id(config.install_id_path) == install_id
def test_upgrade_config_file_with_api_token():
config = create_config("[api]\ntoken = foo-bar-baz")
config.upgrade()
assert_equal(read_install_id(config.install_id_path), 'foo-bar-baz')
assert_equal(config.install_id, 'foo-bar-baz')
assert read_install_id(config.install_id_path) == "foo-bar-baz"
assert config.install_id == "foo-bar-baz"
assert not path.exists(config.config_file_path)
config.upgrade()
assert_equal(read_install_id(config.install_id_path), 'foo-bar-baz')
assert read_install_id(config.install_id_path) == "foo-bar-baz"
def test_upgrade_config_file_with_api_token_and_more():
config = create_config("[api]\ntoken = foo-bar-baz\nurl = http://example.com")
config = create_config(
"[api]\ntoken = foo-bar-baz\nurl = http://example.com"
)
config.upgrade()
assert_equal(read_install_id(config.install_id_path), 'foo-bar-baz')
assert_equal(config.install_id, 'foo-bar-baz')
assert_equal(config.api_url, 'http://example.com')
assert read_install_id(config.install_id_path) == "foo-bar-baz"
assert config.install_id == "foo-bar-baz"
assert config.api_url == "http://example.com"
assert path.exists(config.config_file_path)
config.upgrade()
assert_equal(read_install_id(config.install_id_path), 'foo-bar-baz')
assert read_install_id(config.install_id_path) == "foo-bar-baz"
def test_upgrade_config_file_with_user_token():
config = create_config("[user]\ntoken = foo-bar-baz")
config.upgrade()
assert_equal(read_install_id(config.install_id_path), 'foo-bar-baz')
assert_equal(config.install_id, 'foo-bar-baz')
assert read_install_id(config.install_id_path) == "foo-bar-baz"
assert config.install_id == "foo-bar-baz"
assert not path.exists(config.config_file_path)
config.upgrade()
assert_equal(read_install_id(config.install_id_path), 'foo-bar-baz')
assert read_install_id(config.install_id_path) == "foo-bar-baz"
def test_upgrade_config_file_with_user_token_and_more():
config = create_config("[user]\ntoken = foo-bar-baz\n[api]\nurl = http://example.com")
config = create_config(
"[user]\ntoken = foo-bar-baz\n[api]\nurl = http://example.com"
)
config.upgrade()
assert_equal(read_install_id(config.install_id_path), 'foo-bar-baz')
assert_equal(config.install_id, 'foo-bar-baz')
assert_equal(config.api_url, 'http://example.com')
assert read_install_id(config.install_id_path) == "foo-bar-baz"
assert config.install_id == "foo-bar-baz"
assert config.api_url == "http://example.com"
assert path.exists(config.config_file_path)
config.upgrade()
assert_equal(read_install_id(config.install_id_path), 'foo-bar-baz')
assert read_install_id(config.install_id_path) == "foo-bar-baz"
def test_default_api_url():
config = create_config('')
assert_equal('https://asciinema.org', config.api_url)
config = create_config("")
assert config.api_url == "https://asciinema.org"
def test_default_record_stdin():
config = create_config('')
assert_equal(False, config.record_stdin)
config = create_config("")
assert config.record_stdin is False
def test_default_record_command():
config = create_config('')
assert_equal(None, config.record_command)
config = create_config("")
assert config.record_command is None
def test_default_record_env():
config = create_config('')
assert_equal('SHELL,TERM', config.record_env)
config = create_config("")
assert config.record_env == "SHELL,TERM"
def test_default_record_idle_time_limit():
config = create_config('')
assert_equal(None, config.record_idle_time_limit)
config = create_config("")
assert config.record_idle_time_limit is None
def test_default_record_yes():
config = create_config('')
assert_equal(False, config.record_yes)
config = create_config("")
assert config.record_yes is False
def test_default_record_quiet():
config = create_config('')
assert_equal(False, config.record_quiet)
config = create_config("")
assert config.record_quiet is False
def test_default_play_idle_time_limit():
config = create_config('')
assert_equal(None, config.play_idle_time_limit)
config = create_config("")
assert config.play_idle_time_limit is None
def test_api_url():
config = create_config("[api]\nurl = http://the/url")
assert_equal('http://the/url', config.api_url)
assert config.api_url == "http://the/url"
def test_api_url_when_override_set():
config = create_config("[api]\nurl = http://the/url", {
'ASCIINEMA_API_URL': 'http://the/url2'})
assert_equal('http://the/url2', config.api_url)
config = create_config(
"[api]\nurl = http://the/url", {"ASCIINEMA_API_URL": "http://the/url2"}
)
assert config.api_url == "http://the/url2"
def test_record_command():
command = 'bash -l'
command = "bash -l"
config = create_config("[record]\ncommand = %s" % command)
assert_equal(command, config.record_command)
assert config.record_command == command
def test_record_stdin():
config = create_config("[record]\nstdin = yes")
assert_equal(True, config.record_stdin)
assert config.record_stdin is True
def test_record_env():
config = create_config("[record]\nenv = FOO,BAR")
assert_equal('FOO,BAR', config.record_env)
assert config.record_env == "FOO,BAR"
def test_record_idle_time_limit():
config = create_config("[record]\nidle_time_limit = 2.35")
assert_equal(2.35, config.record_idle_time_limit)
assert config.record_idle_time_limit == 2.35
config = create_config("[record]\nmaxwait = 2.35")
assert_equal(2.35, config.record_idle_time_limit)
assert config.record_idle_time_limit == 2.35
def test_record_yes():
yes = 'yes'
yes = "yes"
config = create_config("[record]\nyes = %s" % yes)
assert_equal(True, config.record_yes)
assert config.record_yes is True
def test_record_quiet():
quiet = 'yes'
quiet = "yes"
config = create_config("[record]\nquiet = %s" % quiet)
assert_equal(True, config.record_quiet)
assert config.record_quiet is True
def test_play_idle_time_limit():
config = create_config("[play]\nidle_time_limit = 2.35")
assert_equal(2.35, config.play_idle_time_limit)
assert config.play_idle_time_limit == 2.35
config = create_config("[play]\nmaxwait = 2.35")
assert_equal(2.35, config.play_idle_time_limit)
assert config.play_idle_time_limit == 2.35
def test_notifications_enabled():
config = create_config('')
assert_equal(True, config.notifications_enabled)
config = create_config("")
assert config.notifications_enabled is True
config = create_config("[notifications]\nenabled = yes")
assert_equal(True, config.notifications_enabled)
assert config.notifications_enabled is True
config = create_config("[notifications]\nenabled = no")
assert_equal(False, config.notifications_enabled)
assert config.notifications_enabled is False
def test_notifications_command():
config = create_config('')
assert_equal(None, config.notifications_command)
config = create_config("")
assert config.notifications_command is None
config = create_config('[notifications]\ncommand = tmux display-message "$TEXT"')
assert_equal('tmux display-message "$TEXT"', config.notifications_command)
config = create_config(
'[notifications]\ncommand = tmux display-message "$TEXT"'
)
assert config.notifications_command == 'tmux display-message "$TEXT"'

View File

@@ -1,22 +1,38 @@
#!/usr/bin/env bash
set -e
set -euo pipefail
path_to_self="${BASH_SOURCE[0]}"
tests_dir="$(cd "$(dirname "$path_to_self")" && pwd)"
readonly DISTROS=(
'arch'
'alpine'
'centos'
'debian'
'fedora'
'ubuntu'
)
test() {
printf "\e[1;32mTesting on $1...\e[0m\n"
echo
readonly DOCKER='docker'
docker build -t asciinema/asciinema:$1 -f tests/distros/Dockerfile.$1 .
docker run --rm -ti asciinema/asciinema:$1 tests/integration.sh
# do not redefine builtin `test`
test_() {
local -r tag="${1}"
local -ra docker_opts=(
"--tag=asciinema/asciinema:${tag}"
"--file=tests/distros/Dockerfile.${tag}"
)
printf "\e[1;32mTesting on %s...\e[0m\n\n" "${tag}"
# shellcheck disable=SC2068
"${DOCKER}" build ${docker_opts[@]} .
"${DOCKER}" run --rm -it "asciinema/asciinema:${tag}" tests/integration.sh
}
test ubuntu
test debian
test fedora
test centos
echo
printf "\e[1;32mAll tests passed.\e[0m\n"
for distro in "${DISTROS[@]}"; do
test_ "${distro}"
done
printf "\n\e[1;32mAll tests passed.\e[0m\n"

View File

@@ -0,0 +1,19 @@
# syntax=docker/dockerfile:1.3
FROM docker.io/library/alpine:3.15
# https://github.com/actions/runner/issues/241
RUN apk --no-cache add bash ca-certificates make python3 util-linux
WORKDIR /usr/src/app
COPY asciinema/ asciinema/
COPY tests/ tests/
ENV LANG="en_US.utf8"
USER nobody
ENTRYPOINT ["/bin/bash"]
# vim:ft=dockerfile

View File

@@ -0,0 +1,22 @@
# syntax=docker/dockerfile:1.3
FROM docker.io/library/archlinux:latest
RUN pacman-key --init \
&& pacman --sync --refresh --sysupgrade --noconfirm make python3 \
&& printf "LANG=en_US.UTF-8\n" > /etc/locale.conf \
&& locale-gen \
&& pacman --sync --clean --clean --noconfirm
WORKDIR /usr/src/app
COPY asciinema/ asciinema/
COPY tests/ tests/
ENV LANG="en_US.utf8"
USER nobody
ENTRYPOINT ["/bin/bash"]
# vim:ft=dockerfile

View File

@@ -1,10 +1,18 @@
FROM centos:7
# syntax=docker/dockerfile:1.3
FROM docker.io/library/centos:7
RUN yum install -y epel-release && yum install -y make python36 && yum clean all
RUN yum install -y epel-release
RUN yum install -y python34
WORKDIR /usr/src/app
COPY asciinema asciinema
COPY tests tests
ENV LANG en_US.utf8
ENV SHELL /bin/bash
ENV USER docker
COPY asciinema/ asciinema/
COPY tests/ tests/
ENV LANG="en_US.utf8"
USER nobody
ENTRYPOINT ["/bin/bash"]
# vim:ft=dockerfile

View File

@@ -1,13 +1,33 @@
FROM debian:jessie
# syntax=docker/dockerfile:1.3
FROM docker.io/library/debian:bullseye
ENV DEBIAN_FRONTENT="noninteractive"
RUN apt-get update \
&& apt-get install -y \
ca-certificates \
locales \
make \
procps \
python3 \
&& localedef \
-i en_US \
-c \
-f UTF-8 \
-A /usr/share/locale/locale.alias \
en_US.UTF-8 \
&& rm -rf /var/lib/apt/lists/*
RUN apt-get update && apt-get install -y \
ca-certificates \
locales \
python3
RUN localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8
WORKDIR /usr/src/app
COPY asciinema asciinema
COPY tests tests
ENV LANG en_US.utf8
ENV SHELL /bin/bash
ENV USER docker
COPY asciinema/ asciinema/
COPY tests/ tests/
ENV LANG="en_US.utf8"
USER nobody
ENV SHELL="/bin/bash"
# vim:ft=dockerfile

View File

@@ -1,9 +1,18 @@
FROM fedora:26
# syntax=docker/dockerfile:1.3
FROM registry.fedoraproject.org/fedora:35
RUN dnf install -y make python3 procps && dnf clean all
RUN dnf install -y python3 procps
WORKDIR /usr/src/app
COPY asciinema asciinema
COPY tests tests
ENV LANG en_US.utf8
ENV SHELL /bin/bash
ENV USER docker
COPY asciinema/ asciinema/
COPY tests/ tests/
ENV LANG="en_US.utf8"
ENV SHELL="/bin/bash"
USER nobody
ENTRYPOINT ["/bin/bash"]
# vim:ft=dockerfile

View File

@@ -1,13 +1,32 @@
FROM ubuntu:16.04
# syntax=docker/dockerfile:1.3
FROM docker.io/library/ubuntu:20.04
ENV DEBIAN_FRONTENT="noninteractive"
RUN apt-get update \
&& apt-get install -y \
ca-certificates \
locales \
make \
python3 \
&& localedef \
-i en_US \
-c \
-f UTF-8 \
-A /usr/share/locale/locale.alias \
en_US.UTF-8 \
&& rm -rf /var/lib/apt/lists/*
RUN apt-get update && apt-get install -y \
ca-certificates \
locales \
python3
RUN localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8
WORKDIR /usr/src/app
COPY asciinema asciinema
COPY tests tests
ENV LANG en_US.utf8
ENV SHELL /bin/bash
ENV USER docker
COPY asciinema/ asciinema/
COPY tests/ tests/
ENV LANG="en_US.utf8"
USER nobody
ENTRYPOINT ["/bin/bash"]
# vim:ft=dockerfile

View File

@@ -1,21 +1,26 @@
#!/usr/bin/env bash
set -e
set -x
set -eExuo pipefail
if ! type "pkill" >/dev/null 2>&1; then
echo "error: pkill not installed"
if ! command -v "pkill" >/dev/null 2>&1; then
printf "error: pkill not installed\n"
exit 1
fi
python3 -V
export ASCIINEMA_CONFIG_HOME=`mktemp -d 2>/dev/null || mktemp -d -t asciinema-config-home`
TMP_DATA_DIR=`mktemp -d 2>/dev/null || mktemp -d -t asciinema-data-dir`
trap "rm -rf $ASCIINEMA_CONFIG_HOME $TMP_DATA_DIR" EXIT
ASCIINEMA_CONFIG_HOME="$(
mktemp -d 2>/dev/null || mktemp -d -t asciinema-config-home
)"
function asciinema() {
python3 -m asciinema "$@"
export ASCIINEMA_CONFIG_HOME
TMP_DATA_DIR="$(mktemp -d 2>/dev/null || mktemp -d -t asciinema-data-dir)"
trap 'rm -rf ${ASCIINEMA_CONFIG_HOME} ${TMP_DATA_DIR}' EXIT
asciinema() {
python3 -m asciinema "${@}"
}
## test help message
@@ -35,50 +40,54 @@ asciinema auth
# asciicast v1
asciinema play -s 5 tests/demo.json
asciinema play -s 5 -i 0.2 tests/demo.json
# shellcheck disable=SC2002
cat tests/demo.json | asciinema play -s 5 -
# asciicast v2
asciinema play -s 5 tests/demo.cast
asciinema play -s 5 -i 0.2 tests/demo.cast
# shellcheck disable=SC2002
cat tests/demo.cast | asciinema play -s 5 -
## test cat command
# asciicast v1
asciinema cat tests/demo.json
# shellcheck disable=SC2002
cat tests/demo.json | asciinema cat -
# asciicast v2
asciinema cat tests/demo.cast
# shellcheck disable=SC2002
cat tests/demo.cast | asciinema cat -
## test rec command
# normal program
asciinema rec -c 'bash -c "echo t3st; sleep 2; echo ok"' "$TMP_DATA_DIR/1a.cast"
grep '"o",' "$TMP_DATA_DIR/1a.cast"
asciinema rec -c 'bash -c "echo t3st; sleep 2; echo ok"' "${TMP_DATA_DIR}/1a.cast"
grep '"o",' "${TMP_DATA_DIR}/1a.cast"
# very quickly exiting program
# https://github.com/asciinema/asciinema/issues/246
# asciinema rec -c who "$TMP_DATA_DIR/1b.cast"
# grep '"o",' "$TMP_DATA_DIR/1b.cast"
# asciinema rec -c who "${TMP_DATA_DIR}/1b.cast"
# grep '"o",' "${TMP_DATA_DIR}/1b.cast"
# signal handling
bash -c "sleep 1; pkill -28 -n -f 'm asciinema'" &
asciinema rec -c 'bash -c "echo t3st; sleep 2; echo ok"' "$TMP_DATA_DIR/2.cast"
asciinema rec -c 'bash -c "echo t3st; sleep 2; echo ok"' "${TMP_DATA_DIR}/2.cast"
bash -c "sleep 1; pkill -n -f 'bash -c echo t3st'" &
asciinema rec -c 'bash -c "echo t3st; sleep 2; echo ok"' "$TMP_DATA_DIR/3.cast"
asciinema rec -c 'bash -c "echo t3st; sleep 2; echo ok"' "${TMP_DATA_DIR}/3.cast"
bash -c "sleep 1; pkill -9 -n -f 'bash -c echo t3st'" &
asciinema rec -c 'bash -c "echo t3st; sleep 2; echo ok"' "$TMP_DATA_DIR/4.cast"
asciinema rec -c 'bash -c "echo t3st; sleep 2; echo ok"' "${TMP_DATA_DIR}/4.cast"
# with stdin recording
asciinema rec --stdin -c 'bash -c "echo t3st; sleep 1; echo ok"' "$TMP_DATA_DIR/5.cast"
asciinema rec --stdin -c 'bash -c "echo t3st; sleep 1; echo ok"' "${TMP_DATA_DIR}/5.cast"
# raw output recording
asciinema rec --raw -c 'bash -c "echo t3st; sleep 1; echo ok"' "$TMP_DATA_DIR/6.raw"
asciinema rec --raw -c 'bash -c "echo t3st; sleep 1; echo ok"' "${TMP_DATA_DIR}/6.raw"
# appending to existing recording
asciinema rec -c 'echo allright!; sleep 0.1' "$TMP_DATA_DIR/7.cast"
asciinema rec --append -c uptime "$TMP_DATA_DIR/7.cast"
asciinema rec -c 'echo allright!; sleep 0.1' "${TMP_DATA_DIR}/7.cast"
asciinema rec --append -c uptime "${TMP_DATA_DIR}/7.cast"

View File

@@ -1,14 +1,12 @@
import os
import pty
from nose.tools import assert_equal
from .test_helper import Test
import asciinema.pty_
import asciinema.pty
from .test_helper import Test
class FakeStdout:
def __init__(self):
self.data = []
@@ -20,7 +18,6 @@ class FakeStdout:
class TestRecord(Test):
def setUp(self):
self.real_os_write = os.write
os.write = self.os_write
@@ -35,7 +32,18 @@ class TestRecord(Test):
def test_record_command_writes_to_stdout(self):
output = FakeStdout()
command = ['python3', '-c', "import sys; import time; sys.stdout.write(\'foo\'); sys.stdout.flush(); time.sleep(0.01); sys.stdout.write(\'bar\')"]
asciinema.pty.record(command, output)
command = [
"python3",
"-c",
(
"import sys"
"; import time"
"; sys.stdout.write('foo')"
"; sys.stdout.flush()"
"; time.sleep(0.01)"
"; sys.stdout.write('bar')"
),
]
asciinema.pty_.record(command, output)
assert_equal([b'foo', b'bar'], output.data)
assert output.data == [b"foo", b"bar"]

View File

@@ -1,4 +1,5 @@
import sys
try:
from StringIO import StringIO
except ImportError:
@@ -19,7 +20,6 @@ def assert_not_printed(expected):
class Test:
def setUp(self):
global stdout
self.real_stdout = sys.stdout
@@ -30,7 +30,6 @@ class Test:
class FakeClock:
def __init__(self, values):
self.values = values
self.n = 0
@@ -43,7 +42,6 @@ class FakeClock:
class FakeAsciicast:
def __init__(self, cmd=None, title=None, stdout=None, meta_data=None):
self.cmd = cmd
self.title = title