Add WS subprotocol negotiation in live stream forwarder

This commit is contained in:
Marcin Kulik
2025-01-18 21:19:08 +01:00
parent 0203522dbc
commit cec993a143

View File

@@ -2,6 +2,7 @@ use super::alis;
use super::session; use super::session;
use anyhow::anyhow; use anyhow::anyhow;
use anyhow::bail; use anyhow::bail;
use axum::http::Uri;
use core::future::{self, Future}; use core::future::{self, Future};
use futures_util::{stream, SinkExt, Stream, StreamExt}; use futures_util::{stream, SinkExt, Stream, StreamExt};
use std::borrow::Cow; use std::borrow::Cow;
@@ -14,7 +15,7 @@ use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
use tokio_stream::wrappers::IntervalStream; use tokio_stream::wrappers::IntervalStream;
use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode; use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode;
use tokio_tungstenite::tungstenite::protocol::CloseFrame; use tokio_tungstenite::tungstenite::protocol::CloseFrame;
use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::tungstenite::{self, ClientRequestBuilder, Message};
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use tracing::{debug, error, info}; use tracing::{debug, error, info};
@@ -65,6 +66,16 @@ pub async fn forward(
} }
Err(e) => { Err(e) => {
if let Some(tungstenite::error::Error::Protocol(
tungstenite::error::ProtocolError::SecWebSocketSubProtocolError(_),
)) = e.downcast_ref::<tungstenite::error::Error>()
{
let _ = notifier_tx
.send("CLI not compatible with the server, forwarding failed".to_string());
break;
}
error!("connection error: {e}"); error!("connection error: {e}");
if reconnect_attempt == 0 { if reconnect_attempt == 0 {
@@ -94,7 +105,9 @@ async fn connect_and_forward(
url: &url::Url, url: &url::Url,
clients_tx: &mpsc::Sender<session::Client>, clients_tx: &mpsc::Sender<session::Client>,
) -> anyhow::Result<bool> { ) -> anyhow::Result<bool> {
let (ws, _) = tokio_tungstenite::connect_async_with_config(url.to_string(), None, true).await?; let uri: Uri = url.to_string().parse()?;
let builder = ClientRequestBuilder::new(uri).with_sub_protocol("v1.alis");
let (ws, _) = tokio_tungstenite::connect_async_with_config(builder, None, true).await?;
info!("connected to the endpoint"); info!("connected to the endpoint");
let events = event_stream(clients_tx).await?; let events = event_stream(clients_tx).await?;