Add session command

This commit is contained in:
Marcin Kulik
2025-03-12 13:15:02 +01:00
parent c6155f62a0
commit 8660a74e80
21 changed files with 1077 additions and 1244 deletions

View File

@@ -4,37 +4,36 @@
// TODO document the protocol when it's final
use super::session;
use crate::leb128;
use std::future;
use anyhow::Result;
use futures_util::{stream, Stream, StreamExt};
use std::future;
use tokio::sync::mpsc;
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
use crate::leb128;
use crate::stream::Event;
static MAGIC_STRING: &str = "ALiS\x01";
pub async fn stream(
clients_tx: &mpsc::Sender<session::Client>,
pub async fn stream<S: Stream<Item = Result<Event, BroadcastStreamRecvError>>>(
stream: S,
) -> Result<impl Stream<Item = Result<Vec<u8>, BroadcastStreamRecvError>>> {
let header = stream::once(future::ready(Ok(MAGIC_STRING.into())));
let events = session::stream(clients_tx)
.await?
.scan(0u64, |prev_event_time, event| {
future::ready(Some(event.map(|event| {
let (bytes, time) = serialize_event(event, *prev_event_time);
*prev_event_time = time;
let events = stream.scan(0u64, |prev_event_time, event| {
future::ready(Some(event.map(|event| {
let (bytes, time) = serialize_event(event, *prev_event_time);
*prev_event_time = time;
bytes
})))
});
bytes
})))
});
Ok(header.chain(events))
}
fn serialize_event(event: session::Event, prev_event_time: u64) -> (Vec<u8>, u64) {
use session::Event::*;
fn serialize_event(event: Event, prev_event_time: u64) -> (Vec<u8>, u64) {
use Event::*;
match event {
Init(last_id, time, size, theme, init) => {

View File

@@ -1,9 +1,9 @@
use clap::{Args, ValueEnum};
use clap::{Parser, Subcommand};
use std::net::SocketAddr;
use std::num::ParseIntError;
use std::path::PathBuf;
use clap::{Args, Parser, Subcommand, ValueEnum};
pub const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:8080";
#[derive(Debug, Parser)]
@@ -33,6 +33,9 @@ pub enum Commands {
/// Stream a terminal session
Stream(Stream),
/// Record and/or stream a terminal session
Session(Session),
/// Concatenate multiple recordings
Cat(Cat),
@@ -70,7 +73,7 @@ pub struct Record {
#[arg(long, conflicts_with = "append")]
pub overwrite: bool,
/// Command to record [default: $SHELL]
/// Command to start in the session [default: $SHELL]
#[arg(short, long)]
pub command: Option<String>,
@@ -94,7 +97,7 @@ pub struct Record {
#[arg(long)]
pub headless: bool,
/// Override terminal size for the recorded command
/// Override terminal size for the session
#[arg(long, value_name = "COLSxROWS", value_parser = parse_tty_size)]
pub tty_size: Option<(Option<u16>, Option<u16>)>,
@@ -162,6 +165,69 @@ pub struct Stream {
pub log_file: Option<PathBuf>,
}
#[derive(Debug, Args)]
pub struct Session {
/// Output path - either a file or a directory path
#[arg(short, long)]
pub output: Option<String>,
/// Enable input recording
#[arg(long, short = 'I', alias = "stdin")]
pub input: bool,
/// Append to an existing recording file
#[arg(short, long)]
pub append: bool,
/// Recording file format [default: asciicast]
#[arg(short, long, value_enum)]
pub format: Option<Format>,
/// Overwrite target file if it already exists
#[arg(long, conflicts_with = "append")]
pub overwrite: bool,
/// Command to start in the session [default: $SHELL]
#[arg(short, long)]
pub command: Option<String>,
/// Filename template, used when recording to a directory
#[arg(long, value_name = "TEMPLATE")]
pub filename: Option<String>,
/// List of env vars to save [default: TERM,SHELL]
#[arg(long)]
pub env: Option<String>,
/// Title of the recording
#[arg(short, long)]
pub title: Option<String>,
/// Limit idle time to a given number of seconds
#[arg(short, long, value_name = "SECS")]
pub idle_time_limit: Option<f64>,
/// Use headless mode - don't use TTY for input/output
#[arg(long)]
pub headless: bool,
/// Override terminal size for the session
#[arg(long, value_name = "COLSxROWS", value_parser = parse_tty_size)]
pub tty_size: Option<(Option<u16>, Option<u16>)>,
/// Stream the session with the built-in HTTP server
#[arg(short, long, value_name = "IP:PORT", default_missing_value = DEFAULT_LISTEN_ADDR, num_args = 0..=1)]
pub serve: Option<SocketAddr>,
/// Stream the session via an asciinema server
#[arg(short, long, value_name = "STREAM-ID|WS-URL", default_missing_value = "", num_args = 0..=1, value_parser = validate_forward_target)]
pub relay: Option<RelayTarget>,
/// Log file path
#[arg(long)]
pub log_file: Option<PathBuf>,
}
#[derive(Debug, Args)]
pub struct Cat {
#[arg(required = true)]

View File

@@ -1,11 +1,10 @@
use super::Command;
use crate::api;
use crate::cli;
use crate::config::Config;
use anyhow::Result;
impl Command for cli::Auth {
fn run(self, config: &Config) -> Result<()> {
impl cli::Auth {
pub fn run(self, config: &Config) -> Result<()> {
let server_url = config.get_server_url()?;
let server_hostname = server_url.host().unwrap();
let auth_url = api::get_auth_url(config)?;

View File

@@ -1,4 +1,3 @@
use super::Command;
use crate::asciicast;
use crate::cli;
use crate::config::Config;
@@ -6,8 +5,8 @@ use anyhow::Result;
use std::io;
use std::io::Write;
impl Command for cli::Cat {
fn run(self, _config: &Config) -> Result<()> {
impl cli::Cat {
pub fn run(self, _config: &Config) -> Result<()> {
let mut encoder = asciicast::Encoder::new(0);
let mut stdout = io::stdout();
let mut time_offset: u64 = 0;

View File

@@ -1,4 +1,3 @@
use super::Command;
use crate::asciicast;
use crate::cli::{self, Format};
use crate::config::Config;
@@ -8,8 +7,8 @@ use anyhow::{bail, Result};
use std::fs;
use std::path::Path;
impl Command for cli::Convert {
fn run(self, _config: &Config) -> Result<()> {
impl cli::Convert {
pub fn run(self, _config: &Config) -> Result<()> {
let path = util::get_local_path(&self.input_filename)?;
let cast = asciicast::open_from_path(&*path)?;
let mut encoder = self.get_encoder();
@@ -17,9 +16,7 @@ impl Command for cli::Convert {
encoder.encode_to_file(cast, &mut file)
}
}
impl cli::Convert {
fn get_encoder(&self) -> Box<dyn encoder::Encoder> {
let format = self.format.unwrap_or_else(|| {
if self.output_filename.to_lowercase().ends_with(".txt") {

View File

@@ -2,42 +2,5 @@ pub mod auth;
pub mod cat;
pub mod convert;
pub mod play;
pub mod rec;
pub mod stream;
pub mod session;
pub mod upload;
use crate::config::Config;
use crate::notifier;
use std::collections::HashMap;
use std::env;
pub trait Command {
fn run(self, config: &Config) -> anyhow::Result<()>;
}
fn get_notifier(config: &Config) -> Box<dyn notifier::Notifier> {
if config.notifications.enabled {
notifier::get_notifier(config.notifications.command.clone())
} else {
Box::new(notifier::NullNotifier)
}
}
fn build_exec_command(command: Option<String>) -> Vec<String> {
let command = command
.or(env::var("SHELL").ok())
.unwrap_or("/bin/sh".to_owned());
vec!["/bin/sh".to_owned(), "-c".to_owned(), command]
}
fn build_exec_extra_env(vars: &[(String, String)]) -> HashMap<String, String> {
let mut env = HashMap::new();
env.insert("ASCIINEMA_REC".to_owned(), "1".to_owned());
for (k, v) in vars {
env.insert(k.clone(), v.clone());
}
env
}

View File

@@ -1,22 +1,22 @@
use super::Command;
use crate::asciicast;
use crate::cli;
use crate::config::Config;
use crate::config::{self, Config};
use crate::logger;
use crate::player::{self, KeyBindings};
use crate::tty;
use crate::util;
use anyhow::Result;
impl Command for cli::Play {
fn run(self, config: &Config) -> Result<()> {
let speed = self.speed.or(config.cmd_play_speed()).unwrap_or(1.0);
let idle_time_limit = self.idle_time_limit.or(config.cmd_play_idle_time_limit());
impl cli::Play {
pub fn run(self, config: &Config) -> Result<()> {
let cmd_config = config.cmd_play();
let speed = self.speed.or(cmd_config.speed).unwrap_or(1.0);
let idle_time_limit = self.idle_time_limit.or(cmd_config.idle_time_limit);
logger::info!("Replaying session from {}", self.filename);
let path = util::get_local_path(&self.filename)?;
let keys = get_key_bindings(config)?;
let keys = get_key_bindings(&cmd_config)?;
let ended = loop {
let recording = asciicast::open_from_path(&*path)?;
@@ -46,18 +46,18 @@ impl Command for cli::Play {
}
}
fn get_key_bindings(config: &Config) -> Result<KeyBindings> {
fn get_key_bindings(config: &config::Play) -> Result<KeyBindings> {
let mut keys = KeyBindings::default();
if let Some(key) = config.cmd_play_pause_key()? {
if let Some(key) = config.pause_key()? {
keys.pause = key;
}
if let Some(key) = config.cmd_play_step_key()? {
if let Some(key) = config.step_key()? {
keys.step = key;
}
if let Some(key) = config.cmd_play_next_marker_key()? {
if let Some(key) = config.next_marker_key()? {
keys.next_marker = key;
}

View File

@@ -1,299 +0,0 @@
use super::Command;
use crate::asciicast;
use crate::asciicast::Header;
use crate::cli;
use crate::config::Config;
use crate::encoder::{AsciicastEncoder, Encoder, RawEncoder, TextEncoder};
use crate::locale;
use crate::logger;
use crate::notifier;
use crate::pty;
use crate::recorder::Output;
use crate::recorder::{self, KeyBindings};
use crate::tty::{self, FixedSizeTty};
use anyhow::{bail, Result};
use cli::Format;
use std::collections::{HashMap, HashSet};
use std::env;
use std::fs;
use std::io::{self, Write};
use std::path::{Path, PathBuf};
use std::process;
use std::time::{SystemTime, UNIX_EPOCH};
impl Command for cli::Record {
fn run(mut self, config: &Config) -> Result<()> {
locale::check_utf8_locale()?;
self.ensure_filename(config)?;
let format = self.get_format();
let (append, overwrite) = self.get_mode()?;
let file = self.open_file(append, overwrite)?;
let time_offset = self.get_time_offset(append, format)?;
let command = self.get_command(config);
let keys = get_key_bindings(config)?;
let notifier = super::get_notifier(config);
let record_input = self.input || config.cmd_rec_input();
let exec_command = super::build_exec_command(command.as_ref().cloned());
let exec_extra_env = super::build_exec_extra_env(&[]);
let output = self.get_output(file, format, append, time_offset, config);
logger::info!("Recording session started, writing to {}", self.path);
if command.is_none() {
logger::info!("Press <ctrl+d> or type 'exit' to end");
}
let notifier = notifier::threaded(notifier);
{
let mut tty = self.get_tty()?;
let mut recorder = recorder::Recorder::new(output, record_input, keys, notifier);
pty::exec(&exec_command, &exec_extra_env, &mut tty, &mut recorder)?;
}
logger::info!("Recording session ended");
Ok(())
}
}
impl cli::Record {
fn ensure_filename(&mut self, config: &Config) -> Result<()> {
let mut path = PathBuf::from(&self.path);
if path.exists() && fs::metadata(&path)?.is_dir() {
let mut tpl = self.filename.clone().unwrap_or(config.cmd_rec_filename());
if tpl.contains("{pid}") {
let pid = process::id().to_string();
tpl = tpl.replace("{pid}", &pid);
}
if tpl.contains("{user}") {
let user = env::var("USER").ok().unwrap_or("unknown".to_owned());
tpl = tpl.replace("{user}", &user);
}
if tpl.contains("{hostname}") {
let hostname = hostname::get()
.ok()
.and_then(|h| h.into_string().ok())
.unwrap_or("unknown".to_owned());
tpl = tpl.replace("{hostname}", &hostname);
}
let filename = chrono::Local::now().format(&tpl).to_string();
path.push(Path::new(&filename));
if let Some(dir) = path.parent() {
fs::create_dir_all(dir)?;
}
self.path = path.to_string_lossy().to_string();
}
Ok(())
}
fn get_mode(&self) -> Result<(bool, bool)> {
let mut overwrite = self.overwrite;
let mut append = self.append;
let path = Path::new(&self.path);
if path.exists() {
let metadata = fs::metadata(path)?;
if metadata.len() == 0 {
overwrite = true;
append = false;
}
if !append && !overwrite {
bail!("file exists, use --overwrite or --append");
}
} else {
append = false;
}
Ok((append, overwrite))
}
fn open_file(&self, append: bool, overwrite: bool) -> Result<fs::File> {
let file = fs::OpenOptions::new()
.write(true)
.append(append)
.create(overwrite)
.create_new(!overwrite && !append)
.truncate(overwrite)
.open(&self.path)?;
Ok(file)
}
fn get_format(&self) -> Format {
self.format.unwrap_or_else(|| {
if self.raw {
Format::Raw
} else if self.path.to_lowercase().ends_with(".txt") {
Format::Txt
} else {
Format::Asciicast
}
})
}
fn get_time_offset(&self, append: bool, format: Format) -> Result<u64> {
if append && format == Format::Asciicast {
asciicast::get_duration(&self.path)
} else {
Ok(0)
}
}
fn get_tty(&self) -> Result<FixedSizeTty> {
let (cols, rows) = self.tty_size.unwrap_or((None, None));
let cols = cols.or(self.cols);
let rows = rows.or(self.rows);
if self.headless {
Ok(FixedSizeTty::new(tty::NullTty::open()?, cols, rows))
} else if let Ok(dev_tty) = tty::DevTty::open() {
Ok(FixedSizeTty::new(dev_tty, cols, rows))
} else {
logger::info!("TTY not available, recording in headless mode");
Ok(FixedSizeTty::new(tty::NullTty::open()?, cols, rows))
}
}
fn get_output(
&self,
file: fs::File,
format: Format,
append: bool,
time_offset: u64,
config: &Config,
) -> Box<dyn recorder::Output + Send> {
let metadata = self.build_asciicast_metadata(config);
match format {
Format::Asciicast => {
let writer = io::LineWriter::new(file);
let encoder = AsciicastEncoder::new(append, time_offset);
Box::new(FileOutput {
writer,
encoder,
metadata,
})
}
Format::Raw => Box::new(FileOutput {
writer: file,
encoder: RawEncoder::new(append),
metadata,
}),
Format::Txt => Box::new(FileOutput {
writer: file,
encoder: TextEncoder::new(),
metadata,
}),
}
}
fn get_command(&self, config: &Config) -> Option<String> {
self.command.as_ref().cloned().or(config.cmd_rec_command())
}
fn build_asciicast_metadata(&self, config: &Config) -> Metadata {
let idle_time_limit = self.idle_time_limit.or(config.cmd_rec_idle_time_limit());
let command = self.get_command(config);
let env = self
.env
.as_ref()
.cloned()
.or(config.cmd_rec_env())
.unwrap_or(String::from("TERM,SHELL"));
Metadata {
idle_time_limit,
command,
title: self.title.clone(),
env: Some(capture_env(&env)),
}
}
}
struct FileOutput<W: Write, E: Encoder> {
writer: W,
encoder: E,
metadata: Metadata,
}
pub struct Metadata {
pub idle_time_limit: Option<f64>,
pub command: Option<String>,
pub title: Option<String>,
pub env: Option<HashMap<String, String>>,
}
impl<W: Write, E: Encoder> Output for FileOutput<W, E> {
fn header(
&mut self,
time: SystemTime,
tty_size: tty::TtySize,
theme: Option<tty::Theme>,
) -> io::Result<()> {
let timestamp = time.duration_since(UNIX_EPOCH).unwrap().as_secs();
let header = Header {
cols: tty_size.0,
rows: tty_size.1,
timestamp: Some(timestamp),
theme,
idle_time_limit: self.metadata.idle_time_limit,
command: self.metadata.command.as_ref().cloned(),
title: self.metadata.title.as_ref().cloned(),
env: self.metadata.env.as_ref().cloned(),
};
self.writer.write_all(&self.encoder.header(&header))
}
fn event(&mut self, event: asciicast::Event) -> io::Result<()> {
self.writer.write_all(&self.encoder.event(event))
}
fn flush(&mut self) -> io::Result<()> {
self.writer.write_all(&self.encoder.flush())
}
}
fn get_key_bindings(config: &Config) -> Result<KeyBindings> {
let mut keys = KeyBindings::default();
if let Some(key) = config.cmd_rec_prefix_key()? {
keys.prefix = key;
}
if let Some(key) = config.cmd_rec_pause_key()? {
keys.pause = key;
}
if let Some(key) = config.cmd_rec_add_marker_key()? {
keys.add_marker = key;
}
Ok(keys)
}
fn capture_env(vars: &str) -> HashMap<String, String> {
let vars = vars.split(',').collect::<HashSet<_>>();
env::vars()
.filter(|(k, _v)| vars.contains(&k.as_str()))
.collect::<HashMap<_, _>>()
}

467
src/cmd/session.rs Normal file
View File

@@ -0,0 +1,467 @@
use std::collections::{HashMap, HashSet};
use std::env;
use std::fs::{self, OpenOptions};
use std::io::LineWriter;
use std::net::TcpListener;
use std::path::{Path, PathBuf};
use std::process;
use std::time::Duration;
use anyhow::{anyhow, bail, Context, Result};
use chrono::Local;
use tokio::runtime::Runtime;
use tokio::time;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use tracing::level_filters::LevelFilter;
use tracing_subscriber::EnvFilter;
use url::{form_urlencoded, Url};
use crate::api;
use crate::asciicast;
use crate::cli::{self, Format, RelayTarget};
use crate::config::{self, Config};
use crate::encoder::{AsciicastEncoder, RawEncoder, TextEncoder};
use crate::file_writer::{FileWriter, Metadata};
use crate::forwarder;
use crate::locale;
use crate::logger;
use crate::notifier::{self, Notifier, NullNotifier};
use crate::pty;
use crate::server;
use crate::session::{self, KeyBindings, Session};
use crate::stream::Stream;
use crate::tty::{DevTty, FixedSizeTty, NullTty};
use crate::util;
impl cli::Session {
pub fn run(mut self, config: &Config, cmd_config: &config::Session) -> Result<()> {
locale::check_utf8_locale()?;
let runtime = Runtime::new()?;
let command = self.get_command(&cmd_config);
let keys = get_key_bindings(&cmd_config)?;
let notifier = notifier::threaded(get_notifier(config));
let record_input = self.input || cmd_config.input;
let path = self
.output
.take()
.map(|path| self.ensure_filename(path, &cmd_config))
.transpose()?;
let file_writer = path
.as_ref()
.map(|path| self.get_file_writer(path, &cmd_config))
.transpose()?;
let mut listener = self
.serve
.take()
.map(TcpListener::bind)
.transpose()
.context("cannot start listener")?;
let mut relay = self
.relay
.take()
.map(|target| get_relay(target, config, self.env.as_deref()))
.transpose()?;
let relay_id = relay.as_ref().map(|r| r.id());
let parent_session_relay_id = get_parent_session_relay_id();
if relay_id.is_some()
&& parent_session_relay_id.is_some()
&& relay_id == parent_session_relay_id
{
if let Some(Relay { url: Some(url), .. }) = relay {
bail!("This shell is already being streamed at {url}");
} else {
bail!("This shell is already being streamed");
}
}
if listener.is_some() || relay.is_some() {
self.init_logging()?;
}
logger::info!("asciinema session started");
if let Some(path) = path {
logger::info!("Recording to {}", path);
}
if let Some(listener) = &listener {
logger::info!(
"Live streaming at http://{}",
listener.local_addr().unwrap()
);
}
if let Some(Relay { url: Some(url), .. }) = &relay {
logger::info!("Live streaming at {}", url);
}
if command.is_none() {
logger::info!("Press <ctrl+d> or type 'exit' to end");
}
let stream = Stream::new();
let shutdown_token = CancellationToken::new();
let server = listener.take().map(|listener| {
runtime.spawn(server::serve(
listener,
stream.subscriber(),
shutdown_token.clone(),
))
});
let forwarder = relay.take().map(|relay| {
runtime.spawn(forwarder::forward(
relay.ws_producer_url,
stream.subscriber(),
notifier.clone(),
shutdown_token.clone(),
))
});
let mut outputs: Vec<Box<dyn session::Output + Send>> = Vec::new();
if server.is_some() || forwarder.is_some() {
let output = stream.start(&runtime);
outputs.push(Box::new(output));
}
if let Some(output) = file_writer {
outputs.push(Box::new(output));
}
let exec_command = build_exec_command(command.as_ref().cloned());
let exec_extra_env = build_exec_extra_env(relay_id.as_ref());
{
let mut session = Session::new(outputs, record_input, keys, notifier);
let mut tty = self.get_tty()?;
pty::exec(&exec_command, &exec_extra_env, &mut tty, &mut session)?;
}
runtime.block_on(async {
debug!("session shutting down...");
shutdown_token.cancel();
if let Some(task) = server {
debug!("waiting for server shutdown...");
let _ = time::timeout(Duration::from_secs(5), task).await;
}
if let Some(task) = forwarder {
debug!("waiting for forwarder shutdown...");
let _ = time::timeout(Duration::from_secs(5), task).await;
}
debug!("shutdown complete");
});
logger::info!("asciinema session ended");
Ok(())
}
fn ensure_filename(&mut self, path_: String, config: &config::Session) -> Result<String> {
let mut path = PathBuf::from(&path_);
if path.exists() && fs::metadata(&path)?.is_dir() {
let mut tpl = self.filename.clone().unwrap_or(config.filename.clone());
if tpl.contains("{pid}") {
let pid = process::id().to_string();
tpl = tpl.replace("{pid}", &pid);
}
if tpl.contains("{user}") {
let user = env::var("USER").ok().unwrap_or("unknown".to_owned());
tpl = tpl.replace("{user}", &user);
}
if tpl.contains("{hostname}") {
let hostname = hostname::get()
.ok()
.and_then(|h| h.into_string().ok())
.unwrap_or("unknown".to_owned());
tpl = tpl.replace("{hostname}", &hostname);
}
let filename = Local::now().format(&tpl).to_string();
path.push(Path::new(&filename));
if let Some(dir) = path.parent() {
fs::create_dir_all(dir)?;
}
Ok(path.to_string_lossy().to_string())
} else {
Ok(path_)
}
}
fn get_file_writer(&self, path: &str, config: &config::Session) -> Result<FileWriter> {
let format = self.format.unwrap_or_else(|| {
if path.to_lowercase().ends_with(".txt") {
Format::Txt
} else {
Format::Asciicast
}
});
let mut overwrite = self.overwrite;
let mut append = self.append;
let path = Path::new(path);
if path.exists() {
let metadata = fs::metadata(path)?;
if metadata.len() == 0 {
overwrite = true;
append = false;
}
if !append && !overwrite {
bail!("file exists, use --overwrite or --append");
}
} else {
append = false;
}
let file = OpenOptions::new()
.write(true)
.append(append)
.create(overwrite)
.create_new(!overwrite && !append)
.truncate(overwrite)
.open(path)?;
let time_offset = if append && format == Format::Asciicast {
asciicast::get_duration(path)?
} else {
0
};
let metadata = self.build_asciicast_metadata(config);
let writer = match format {
Format::Asciicast => {
let writer = Box::new(LineWriter::new(file));
let encoder = Box::new(AsciicastEncoder::new(append, time_offset));
FileWriter {
writer,
encoder,
metadata,
}
}
Format::Raw => {
let writer = Box::new(file);
let encoder = Box::new(RawEncoder::new(append));
FileWriter {
writer,
encoder,
metadata,
}
}
Format::Txt => {
let writer = Box::new(file);
let encoder = Box::new(TextEncoder::new());
FileWriter {
writer,
encoder,
metadata,
}
}
};
Ok(writer)
}
fn get_command(&self, config: &config::Session) -> Option<String> {
self.command.as_ref().cloned().or(config.command.clone())
}
fn build_asciicast_metadata(&self, config: &config::Session) -> Metadata {
let idle_time_limit = self.idle_time_limit.or(config.idle_time_limit);
let command = self.get_command(config);
let env = self
.env
.as_ref()
.cloned()
.or(config.env.clone())
.unwrap_or(String::from("TERM,SHELL"));
Metadata {
idle_time_limit,
command,
title: self.title.clone(),
env: Some(capture_env(&env)),
}
}
fn get_tty(&self) -> Result<FixedSizeTty> {
let (cols, rows) = self.tty_size.unwrap_or((None, None));
if self.headless {
Ok(FixedSizeTty::new(NullTty::open()?, cols, rows))
} else if let Ok(dev_tty) = DevTty::open() {
Ok(FixedSizeTty::new(dev_tty, cols, rows))
} else {
logger::info!("TTY not available, recording in headless mode");
Ok(FixedSizeTty::new(NullTty::open()?, cols, rows))
}
}
fn init_logging(&self) -> Result<()> {
let log_file = self.log_file.as_ref().cloned();
if let Some(path) = &log_file {
let file = OpenOptions::new()
.create(true)
.append(true)
.open(path)
.map_err(|e| anyhow!("cannot open log file {}: {}", path.to_string_lossy(), e))?;
let filter = EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env_lossy();
tracing_subscriber::fmt()
.with_ansi(false)
.with_env_filter(filter)
.with_writer(file)
.init();
}
Ok(())
}
}
#[derive(Debug)]
struct Relay {
ws_producer_url: Url,
url: Option<Url>,
}
impl Relay {
fn id(&self) -> String {
util::sha2_digest(self.ws_producer_url.as_ref())
}
}
fn get_relay(target: RelayTarget, config: &Config, env_vars: Option<&str>) -> Result<Relay> {
match target {
RelayTarget::StreamId(id) => {
let stream = api::create_user_stream(id, config)?;
let ws_producer_url = build_producer_url(&stream.ws_producer_url, env_vars)?;
Ok(Relay {
ws_producer_url,
url: Some(stream.url.parse()?),
})
}
RelayTarget::WsProducerUrl(url) => Ok(Relay {
ws_producer_url: url,
url: None,
}),
}
}
fn build_producer_url(url: &str, env_vars: Option<&str>) -> Result<Url> {
let mut url: Url = url.parse()?;
let term = env::var("TERM").ok().unwrap_or_default();
let shell = env::var("SHELL").ok().unwrap_or_default();
let mut params = vec![
("term[type]".to_string(), term.clone()),
("shell".to_string(), shell.clone()),
];
if let Some(env_vars) = env_vars {
for (k, v) in capture_env(env_vars) {
params.push((format!("env[{k}]"), v));
}
}
let params = params.into_iter().filter(|(_k, v)| !v.is_empty());
let query = form_urlencoded::Serializer::new(String::new())
.extend_pairs(params)
.finish();
url.set_query(Some(&query));
Ok(url)
}
fn get_key_bindings(config: &config::Session) -> Result<KeyBindings> {
let mut keys = KeyBindings::default();
if let Some(key) = config.prefix_key()? {
keys.prefix = key;
}
if let Some(key) = config.pause_key()? {
keys.pause = key;
}
if let Some(key) = config.add_marker_key()? {
keys.add_marker = key;
}
Ok(keys)
}
fn capture_env(vars: &str) -> HashMap<String, String> {
let vars = vars.split(',').collect::<HashSet<_>>();
env::vars()
.filter(|(k, _v)| vars.contains(&k.as_str()))
.collect::<HashMap<_, _>>()
}
fn get_notifier(config: &Config) -> Box<dyn Notifier> {
if config.notifications.enabled {
notifier::get_notifier(config.notifications.command.clone())
} else {
Box::new(NullNotifier)
}
}
fn build_exec_command(command: Option<String>) -> Vec<String> {
let command = command
.or(env::var("SHELL").ok())
.unwrap_or("/bin/sh".to_owned());
vec!["/bin/sh".to_owned(), "-c".to_owned(), command]
}
fn build_exec_extra_env(relay_id: Option<&String>) -> HashMap<String, String> {
let mut env = HashMap::new();
env.insert("ASCIINEMA_REC".to_owned(), "1".to_owned());
if let Some(id) = relay_id {
env.insert("ASCIINEMA_RELAY_ID".to_owned(), id.clone());
}
env
}
fn get_parent_session_relay_id() -> Option<String> {
env::var("ASCIINEMA_RELAY_ID").ok()
}

View File

@@ -1,243 +0,0 @@
use super::Command;
use crate::api;
use crate::cli;
use crate::config::Config;
use crate::locale;
use crate::logger;
use crate::notifier;
use crate::pty;
use crate::streamer::{self, KeyBindings};
use crate::tty::{self, FixedSizeTty};
use crate::util;
use anyhow::bail;
use anyhow::{anyhow, Context, Result};
use cli::{RelayTarget, DEFAULT_LISTEN_ADDR};
use std::collections::{HashMap, HashSet};
use std::env;
use std::fmt::Debug;
use std::fs;
use std::net::TcpListener;
use tracing::level_filters::LevelFilter;
use tracing_subscriber::EnvFilter;
use url::{form_urlencoded, Url};
#[derive(Debug)]
struct Relay {
ws_producer_url: Url,
url: Option<Url>,
}
impl Command for cli::Stream {
fn run(mut self, config: &Config) -> Result<()> {
locale::check_utf8_locale()?;
if self.serve.is_none() && self.relay.is_none() {
self.serve = Some(DEFAULT_LISTEN_ADDR.parse().unwrap());
}
let command = self.get_command(config);
let keys = get_key_bindings(config)?;
let notifier = super::get_notifier(config);
let record_input = self.input || config.cmd_stream_input();
let exec_command = super::build_exec_command(command.as_ref().cloned());
let listener = self.get_listener()?;
let relay = self.get_relay(config, self.env.clone())?;
let relay_id = relay.as_ref().map(|r| r.id());
let exec_extra_env = build_exec_extra_env(relay_id.as_ref());
if let (Some(id), Some(parent_id)) = (relay_id, parent_session_relay_id()) {
if id == parent_id {
if let Some(Relay { url: Some(url), .. }) = relay {
bail!("This shell is already being streamed at {url}");
} else {
bail!("This shell is already being streamed");
}
}
}
logger::info!("Streaming session started");
if let Some(listener) = &listener {
logger::info!(
"Live stream available at http://{}",
listener.local_addr().unwrap()
);
}
if let Some(Relay { url: Some(url), .. }) = &relay {
logger::info!("Live stream available at {}", url);
}
if command.is_none() {
logger::info!("Press <ctrl+d> or type 'exit' to end");
}
let notifier = notifier::threaded(notifier);
{
let mut tty = self.get_tty()?;
let mut streamer = streamer::Streamer::new(
listener,
relay.map(|e| e.ws_producer_url),
record_input,
keys,
notifier,
);
self.init_logging()?;
pty::exec(&exec_command, &exec_extra_env, &mut tty, &mut streamer)?;
}
logger::info!("Streaming session ended");
Ok(())
}
}
impl cli::Stream {
fn get_command(&self, config: &Config) -> Option<String> {
self.command
.as_ref()
.cloned()
.or(config.cmd_stream_command())
}
fn get_relay(&mut self, config: &Config, env_vars: Option<String>) -> Result<Option<Relay>> {
match self.relay.take() {
Some(RelayTarget::StreamId(id)) => {
let stream = api::create_user_stream(id, config)?;
let ws_producer_url = self.build_producer_url(&stream.ws_producer_url, env_vars)?;
Ok(Some(Relay {
ws_producer_url,
url: Some(stream.url.parse()?),
}))
}
Some(RelayTarget::WsProducerUrl(url)) => Ok(Some(Relay {
ws_producer_url: url,
url: None,
})),
None => Ok(None),
}
}
fn build_producer_url(&self, url: &str, env_vars: Option<String>) -> Result<Url> {
let mut url: Url = url.parse()?;
let term = env::var("TERM").ok().unwrap_or_default();
let shell = env::var("SHELL").ok().unwrap_or_default();
let mut params = vec![
("term[type]".to_string(), term.clone()),
("shell".to_string(), shell.clone()),
];
if let Some(env_vars) = env_vars {
for (k, v) in capture_env(&env_vars) {
params.push((format!("env[{k}]"), v));
}
}
let params = params.into_iter().filter(|(_k, v)| v != "");
let query = form_urlencoded::Serializer::new(String::new())
.extend_pairs(params)
.finish();
url.set_query(Some(&query));
Ok(url)
}
fn get_listener(&self) -> Result<Option<TcpListener>> {
if let Some(addr) = self.serve {
return Ok(Some(
TcpListener::bind(addr).context("cannot start listener")?,
));
}
Ok(None)
}
fn get_tty(&self) -> Result<FixedSizeTty> {
let (cols, rows) = self.tty_size.unwrap_or((None, None));
if self.headless {
Ok(FixedSizeTty::new(tty::NullTty::open()?, cols, rows))
} else if let Ok(dev_tty) = tty::DevTty::open() {
Ok(FixedSizeTty::new(dev_tty, cols, rows))
} else {
logger::info!("TTY not available, streaming in headless mode");
Ok(FixedSizeTty::new(tty::NullTty::open()?, cols, rows))
}
}
fn init_logging(&self) -> Result<()> {
let log_file = self.log_file.as_ref().cloned();
if let Some(path) = &log_file {
let file = fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)
.map_err(|e| anyhow!("cannot open log file {}: {}", path.to_string_lossy(), e))?;
let filter = EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env_lossy();
tracing_subscriber::fmt()
.with_ansi(false)
.with_env_filter(filter)
.with_writer(file)
.init();
}
Ok(())
}
}
impl Relay {
fn id(&self) -> String {
util::sha2_digest(self.ws_producer_url.as_ref())
}
}
fn get_key_bindings(config: &Config) -> Result<KeyBindings> {
let mut keys = KeyBindings::default();
if let Some(key) = config.cmd_stream_prefix_key()? {
keys.prefix = key;
}
if let Some(key) = config.cmd_stream_pause_key()? {
keys.pause = key;
}
if let Some(key) = config.cmd_stream_add_marker_key()? {
keys.add_marker = key;
}
Ok(keys)
}
fn build_exec_extra_env(relay_id: Option<&String>) -> HashMap<String, String> {
match relay_id {
Some(id) => super::build_exec_extra_env(&[("ASCIINEMA_RELAY_ID".to_string(), id.clone())]),
None => super::build_exec_extra_env(&[]),
}
}
fn parent_session_relay_id() -> Option<String> {
env::var("ASCIINEMA_RELAY_ID").ok()
}
fn capture_env(vars: &str) -> HashMap<String, String> {
let vars = vars.split(',').collect::<HashSet<_>>();
env::vars()
.filter(|(k, _v)| vars.contains(&k.as_str()))
.collect::<HashMap<_, _>>()
}

View File

@@ -1,12 +1,11 @@
use super::Command;
use crate::api;
use crate::asciicast;
use crate::cli;
use crate::config::Config;
use anyhow::Result;
impl Command for cli::Upload {
fn run(self, config: &Config) -> Result<()> {
impl cli::Upload {
pub fn run(self, config: &Config) -> Result<()> {
let _ = asciicast::open_from_path(&self.filename)?;
let response = api::upload_asciicast(&self.filename, config)?;
println!("{}", response.message.unwrap_or(response.url));

View File

@@ -1,13 +1,15 @@
use anyhow::{anyhow, bail, Result};
use reqwest::Url;
use serde::Deserialize;
use std::env;
use std::fs;
use std::io::ErrorKind;
use std::path::{Path, PathBuf};
use anyhow::{anyhow, bail, Result};
use reqwest::Url;
use serde::Deserialize;
use uuid::Uuid;
const DEFAULT_SERVER_URL: &str = "https://asciinema.org";
const DEFAULT_REC_FILENAME: &str = "%Y-%m-%d-%H-%M-%S-{pid}.cast";
const INSTALL_ID_FILENAME: &str = "install-id";
pub type Key = Option<Vec<u8>>;
@@ -32,9 +34,23 @@ pub struct Cmd {
rec: Rec,
play: Play,
stream: Stream,
session: Session,
}
#[derive(Debug, Deserialize, Default)]
#[derive(Debug, Clone, Deserialize, Default)]
#[allow(unused)]
pub struct Session {
pub command: Option<String>,
pub filename: String,
pub input: bool,
pub env: Option<String>,
pub idle_time_limit: Option<f64>,
pub prefix_key: Option<String>,
pub pause_key: Option<String>,
pub add_marker_key: Option<String>,
}
#[derive(Debug, Clone, Deserialize, Default)]
#[allow(unused)]
pub struct Rec {
pub command: Option<String>,
@@ -47,7 +63,7 @@ pub struct Rec {
pub add_marker_key: Option<String>,
}
#[derive(Debug, Deserialize)]
#[derive(Debug, Clone, Deserialize)]
#[allow(unused)]
pub struct Play {
pub speed: Option<f64>,
@@ -57,7 +73,7 @@ pub struct Play {
pub next_marker_key: Option<String>,
}
#[derive(Debug, Deserialize, Default)]
#[derive(Debug, Clone, Deserialize, Default)]
#[allow(unused)]
pub struct Stream {
pub command: Option<String>,
@@ -80,9 +96,11 @@ impl Config {
let mut config = config::Config::builder()
.set_default("server.url", None::<Option<String>>)?
.set_default("cmd.rec.input", false)?
.set_default("cmd.rec.filename", "%Y-%m-%d-%H-%M-%S-{pid}.cast")?
.set_default("cmd.rec.filename", DEFAULT_REC_FILENAME)?
.set_default("cmd.play.speed", None::<Option<f64>>)?
.set_default("cmd.stream.input", false)?
.set_default("cmd.session.input", false)?
.set_default("cmd.session.filename", DEFAULT_REC_FILENAME)?
.set_default("notifications.enabled", true)?
.add_source(config::File::with_name("/etc/asciinema/config.toml").required(false))
.add_source(
@@ -133,101 +151,66 @@ impl Config {
}
}
pub fn cmd_rec_command(&self) -> Option<String> {
self.cmd.rec.command.as_ref().cloned()
pub fn cmd_rec(&self) -> Session {
Session {
command: self.cmd.rec.command.clone(),
filename: self.cmd.rec.filename.clone(),
input: self.cmd.rec.input,
env: self.cmd.rec.env.clone(),
idle_time_limit: self.cmd.rec.idle_time_limit,
prefix_key: self.cmd.rec.prefix_key.clone(),
pause_key: self.cmd.rec.pause_key.clone(),
add_marker_key: self.cmd.rec.add_marker_key.clone(),
}
}
pub fn cmd_rec_filename(&self) -> String {
self.cmd.rec.filename.clone()
pub fn cmd_stream(&self) -> Session {
Session {
command: self.cmd.stream.command.clone(),
filename: "".to_string(),
input: self.cmd.stream.input,
env: self.cmd.stream.env.clone(),
idle_time_limit: None,
prefix_key: self.cmd.stream.prefix_key.clone(),
pause_key: self.cmd.stream.pause_key.clone(),
add_marker_key: self.cmd.stream.add_marker_key.clone(),
}
}
pub fn cmd_rec_input(&self) -> bool {
self.cmd.rec.input
pub fn cmd_session(&self) -> Session {
self.cmd.session.clone()
}
pub fn cmd_rec_idle_time_limit(&self) -> Option<f64> {
self.cmd.rec.idle_time_limit
pub fn cmd_play(&self) -> Play {
self.cmd.play.clone()
}
}
impl Session {
pub fn prefix_key(&self) -> Result<Option<Key>> {
self.prefix_key.as_ref().map(parse_key).transpose()
}
pub fn cmd_rec_env(&self) -> Option<String> {
self.cmd.rec.env.as_ref().cloned()
pub fn pause_key(&self) -> Result<Option<Key>> {
self.pause_key.as_ref().map(parse_key).transpose()
}
pub fn cmd_rec_prefix_key(&self) -> Result<Option<Key>> {
self.cmd.rec.prefix_key.as_ref().map(parse_key).transpose()
pub fn add_marker_key(&self) -> Result<Option<Key>> {
self.add_marker_key.as_ref().map(parse_key).transpose()
}
}
impl Play {
pub fn pause_key(&self) -> Result<Option<Key>> {
self.pause_key.as_ref().map(parse_key).transpose()
}
pub fn cmd_rec_pause_key(&self) -> Result<Option<Key>> {
self.cmd.rec.pause_key.as_ref().map(parse_key).transpose()
pub fn step_key(&self) -> Result<Option<Key>> {
self.step_key.as_ref().map(parse_key).transpose()
}
pub fn cmd_rec_add_marker_key(&self) -> Result<Option<Key>> {
self.cmd
.rec
.add_marker_key
.as_ref()
.map(parse_key)
.transpose()
}
pub fn cmd_play_speed(&self) -> Option<f64> {
self.cmd.play.speed
}
pub fn cmd_play_idle_time_limit(&self) -> Option<f64> {
self.cmd.play.idle_time_limit
}
pub fn cmd_play_pause_key(&self) -> Result<Option<Key>> {
self.cmd.play.pause_key.as_ref().map(parse_key).transpose()
}
pub fn cmd_play_step_key(&self) -> Result<Option<Key>> {
self.cmd.play.step_key.as_ref().map(parse_key).transpose()
}
pub fn cmd_play_next_marker_key(&self) -> Result<Option<Key>> {
self.cmd
.play
.next_marker_key
.as_ref()
.map(parse_key)
.transpose()
}
pub fn cmd_stream_command(&self) -> Option<String> {
self.cmd.stream.command.as_ref().cloned()
}
pub fn cmd_stream_input(&self) -> bool {
self.cmd.stream.input
}
pub fn cmd_stream_prefix_key(&self) -> Result<Option<Key>> {
self.cmd
.stream
.prefix_key
.as_ref()
.map(parse_key)
.transpose()
}
pub fn cmd_stream_pause_key(&self) -> Result<Option<Key>> {
self.cmd
.stream
.pause_key
.as_ref()
.map(parse_key)
.transpose()
}
pub fn cmd_stream_add_marker_key(&self) -> Result<Option<Key>> {
self.cmd
.stream
.add_marker_key
.as_ref()
.map(parse_key)
.transpose()
pub fn next_marker_key(&self) -> Result<Option<Key>> {
self.next_marker_key.as_ref().map(parse_key).transpose()
}
}

66
src/file_writer.rs Normal file
View File

@@ -0,0 +1,66 @@
use std::collections::HashMap;
use std::io::{self, Write};
use std::time::{SystemTime, UNIX_EPOCH};
use crate::asciicast;
use crate::encoder;
use crate::session;
use crate::tty;
pub struct FileWriter {
pub writer: Box<dyn Write + Send>,
pub encoder: Box<dyn encoder::Encoder + Send>,
pub metadata: Metadata,
}
pub struct Metadata {
pub idle_time_limit: Option<f64>,
pub command: Option<String>,
pub title: Option<String>,
pub env: Option<HashMap<String, String>>,
}
impl session::Output for FileWriter {
fn start(
&mut self,
time: SystemTime,
tty_size: tty::TtySize,
theme: Option<tty::Theme>,
) -> io::Result<()> {
let timestamp = time.duration_since(UNIX_EPOCH).unwrap().as_secs();
let header = asciicast::Header {
cols: tty_size.0,
rows: tty_size.1,
timestamp: Some(timestamp),
theme,
idle_time_limit: self.metadata.idle_time_limit,
command: self.metadata.command.as_ref().cloned(),
title: self.metadata.title.as_ref().cloned(),
env: self.metadata.env.as_ref().cloned(),
};
self.writer.write_all(&self.encoder.header(&header))
}
fn event(&mut self, event: session::Event) -> io::Result<()> {
self.writer.write_all(&self.encoder.event(event.into()))
}
fn flush(&mut self) -> io::Result<()> {
self.writer.write_all(&self.encoder.flush())
}
}
impl From<session::Event> for asciicast::Event {
fn from(event: session::Event) -> Self {
match event {
session::Event::Output(time, text) => asciicast::Event::output(time, text),
session::Event::Input(time, text) => asciicast::Event::input(time, text),
session::Event::Resize(time, tty_size) => {
asciicast::Event::resize(time, tty_size.into())
}
session::Event::Marker(time, label) => asciicast::Event::marker(time, label),
}
}
}

View File

@@ -1,16 +1,11 @@
use crate::notifier::Notifier;
use super::alis;
use super::session;
use crate::api;
use anyhow::anyhow;
use anyhow::bail;
use axum::http::Uri;
use core::future::{self, Future};
use futures_util::{stream, SinkExt, Stream, StreamExt};
use std::borrow::Cow;
use std::pin::Pin;
use std::time::Duration;
use anyhow::{anyhow, bail};
use axum::http::Uri;
use futures_util::{SinkExt, Stream, StreamExt};
use tokio::net::TcpStream;
use tokio::time::{interval, sleep, timeout};
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
@@ -21,6 +16,11 @@ use tokio_tungstenite::tungstenite::{self, ClientRequestBuilder, Message};
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use tracing::{debug, error, info};
use crate::alis;
use crate::api;
use crate::notifier::Notifier;
use crate::stream::Subscriber;
const PING_INTERVAL: u64 = 15;
const PING_TIMEOUT: u64 = 10;
const SEND_TIMEOUT: u64 = 10;
@@ -28,7 +28,7 @@ const MAX_RECONNECT_DELAY: u64 = 5000;
pub async fn forward<N: Notifier>(
url: url::Url,
clients_tx: tokio::sync::mpsc::Sender<session::Client>,
subscriber: Subscriber,
mut notifier: N,
shutdown_token: tokio_util::sync::CancellationToken,
) {
@@ -37,7 +37,7 @@ pub async fn forward<N: Notifier>(
let mut connection_count: u64 = 0;
loop {
let conn = connect_and_forward(&url, &clients_tx);
let conn = connect_and_forward(&url, &subscriber);
tokio::pin!(conn);
let result = tokio::select! {
@@ -122,10 +122,7 @@ pub async fn forward<N: Notifier>(
}
}
async fn connect_and_forward(
url: &url::Url,
clients_tx: &tokio::sync::mpsc::Sender<session::Client>,
) -> anyhow::Result<bool> {
async fn connect_and_forward(url: &url::Url, subscriber: &Subscriber) -> anyhow::Result<bool> {
let uri: Uri = url.to_string().parse()?;
let builder = ClientRequestBuilder::new(uri)
@@ -134,18 +131,22 @@ async fn connect_and_forward(
let (ws, _) = tokio_tungstenite::connect_async_with_config(builder, None, true).await?;
info!("connected to the endpoint");
let events = event_stream(clients_tx).await?;
let events = event_stream(subscriber).await?;
handle_socket(ws, events).await
}
async fn event_stream(
clients_tx: &tokio::sync::mpsc::Sender<session::Client>,
subscriber: &Subscriber,
) -> anyhow::Result<impl Stream<Item = anyhow::Result<Message>>> {
let stream = alis::stream(clients_tx)
let stream = subscriber.subscribe().await?;
let stream = alis::stream(stream)
.await?
.map(ws_result)
.chain(stream::once(future::ready(Ok(close_message()))));
.chain(futures_util::stream::once(future::ready(Ok(
close_message(),
))));
Ok(stream)
}

View File

@@ -1,9 +1,12 @@
mod alis;
mod api;
mod asciicast;
mod cli;
mod cmd;
mod config;
mod encoder;
mod file_writer;
mod forwarder;
mod io;
mod leb128;
mod locale;
@@ -11,14 +14,16 @@ mod logger;
mod notifier;
mod player;
mod pty;
mod recorder;
mod streamer;
mod server;
mod session;
mod stream;
mod tty;
mod util;
use crate::cli::{Cli, Commands};
use crate::config::Config;
use clap::Parser;
use cmd::Command;
use self::cli::{Cli, Commands, Session};
use self::config::Config;
fn main() -> anyhow::Result<()> {
let cli = Cli::parse();
@@ -31,12 +36,55 @@ fn main() -> anyhow::Result<()> {
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
match cli.command {
Commands::Rec(record) => record.run(&config),
Commands::Play(play) => play.run(&config),
Commands::Stream(stream) => stream.run(&config),
Commands::Cat(cat) => cat.run(&config),
Commands::Convert(convert) => convert.run(&config),
Commands::Upload(upload) => upload.run(&config),
Commands::Auth(auth) => auth.run(&config),
Commands::Rec(cmd) => {
let cmd = Session {
output: Some(cmd.path),
input: cmd.input,
append: cmd.append,
format: cmd.format,
overwrite: cmd.overwrite,
command: cmd.command,
filename: cmd.filename,
env: cmd.env,
title: cmd.title,
idle_time_limit: cmd.idle_time_limit,
headless: cmd.headless,
tty_size: cmd.tty_size,
serve: None,
relay: None,
log_file: None,
};
cmd.run(&config, &config.cmd_rec())
}
Commands::Stream(stream) => {
let cmd = Session {
output: None,
input: stream.input,
append: false,
format: None,
overwrite: false,
command: stream.command,
filename: None,
env: stream.env,
title: None,
idle_time_limit: None,
headless: stream.headless,
tty_size: stream.tty_size,
serve: stream.serve,
relay: stream.relay,
log_file: stream.log_file,
};
cmd.run(&config, &config.cmd_stream())
}
Commands::Session(cmd) => cmd.run(&config, &config.cmd_session()),
Commands::Play(cmd) => cmd.run(&config),
Commands::Cat(cmd) => cmd.run(&config),
Commands::Convert(cmd) => cmd.run(&config),
Commands::Upload(cmd) => cmd.run(&config),
Commands::Auth(cmd) => cmd.run(&config),
}
}

View File

@@ -1,44 +1,43 @@
use super::alis;
use super::session;
use axum::{
extract::connect_info::ConnectInfo,
extract::ws,
extract::State,
http::{header, StatusCode, Uri},
response::IntoResponse,
routing::get,
Router,
};
use futures_util::sink;
use futures_util::{stream, StreamExt};
use rust_embed::RustEmbed;
use std::borrow::Cow;
use std::future;
use std::io;
use std::net::SocketAddr;
use tokio::sync::mpsc;
use axum::extract::connect_info::ConnectInfo;
use axum::extract::ws::{self, CloseCode, CloseFrame, Message, WebSocket, WebSocketUpgrade};
use axum::extract::State;
use axum::http::{header, StatusCode, Uri};
use axum::response::IntoResponse;
use axum::routing::get;
use axum::Router;
use futures_util::{sink, StreamExt};
use rust_embed::RustEmbed;
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
use tower_http::trace;
use tokio_util::sync::CancellationToken;
use tower_http::trace::{DefaultMakeSpan, TraceLayer};
use tracing::info;
use crate::alis;
use crate::stream::Subscriber;
#[derive(RustEmbed)]
#[folder = "assets/"]
struct Assets;
pub async fn serve(
listener: std::net::TcpListener,
clients_tx: mpsc::Sender<session::Client>,
shutdown_token: tokio_util::sync::CancellationToken,
subscriber: Subscriber,
shutdown_token: CancellationToken,
) -> io::Result<()> {
listener.set_nonblocking(true)?;
let listener = tokio::net::TcpListener::from_std(listener)?;
let trace = trace::TraceLayer::new_for_http()
.make_span_with(trace::DefaultMakeSpan::default().include_headers(true));
let trace =
TraceLayer::new_for_http().make_span_with(DefaultMakeSpan::default().include_headers(true));
let app = Router::new()
.route("/ws", get(ws_handler))
.with_state(clients_tx)
.with_state(subscriber)
.fallback(static_handler)
.layer(trace);
@@ -79,16 +78,16 @@ async fn static_handler(uri: Uri) -> impl IntoResponse {
}
async fn ws_handler(
ws: ws::WebSocketUpgrade,
ws: WebSocketUpgrade,
ConnectInfo(addr): ConnectInfo<SocketAddr>,
State(clients_tx): State<mpsc::Sender<session::Client>>,
State(subscriber): State<Subscriber>,
) -> impl IntoResponse {
ws.protocols(["v1.alis"])
.on_upgrade(move |socket| async move {
info!("websocket client {addr} connected");
if socket.protocol().is_some() {
let _ = handle_socket(socket, clients_tx).await;
let _ = handle_socket(socket, subscriber).await;
info!("websocket client {addr} disconnected");
} else {
info!("subprotocol negotiation failed, closing connection");
@@ -97,18 +96,16 @@ async fn ws_handler(
})
}
async fn handle_socket(
socket: ws::WebSocket,
clients_tx: mpsc::Sender<session::Client>,
) -> anyhow::Result<()> {
async fn handle_socket(socket: WebSocket, subscriber: Subscriber) -> anyhow::Result<()> {
let (sink, stream) = socket.split();
let drainer = tokio::spawn(stream.map(Ok).forward(sink::drain()));
let close_msg = close_message(ws::close_code::NORMAL, "Stream ended");
let stream = subscriber.subscribe().await?;
let result = alis::stream(&clients_tx)
let result = alis::stream(stream)
.await?
.map(ws_result)
.chain(stream::once(future::ready(Ok(close_msg))))
.chain(futures_util::stream::once(future::ready(Ok(close_msg))))
.forward(sink)
.await;
@@ -118,21 +115,21 @@ async fn handle_socket(
Ok(())
}
async fn close_socket(mut socket: ws::WebSocket) {
async fn close_socket(mut socket: WebSocket) {
let msg = close_message(ws::close_code::PROTOCOL, "Subprotocol negotiation failed");
let _ = socket.send(msg).await;
}
fn close_message(code: ws::CloseCode, reason: &'static str) -> ws::Message {
ws::Message::Close(Some(ws::CloseFrame {
fn close_message(code: CloseCode, reason: &'static str) -> Message {
Message::Close(Some(CloseFrame {
code,
reason: Cow::from(reason),
}))
}
fn ws_result(m: Result<Vec<u8>, BroadcastStreamRecvError>) -> Result<ws::Message, axum::Error> {
fn ws_result(m: Result<Vec<u8>, BroadcastStreamRecvError>) -> Result<Message, axum::Error> {
match m {
Ok(bytes) => Ok(ws::Message::Binary(bytes)),
Ok(bytes) => Ok(Message::Binary(bytes)),
Err(e) => Err(axum::Error::new(e)),
}
}

View File

@@ -1,29 +1,32 @@
use crate::asciicast::Event;
use std::io;
use std::sync::mpsc::{self, Receiver};
use std::thread;
use std::time::{Duration, SystemTime};
use crate::config::Key;
use crate::notifier::Notifier;
use crate::pty;
use crate::tty;
use crate::util;
use std::io;
use std::sync::mpsc;
use std::thread;
use std::time::{Duration, SystemTime};
use crate::util::{JoinHandle, Utf8Decoder};
pub struct Recorder<N> {
output: Option<Box<dyn Output + Send>>,
pub struct Session<N> {
outputs: Vec<Box<dyn Output + Send>>,
input_decoder: Utf8Decoder,
output_decoder: Utf8Decoder,
tty_size: tty::TtySize,
record_input: bool,
keys: KeyBindings,
notifier: N,
sender: mpsc::Sender<Message>,
receiver: Option<mpsc::Receiver<Message>>,
handle: Option<util::JoinHandle>,
sender: mpsc::Sender<Event>,
receiver: Option<Receiver<Event>>,
handle: Option<JoinHandle>,
time_offset: u64,
pause_time: Option<u64>,
prefix_mode: bool,
}
pub trait Output {
fn header(
fn start(
&mut self,
time: SystemTime,
tty_size: tty::TtySize,
@@ -33,24 +36,28 @@ pub trait Output {
fn flush(&mut self) -> io::Result<()>;
}
enum Message {
Output(u64, Vec<u8>),
Input(u64, Vec<u8>),
#[derive(Clone)]
pub enum Event {
Output(u64, String),
Input(u64, String),
Resize(u64, tty::TtySize),
Marker(u64),
Marker(u64, String),
}
impl<N: Notifier> Recorder<N> {
impl<N: Notifier> Session<N> {
pub fn new(
output: Box<dyn Output + Send>,
outputs: Vec<Box<dyn Output + Send>>,
record_input: bool,
keys: KeyBindings,
notifier: N,
) -> Self {
let (sender, receiver) = mpsc::channel();
Recorder {
output: Some(output),
Session {
outputs,
input_decoder: Utf8Decoder::new(),
output_decoder: Utf8Decoder::new(),
tty_size: tty::TtySize::default(),
record_input,
keys,
notifier,
@@ -74,63 +81,39 @@ impl<N: Notifier> Recorder<N> {
fn notify<S: ToString>(&mut self, text: S) {
self.notifier
.notify(text.to_string())
.expect("notification send should succeed");
.expect("notification should succeed");
}
}
impl<N: Notifier> pty::Handler for Recorder<N> {
fn start(&mut self, tty_size: tty::TtySize, theme: Option<tty::Theme>) {
let mut output = self.output.take().unwrap();
let _ = output.header(SystemTime::now(), tty_size, theme);
impl<N: Notifier> pty::Handler for Session<N> {
fn start(&mut self, tty_size: tty::TtySize, tty_theme: Option<tty::Theme>) {
let mut outputs = std::mem::take(&mut self.outputs);
let time = SystemTime::now();
let receiver = self.receiver.take().unwrap();
let handle = thread::spawn(move || {
use Message::*;
let mut last_tty_size = tty_size;
let mut input_decoder = util::Utf8Decoder::new();
let mut output_decoder = util::Utf8Decoder::new();
outputs.retain_mut(|output| output.start(time, tty_size, tty_theme.clone()).is_ok());
for msg in receiver {
match msg {
Output(time, data) => {
let text = output_decoder.feed(&data);
if !text.is_empty() {
let _ = output.event(Event::output(time, text));
}
}
Input(time, data) => {
let text = input_decoder.feed(&data);
if !text.is_empty() {
let _ = output.event(Event::input(time, text));
}
}
Resize(time, new_tty_size) => {
if new_tty_size != last_tty_size {
let _ = output.event(Event::resize(time, new_tty_size.into()));
last_tty_size = new_tty_size;
}
}
Marker(time) => {
let _ = output.event(Event::marker(time, String::new()));
}
}
for event in receiver {
outputs.retain_mut(|output| output.event(event.clone()).is_ok())
}
let _ = output.flush();
for mut output in outputs {
let _ = output.flush();
}
});
self.handle = Some(util::JoinHandle::new(handle));
self.handle = Some(JoinHandle::new(handle));
}
fn output(&mut self, time: Duration, data: &[u8]) -> bool {
if self.pause_time.is_none() {
let msg = Message::Output(self.elapsed_time(time), data.into());
self.sender.send(msg).expect("output send should succeed");
let text = self.output_decoder.feed(data);
if !text.is_empty() {
let msg = Event::Output(self.elapsed_time(time), text);
self.sender.send(msg).expect("output send should succeed");
}
}
true
@@ -161,7 +144,7 @@ impl<N: Notifier> pty::Handler for Recorder<N> {
return false;
} else if add_marker_key.is_some_and(|key| data == key) {
let msg = Message::Marker(self.elapsed_time(time));
let msg = Event::Marker(self.elapsed_time(time), "".to_owned());
self.sender.send(msg).expect("marker send should succeed");
self.notify("Marker added");
return false;
@@ -169,16 +152,24 @@ impl<N: Notifier> pty::Handler for Recorder<N> {
}
if self.record_input && self.pause_time.is_none() {
let msg = Message::Input(self.elapsed_time(time), data.into());
self.sender.send(msg).expect("input send should succeed");
let text = self.input_decoder.feed(data);
if !text.is_empty() {
let msg = Event::Input(self.elapsed_time(time), text);
self.sender.send(msg).expect("input send should succeed");
}
}
true
}
fn resize(&mut self, time: Duration, tty_size: tty::TtySize) -> bool {
let msg = Message::Resize(self.elapsed_time(time), tty_size);
self.sender.send(msg).expect("resize send should succeed");
if tty_size != self.tty_size {
let msg = Event::Resize(self.elapsed_time(time), tty_size);
self.sender.send(msg).expect("resize send should succeed");
self.tty_size = tty_size;
}
true
}

191
src/stream.rs Normal file
View File

@@ -0,0 +1,191 @@
use std::future;
use std::io;
use std::time::{Duration, Instant, SystemTime};
use anyhow::Result;
use avt::Vt;
use futures_util::{stream, StreamExt};
use tokio::runtime::Runtime;
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio::time;
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
use tokio_stream::wrappers::BroadcastStream;
use tracing::info;
use crate::session;
use crate::tty;
pub struct Stream {
tx: mpsc::Sender<Request>,
rx: mpsc::Receiver<Request>,
}
type Request = oneshot::Sender<Subscription>;
struct Subscription {
init: Event,
events_rx: broadcast::Receiver<Event>,
}
#[derive(Clone)]
pub struct Subscriber(mpsc::Sender<Request>);
pub struct Output(mpsc::UnboundedSender<Message>);
enum Message {
Start(tty::TtySize, Option<tty::Theme>),
SessionEvent(session::Event),
}
#[derive(Clone)]
pub enum Event {
Init(u64, u64, tty::TtySize, Option<tty::Theme>, String),
Output(u64, u64, String),
Input(u64, u64, String),
Resize(u64, u64, tty::TtySize),
Marker(u64, u64, String),
}
impl Stream {
pub fn new() -> Self {
let (tx, rx) = mpsc::channel(1);
Stream { tx, rx }
}
pub fn subscriber(&self) -> Subscriber {
Subscriber(self.tx.clone())
}
pub fn start(self, runtime: &Runtime) -> Output {
let (stream_tx, stream_rx) = mpsc::unbounded_channel();
runtime.spawn(async move { run(stream_rx, self.rx).await });
Output(stream_tx)
}
}
async fn run(
mut stream_rx: mpsc::UnboundedReceiver<Message>,
mut request_rx: mpsc::Receiver<Request>,
) {
let (broadcast_tx, _) = broadcast::channel(1024);
let mut vt = build_vt(tty::TtySize::default());
let mut stream_time = 0;
let mut last_event_id = 0;
let mut last_event_time = Instant::now();
let mut tty_theme = None;
loop {
tokio::select! {
event = stream_rx.recv() => {
match event {
Some(Message::Start(tty_size_, tty_theme_)) => {
tty_theme = tty_theme_;
vt = build_vt(tty_size_);
}
Some(Message::SessionEvent(event)) => {
last_event_time = Instant::now();
last_event_id += 1;
match event {
session::Event::Output(time, text) => {
vt.feed_str(&text);
let _ = broadcast_tx.send(Event::Output(last_event_id, time, text));
stream_time = time;
}
session::Event::Input(time, text) => {
let _ = broadcast_tx.send(Event::Input(last_event_id, time, text));
stream_time = time;
}
session::Event::Resize(time, tty_size) => {
vt.resize(tty_size.0.into(), tty_size.1.into());
let _ = broadcast_tx.send(Event::Resize(last_event_id, time, tty_size));
stream_time = time;
}
session::Event::Marker(time, label) => {
let _ = broadcast_tx.send(Event::Marker(last_event_id, time, label));
stream_time = time;
}
}
}
None => break,
}
}
request = request_rx.recv() => {
match request {
Some(request) => {
let elapsed_time = stream_time + last_event_time.elapsed().as_micros() as u64;
let init = Event::Init(
last_event_id,
elapsed_time,
vt.size().into(),
tty_theme.clone(),
vt.dump(),
);
let events_rx = broadcast_tx.subscribe();
let _ = request.send(Subscription { init, events_rx });
info!("subscriber count: {}", broadcast_tx.receiver_count());
}
None => break,
}
}
}
}
}
impl Subscriber {
pub async fn subscribe(
&self,
) -> Result<impl futures_util::Stream<Item = Result<Event, BroadcastStreamRecvError>>> {
let (tx, rx) = oneshot::channel();
self.0.send(tx).await?;
let subscription = time::timeout(Duration::from_secs(5), rx).await??;
let init = stream::once(future::ready(Ok(subscription.init)));
let events = BroadcastStream::new(subscription.events_rx);
Ok(init.chain(events))
}
}
fn build_vt(tty_size: tty::TtySize) -> Vt {
Vt::builder()
.size(tty_size.0 as usize, tty_size.1 as usize)
.build()
}
impl session::Output for Output {
fn start(
&mut self,
_time: SystemTime,
tty_size: tty::TtySize,
theme: Option<tty::Theme>,
) -> io::Result<()> {
self.0
.send(Message::Start(tty_size, theme))
.expect("send should succeed");
Ok(())
}
fn event(&mut self, event: session::Event) -> io::Result<()> {
self.0
.send(Message::SessionEvent(event))
.expect("send should succeed");
Ok(())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}

View File

@@ -1,244 +0,0 @@
mod alis;
mod forwarder;
mod server;
mod session;
use crate::config::Key;
use crate::notifier::Notifier;
use crate::pty;
use crate::tty;
use crate::util;
use std::net;
use std::thread;
use std::time::Duration;
use tracing::info;
pub struct Streamer<N> {
record_input: bool,
keys: KeyBindings,
notifier: N,
pty_rx: Option<tokio::sync::mpsc::UnboundedReceiver<Event>>,
paused: bool,
prefix_mode: bool,
listener: Option<net::TcpListener>,
forward_url: Option<url::Url>,
// XXX: field (drop) order below is crucial for correct shutdown
pty_tx: tokio::sync::mpsc::UnboundedSender<Event>,
event_loop_handle: Option<util::JoinHandle>,
}
enum Event {
Output(u64, Vec<u8>),
Input(u64, Vec<u8>),
Resize(u64, tty::TtySize),
Marker(u64),
}
impl<N: Notifier> Streamer<N> {
pub fn new(
listener: Option<net::TcpListener>,
forward_url: Option<url::Url>,
record_input: bool,
keys: KeyBindings,
notifier: N,
) -> Self {
let (pty_tx, pty_rx) = tokio::sync::mpsc::unbounded_channel();
Self {
record_input,
keys,
notifier,
pty_tx,
pty_rx: Some(pty_rx),
event_loop_handle: None,
paused: false,
prefix_mode: false,
listener,
forward_url,
}
}
fn elapsed_time(&self, time: Duration) -> u64 {
time.as_micros() as u64
}
fn notify<S: ToString>(&mut self, message: S) {
let message = message.to_string();
info!(message);
self.notifier
.notify(message)
.expect("notification send should succeed");
}
}
impl<N: Notifier + Clone + 'static> pty::Handler for Streamer<N> {
fn start(&mut self, tty_size: tty::TtySize, theme: Option<tty::Theme>) {
let pty_rx = self.pty_rx.take().unwrap();
let (clients_tx, mut clients_rx) = tokio::sync::mpsc::channel(1);
let shutdown_token = tokio_util::sync::CancellationToken::new();
let runtime = build_tokio_runtime();
let server = self.listener.take().map(|listener| {
runtime.spawn(server::serve(
listener,
clients_tx.clone(),
shutdown_token.clone(),
))
});
let forwarder = self.forward_url.take().map(|url| {
runtime.spawn(forwarder::forward(
url,
clients_tx,
self.notifier.clone(),
shutdown_token.clone(),
))
});
self.event_loop_handle = wrap_thread_handle(thread::spawn(move || {
runtime.block_on(async move {
event_loop(pty_rx, &mut clients_rx, tty_size, theme).await;
info!("shutting down");
shutdown_token.cancel();
if let Some(task) = server {
let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
}
if let Some(task) = forwarder {
let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
}
let _ = clients_rx.recv().await;
});
}));
}
fn output(&mut self, time: Duration, data: &[u8]) -> bool {
if !self.paused {
let event = Event::Output(self.elapsed_time(time), data.into());
let _ = self.pty_tx.send(event);
}
true
}
fn input(&mut self, time: Duration, data: &[u8]) -> bool {
let prefix_key = self.keys.prefix.as_ref();
let pause_key = self.keys.pause.as_ref();
let add_marker_key = self.keys.add_marker.as_ref();
if !self.prefix_mode && prefix_key.is_some_and(|key| data == key) {
self.prefix_mode = true;
return false;
}
if self.prefix_mode || prefix_key.is_none() {
self.prefix_mode = false;
if pause_key.is_some_and(|key| data == key) {
if self.paused {
self.paused = false;
self.notify("Resumed streaming");
} else {
self.paused = true;
self.notify("Paused streaming");
}
return false;
} else if add_marker_key.is_some_and(|key| data == key) {
let event = Event::Marker(self.elapsed_time(time));
let _ = self.pty_tx.send(event);
self.notify("Marker added");
return false;
}
}
if self.record_input && !self.paused {
let event = Event::Input(self.elapsed_time(time), data.into());
let _ = self.pty_tx.send(event);
}
true
}
fn resize(&mut self, time: Duration, tty_size: tty::TtySize) -> bool {
let event = Event::Resize(self.elapsed_time(time), tty_size);
let _ = self.pty_tx.send(event);
true
}
}
async fn event_loop(
mut events: tokio::sync::mpsc::UnboundedReceiver<Event>,
clients: &mut tokio::sync::mpsc::Receiver<session::Client>,
tty_size: tty::TtySize,
theme: Option<tty::Theme>,
) {
let mut session = session::Session::new(tty_size, theme);
loop {
tokio::select! {
event = events.recv() => {
match event {
Some(Event::Output(time, data)) => {
session.output(time, &data);
}
Some(Event::Input(time, data)) => {
session.input(time, &data);
}
Some(Event::Resize(time, new_tty_size)) => {
session.resize(time, new_tty_size);
}
Some(Event::Marker(time)) => {
session.marker(time);
}
None => break,
}
}
client = clients.recv() => {
match client {
Some(client) => {
client.accept(session.subscribe());
info!("client count: {}", session.subscriber_count());
}
None => break,
}
}
}
}
}
fn build_tokio_runtime() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
}
fn wrap_thread_handle(handle: thread::JoinHandle<()>) -> Option<util::JoinHandle> {
Some(util::JoinHandle::new(handle))
}
pub struct KeyBindings {
pub prefix: Key,
pub pause: Key,
pub add_marker: Key,
}
impl Default for KeyBindings {
fn default() -> Self {
Self {
prefix: None,
pause: Some(vec![0x1c]), // ^\
add_marker: None,
}
}
}

View File

@@ -1,153 +0,0 @@
use crate::tty;
use crate::util;
use anyhow::Result;
use futures_util::{stream, Stream, StreamExt};
use std::{
future,
time::{Duration, Instant},
};
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream};
pub struct Session {
vt: avt::Vt,
broadcast_tx: broadcast::Sender<Event>,
stream_time: u64,
last_event_id: u64,
last_event_time: Instant,
theme: Option<tty::Theme>,
output_decoder: util::Utf8Decoder,
input_decoder: util::Utf8Decoder,
}
#[derive(Clone)]
pub enum Event {
Init(u64, u64, tty::TtySize, Option<tty::Theme>, String),
Output(u64, u64, String),
Input(u64, u64, String),
Resize(u64, u64, tty::TtySize),
Marker(u64, u64, String),
}
pub struct Client(oneshot::Sender<Subscription>);
pub struct Subscription {
init: Event,
broadcast_rx: broadcast::Receiver<Event>,
}
impl Session {
pub fn new(tty_size: tty::TtySize, theme: Option<tty::Theme>) -> Self {
let (broadcast_tx, _) = broadcast::channel(1024);
Self {
vt: build_vt(tty_size),
broadcast_tx,
stream_time: 0,
last_event_id: 0,
last_event_time: Instant::now(),
theme,
output_decoder: util::Utf8Decoder::new(),
input_decoder: util::Utf8Decoder::new(),
}
}
pub fn output(&mut self, time: u64, data: &[u8]) {
let text = self.output_decoder.feed(data);
if !text.is_empty() {
self.vt.feed_str(&text);
let id = self.get_next_event_id();
let _ = self.broadcast_tx.send(Event::Output(id, time, text));
}
self.stream_time = time;
self.last_event_time = Instant::now();
}
pub fn input(&mut self, time: u64, data: &[u8]) {
let text = self.input_decoder.feed(data);
if !text.is_empty() {
let id = self.get_next_event_id();
let _ = self.broadcast_tx.send(Event::Input(id, time, text));
}
self.stream_time = time;
self.last_event_time = Instant::now();
}
pub fn resize(&mut self, time: u64, tty_size: tty::TtySize) {
if tty_size != self.vt.size().into() {
self.vt.resize(tty_size.0.into(), tty_size.1.into());
let id = self.get_next_event_id();
let _ = self.broadcast_tx.send(Event::Resize(id, time, tty_size));
}
self.stream_time = time;
self.last_event_time = Instant::now();
}
pub fn marker(&mut self, time: u64) {
let id = self.get_next_event_id();
let _ = self
.broadcast_tx
.send(Event::Marker(id, time, String::new()));
self.stream_time = time;
self.last_event_time = Instant::now();
}
pub fn subscribe(&self) -> Subscription {
let init = Event::Init(
self.last_event_id,
self.elapsed_time(),
self.vt.size().into(),
self.theme.clone(),
self.vt.dump(),
);
let broadcast_rx = self.broadcast_tx.subscribe();
Subscription { init, broadcast_rx }
}
pub fn subscriber_count(&self) -> usize {
self.broadcast_tx.receiver_count()
}
fn get_next_event_id(&mut self) -> u64 {
self.last_event_id += 1;
self.last_event_id
}
fn elapsed_time(&self) -> u64 {
self.stream_time + self.last_event_time.elapsed().as_micros() as u64
}
}
fn build_vt(tty_size: tty::TtySize) -> avt::Vt {
avt::Vt::builder()
.size(tty_size.0 as usize, tty_size.1 as usize)
.build()
}
impl Client {
pub fn accept(self, subscription: Subscription) {
let _ = self.0.send(subscription);
}
}
pub async fn stream(
clients_tx: &mpsc::Sender<Client>,
) -> Result<impl Stream<Item = Result<Event, BroadcastStreamRecvError>>> {
let (sub_tx, sub_rx) = oneshot::channel();
clients_tx.send(Client(sub_tx)).await?;
let sub = tokio::time::timeout(Duration::from_secs(5), sub_rx).await??;
let init = stream::once(future::ready(Ok(sub.init)));
let events = BroadcastStream::new(sub.broadcast_rx);
Ok(init.chain(events))
}

View File

@@ -17,6 +17,12 @@ use termion::raw::{IntoRawMode, RawTerminal};
#[derive(Clone, Copy, Debug, PartialEq)]
pub struct TtySize(pub u16, pub u16);
impl Default for TtySize {
fn default() -> Self {
TtySize(80, 24)
}
}
impl From<pty::Winsize> for TtySize {
fn from(winsize: pty::Winsize) -> Self {
TtySize(winsize.ws_col, winsize.ws_row)