diff --git a/src/streamer/forwarder.rs b/src/streamer/forwarder.rs index 5e43a9d..0578208 100644 --- a/src/streamer/forwarder.rs +++ b/src/streamer/forwarder.rs @@ -2,6 +2,7 @@ use super::alis; use super::session; use anyhow::anyhow; use anyhow::bail; +use axum::http::Uri; use core::future::{self, Future}; use futures_util::{stream, SinkExt, Stream, StreamExt}; use std::borrow::Cow; @@ -14,7 +15,7 @@ use tokio_stream::wrappers::errors::BroadcastStreamRecvError; use tokio_stream::wrappers::IntervalStream; use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode; use tokio_tungstenite::tungstenite::protocol::CloseFrame; -use tokio_tungstenite::tungstenite::Message; +use tokio_tungstenite::tungstenite::{self, ClientRequestBuilder, Message}; use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; use tracing::{debug, error, info}; @@ -65,6 +66,16 @@ pub async fn forward( } Err(e) => { + if let Some(tungstenite::error::Error::Protocol( + tungstenite::error::ProtocolError::SecWebSocketSubProtocolError(_), + )) = e.downcast_ref::() + { + let _ = notifier_tx + .send("CLI not compatible with the server, forwarding failed".to_string()); + + break; + } + error!("connection error: {e}"); if reconnect_attempt == 0 { @@ -94,7 +105,9 @@ async fn connect_and_forward( url: &url::Url, clients_tx: &mpsc::Sender, ) -> anyhow::Result { - let (ws, _) = tokio_tungstenite::connect_async_with_config(url.to_string(), None, true).await?; + let uri: Uri = url.to_string().parse()?; + let builder = ClientRequestBuilder::new(uri).with_sub_protocol("v1.alis"); + let (ws, _) = tokio_tungstenite::connect_async_with_config(builder, None, true).await?; info!("connected to the endpoint"); let events = event_stream(clients_tx).await?;