Fix race condition wrt terminal size, simplify session outputs code

This commit is contained in:
Marcin Kulik
2025-06-05 12:26:48 +02:00
parent 0676b54033
commit ca665b29da
6 changed files with 249 additions and 325 deletions

View File

@@ -5,7 +5,7 @@ use std::io::LineWriter;
use std::net::TcpListener; use std::net::TcpListener;
use std::path::Path; use std::path::Path;
use std::process::ExitCode; use std::process::ExitCode;
use std::time::Duration; use std::time::{Duration, SystemTime};
use anyhow::{anyhow, bail, Context, Result}; use anyhow::{anyhow, bail, Context, Result};
use tokio::runtime::Runtime; use tokio::runtime::Runtime;
@@ -21,14 +21,14 @@ use crate::asciicast::{self, Version};
use crate::cli::{self, Format, RelayTarget}; use crate::cli::{self, Format, RelayTarget};
use crate::config::{self, Config}; use crate::config::{self, Config};
use crate::encoder::{AsciicastV2Encoder, AsciicastV3Encoder, RawEncoder, TextEncoder}; use crate::encoder::{AsciicastV2Encoder, AsciicastV3Encoder, RawEncoder, TextEncoder};
use crate::file_writer::{FileWriterStarter, Metadata}; use crate::file_writer::FileWriter;
use crate::forwarder; use crate::forwarder;
use crate::hash; use crate::hash;
use crate::locale; use crate::locale;
use crate::notifier::{self, Notifier, NullNotifier}; use crate::notifier::{self, Notifier, NullNotifier};
use crate::pty; use crate::pty;
use crate::server; use crate::server;
use crate::session::{self, KeyBindings, SessionStarter}; use crate::session::{self, KeyBindings, Metadata, Session, TermInfo};
use crate::status; use crate::status;
use crate::stream::Stream; use crate::stream::Stream;
use crate::tty::{DevTty, FixedSizeTty, NullTty, Tty}; use crate::tty::{DevTty, FixedSizeTty, NullTty, Tty};
@@ -43,23 +43,13 @@ impl cli::Session {
let keys = get_key_bindings(&config.recording)?; let keys = get_key_bindings(&config.recording)?;
let notifier = notifier::threaded(get_notifier(&config)); let notifier = notifier::threaded(get_notifier(&config));
let record_input = self.rec_input || config.recording.rec_input; let record_input = self.rec_input || config.recording.rec_input;
let term_type = self.get_term_type(); let signal_fd = pty::open_signal_fd()?;
let term_version = self.get_term_version()?; let metadata = self.get_session_metadata(&config.recording)?;
let env = capture_env(self.rec_env.take(), &config.recording);
let file_writer = self let file_writer = self
.output_file .output_file
.as_ref() .as_ref()
.map(|path| { .map(|path| self.get_file_writer(path, &metadata, notifier.clone()))
self.get_file_writer(
path,
&config.recording,
term_type.clone(),
term_version.clone(),
&env,
notifier.clone(),
)
})
.transpose()?; .transpose()?;
let mut listener = self let mut listener = self
@@ -72,16 +62,7 @@ impl cli::Session {
let mut relay = self let mut relay = self
.stream_remote .stream_remote
.take() .take()
.map(|target| { .map(|target| get_relay(target, &metadata, &config))
get_relay(
target,
&config,
term_type,
term_version,
self.title.take(),
&env,
)
})
.transpose()?; .transpose()?;
let relay_id = relay.as_ref().map(|r| r.id()); let relay_id = relay.as_ref().map(|r| r.id());
@@ -104,8 +85,11 @@ impl cli::Session {
status::info!("asciinema session started"); status::info!("asciinema session started");
let mut output_count = 0;
if let Some(path) = self.output_file.as_ref() { if let Some(path) = self.output_file.as_ref() {
status::info!("Recording to {}", path); status::info!("Recording to {}", path);
output_count += 1;
} }
if let Some(listener) = &listener { if let Some(listener) = &listener {
@@ -113,14 +97,31 @@ impl cli::Session {
"Live streaming at http://{}", "Live streaming at http://{}",
listener.local_addr().unwrap() listener.local_addr().unwrap()
); );
output_count += 1;
} }
if let Some(Relay { url: Some(url), .. }) = &relay { if let Some(Relay { url: Some(url), .. }) = &relay {
status::info!("Live streaming at {}", url); status::info!("Live streaming at {}", url);
output_count += 1;
}
if output_count == 0 {
status::warning!("No outputs enabled, consider using -o, -l, or -r");
}
if command.is_none() {
status::info!("Press <ctrl+d> or type 'exit' to end");
} }
let stream = Stream::new(); let stream = Stream::new();
let shutdown_token = CancellationToken::new(); let shutdown_token = CancellationToken::new();
let mut outputs: Vec<Box<dyn session::Output>> = Vec::new();
if let Some(writer) = file_writer {
let output = writer.start()?;
outputs.push(Box::new(output));
}
let server = listener.take().map(|listener| { let server = listener.take().map(|listener| {
runtime.spawn(server::serve( runtime.spawn(server::serve(
@@ -139,32 +140,28 @@ impl cli::Session {
)) ))
}); });
let mut outputs: Vec<Box<dyn session::OutputStarter>> = Vec::new();
if server.is_some() || forwarder.is_some() { if server.is_some() || forwarder.is_some() {
let output = stream.start(runtime.handle().clone()); let output = stream.start(runtime.handle().clone(), &metadata);
outputs.push(Box::new(output)); outputs.push(Box::new(output));
} }
if let Some(output) = file_writer {
outputs.push(Box::new(output));
}
if outputs.is_empty() {
status::warning!("No outputs enabled, consider using -o, -l, or -r");
}
if command.is_none() {
status::info!("Press <ctrl+d> or type 'exit' to end");
}
let exec_command = build_exec_command(command.as_ref().cloned()); let exec_command = build_exec_command(command.as_ref().cloned());
let exec_extra_env = build_exec_extra_env(relay_id.as_ref()); let exec_extra_env = build_exec_extra_env(relay_id.as_ref());
let (exit_status, _) = { let exit_status = {
let starter = SessionStarter::new(outputs, record_input, keys, notifier);
let mut tty = self.get_tty(true)?; let mut tty = self.get_tty(true)?;
pty::exec(&exec_command, &exec_extra_env, &mut tty, starter)?
let mut session =
Session::new(outputs, metadata.term.size, record_input, keys, notifier);
pty::exec(
&exec_command,
&exec_extra_env,
metadata.term.size,
&mut tty,
&mut session,
signal_fd,
)?
}; };
runtime.block_on(async { runtime.block_on(async {
@@ -195,15 +192,34 @@ impl cli::Session {
} }
} }
fn get_session_metadata(&self, config: &config::Recording) -> Result<Metadata> {
Ok(Metadata {
time: SystemTime::now(),
term: self.get_term_info()?,
idle_time_limit: self.idle_time_limit.or(config.idle_time_limit),
command: self.get_command(config),
title: self.title.clone(),
env: capture_env(self.rec_env.clone(), config),
})
}
fn get_term_info(&self) -> Result<TermInfo> {
let tty = self.get_tty(false)?;
Ok(TermInfo {
type_: env::var("TERM").ok(),
version: tty.get_version(),
size: tty.get_size().into(),
theme: tty.get_theme(),
})
}
fn get_file_writer<N: Notifier + 'static>( fn get_file_writer<N: Notifier + 'static>(
&self, &self,
path: &str, path: &str,
config: &config::Recording, metadata: &Metadata,
term_type: Option<String>,
term_version: Option<String>,
env: &HashMap<String, String>,
notifier: N, notifier: N,
) -> Result<FileWriterStarter> { ) -> Result<FileWriter> {
let mut overwrite = self.overwrite; let mut overwrite = self.overwrite;
let mut append = self.append; let mut append = self.append;
let path = Path::new(path); let path = Path::new(path);
@@ -253,20 +269,14 @@ impl cli::Session {
.truncate(overwrite) .truncate(overwrite)
.open(path)?; .open(path)?;
let metadata = self.build_asciicast_metadata(term_type, term_version, env, config);
let notifier = Box::new(notifier); let notifier = Box::new(notifier);
let writer = match format { let file_writer = match format {
Format::AsciicastV3 => { Format::AsciicastV3 => {
let writer = Box::new(LineWriter::new(file)); let writer = Box::new(LineWriter::new(file));
let encoder = Box::new(AsciicastV3Encoder::new(append)); let encoder = Box::new(AsciicastV3Encoder::new(append));
FileWriterStarter { FileWriter::new(writer, encoder, notifier, metadata.clone())
writer,
encoder,
metadata,
notifier,
}
} }
Format::AsciicastV2 => { Format::AsciicastV2 => {
@@ -279,74 +289,31 @@ impl cli::Session {
let writer = Box::new(LineWriter::new(file)); let writer = Box::new(LineWriter::new(file));
let encoder = Box::new(AsciicastV2Encoder::new(append, time_offset)); let encoder = Box::new(AsciicastV2Encoder::new(append, time_offset));
FileWriterStarter { FileWriter::new(writer, encoder, notifier, metadata.clone())
writer,
encoder,
metadata,
notifier,
}
} }
Format::Raw => { Format::Raw => {
let writer = Box::new(file); let writer = Box::new(file);
let encoder = Box::new(RawEncoder::new()); let encoder = Box::new(RawEncoder::new());
FileWriterStarter { FileWriter::new(writer, encoder, notifier, metadata.clone())
writer,
encoder,
metadata,
notifier,
}
} }
Format::Txt => { Format::Txt => {
let writer = Box::new(file); let writer = Box::new(file);
let encoder = Box::new(TextEncoder::new()); let encoder = Box::new(TextEncoder::new());
FileWriterStarter { FileWriter::new(writer, encoder, notifier, metadata.clone())
writer,
encoder,
metadata,
notifier,
}
} }
}; };
Ok(writer) Ok(file_writer)
}
fn get_term_type(&self) -> Option<String> {
env::var("TERM").ok()
}
fn get_term_version(&self) -> Result<Option<String>> {
self.get_tty(false).map(|tty| tty.get_version())
} }
fn get_command(&self, config: &config::Recording) -> Option<String> { fn get_command(&self, config: &config::Recording) -> Option<String> {
self.command.as_ref().cloned().or(config.command.clone()) self.command.as_ref().cloned().or(config.command.clone())
} }
fn build_asciicast_metadata(
&self,
term_type: Option<String>,
term_version: Option<String>,
env: &HashMap<String, String>,
config: &config::Recording,
) -> Metadata {
let idle_time_limit = self.idle_time_limit.or(config.idle_time_limit);
let command = self.get_command(config);
Metadata {
term_type,
term_version,
idle_time_limit,
command,
title: self.title.clone(),
env: Some(env.clone()),
}
}
fn get_tty(&self, quiet: bool) -> Result<impl Tty> { fn get_tty(&self, quiet: bool) -> Result<impl Tty> {
let (cols, rows) = self.window_size.unwrap_or((None, None)); let (cols, rows) = self.window_size.unwrap_or((None, None));
@@ -400,19 +367,11 @@ impl Relay {
} }
} }
fn get_relay( fn get_relay(target: RelayTarget, metadata: &Metadata, config: &Config) -> Result<Relay> {
target: RelayTarget,
config: &Config,
term_type: Option<String>,
term_version: Option<String>,
title: Option<String>,
env: &HashMap<String, String>,
) -> Result<Relay> {
match target { match target {
RelayTarget::StreamId(id) => { RelayTarget::StreamId(id) => {
let stream = api::create_user_stream(id, config)?; let stream = api::create_user_stream(id, config)?;
let ws_producer_url = let ws_producer_url = build_producer_url(&stream.ws_producer_url, metadata)?;
build_producer_url(&stream.ws_producer_url, term_type, term_version, title, env)?;
Ok(Relay { Ok(Relay {
ws_producer_url, ws_producer_url,
@@ -427,33 +386,27 @@ fn get_relay(
} }
} }
fn build_producer_url( fn build_producer_url(url: &str, metadata: &Metadata) -> Result<Url> {
url: &str,
term_type: Option<String>,
term_version: Option<String>,
title: Option<String>,
env: &HashMap<String, String>,
) -> Result<Url> {
let mut url: Url = url.parse()?; let mut url: Url = url.parse()?;
let mut params = Vec::new(); let mut params = Vec::new();
if let Some(type_) = term_type { if let Some(type_) = &metadata.term.type_ {
params.push(("term[type]".to_string(), type_)); params.push(("term[type]".to_string(), type_.clone()));
} }
if let Some(version) = term_version { if let Some(version) = &metadata.term.version {
params.push(("term[version]".to_string(), version)); params.push(("term[version]".to_string(), version.clone()));
} }
if let Ok(shell) = env::var("SHELL") { if let Ok(shell) = env::var("SHELL") {
params.push(("shell".to_string(), shell)); params.push(("shell".to_string(), shell));
} }
if let Some(title) = title { if let Some(title) = &metadata.title {
params.push(("title".to_string(), title)); params.push(("title".to_string(), title.clone()));
} }
for (k, v) in env { for (k, v) in &metadata.env {
params.push((format!("env[{k}]"), v.to_string())); params.push((format!("env[{k}]"), v.to_string()));
} }

View File

@@ -1,55 +1,58 @@
use std::collections::HashMap;
use std::io::{self, Write}; use std::io::{self, Write};
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::UNIX_EPOCH;
use crate::asciicast; use crate::asciicast;
use crate::encoder; use crate::encoder;
use crate::notifier::Notifier; use crate::notifier::Notifier;
use crate::session; use crate::session::{self, Metadata};
use crate::tty::{TtySize, TtyTheme};
pub struct FileWriterStarter {
pub writer: Box<dyn Write + Send>,
pub encoder: Box<dyn encoder::Encoder + Send>,
pub metadata: Metadata,
pub notifier: Box<dyn Notifier>,
}
pub struct FileWriter { pub struct FileWriter {
pub writer: Box<dyn Write + Send>, writer: Box<dyn Write + Send>,
pub encoder: Box<dyn encoder::Encoder + Send>, encoder: Box<dyn encoder::Encoder + Send>,
pub notifier: Box<dyn Notifier>, notifier: Box<dyn Notifier>,
metadata: Metadata,
} }
pub struct Metadata { pub struct LiveFileWriter {
pub term_type: Option<String>, writer: Box<dyn Write + Send>,
pub term_version: Option<String>, encoder: Box<dyn encoder::Encoder + Send>,
pub idle_time_limit: Option<f64>, notifier: Box<dyn Notifier>,
pub command: Option<String>,
pub title: Option<String>,
pub env: Option<HashMap<String, String>>,
} }
impl session::OutputStarter for FileWriterStarter { impl FileWriter {
fn start( pub fn new(
mut self: Box<Self>, writer: Box<dyn Write + Send>,
time: SystemTime, encoder: Box<dyn encoder::Encoder + Send>,
tty_size: TtySize, notifier: Box<dyn Notifier>,
tty_theme: Option<TtyTheme>, metadata: Metadata,
) -> io::Result<Box<dyn session::Output>> { ) -> Self {
let timestamp = time.duration_since(UNIX_EPOCH).unwrap().as_secs(); FileWriter {
writer,
encoder,
notifier,
metadata,
}
}
pub fn start(mut self) -> io::Result<LiveFileWriter> {
let timestamp = self
.metadata
.time
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
let header = asciicast::Header { let header = asciicast::Header {
term_cols: tty_size.0, term_cols: self.metadata.term.size.0,
term_rows: tty_size.1, term_rows: self.metadata.term.size.1,
term_type: self.metadata.term_type, term_type: self.metadata.term.type_.clone(),
term_version: self.metadata.term_version, term_version: self.metadata.term.version.clone(),
term_theme: tty_theme, term_theme: self.metadata.term.theme.clone(),
timestamp: Some(timestamp), timestamp: Some(timestamp),
idle_time_limit: self.metadata.idle_time_limit, idle_time_limit: self.metadata.idle_time_limit,
command: self.metadata.command.as_ref().cloned(), command: self.metadata.command.as_ref().cloned(),
title: self.metadata.title.as_ref().cloned(), title: self.metadata.title.as_ref().cloned(),
env: self.metadata.env.as_ref().cloned(), env: Some(self.metadata.env.clone()),
}; };
if let Err(e) = self.writer.write_all(&self.encoder.header(&header)) { if let Err(e) = self.writer.write_all(&self.encoder.header(&header)) {
@@ -60,15 +63,15 @@ impl session::OutputStarter for FileWriterStarter {
return Err(e); return Err(e);
} }
Ok(Box::new(FileWriter { Ok(LiveFileWriter {
writer: self.writer, writer: self.writer,
encoder: self.encoder, encoder: self.encoder,
notifier: self.notifier, notifier: self.notifier,
})) })
} }
} }
impl session::Output for FileWriter { impl session::Output for LiveFileWriter {
fn event(&mut self, event: session::Event) -> io::Result<()> { fn event(&mut self, event: session::Event) -> io::Result<()> {
match self.writer.write_all(&self.encoder.event(event.into())) { match self.writer.write_all(&self.encoder.event(event.into())) {
Ok(_) => Ok(()), Ok(_) => Ok(()),

View File

@@ -21,36 +21,39 @@ use signal_hook::consts::{SIGALRM, SIGCHLD, SIGHUP, SIGINT, SIGQUIT, SIGTERM, SI
use signal_hook::SigId; use signal_hook::SigId;
use crate::io::set_non_blocking; use crate::io::set_non_blocking;
use crate::tty::{Tty, TtySize, TtyTheme}; use crate::tty::{Tty, TtySize};
type ExtraEnv = HashMap<String, String>; type ExtraEnv = HashMap<String, String>;
pub trait HandlerStarter<H: Handler> {
fn start(self, tty_size: TtySize, tty_theme: Option<TtyTheme>) -> H;
}
pub trait Handler { pub trait Handler {
fn output(&mut self, time: Duration, data: &[u8]) -> bool; fn output(&mut self, time: Duration, data: &[u8]) -> bool;
fn input(&mut self, time: Duration, data: &[u8]) -> bool; fn input(&mut self, time: Duration, data: &[u8]) -> bool;
fn resize(&mut self, time: Duration, tty_size: TtySize) -> bool; fn resize(&mut self, time: Duration, tty_size: TtySize) -> bool;
fn stop(self, time: Duration, exit_status: i32) -> Self; fn stop(&mut self, time: Duration, exit_status: i32);
} }
pub fn exec<S: AsRef<str>, T: Tty, H: Handler, R: HandlerStarter<H>>( pub fn open_signal_fd() -> anyhow::Result<SignalFd> {
SignalFd::open(&[SIGWINCH, SIGINT, SIGTERM, SIGQUIT, SIGHUP, SIGALRM, SIGCHLD])
}
pub fn exec<S: AsRef<str>, T: Tty, H: Handler>(
command: &[S], command: &[S],
extra_env: &ExtraEnv, extra_env: &ExtraEnv,
initial_tty_size: TtySize,
tty: &mut T, tty: &mut T,
handler_starter: R, handler: &mut H,
) -> anyhow::Result<(i32, H)> { signal_fd: SignalFd,
let winsize = tty.get_size(); ) -> anyhow::Result<i32> {
let winsize = initial_tty_size.into();
let epoch = Instant::now(); let epoch = Instant::now();
let mut handler = handler_starter.start(winsize.into(), tty.get_theme());
let result = unsafe { pty::forkpty(Some(&winsize), None) }?; let result = unsafe { pty::forkpty(Some(&winsize), None) }?;
match result { match result {
pty::ForkptyResult::Parent { child, master } => { pty::ForkptyResult::Parent { child, master } => {
handle_parent(master, child, tty, &mut handler, epoch) let code = handle_parent(master, child, tty, handler, epoch, signal_fd)?;
.map(|code| (code, handler.stop(epoch.elapsed(), code))) handler.stop(epoch.elapsed(), code);
Ok(code)
} }
pty::ForkptyResult::Child => { pty::ForkptyResult::Child => {
@@ -66,8 +69,9 @@ fn handle_parent<T: Tty, H: Handler>(
tty: &mut T, tty: &mut T,
handler: &mut H, handler: &mut H,
epoch: Instant, epoch: Instant,
signal_fd: SignalFd,
) -> anyhow::Result<i32> { ) -> anyhow::Result<i32> {
let wait_result = match copy(master_fd, child, tty, handler, epoch) { let wait_result = match copy(master_fd, child, tty, handler, epoch, signal_fd) {
Ok(Some(status)) => Ok(status), Ok(Some(status)) => Ok(status),
Ok(None) => wait::waitpid(child, None), Ok(None) => wait::waitpid(child, None),
@@ -93,6 +97,7 @@ fn copy<T: Tty, H: Handler>(
tty: &mut T, tty: &mut T,
handler: &mut H, handler: &mut H,
epoch: Instant, epoch: Instant,
mut signal_fd: SignalFd,
) -> anyhow::Result<Option<WaitStatus>> { ) -> anyhow::Result<Option<WaitStatus>> {
let mut master = File::from(master_fd); let mut master = File::from(master_fd);
let master_raw_fd = master.as_raw_fd(); let master_raw_fd = master.as_raw_fd();
@@ -101,9 +106,6 @@ fn copy<T: Tty, H: Handler>(
let mut output: Vec<u8> = Vec::with_capacity(BUF_SIZE); let mut output: Vec<u8> = Vec::with_capacity(BUF_SIZE);
let mut master_closed = false; let mut master_closed = false;
let mut signal_fd =
SignalFd::open(&[SIGWINCH, SIGINT, SIGTERM, SIGQUIT, SIGHUP, SIGALRM, SIGCHLD])?;
set_non_blocking(&master)?; set_non_blocking(&master)?;
loop { loop {
@@ -306,7 +308,7 @@ fn write_non_blocking<W: Write + ?Sized>(sink: &mut W, buf: &[u8]) -> io::Result
} }
} }
struct SignalFd { pub struct SignalFd {
sigids: Vec<SigId>, sigids: Vec<SigId>,
rx: OwnedFd, rx: OwnedFd,
} }
@@ -371,28 +373,17 @@ impl Drop for SignalFd {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::{Handler, HandlerStarter}; use super::{Handler, SignalFd};
use crate::pty::ExtraEnv; use crate::pty::ExtraEnv;
use crate::tty::{FixedSizeTty, NullTty, TtySize, TtyTheme}; use crate::tty::{NullTty, TtySize};
use std::time::Duration; use std::time::Duration;
struct TestHandlerStarter;
#[derive(Default)] #[derive(Default)]
struct TestHandler { struct TestHandler {
tty_size: TtySize, tty_size: TtySize,
output: Vec<Vec<u8>>, output: Vec<Vec<u8>>,
} }
impl HandlerStarter<TestHandler> for TestHandlerStarter {
fn start(self, tty_size: TtySize, _tty_theme: Option<TtyTheme>) -> TestHandler {
TestHandler {
tty_size,
output: Vec::new(),
}
}
}
impl Handler for TestHandler { impl Handler for TestHandler {
fn output(&mut self, _time: Duration, data: &[u8]) -> bool { fn output(&mut self, _time: Duration, data: &[u8]) -> bool {
self.output.push(data.into()); self.output.push(data.into());
@@ -408,12 +399,17 @@ mod tests {
true true
} }
fn stop(self, _time: Duration, _exit_status: i32) -> Self { fn stop(&mut self, _time: Duration, _exit_status: i32) {}
self
}
} }
impl TestHandler { impl TestHandler {
fn new() -> Self {
Self {
tty_size: Default::default(),
output: Vec::new(),
}
}
fn output(&self) -> Vec<String> { fn output(&self) -> Vec<String> {
self.output self.output
.iter() .iter()
@@ -422,9 +418,16 @@ mod tests {
} }
} }
fn setup() -> (TestHandler, SignalFd) {
let handler = TestHandler::new();
let signal_fd = super::open_signal_fd().unwrap();
(handler, signal_fd)
}
#[test] #[test]
fn exec_basic() { fn exec_basic() {
let starter = TestHandlerStarter; let (mut handler, signal_fd) = setup();
let code = r#" let code = r#"
import sys; import sys;
@@ -435,11 +438,13 @@ time.sleep(0.1);
sys.stdout.write('bar'); sys.stdout.write('bar');
"#; "#;
let (_code, handler) = super::exec( let _code = super::exec(
&["python3", "-c", code], &["python3", "-c", code],
&ExtraEnv::new(), &ExtraEnv::new(),
TtySize::default(),
&mut NullTty::open().unwrap(), &mut NullTty::open().unwrap(),
starter, &mut handler,
signal_fd,
) )
.unwrap(); .unwrap();
@@ -449,13 +454,15 @@ sys.stdout.write('bar');
#[test] #[test]
fn exec_no_output() { fn exec_no_output() {
let starter = TestHandlerStarter; let (mut handler, signal_fd) = setup();
let (_code, handler) = super::exec( let _code = super::exec(
&["true"], &["true"],
&ExtraEnv::new(), &ExtraEnv::new(),
TtySize::default(),
&mut NullTty::open().unwrap(), &mut NullTty::open().unwrap(),
starter, &mut handler,
signal_fd,
) )
.unwrap(); .unwrap();
@@ -464,13 +471,15 @@ sys.stdout.write('bar');
#[test] #[test]
fn exec_quick() { fn exec_quick() {
let starter = TestHandlerStarter; let (mut handler, signal_fd) = setup();
let (_code, handler) = super::exec( let _code = super::exec(
&["printf", "hello world\n"], &["printf", "hello world\n"],
&ExtraEnv::new(), &ExtraEnv::new(),
TtySize::default(),
&mut NullTty::open().unwrap(), &mut NullTty::open().unwrap(),
starter, &mut handler,
signal_fd,
) )
.unwrap(); .unwrap();
@@ -479,34 +488,21 @@ sys.stdout.write('bar');
#[test] #[test]
fn exec_extra_env() { fn exec_extra_env() {
let starter = TestHandlerStarter; let (mut handler, signal_fd) = setup();
let mut env = ExtraEnv::new(); let mut env = ExtraEnv::new();
env.insert("ASCIINEMA_TEST_FOO".to_owned(), "bar".to_owned()); env.insert("ASCIINEMA_TEST_FOO".to_owned(), "bar".to_owned());
let (_code, handler) = super::exec( let _code = super::exec(
&["sh", "-c", "echo -n $ASCIINEMA_TEST_FOO"], &["sh", "-c", "echo -n $ASCIINEMA_TEST_FOO"],
&env, &env,
TtySize::default(),
&mut NullTty::open().unwrap(), &mut NullTty::open().unwrap(),
starter, &mut handler,
signal_fd,
) )
.unwrap(); .unwrap();
assert_eq!(handler.output(), vec!["bar"]); assert_eq!(handler.output(), vec!["bar"]);
} }
#[test]
fn exec_winsize_override() {
let starter = TestHandlerStarter;
let (_code, handler) = super::exec(
&["true"],
&ExtraEnv::new(),
&mut FixedSizeTty::new(NullTty::open().unwrap(), Some(100), Some(50)),
starter,
)
.unwrap();
assert_eq!(handler.tty_size, TtySize(100, 50));
}
} }

View File

@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::io; use std::io;
use std::sync::mpsc; use std::sync::mpsc;
use std::thread; use std::thread;
@@ -11,22 +12,6 @@ use crate::pty;
use crate::tty::{TtySize, TtyTheme}; use crate::tty::{TtySize, TtyTheme};
use crate::util::{JoinHandle, Utf8Decoder}; use crate::util::{JoinHandle, Utf8Decoder};
pub struct SessionStarter<N> {
starters: Vec<Box<dyn OutputStarter>>,
record_input: bool,
keys: KeyBindings,
notifier: N,
}
pub trait OutputStarter {
fn start(
self: Box<Self>,
time: SystemTime,
tty_size: TtySize,
tty_theme: Option<TtyTheme>,
) -> io::Result<Box<dyn Output>>;
}
pub trait Output: Send { pub trait Output: Send {
fn event(&mut self, event: Event) -> io::Result<()>; fn event(&mut self, event: Event) -> io::Result<()>;
fn flush(&mut self) -> io::Result<()>; fn flush(&mut self) -> io::Result<()>;
@@ -41,39 +26,46 @@ pub enum Event {
Exit(u64, i32), Exit(u64, i32),
} }
impl<N: Notifier> SessionStarter<N> { pub struct Session<N> {
notifier: N,
input_decoder: Utf8Decoder,
output_decoder: Utf8Decoder,
tty_size: TtySize,
record_input: bool,
keys: KeyBindings,
sender: mpsc::Sender<Event>,
time_offset: u64,
pause_time: Option<u64>,
prefix_mode: bool,
_handle: JoinHandle,
}
#[derive(Clone)]
pub struct Metadata {
pub time: SystemTime,
pub term: TermInfo,
pub idle_time_limit: Option<f64>,
pub command: Option<String>,
pub title: Option<String>,
pub env: HashMap<String, String>,
}
#[derive(Clone)]
pub struct TermInfo {
pub type_: Option<String>,
pub version: Option<String>,
pub size: TtySize,
pub theme: Option<TtyTheme>,
}
impl<N: Notifier> Session<N> {
pub fn new( pub fn new(
starters: Vec<Box<dyn OutputStarter>>, mut outputs: Vec<Box<dyn Output>>,
tty_size: TtySize,
record_input: bool, record_input: bool,
keys: KeyBindings, keys: KeyBindings,
notifier: N, notifier: N,
) -> Self { ) -> Self {
SessionStarter {
starters,
record_input,
keys,
notifier,
}
}
}
impl<N: Notifier> pty::HandlerStarter<Session<N>> for SessionStarter<N> {
fn start(self, tty_size: TtySize, tty_theme: Option<TtyTheme>) -> Session<N> {
let time = SystemTime::now();
let mut outputs = Vec::new();
for starter in self.starters {
match starter.start(time, tty_size, tty_theme.clone()) {
Ok(output) => {
outputs.push(output);
}
Err(e) => {
error!("output startup failed: {e:?}");
}
}
}
let (sender, receiver) = mpsc::channel::<Event>(); let (sender, receiver) = mpsc::channel::<Event>();
let handle = thread::spawn(move || { let handle = thread::spawn(move || {
@@ -101,11 +93,11 @@ impl<N: Notifier> pty::HandlerStarter<Session<N>> for SessionStarter<N> {
}); });
Session { Session {
notifier: self.notifier, notifier,
input_decoder: Utf8Decoder::new(), input_decoder: Utf8Decoder::new(),
output_decoder: Utf8Decoder::new(), output_decoder: Utf8Decoder::new(),
record_input: self.record_input, record_input,
keys: self.keys, keys,
tty_size, tty_size,
sender, sender,
time_offset: 0, time_offset: 0,
@@ -114,23 +106,7 @@ impl<N: Notifier> pty::HandlerStarter<Session<N>> for SessionStarter<N> {
_handle: JoinHandle::new(handle), _handle: JoinHandle::new(handle),
} }
} }
}
pub struct Session<N> {
notifier: N,
input_decoder: Utf8Decoder,
output_decoder: Utf8Decoder,
tty_size: TtySize,
record_input: bool,
keys: KeyBindings,
sender: mpsc::Sender<Event>,
time_offset: u64,
pause_time: Option<u64>,
prefix_mode: bool,
_handle: JoinHandle,
}
impl<N: Notifier> Session<N> {
fn elapsed_time(&self, time: Duration) -> u64 { fn elapsed_time(&self, time: Duration) -> u64 {
if let Some(pause_time) = self.pause_time { if let Some(pause_time) = self.pause_time {
pause_time pause_time
@@ -215,11 +191,9 @@ impl<N: Notifier> pty::Handler for Session<N> {
true true
} }
fn stop(self, time: Duration, exit_status: i32) -> Self { fn stop(&mut self, time: Duration, exit_status: i32) {
let msg = Event::Exit(self.elapsed_time(time), exit_status); let msg = Event::Exit(self.elapsed_time(time), exit_status);
self.sender.send(msg).expect("exit send should succeed"); self.sender.send(msg).expect("exit send should succeed");
self
} }
} }

View File

@@ -1,6 +1,6 @@
use std::future; use std::future;
use std::io; use std::io;
use std::time::{Duration, Instant, SystemTime}; use std::time::{Duration, Instant};
use anyhow::Result; use anyhow::Result;
use avt::Vt; use avt::Vt;
@@ -12,7 +12,7 @@ use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
use tokio_stream::wrappers::BroadcastStream; use tokio_stream::wrappers::BroadcastStream;
use tracing::info; use tracing::info;
use crate::session; use crate::session::{self, Metadata};
use crate::tty::TtySize; use crate::tty::TtySize;
use crate::tty::TtyTheme; use crate::tty::TtyTheme;
@@ -21,6 +21,8 @@ pub struct Stream {
request_rx: mpsc::Receiver<Request>, request_rx: mpsc::Receiver<Request>,
} }
pub struct LiveStream(mpsc::UnboundedSender<session::Event>);
type Request = oneshot::Sender<Subscription>; type Request = oneshot::Sender<Subscription>;
struct Subscription { struct Subscription {
@@ -31,13 +33,6 @@ struct Subscription {
#[derive(Clone)] #[derive(Clone)]
pub struct Subscriber(mpsc::Sender<Request>); pub struct Subscriber(mpsc::Sender<Request>);
pub struct OutputStarter {
handle: Handle,
request_rx: mpsc::Receiver<Request>,
}
struct Output(mpsc::UnboundedSender<session::Event>);
#[derive(Clone)] #[derive(Clone)]
pub enum Event { pub enum Event {
Init(u64, u64, TtySize, Option<TtyTheme>, String), Init(u64, u64, TtySize, Option<TtyTheme>, String),
@@ -62,11 +57,20 @@ impl Stream {
Subscriber(self.request_tx.clone()) Subscriber(self.request_tx.clone())
} }
pub fn start(self, handle: Handle) -> OutputStarter { pub fn start(self, handle: Handle, metadata: &Metadata) -> LiveStream {
OutputStarter { let (stream_tx, stream_rx) = mpsc::unbounded_channel();
handle, let request_rx = self.request_rx;
request_rx: self.request_rx,
} let fut = run(
metadata.term.size,
metadata.term.theme.clone(),
stream_rx,
request_rx,
);
handle.spawn(fut);
LiveStream(stream_tx)
} }
} }
@@ -175,24 +179,7 @@ fn build_vt(tty_size: TtySize) -> Vt {
.build() .build()
} }
impl session::OutputStarter for OutputStarter { impl session::Output for LiveStream {
fn start(
self: Box<Self>,
_time: SystemTime,
tty_size: TtySize,
tty_theme: Option<TtyTheme>,
) -> io::Result<Box<dyn session::Output>> {
let (stream_tx, stream_rx) = mpsc::unbounded_channel();
let request_rx = self.request_rx;
self.handle
.spawn(async move { run(tty_size, tty_theme, stream_rx, request_rx).await });
Ok(Box::new(Output(stream_tx)))
}
}
impl session::Output for Output {
fn event(&mut self, event: session::Event) -> io::Result<()> { fn event(&mut self, event: session::Event) -> io::Result<()> {
self.0.send(event).map_err(io::Error::other) self.0.send(event).map_err(io::Error::other)
} }

View File

@@ -25,6 +25,17 @@ impl From<pty::Winsize> for TtySize {
} }
} }
impl From<TtySize> for pty::Winsize {
fn from(tty_size: TtySize) -> Self {
pty::Winsize {
ws_col: tty_size.0,
ws_row: tty_size.1,
ws_xpixel: 0,
ws_ypixel: 0,
}
}
}
impl From<(usize, usize)> for TtySize { impl From<(usize, usize)> for TtySize {
fn from((cols, rows): (usize, usize)) -> Self { fn from((cols, rows): (usize, usize)) -> Self {
TtySize(cols as u16, rows as u16) TtySize(cols as u16, rows as u16)