Fix race condition in forwarder (remote streaming) for very short sessions

This commit is contained in:
Marcin Kulik
2025-06-06 16:03:38 +02:00
parent 3419d03041
commit 0a5dbc2bf5
3 changed files with 20 additions and 19 deletions

View File

@@ -17,7 +17,7 @@ static MAGIC_STRING: &str = "ALiS\x01";
struct EventSerializer(u64);
pub async fn stream<S: Stream<Item = Result<Event, BroadcastStreamRecvError>>>(
pub fn stream<S: Stream<Item = Result<Event, BroadcastStreamRecvError>>>(
stream: S,
) -> impl Stream<Item = Result<Vec<u8>, BroadcastStreamRecvError>> {
let header = stream::once(future::ready(Ok(MAGIC_STRING.into())));

View File

@@ -19,7 +19,7 @@ use tracing::{debug, error, info};
use crate::alis;
use crate::api;
use crate::notifier::Notifier;
use crate::stream::Subscriber;
use crate::stream::{Event, Subscriber};
const PING_INTERVAL: u64 = 15;
const PING_TIMEOUT: u64 = 10;
@@ -38,7 +38,12 @@ pub async fn forward<N: Notifier>(
let mut connection_count: u64 = 0;
loop {
let conn = connect_and_forward(&url, &subscriber);
let stream = subscriber
.subscribe()
.await
.expect("stream should be alive");
let conn = connect_and_forward(&url, stream);
tokio::pin!(conn);
let result = tokio::select! {
@@ -61,7 +66,9 @@ pub async fn forward<N: Notifier>(
};
match result {
Ok(true) => break,
Ok(true) => {
break;
}
Ok(false) => {
let _ = notifier.notify("Stream halted by the server".to_string());
@@ -123,7 +130,10 @@ pub async fn forward<N: Notifier>(
}
}
async fn connect_and_forward(url: &url::Url, subscriber: &Subscriber) -> anyhow::Result<bool> {
async fn connect_and_forward(
url: &url::Url,
stream: impl Stream<Item = Result<Event, BroadcastStreamRecvError>> + Unpin,
) -> anyhow::Result<bool> {
let uri: Uri = url.to_string().parse()?;
let builder = ClientRequestBuilder::new(uri)
@@ -132,24 +142,14 @@ async fn connect_and_forward(url: &url::Url, subscriber: &Subscriber) -> anyhow:
let (ws, _) = tokio_tungstenite::connect_async_with_config(builder, None, true).await?;
info!("connected to the endpoint");
let events = event_stream(subscriber).await?;
handle_socket(ws, events).await
}
async fn event_stream(
subscriber: &Subscriber,
) -> anyhow::Result<impl Stream<Item = anyhow::Result<Message>>> {
let stream = subscriber.subscribe().await?;
let stream = alis::stream(stream)
.await
let events = alis::stream(stream)
.map(ws_result)
.chain(futures_util::stream::once(future::ready(Ok(
close_message(),
))));
Ok(stream)
handle_socket(ws, events).await
}
async fn handle_socket<T>(
@@ -172,7 +172,9 @@ where
timeout(Duration::from_secs(SEND_TIMEOUT), sink.send(event?)).await.map_err(|_| anyhow!("send timeout"))??;
},
None => return Ok(true)
None => {
return Ok(true);
}
}
},

View File

@@ -122,7 +122,6 @@ async fn handle_socket(socket: WebSocket, subscriber: Subscriber) -> anyhow::Res
let stream = subscriber.subscribe().await?;
let result = alis::stream(stream)
.await
.map(ws_result)
.chain(futures_util::stream::once(future::ready(Ok(close_msg))))
.forward(sink)