Merge pull request #601 from asciinema/rec-pselect

Replace mio with select in the recorder, add signal handling
This commit is contained in:
Marcin Kulik
2024-01-08 10:10:44 +01:00
committed by GitHub
4 changed files with 174 additions and 169 deletions

14
Cargo.lock generated
View File

@@ -89,14 +89,12 @@ dependencies = [
"anyhow", "anyhow",
"clap", "clap",
"config", "config",
"mio",
"nix", "nix",
"reqwest", "reqwest",
"rustyline", "rustyline",
"serde", "serde",
"serde_json", "serde_json",
"signal-hook", "signal-hook",
"signal-hook-mio",
"termion", "termion",
"uuid", "uuid",
] ]
@@ -666,7 +664,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2"
dependencies = [ dependencies = [
"libc", "libc",
"log",
"wasi", "wasi",
"windows-sys 0.48.0", "windows-sys 0.48.0",
] ]
@@ -1020,17 +1017,6 @@ dependencies = [
"signal-hook-registry", "signal-hook-registry",
] ]
[[package]]
name = "signal-hook-mio"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29ad2e15f37ec9a6cc544097b78a1ec90001e9f71b81338ca39f430adaca99af"
dependencies = [
"libc",
"mio",
"signal-hook",
]
[[package]] [[package]]
name = "signal-hook-registry" name = "signal-hook-registry"
version = "1.4.1" version = "1.4.1"

View File

@@ -13,13 +13,11 @@ license = "GPL-3.0"
[dependencies] [dependencies]
anyhow = "1.0.75" anyhow = "1.0.75"
nix = { version = "0.27", features = [ "fs", "term", "process", "signal" ] } nix = { version = "0.27", features = [ "fs", "term", "process", "signal" ] }
mio = { version ="0.8", features = ["os-poll", "os-ext"] }
termion = "2.0.1" termion = "2.0.1"
serde = { version = "1.0.189", features = ["derive"] } serde = { version = "1.0.189", features = ["derive"] }
serde_json = "1.0.107" serde_json = "1.0.107"
clap = { version = "4.4.7", features = ["derive"] } clap = { version = "4.4.7", features = ["derive"] }
signal-hook-mio = { version = "0.2.3", features = ["support-v0_8"] } signal-hook = { version = "0.3.17", default-features = false }
signal-hook = "0.3.17"
uuid = { version = "1.6.1", features = ["v4"] } uuid = { version = "1.6.1", features = ["v4"] }
reqwest = { version = "0.11.23", default-features = false, features = ["blocking", "rustls-tls", "multipart", "gzip", "json"] } reqwest = { version = "0.11.23", default-features = false, features = ["blocking", "rustls-tls", "multipart", "gzip", "json"] }
rustyline = "13.0.0" rustyline = "13.0.0"

View File

@@ -1,14 +1,17 @@
use crate::io::set_non_blocking; use crate::io::set_non_blocking;
use crate::tty::Tty; use crate::tty::Tty;
use anyhow::bail; use anyhow::{bail, Result};
use mio::unix::SourceFd; use nix::errno::Errno;
use nix::sys::select::{select, FdSet};
use nix::unistd::pipe;
use nix::{libc, pty, sys::signal, sys::wait, unistd, unistd::ForkResult}; use nix::{libc, pty, sys::signal, sys::wait, unistd, unistd::ForkResult};
use signal_hook::consts::signal::*; use signal_hook::consts::{SIGHUP, SIGINT, SIGQUIT, SIGTERM, SIGWINCH};
use signal_hook_mio::v0_8::Signals; use signal_hook::SigId;
use std::collections::HashMap; use std::collections::HashMap;
use std::ffi::{CString, NulError}; use std::ffi::{CString, NulError};
use std::io::{self, Read, Write}; use std::io::{self, Read, Write};
use std::os::fd::RawFd; use std::os::fd::BorrowedFd;
use std::os::fd::{AsFd, RawFd};
use std::os::unix::io::{AsRawFd, FromRawFd}; use std::os::unix::io::{AsRawFd, FromRawFd};
use std::{env, fs}; use std::{env, fs};
@@ -27,7 +30,7 @@ pub fn exec<S: AsRef<str>, R: Recorder>(
tty: Box<dyn Tty>, tty: Box<dyn Tty>,
winsize_override: (Option<u16>, Option<u16>), winsize_override: (Option<u16>, Option<u16>),
recorder: &mut R, recorder: &mut R,
) -> anyhow::Result<i32> { ) -> Result<i32> {
let winsize = get_tty_size(&*tty, winsize_override); let winsize = get_tty_size(&*tty, winsize_override);
recorder.start((winsize.ws_col, winsize.ws_row))?; recorder.start((winsize.ws_col, winsize.ws_row))?;
let result = unsafe { pty::forkpty(Some(&winsize), None) }?; let result = unsafe { pty::forkpty(Some(&winsize), None) }?;
@@ -54,7 +57,7 @@ fn handle_parent<R: Recorder>(
tty: Box<dyn Tty>, tty: Box<dyn Tty>,
winsize_override: (Option<u16>, Option<u16>), winsize_override: (Option<u16>, Option<u16>),
recorder: &mut R, recorder: &mut R,
) -> anyhow::Result<i32> { ) -> Result<i32> {
let copy_result = copy(master_fd, child, tty, winsize_override, recorder); let copy_result = copy(master_fd, child, tty, winsize_override, recorder);
let wait_result = wait::waitpid(child, None); let wait_result = wait::waitpid(child, None);
copy_result?; copy_result?;
@@ -67,157 +70,137 @@ fn handle_parent<R: Recorder>(
} }
} }
const MASTER: mio::Token = mio::Token(0);
const TTY: mio::Token = mio::Token(1);
const SIGNAL: mio::Token = mio::Token(2);
const BUF_SIZE: usize = 128 * 1024; const BUF_SIZE: usize = 128 * 1024;
fn copy<R: Recorder>( fn copy<R: Recorder>(
master_fd: RawFd, master_raw_fd: RawFd,
child: unistd::Pid, child: unistd::Pid,
mut tty: Box<dyn Tty>, mut tty: Box<dyn Tty>,
winsize_override: (Option<u16>, Option<u16>), winsize_override: (Option<u16>, Option<u16>),
recorder: &mut R, recorder: &mut R,
) -> anyhow::Result<()> { ) -> Result<()> {
let mut master = unsafe { fs::File::from_raw_fd(master_fd) }; let mut master = unsafe { fs::File::from_raw_fd(master_raw_fd) };
let mut poll = mio::Poll::new()?;
let mut events = mio::Events::with_capacity(128);
let mut master_source = SourceFd(&master_fd);
let tty_fd = tty.as_raw_fd();
let mut tty_source = SourceFd(&tty_fd);
let mut signals = Signals::new([SIGWINCH, SIGINT, SIGTERM, SIGQUIT, SIGHUP])?;
let mut buf = [0u8; BUF_SIZE]; let mut buf = [0u8; BUF_SIZE];
let mut input: Vec<u8> = Vec::with_capacity(BUF_SIZE); let mut input: Vec<u8> = Vec::with_capacity(BUF_SIZE);
let mut output: Vec<u8> = Vec::with_capacity(BUF_SIZE); let mut output: Vec<u8> = Vec::with_capacity(BUF_SIZE);
let mut flush = false; let mut flush = false;
let sigwinch_fd = SignalFd::open(SIGWINCH)?;
let sigint_fd = SignalFd::open(SIGINT)?;
let sigterm_fd = SignalFd::open(SIGTERM)?;
let sigquit_fd = SignalFd::open(SIGQUIT)?;
let sighup_fd = SignalFd::open(SIGHUP)?;
set_non_blocking(&master_fd)?; set_non_blocking(&master_raw_fd)?;
poll.registry()
.register(&mut master_source, MASTER, mio::Interest::READABLE)?;
poll.registry()
.register(&mut tty_source, TTY, mio::Interest::READABLE)?;
poll.registry()
.register(&mut signals, SIGNAL, mio::Interest::READABLE)?;
loop { loop {
if let Err(e) = poll.poll(&mut events, None) { let master_fd = master.as_fd();
if e.kind() == io::ErrorKind::Interrupted { let tty_fd = tty.as_fd();
let mut rfds = FdSet::new();
let mut wfds = FdSet::new();
rfds.insert(&tty_fd);
rfds.insert(&sigwinch_fd);
rfds.insert(&sigint_fd);
rfds.insert(&sigterm_fd);
rfds.insert(&sigquit_fd);
rfds.insert(&sighup_fd);
if !flush {
rfds.insert(&master_fd);
}
if !input.is_empty() {
wfds.insert(&master_fd);
}
if !output.is_empty() {
wfds.insert(&tty_fd);
}
if let Err(e) = select(None, &mut rfds, &mut wfds, None, None) {
if e == Errno::EINTR {
continue; continue;
} else { } else {
bail!(e); bail!(e);
} }
} }
for event in events.iter() { let master_read = rfds.contains(&master_fd);
match event.token() { let master_write = wfds.contains(&master_fd);
MASTER => { let tty_read = rfds.contains(&tty_fd);
if event.is_readable() { let tty_write = wfds.contains(&tty_fd);
let offset = output.len(); let sigwinch_read = rfds.contains(&sigwinch_fd);
let read = read_all(&mut master, &mut buf, &mut output)?; let sigint_read = rfds.contains(&sigint_fd);
let sigterm_read = rfds.contains(&sigterm_fd);
let sigquit_read = rfds.contains(&sigquit_fd);
let sighup_read = rfds.contains(&sighup_fd);
if read > 0 { if master_read {
recorder.output(&output[offset..]); let offset = output.len();
let read = read_all(&mut master, &mut buf, &mut output)?;
poll.registry().reregister( if read > 0 {
&mut tty_source, recorder.output(&output[offset..]);
TTY, } else if output.is_empty() {
mio::Interest::READABLE | mio::Interest::WRITABLE, return Ok(());
)?; } else {
} flush = true;
}
if event.is_writable() {
let left = write_all(&mut master, &mut input)?;
if left == 0 {
poll.registry().reregister(
&mut master_source,
MASTER,
mio::Interest::READABLE,
)?;
}
}
if event.is_read_closed() {
poll.registry().deregister(&mut master_source)?;
if output.is_empty() {
return Ok(());
}
flush = true;
}
}
TTY => {
if event.is_writable() {
let left = write_all(&mut tty, &mut output)?;
if left == 0 {
if flush {
return Ok(());
}
poll.registry().reregister(
&mut tty_source,
TTY,
mio::Interest::READABLE,
)?;
}
}
if event.is_readable() {
let offset = input.len();
let read = read_all(&mut tty, &mut buf, &mut input)?;
if read > 0 {
recorder.input(&input[offset..]);
poll.registry().reregister(
&mut master_source,
MASTER,
mio::Interest::READABLE | mio::Interest::WRITABLE,
)?;
}
}
if event.is_read_closed() {
poll.registry().deregister(&mut tty_source)?;
return Ok(());
}
}
SIGNAL => {
for signal in signals.pending() {
match signal {
SIGWINCH => {
let winsize = get_tty_size(&*tty, winsize_override);
set_pty_size(master_fd, &winsize);
recorder.resize((winsize.ws_col, winsize.ws_row));
}
SIGINT => (),
SIGTERM | SIGQUIT | SIGHUP => {
unsafe { libc::kill(child.as_raw(), SIGTERM) };
return Ok(());
}
_ => (),
}
}
}
_ => (),
} }
} }
if master_write {
write_all(&mut master, &mut input)?;
}
if tty_write {
let left = write_all(&mut tty, &mut output)?;
if left == 0 && flush {
return Ok(());
}
}
if tty_read {
let offset = input.len();
let read = read_all(&mut tty, &mut buf, &mut input)?;
if read > 0 {
recorder.input(&input[offset..]);
}
}
if sigwinch_read {
sigwinch_fd.flush();
let winsize = get_tty_size(&*tty, winsize_override);
set_pty_size(master_raw_fd, &winsize);
recorder.resize((winsize.ws_col, winsize.ws_row));
}
if sigint_read {
sigint_fd.flush();
}
if sigterm_read || sigquit_read || sighup_read {
if sigterm_read {
sigterm_fd.flush();
}
if sigquit_read {
sigquit_fd.flush();
}
if sighup_read {
sighup_fd.flush();
}
unsafe { libc::kill(child.as_raw(), SIGTERM) };
return Ok(());
}
} }
} }
fn handle_child<S: AsRef<str>>(args: &[S], extra_env: &ExtraEnv) -> anyhow::Result<()> { fn handle_child<S: AsRef<str>>(args: &[S], extra_env: &ExtraEnv) -> Result<()> {
use signal::{SigHandler, Signal}; use signal::{SigHandler, Signal};
let args = args let args = args
@@ -260,16 +243,14 @@ fn read_all<R: Read>(source: &mut R, buf: &mut [u8], out: &mut Vec<u8>) -> io::R
loop { loop {
match source.read(buf) { match source.read(buf) {
Ok(0) => (), Ok(0) => break,
Ok(n) => { Ok(n) => {
out.extend_from_slice(&buf[0..n]); out.extend_from_slice(&buf[0..n]);
read += n; read += n;
} }
Err(_) => { Err(_) => break,
break;
}
} }
} }
@@ -281,7 +262,7 @@ fn write_all<W: Write>(sink: &mut W, data: &mut Vec<u8>) -> io::Result<usize> {
loop { loop {
match sink.write(buf) { match sink.write(buf) {
Ok(0) => (), Ok(0) => break,
Ok(n) => { Ok(n) => {
buf = &buf[n..]; buf = &buf[n..];
@@ -291,9 +272,7 @@ fn write_all<W: Write>(sink: &mut W, data: &mut Vec<u8>) -> io::Result<usize> {
} }
} }
Err(_) => { Err(_) => break,
break;
}
} }
} }
@@ -310,6 +289,49 @@ fn write_all<W: Write>(sink: &mut W, data: &mut Vec<u8>) -> io::Result<usize> {
Ok(left) Ok(left)
} }
struct SignalFd {
sigid: SigId,
rx: i32,
}
impl SignalFd {
fn open(signal: libc::c_int) -> Result<Self> {
let (rx, tx) = pipe()?;
set_non_blocking(&rx)?;
set_non_blocking(&tx)?;
let sigid = unsafe {
signal_hook::low_level::register(signal, move || {
let _ = unistd::write(tx, &[0]);
})
}?;
Ok(Self { sigid, rx })
}
fn flush(&self) {
let mut buf = [0; 256];
while let Ok(n) = unistd::read(self.rx, &mut buf) {
if n == 0 {
break;
};
}
}
}
impl AsFd for SignalFd {
fn as_fd(&self) -> BorrowedFd<'_> {
unsafe { BorrowedFd::borrow_raw(self.rx) }
}
}
impl Drop for SignalFd {
fn drop(&mut self) {
signal_hook::low_level::unregister(self.sigid);
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::pty::ExtraEnv; use crate::pty::ExtraEnv;

View File

@@ -1,13 +1,12 @@
use anyhow::Result; use anyhow::Result;
use mio::unix::pipe; use nix::{libc, pty, unistd};
use nix::{libc, pty};
use std::{ use std::{
fs, io, fs, io,
os::fd::{AsRawFd, RawFd}, os::fd::{AsFd, AsRawFd, BorrowedFd},
}; };
use termion::raw::{IntoRawMode, RawTerminal}; use termion::raw::{IntoRawMode, RawTerminal};
pub trait Tty: io::Write + io::Read + AsRawFd { pub trait Tty: io::Write + io::Read + AsFd {
fn get_size(&self) -> pty::Winsize; fn get_size(&self) -> pty::Winsize;
} }
@@ -60,20 +59,20 @@ impl io::Write for DevTty {
} }
} }
impl AsRawFd for DevTty { impl AsFd for DevTty {
fn as_raw_fd(&self) -> std::os::unix::prelude::RawFd { fn as_fd(&self) -> BorrowedFd<'_> {
self.file.as_raw_fd() self.file.as_fd()
} }
} }
pub struct DevNull { pub struct DevNull {
tx: pipe::Sender, tx: i32,
_rx: pipe::Receiver, _rx: i32,
} }
impl DevNull { impl DevNull {
pub fn open() -> Result<Self> { pub fn open() -> Result<Self> {
let (tx, rx) = pipe::new()?; let (rx, tx) = unistd::pipe()?;
Ok(Self { tx, _rx: rx }) Ok(Self { tx, _rx: rx })
} }
@@ -106,8 +105,8 @@ impl io::Write for DevNull {
} }
} }
impl AsRawFd for DevNull { impl AsFd for DevNull {
fn as_raw_fd(&self) -> RawFd { fn as_fd(&self) -> BorrowedFd<'_> {
self.tx.as_raw_fd() unsafe { BorrowedFd::borrow_raw(self.tx.as_raw_fd()) }
} }
} }