Workaround kqueue+tty bug on macOS

Fixes #598
This commit is contained in:
Marcin Kulik
2025-06-22 16:33:08 +02:00
parent abbd2a7ac6
commit 8f9253ef2f
4 changed files with 393 additions and 132 deletions

View File

@@ -52,3 +52,6 @@ url = "2.5"
[profile.release]
strip = true
[features]
macos-tty = []

View File

@@ -1,21 +1,29 @@
use std::fs::File;
use std::io::{Read, Write};
use std::os::fd::{AsFd, AsRawFd};
use std::os::unix::fs::OpenOptionsExt;
use std::os::fd::AsFd;
use async_trait::async_trait;
use nix::libc;
use nix::pty::Winsize;
use nix::sys::termios::{self, SetArg, Termios};
use nix::sys::termios::{self, SetArg};
use rgb::RGB8;
use tokio::io::unix::AsyncFd;
use tokio::io::{self, Interest};
use tokio::io;
use tokio::time::{self, Duration};
const QUERY_READ_TIMEOUT: u64 = 500;
const COLORS_QUERY: &str = "\x1b]10;?\x07\x1b]11;?\x07\x1b]4;0;?\x07\x1b]4;1;?\x07\x1b]4;2;?\x07\x1b]4;3;?\x07\x1b]4;4;?\x07\x1b]4;5;?\x07\x1b]4;6;?\x07\x1b]4;7;?\x07\x1b]4;8;?\x07\x1b]4;9;?\x07\x1b]4;10;?\x07\x1b]4;11;?\x07\x1b]4;12;?\x07\x1b]4;13;?\x07\x1b]4;14;?\x07\x1b]4;15;?\x07";
const THEME_QUERY: &str = "\x1b]10;?\x07\x1b]11;?\x07\x1b]4;0;?\x07\x1b]4;1;?\x07\x1b]4;2;?\x07\x1b]4;3;?\x07\x1b]4;4;?\x07\x1b]4;5;?\x07\x1b]4;6;?\x07\x1b]4;7;?\x07\x1b]4;8;?\x07\x1b]4;9;?\x07\x1b]4;10;?\x07\x1b]4;11;?\x07\x1b]4;12;?\x07\x1b]4;13;?\x07\x1b]4;14;?\x07\x1b]4;15;?\x07";
const XTVERSION_QUERY: &str = "\x1b[>0q";
#[cfg(all(not(target_os = "macos"), not(feature = "macos-tty")))]
mod default;
#[cfg(any(target_os = "macos", feature = "macos-tty"))]
mod macos;
#[cfg(all(not(target_os = "macos"), not(feature = "macos-tty")))]
pub use default::DevTty;
#[cfg(any(target_os = "macos", feature = "macos-tty"))]
pub use macos::DevTty;
#[derive(Clone, Copy, Debug, PartialEq)]
pub struct TtySize(pub u16, pub u16);
@@ -26,11 +34,6 @@ pub struct TtyTheme {
pub palette: Vec<RGB8>,
}
pub struct DevTty {
file: AsyncFd<File>,
settings: libc::termios,
}
pub struct NullTty;
pub struct FixedSizeTty<T> {
@@ -92,125 +95,6 @@ impl From<TtySize> for (u16, u16) {
}
}
impl DevTty {
pub async fn open() -> anyhow::Result<Self> {
let file = File::options()
.read(true)
.write(true)
.custom_flags(libc::O_NONBLOCK)
.open("/dev/tty")?;
let file = AsyncFd::new(file)?;
let settings = make_raw(&file)?;
Ok(Self { file, settings })
}
async fn query(&self, query: &str) -> anyhow::Result<Vec<u8>> {
let mut query = query.to_string().into_bytes();
query.extend_from_slice(b"\x1b[c");
let mut query = &query[..];
let mut response = Vec::new();
let mut buf = [0u8; 1024];
loop {
tokio::select! {
result = self.read(&mut buf) => {
let n = result?;
response.extend_from_slice(&buf[..n]);
if let Some(len) = complete_da_response_len(&response) {
response.truncate(len);
break;
}
}
result = self.write(query), if !query.is_empty() => {
let n = result?;
query = &query[n..];
}
_ = time::sleep(Duration::from_millis(QUERY_READ_TIMEOUT)) => {
break;
}
}
}
Ok(response)
}
pub async fn resize(&mut self, size: TtySize) -> io::Result<()> {
let xtwinops_seq = format!("\x1b[8;{};{}t", size.1, size.0);
self.write_all(xtwinops_seq.as_bytes()).await?;
Ok(())
}
}
impl Drop for DevTty {
fn drop(&mut self) {
let termios = Termios::from(self.settings);
let _ = termios::tcsetattr(self.file.as_fd(), SetArg::TCSANOW, &termios);
}
}
#[async_trait(?Send)]
impl Tty for DevTty {
fn get_size(&self) -> Winsize {
let mut winsize = Winsize {
ws_row: 24,
ws_col: 80,
ws_xpixel: 0,
ws_ypixel: 0,
};
unsafe { libc::ioctl(self.file.as_raw_fd(), libc::TIOCGWINSZ, &mut winsize) };
winsize
}
async fn get_theme(&mut self) -> Option<TtyTheme> {
let response = self.query(COLORS_QUERY).await.ok()?;
let response = String::from_utf8_lossy(response.as_slice());
let mut colors = response.match_indices("rgb:");
let (idx, _) = colors.next()?;
let fg = parse_color(&response[idx + 4..])?;
let (idx, _) = colors.next()?;
let bg = parse_color(&response[idx + 4..])?;
let mut palette = Vec::new();
for _ in 0..16 {
let (idx, _) = colors.next()?;
let color = parse_color(&response[idx + 4..])?;
palette.push(color);
}
Some(TtyTheme { fg, bg, palette })
}
async fn get_version(&mut self) -> Option<String> {
let response = self.query(XTVERSION_QUERY).await.ok()?;
if let [b'\x1b', b'P', b'>', b'|', version @ .., b'\x1b', b'\\'] = &response[..] {
Some(String::from_utf8_lossy(version).to_string())
} else {
None
}
}
async fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
self.file
.async_io(Interest::READABLE, |mut file| file.read(buf))
.await
}
async fn write(&self, buf: &[u8]) -> io::Result<usize> {
self.file
.async_io(Interest::WRITABLE, |mut file| file.write(buf))
.await
}
}
impl<T: Tty> FixedSizeTty<T> {
pub fn new(inner: T, cols: Option<u16>, rows: Option<u16>) -> Self {
Self { inner, cols, rows }
@@ -287,6 +171,47 @@ fn make_raw<F: AsFd>(fd: F) -> anyhow::Result<libc::termios> {
Ok(termios.into())
}
async fn get_theme<T: Tty>(tty: &T) -> Option<TtyTheme> {
parse_theme_response(&query(tty, THEME_QUERY).await.ok()?)
}
async fn get_version<T: Tty>(tty: &T) -> Option<String> {
parse_version_response(&query(tty, XTVERSION_QUERY).await.ok()?)
}
async fn query<T: Tty>(tty: &T, query: &str) -> anyhow::Result<Vec<u8>> {
let mut query = query.to_string().into_bytes();
query.extend_from_slice(b"\x1b[c");
let mut query = &query[..];
let mut response = Vec::new();
let mut buf = [0u8; 1024];
loop {
tokio::select! {
result = tty.read(&mut buf) => {
let n = result?;
response.extend_from_slice(&buf[..n]);
if let Some(len) = complete_da_response_len(&response) {
response.truncate(len);
break;
}
}
result = tty.write(query), if !query.is_empty() => {
let n = result?;
query = &query[n..];
}
_ = time::sleep(Duration::from_millis(QUERY_READ_TIMEOUT)) => {
break;
}
}
}
Ok(response)
}
fn complete_da_response_len(response: &[u8]) -> Option<usize> {
let mut reversed = response.iter().rev();
let mut includes_da_response = false;
@@ -316,6 +241,24 @@ fn complete_da_response_len(response: &[u8]) -> Option<usize> {
}
}
fn parse_theme_response(response: &[u8]) -> Option<TtyTheme> {
let response = String::from_utf8_lossy(response);
let mut colors = response.match_indices("rgb:");
let (idx, _) = colors.next()?;
let fg = parse_color(&response[idx + 4..])?;
let (idx, _) = colors.next()?;
let bg = parse_color(&response[idx + 4..])?;
let mut palette = Vec::new();
for _ in 0..16 {
let (idx, _) = colors.next()?;
let color = parse_color(&response[idx + 4..])?;
palette.push(color);
}
Some(TtyTheme { fg, bg, palette })
}
fn parse_color(rgb: &str) -> Option<RGB8> {
let mut components = rgb.split('/');
let r_hex = components.next()?;
@@ -333,6 +276,14 @@ fn parse_color(rgb: &str) -> Option<RGB8> {
Some(RGB8::new(r, g, b))
}
fn parse_version_response(response: &[u8]) -> Option<String> {
if let [b'\x1b', b'P', b'>', b'|', version @ .., b'\x1b', b'\\'] = response {
Some(String::from_utf8_lossy(version).to_string())
} else {
None
}
}
#[cfg(test)]
mod tests {
use super::{FixedSizeTty, NullTty, Tty};

83
src/tty/default.rs Normal file
View File

@@ -0,0 +1,83 @@
use std::fs::File;
use std::io::{Read, Write};
use std::os::fd::{AsFd, AsRawFd};
use std::os::unix::fs::OpenOptionsExt;
use async_trait::async_trait;
use nix::pty::Winsize;
use nix::sys::termios::{self, SetArg, Termios};
use nix::libc;
use tokio::io::unix::AsyncFd;
use tokio::io::{self, Interest};
use super::{TtySize, Tty, TtyTheme};
pub struct DevTty {
file: AsyncFd<File>,
settings: libc::termios,
}
impl DevTty {
pub async fn open() -> anyhow::Result<Self> {
let file = File::options()
.read(true)
.write(true)
.custom_flags(libc::O_NONBLOCK)
.open("/dev/tty")?;
let file = AsyncFd::new(file)?;
let settings = super::make_raw(&file)?;
Ok(Self { file, settings })
}
pub async fn resize(&mut self, size: TtySize) -> io::Result<()> {
let xtwinops_seq = format!("\x1b[8;{};{}t", size.1, size.0);
self.write_all(xtwinops_seq.as_bytes()).await?;
Ok(())
}
}
impl Drop for DevTty {
fn drop(&mut self) {
let termios = Termios::from(self.settings);
let _ = termios::tcsetattr(self.file.as_fd(), SetArg::TCSANOW, &termios);
}
}
#[async_trait(?Send)]
impl Tty for DevTty {
fn get_size(&self) -> Winsize {
let mut winsize = Winsize {
ws_row: 24,
ws_col: 80,
ws_xpixel: 0,
ws_ypixel: 0,
};
unsafe { libc::ioctl(self.file.as_raw_fd(), libc::TIOCGWINSZ, &mut winsize) };
winsize
}
async fn get_theme(&mut self) -> Option<TtyTheme> {
super::get_theme(self).await
}
async fn get_version(&mut self) -> Option<String> {
super::get_version(self).await
}
async fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
self.file
.async_io(Interest::READABLE, |mut file| file.read(buf))
.await
}
async fn write(&self, buf: &[u8]) -> io::Result<usize> {
self.file
.async_io(Interest::WRITABLE, |mut file| file.write(buf))
.await
}
}

224
src/tty/macos.rs Normal file
View File

@@ -0,0 +1,224 @@
/// This is an alternative implementation of DevTty that we use on macOS due to a bug in macOS's
/// kqueue implementation when polling /dev/tty.
///
/// See below links for more about the problem:
///
/// https://code.saghul.net/2016/05/libuv-internals-the-osx-select2-trick/
/// https://nathancraddock.com/blog/macos-dev-tty-polling/
///
use std::fs::File;
use std::os::fd::{AsFd, AsRawFd, BorrowedFd, OwnedFd};
use std::os::unix::fs::OpenOptionsExt;
use std::thread;
use async_trait::async_trait;
use nix::errno::Errno;
use nix::pty::Winsize;
use nix::sys::select;
use nix::sys::termios::{self, SetArg, Termios};
use nix::{libc, unistd};
use tokio::io::unix::AsyncFd;
use tokio::io::{self, Interest};
use super::{Tty, TtySize, TtyTheme};
use crate::fd::FdExt;
const BUF_SIZE: usize = 128 * 1024;
pub struct DevTty {
file: File,
read_r_fd: AsyncFd<OwnedFd>,
write_w_fd: AsyncFd<OwnedFd>,
settings: libc::termios,
}
impl DevTty {
pub async fn open() -> anyhow::Result<Self> {
let file = File::options()
.read(true)
.write(true)
.custom_flags(libc::O_NONBLOCK)
.open("/dev/tty")?;
let settings = super::make_raw(&file)?;
let (read_r_fd, read_w_fd) = unistd::pipe()?;
read_r_fd.set_nonblocking()?;
let read_r_fd = AsyncFd::new(read_r_fd)?;
let (write_r_fd, write_w_fd) = unistd::pipe()?;
write_w_fd.set_nonblocking()?;
let write_w_fd = AsyncFd::new(write_w_fd)?;
// Note about unsafe borrow below: This is on purpose. We can't move proper BorrowedFd to a
// thread (does not live long enough), and we also don't want to use Arc because the
// threads would prevent closing of the file when DevTty is dropped. Use of borrow_raw here
// lets us rely on the fact that dropping of DevTty will close the file and cause EOF or
// I/O error in the background threads, which is what lets us shut down those threads.
let tty_fd = unsafe { BorrowedFd::borrow_raw(file.as_raw_fd()) };
thread::spawn(move || {
copy(tty_fd, read_w_fd);
});
let tty_fd = unsafe { BorrowedFd::borrow_raw(file.as_raw_fd()) };
thread::spawn(move || {
copy(write_r_fd, tty_fd);
});
Ok(Self {
file,
read_r_fd,
write_w_fd,
settings,
})
}
pub async fn resize(&mut self, size: TtySize) -> io::Result<()> {
let xtwinops_seq = format!("\x1b[8;{};{}t", size.1, size.0);
self.write_all(xtwinops_seq.as_bytes()).await?;
Ok(())
}
}
impl Drop for DevTty {
fn drop(&mut self) {
let termios = Termios::from(self.settings);
let _ = termios::tcsetattr(self.file.as_fd(), SetArg::TCSANOW, &termios);
}
}
#[async_trait(?Send)]
impl Tty for DevTty {
fn get_size(&self) -> Winsize {
let mut winsize = Winsize {
ws_row: 24,
ws_col: 80,
ws_xpixel: 0,
ws_ypixel: 0,
};
unsafe { libc::ioctl(self.file.as_raw_fd(), libc::TIOCGWINSZ, &mut winsize) };
winsize
}
async fn get_theme(&mut self) -> Option<TtyTheme> {
super::get_theme(self).await
}
async fn get_version(&mut self) -> Option<String> {
super::get_version(self).await
}
async fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
self.read_r_fd
.async_io(Interest::READABLE, |fd| {
unistd::read(fd, buf).map_err(|e| e.into())
})
.await
}
async fn write(&self, buf: &[u8]) -> io::Result<usize> {
self.write_w_fd
.async_io(Interest::WRITABLE, |fd| {
unistd::write(fd, buf).map_err(|e| e.into())
})
.await
}
}
fn copy<F: AsFd, G: AsFd>(src_fd: F, dst_fd: G) {
let src_fd = src_fd.as_fd();
let dst_fd = dst_fd.as_fd();
let mut buf = [0u8; BUF_SIZE];
let mut data = Vec::with_capacity(BUF_SIZE);
loop {
let mut read_fds = select::FdSet::new();
let mut write_fds = select::FdSet::new();
read_fds.insert(src_fd);
if !data.is_empty() {
write_fds.insert(dst_fd);
}
match select::select(None, Some(&mut read_fds), Some(&mut write_fds), None, None) {
Ok(0) | Err(Errno::EINTR) => {
continue;
}
Ok(_) => {}
Err(_) => {
break;
}
}
if read_fds.contains(src_fd) {
match unistd::read(src_fd, &mut buf) {
Ok(0) => break,
Ok(n) => {
data.extend_from_slice(&buf[..n]);
}
Err(Errno::EWOULDBLOCK) => {}
Err(_) => {
break;
}
}
}
if write_fds.contains(dst_fd) {
match unistd::write(dst_fd, &data) {
Ok(n) => {
data.drain(..n);
}
Err(Errno::EWOULDBLOCK) => {}
Err(_) => {
break;
}
}
}
}
while !data.is_empty() {
let mut write_fds = select::FdSet::new();
write_fds.insert(dst_fd);
match select::select(None, None, Some(&mut write_fds), None, None) {
Ok(1) => {}
Ok(0) | Err(Errno::EINTR) => {
continue;
}
Ok(_) => {
unreachable!();
}
Err(_) => {
break;
}
}
match unistd::write(dst_fd, &data) {
Ok(n) => {
data.drain(..n);
}
Err(Errno::EWOULDBLOCK) => {}
Err(_) => {
break;
}
}
}
}