mirror of
https://github.com/infinilabs/coco-app.git
synced 2025-12-16 19:47:43 +01:00
chore: support multi websocket connections (#314)
* chore: temp commit * chore: add WebSocket session ID to chat message API headers * chore: add param clientId * feat: add websocket id * chore: add debug logs * chore: add log * chore: add connecting * chore: remove partialize * fix: fix to support multi websocket connection * chore: update release notes --------- Co-authored-by: rain <15911122312@163.com>
This commit is contained in:
@@ -16,6 +16,9 @@ Information about release notes of Coco Server is provided here.
|
||||
### Features
|
||||
|
||||
- feat: chat mode support for uploading files #310
|
||||
- feat: support multi websocket connections #314
|
||||
- feat: add support for embeddable web widget #277
|
||||
|
||||
|
||||
### Bug fix
|
||||
|
||||
|
||||
@@ -118,6 +118,7 @@ pub async fn cancel_session_chat<R: Runtime>(
|
||||
pub async fn new_chat<R: Runtime>(
|
||||
_app_handle: AppHandle<R>,
|
||||
server_id: String,
|
||||
websocket_id: String,
|
||||
message: String,
|
||||
query_params: Option<HashMap<String, Value>>, //search,deep_thinking
|
||||
) -> Result<GetResponse, String> {
|
||||
@@ -131,7 +132,10 @@ pub async fn new_chat<R: Runtime>(
|
||||
None
|
||||
};
|
||||
|
||||
let response = HttpClient::post(&server_id, "/chat/_new", query_params, body)
|
||||
let mut headers = HashMap::new();
|
||||
headers.insert("WEBSOCKET-SESSION-ID".to_string(), websocket_id.into());
|
||||
|
||||
let response = HttpClient::advanced_post(&server_id, "/chat/_new", Some(headers), query_params, body)
|
||||
.await
|
||||
.map_err(|e| format!("Error sending message: {}", e))?;
|
||||
|
||||
@@ -156,6 +160,7 @@ pub async fn new_chat<R: Runtime>(
|
||||
pub async fn send_message<R: Runtime>(
|
||||
_app_handle: AppHandle<R>,
|
||||
server_id: String,
|
||||
websocket_id: String,
|
||||
session_id: String,
|
||||
message: String,
|
||||
query_params: Option<HashMap<String, Value>>, //search,deep_thinking
|
||||
@@ -165,9 +170,12 @@ pub async fn send_message<R: Runtime>(
|
||||
message: Some(message),
|
||||
};
|
||||
|
||||
let mut headers = HashMap::new();
|
||||
headers.insert("WEBSOCKET-SESSION-ID".to_string(), websocket_id.into());
|
||||
|
||||
let body = reqwest::Body::from(serde_json::to_string(&msg).unwrap());
|
||||
let response =
|
||||
HttpClient::advanced_post(&server_id, path.as_str(), None, query_params, Some(body))
|
||||
HttpClient::advanced_post(&server_id, path.as_str(), Some(headers), query_params, Some(body))
|
||||
.await
|
||||
.map_err(|e| format!("Error cancel session: {}", e))?;
|
||||
|
||||
|
||||
@@ -25,7 +25,7 @@ pub fn save_datasource_to_cache(server_id: &str, datasources: Vec<DataSource>) {
|
||||
#[allow(dead_code)]
|
||||
pub fn get_datasources_from_cache(server_id: &str) -> Option<HashMap<String, DataSource>> {
|
||||
let cache = DATASOURCE_CACHE.read().unwrap(); // Acquire read lock
|
||||
// dbg!("cache: {:?}", &cache);
|
||||
// dbg!("cache: {:?}", &cache);
|
||||
let server_cache = cache.get(server_id)?; // Get the server's cache
|
||||
Some(server_cache.clone())
|
||||
}
|
||||
@@ -79,8 +79,6 @@ pub async fn refresh_all_datasources<R: Runtime>(_app_handle: &AppHandle<R>) ->
|
||||
cache.extend(server_map);
|
||||
cache.len()
|
||||
};
|
||||
// dbg!("datasource_map size: {:?}", cache_size);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -90,7 +88,6 @@ pub async fn get_datasources_by_server(id: &str) -> Result<Vec<DataSource>, Stri
|
||||
let resp = HttpClient::get(id, "/datasource/_search", None)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
// dbg!("Error fetching datasource: {}", &e);
|
||||
format!("Error fetching datasource: {}", e)
|
||||
})?;
|
||||
|
||||
|
||||
@@ -1,186 +1,132 @@
|
||||
use crate::server::servers::{get_server_by_id, get_server_token};
|
||||
use futures_util::StreamExt;
|
||||
use http::{HeaderMap, HeaderName, HeaderValue};
|
||||
use futures::StreamExt;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use tauri::Emitter;
|
||||
use tauri::{AppHandle, Emitter};
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::sync::{mpsc, Mutex};
|
||||
use tokio_tungstenite::tungstenite::Error;
|
||||
use tokio_tungstenite::tungstenite::Error as WsError;
|
||||
use tokio_tungstenite::{
|
||||
connect_async, tungstenite::protocol::Message, MaybeTlsStream, WebSocketStream,
|
||||
};
|
||||
use tungstenite::handshake::client::generate_key;
|
||||
use tokio_tungstenite::tungstenite::handshake::client::generate_key;
|
||||
use tokio_tungstenite::tungstenite::Message;
|
||||
use tokio_tungstenite::WebSocketStream;
|
||||
use tokio_tungstenite::{connect_async, MaybeTlsStream};
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct WebSocketManager {
|
||||
ws_connection: Arc<Mutex<Option<WebSocketStream<MaybeTlsStream<TcpStream>>>>>,
|
||||
cancel_tx: Arc<Mutex<Option<mpsc::Sender<()>>>>,
|
||||
connections: Arc<Mutex<HashMap<String, Arc<WebSocketInstance>>>>,
|
||||
}
|
||||
|
||||
struct WebSocketInstance {
|
||||
ws_connection: Mutex<WebSocketStream<MaybeTlsStream<TcpStream>>>, // No need to lock the entire map
|
||||
cancel_tx: mpsc::Sender<()>,
|
||||
}
|
||||
|
||||
// Function to convert the HTTP endpoint to WebSocket endpoint
|
||||
fn convert_to_websocket(endpoint: &str) -> Result<String, String> {
|
||||
let url = url::Url::parse(endpoint).map_err(|e| format!("Invalid URL: {}", e))?;
|
||||
let ws_protocol = if url.scheme() == "https" { "wss://" } else { "ws://" };
|
||||
let host = url.host_str().ok_or("No host found in URL")?;
|
||||
let port = url.port_or_known_default().unwrap_or(if url.scheme() == "https" { 443 } else { 80 });
|
||||
|
||||
// Determine WebSocket protocol based on the scheme
|
||||
let ws_protocol = if url.scheme() == "https" {
|
||||
"wss://"
|
||||
} else {
|
||||
"ws://"
|
||||
};
|
||||
|
||||
// Extract host and port (if present)
|
||||
let host = url.host_str().ok_or_else(|| "No host found in URL")?;
|
||||
let port = url
|
||||
.port_or_known_default()
|
||||
.unwrap_or(if url.scheme() == "https" { 443 } else { 80 });
|
||||
|
||||
// Build WebSocket URL, include the port if not the default
|
||||
let ws_endpoint = if port == 80 || port == 443 {
|
||||
format!("{}{}{}", ws_protocol, host, "/ws")
|
||||
} else {
|
||||
format!("{}{}:{}/ws", ws_protocol, host, port)
|
||||
};
|
||||
|
||||
Ok(ws_endpoint)
|
||||
}
|
||||
|
||||
// Function to build a HeaderMap from a vector of key-value pairs
|
||||
#[allow(dead_code)]
|
||||
fn build_header_map(headers: Vec<(String, String)>) -> Result<HeaderMap, String> {
|
||||
let mut header_map = HeaderMap::new();
|
||||
for (key, value) in headers {
|
||||
let header_name = HeaderName::from_bytes(key.as_bytes())
|
||||
.map_err(|e| format!("Invalid header name: {}", e))?;
|
||||
let header_value =
|
||||
HeaderValue::from_str(&value).map_err(|e| format!("Invalid header value: {}", e))?;
|
||||
header_map.insert(header_name, header_value);
|
||||
}
|
||||
Ok(header_map)
|
||||
}
|
||||
|
||||
#[tauri::command]
|
||||
pub async fn connect_to_server(
|
||||
id: String,
|
||||
client_id: String,
|
||||
state: tauri::State<'_, WebSocketManager>,
|
||||
app_handle: tauri::AppHandle,
|
||||
app_handle: AppHandle,
|
||||
) -> Result<(), String> {
|
||||
// Disconnect any existing connection first
|
||||
disconnect(state.clone()).await?;
|
||||
let connections_clone = state.connections.clone();
|
||||
|
||||
// Retrieve server details
|
||||
let server =
|
||||
get_server_by_id(id.as_str()).ok_or_else(|| format!("Server with ID {} not found", id))?;
|
||||
let endpoint = convert_to_websocket(server.endpoint.as_str())?;
|
||||
// Disconnect old connection first
|
||||
disconnect(client_id.clone(), state.clone()).await.ok();
|
||||
|
||||
// Retrieve the token for the server (token is optional)
|
||||
let token = get_server_token(id.as_str())
|
||||
.await?
|
||||
.map(|t| t.access_token.clone());
|
||||
let server = get_server_by_id(&id).ok_or(format!("Server with ID {} not found", id))?;
|
||||
let endpoint = convert_to_websocket(&server.endpoint)?;
|
||||
let token = get_server_token(&id).await?.map(|t| t.access_token.clone());
|
||||
|
||||
// Create the WebSocket request
|
||||
let mut request =
|
||||
tokio_tungstenite::tungstenite::client::IntoClientRequest::into_client_request(&endpoint)
|
||||
.map_err(|e| format!("Failed to create WebSocket request: {}", e))?;
|
||||
|
||||
// Add necessary headers
|
||||
request
|
||||
.headers_mut()
|
||||
.insert("Connection", "Upgrade".parse().unwrap());
|
||||
request
|
||||
.headers_mut()
|
||||
.insert("Upgrade", "websocket".parse().unwrap());
|
||||
request
|
||||
.headers_mut()
|
||||
.insert("Sec-WebSocket-Version", "13".parse().unwrap());
|
||||
request
|
||||
.headers_mut()
|
||||
.insert("Sec-WebSocket-Key", generate_key().parse().unwrap());
|
||||
request.headers_mut().insert("Connection", "Upgrade".parse().unwrap());
|
||||
request.headers_mut().insert("Upgrade", "websocket".parse().unwrap());
|
||||
request.headers_mut().insert("Sec-WebSocket-Version", "13".parse().unwrap());
|
||||
request.headers_mut().insert("Sec-WebSocket-Key", generate_key().parse().unwrap());
|
||||
|
||||
// If a token exists, add it to the headers
|
||||
if let Some(token) = token {
|
||||
request
|
||||
.headers_mut()
|
||||
.insert("X-API-TOKEN", token.parse().unwrap());
|
||||
request.headers_mut().insert("X-API-TOKEN", token.parse().unwrap());
|
||||
}
|
||||
|
||||
// Establish the WebSocket connection
|
||||
// dbg!(&request);
|
||||
let (ws_remote, _) = connect_async(request).await.map_err(|e| match e {
|
||||
Error::ConnectionClosed => "WebSocket connection was closed".to_string(),
|
||||
Error::Protocol(protocol_error) => format!("Protocol error: {}", protocol_error),
|
||||
Error::Utf8 => "UTF-8 error in WebSocket data".to_string(),
|
||||
_ => format!("Unknown error: {:?}", e),
|
||||
})?;
|
||||
|
||||
// Create cancellation channel
|
||||
let (ws_stream, _) = connect_async(request).await.map_err(|e| format!("WebSocket error: {:?}", e))?;
|
||||
let (cancel_tx, mut cancel_rx) = mpsc::channel(1);
|
||||
|
||||
// Store connection and cancellation sender
|
||||
*state.ws_connection.lock().await = Some(ws_remote);
|
||||
*state.cancel_tx.lock().await = Some(cancel_tx);
|
||||
// Spawn listener task with cancellation
|
||||
let instance = Arc::new(WebSocketInstance {
|
||||
ws_connection: Mutex::new(ws_stream),
|
||||
cancel_tx,
|
||||
});
|
||||
|
||||
// Insert connection into the map (lock is held briefly)
|
||||
{
|
||||
let mut connections = connections_clone.lock().await;
|
||||
connections.insert(client_id.clone(), instance.clone());
|
||||
}
|
||||
|
||||
// Spawn WebSocket handler in a separate task
|
||||
let app_handle_clone = app_handle.clone();
|
||||
let connection_clone = state.ws_connection.clone();
|
||||
let client_id_clone = client_id.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut connection = connection_clone.lock().await;
|
||||
if let Some(ws) = connection.as_mut() {
|
||||
loop {
|
||||
tokio::select! {
|
||||
msg = ws.next() => {
|
||||
match msg {
|
||||
Some(Ok(Message::Text(text))) => {
|
||||
// println!("Received message: {}", text);
|
||||
let _ = app_handle_clone.emit("ws-message", text);
|
||||
},
|
||||
Some(Err(WsError::ConnectionClosed)) => {
|
||||
let _ = app_handle_clone.emit("ws-error", id);
|
||||
eprintln!("WebSocket connection closed by the server.");
|
||||
break;
|
||||
},
|
||||
Some(Err(WsError::Protocol(e))) => {
|
||||
let _ = app_handle_clone.emit("ws-error", id);
|
||||
eprintln!("Protocol error: {}", e);
|
||||
break;
|
||||
},
|
||||
Some(Err(WsError::Utf8)) => {
|
||||
let _ = app_handle_clone.emit("ws-error", id);
|
||||
eprintln!("Received invalid UTF-8 data.");
|
||||
break;
|
||||
},
|
||||
Some(Err(_)) => {
|
||||
let _ = app_handle_clone.emit("ws-error", id);
|
||||
eprintln!("WebSocket error encountered.");
|
||||
break;
|
||||
},
|
||||
_ => continue,
|
||||
let ws = &mut *instance.ws_connection.lock().await;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
msg = ws.next() => {
|
||||
match msg {
|
||||
Some(Ok(Message::Text(text))) => {
|
||||
let _ = app_handle_clone.emit(&format!("ws-message-{}", client_id_clone), text);
|
||||
},
|
||||
Some(Err(_)) | None => {
|
||||
let _ = app_handle_clone.emit(&format!("ws-error-{}", client_id_clone), id.clone());
|
||||
break;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
_ = cancel_rx.recv() => {
|
||||
let _ = app_handle_clone.emit("ws-error", id);
|
||||
dbg!("Cancelling WebSocket connection");
|
||||
break;
|
||||
}
|
||||
}
|
||||
_ = cancel_rx.recv() => {
|
||||
let _ = app_handle_clone.emit(&format!("ws-error-{}", client_id_clone), id.clone());
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Remove connection after it closes
|
||||
let mut connections = connections_clone.lock().await;
|
||||
connections.remove(&client_id_clone);
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tauri::command]
|
||||
pub async fn disconnect(state: tauri::State<'_, WebSocketManager>) -> Result<(), String> {
|
||||
// Send cancellation signal
|
||||
if let Some(cancel_tx) = state.cancel_tx.lock().await.take() {
|
||||
let _ = cancel_tx.send(()).await;
|
||||
}
|
||||
|
||||
// Close connection
|
||||
let mut connection = state.ws_connection.lock().await;
|
||||
if let Some(mut ws) = connection.take() {
|
||||
#[tauri::command]
|
||||
pub async fn disconnect(client_id: String, state: tauri::State<'_, WebSocketManager>) -> Result<(), String> {
|
||||
let instance = {
|
||||
let mut connections = state.connections.lock().await;
|
||||
connections.remove(&client_id)
|
||||
};
|
||||
|
||||
if let Some(instance) = instance {
|
||||
let _ = instance.cancel_tx.send(()).await;
|
||||
|
||||
// Close WebSocket (lock only the connection, not the whole map)
|
||||
let mut ws = instance.ws_connection.lock().await;
|
||||
let _ = ws.close(None).await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -58,8 +58,12 @@ export function get_datasources_by_server(id: string): Promise<DataSource[]> {
|
||||
return invoke(`get_datasources_by_server`, { id });
|
||||
}
|
||||
|
||||
export function connect_to_server(id: string): Promise<void> {
|
||||
return invoke(`connect_to_server`, { id });
|
||||
export function connect_to_server(id: string, clientId: string): Promise<void> {
|
||||
return invoke(`connect_to_server`, { id, clientId });
|
||||
}
|
||||
|
||||
export function disconnect(clientId: string): Promise<void> {
|
||||
return invoke(`disconnect`, { clientId });
|
||||
}
|
||||
|
||||
export function chat_history({
|
||||
@@ -138,15 +142,18 @@ export function cancel_session_chat({
|
||||
|
||||
export function new_chat({
|
||||
serverId,
|
||||
websocketId,
|
||||
message,
|
||||
queryParams,
|
||||
}: {
|
||||
serverId: string;
|
||||
websocketId?: string;
|
||||
message: string;
|
||||
queryParams?: Record<string, any>;
|
||||
}): Promise<GetResponse> {
|
||||
return invoke(`new_chat`, {
|
||||
serverId,
|
||||
websocketId,
|
||||
message,
|
||||
queryParams,
|
||||
});
|
||||
@@ -154,17 +161,20 @@ export function new_chat({
|
||||
|
||||
export function send_message({
|
||||
serverId,
|
||||
websocketId,
|
||||
sessionId,
|
||||
message,
|
||||
queryParams,
|
||||
}: {
|
||||
serverId: string;
|
||||
websocketId?: string;
|
||||
sessionId: string;
|
||||
message: string;
|
||||
queryParams?: Record<string, any>;
|
||||
}): Promise<string> {
|
||||
return invoke(`send_message`, {
|
||||
serverId,
|
||||
websocketId,
|
||||
sessionId,
|
||||
message,
|
||||
queryParams,
|
||||
|
||||
@@ -89,6 +89,12 @@ const ChatAI = memo(
|
||||
|
||||
const [Question, setQuestion] = useState<string>("");
|
||||
|
||||
const [websocketSessionId, setWebsocketSessionId] = useState('');
|
||||
|
||||
const onWebsocketSessionId = useCallback((sessionId: string) => {
|
||||
setWebsocketSessionId(sessionId);
|
||||
}, []);
|
||||
|
||||
const {
|
||||
data: {
|
||||
query_intent,
|
||||
@@ -113,12 +119,15 @@ const ChatAI = memo(
|
||||
|
||||
const dealMsgRef = useRef<((msg: string) => void) | null>(null);
|
||||
|
||||
const { errorShow, setErrorShow, reconnect, updateDealMsg } =
|
||||
const clientId = isChatPage ? "standalone" : "popup"
|
||||
const { errorShow, setErrorShow, reconnect, disconnectWS, updateDealMsg } =
|
||||
useWebSocket({
|
||||
clientId,
|
||||
connected,
|
||||
setConnected,
|
||||
currentService,
|
||||
dealMsgRef,
|
||||
onWebsocketSessionId,
|
||||
});
|
||||
|
||||
const {
|
||||
@@ -142,7 +151,8 @@ const ChatAI = memo(
|
||||
isSearchActive,
|
||||
isDeepThinkActive,
|
||||
sourceDataIds,
|
||||
changeInput
|
||||
changeInput,
|
||||
websocketSessionId
|
||||
);
|
||||
|
||||
const { dealMsg, messageTimeoutRef } = useMessageHandler(
|
||||
@@ -151,7 +161,7 @@ const ChatAI = memo(
|
||||
setTimedoutShow,
|
||||
(chat) => cancelChat(chat || activeChat),
|
||||
setLoadingStep,
|
||||
handlers
|
||||
handlers,
|
||||
);
|
||||
|
||||
useEffect(() => {
|
||||
@@ -183,12 +193,12 @@ const ChatAI = memo(
|
||||
if (!isLogin) return;
|
||||
if (!curChatEnd) return;
|
||||
if (!activeChat?._id) {
|
||||
createNewChat(value, activeChat);
|
||||
createNewChat(value, activeChat, websocketSessionId);
|
||||
} else {
|
||||
handleSendMessage(value, activeChat);
|
||||
handleSendMessage(value, activeChat, websocketSessionId);
|
||||
}
|
||||
},
|
||||
[isLogin, curChatEnd, activeChat, createNewChat, handleSendMessage]
|
||||
[isLogin, curChatEnd, activeChat, createNewChat, handleSendMessage, websocketSessionId]
|
||||
);
|
||||
|
||||
const { createWin } = useWindows();
|
||||
@@ -204,6 +214,7 @@ const ChatAI = memo(
|
||||
chatClose(activeChat);
|
||||
setActiveChat(undefined);
|
||||
setCurChatEnd(true);
|
||||
disconnectWS();
|
||||
};
|
||||
}, [chatClose, setCurChatEnd]);
|
||||
|
||||
|
||||
@@ -29,7 +29,6 @@ import { useChatStore } from "@/stores/chatStore";
|
||||
import type { Chat } from "./types";
|
||||
import { useConnectStore } from "@/stores/connectStore";
|
||||
import platformAdapter from "@/utils/platformAdapter";
|
||||
|
||||
interface ChatHeaderProps {
|
||||
onCreateNewChat: () => void;
|
||||
onOpenChatAI: () => void;
|
||||
@@ -56,7 +55,7 @@ export function ChatHeader({
|
||||
const isPinned = useAppStore((state) => state.isPinned);
|
||||
const setIsPinned = useAppStore((state) => state.setIsPinned);
|
||||
|
||||
const { connected, setMessages } = useChatStore();
|
||||
const { setMessages } = useChatStore();
|
||||
|
||||
const [serverList, setServerList] = useState<IServer[]>([]);
|
||||
const [isRefreshing, setIsRefreshing] = useState(false);
|
||||
@@ -100,20 +99,11 @@ export function ChatHeader({
|
||||
|
||||
return () => {
|
||||
// Cleanup logic if needed
|
||||
disconnect();
|
||||
unlisten.then((fn) => fn());
|
||||
};
|
||||
}, []);
|
||||
|
||||
const disconnect = async () => {
|
||||
if (!connected) return;
|
||||
try {
|
||||
console.log("disconnect");
|
||||
await platformAdapter.invokeBackend("disconnect");
|
||||
} catch (error) {
|
||||
console.error("Failed to disconnect:", error);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
const switchServer = async (server: IServer) => {
|
||||
if (!server) return;
|
||||
@@ -129,8 +119,9 @@ export function ChatHeader({
|
||||
return;
|
||||
}
|
||||
setIsLogin(true);
|
||||
//
|
||||
await disconnect();
|
||||
// The Rust backend will automatically disconnect,
|
||||
// so we don't need to handle disconnection on the frontend
|
||||
// src-tauri/src/server/websocket.rs
|
||||
reconnect && reconnect(server);
|
||||
} catch (error) {
|
||||
console.error("switchServer:", error);
|
||||
|
||||
@@ -73,7 +73,7 @@ export default function ChatInput({
|
||||
getFileIcon,
|
||||
}: ChatInputProps) {
|
||||
const { t } = useTranslation();
|
||||
|
||||
|
||||
const showTooltip = useAppStore(
|
||||
(state: { showTooltip: boolean }) => state.showTooltip
|
||||
);
|
||||
@@ -103,6 +103,21 @@ export default function ChatInput({
|
||||
|
||||
const { curChatEnd, connected } = useChatStore();
|
||||
|
||||
const [reconnectCountdown, setReconnectCountdown] = useState<number>(0);
|
||||
useEffect(() => {
|
||||
if (!reconnectCountdown || connected) {
|
||||
setReconnectCountdown(0);
|
||||
return;
|
||||
}
|
||||
|
||||
if (reconnectCountdown > 0) {
|
||||
const timer = setTimeout(() => {
|
||||
setReconnectCountdown(reconnectCountdown - 1);
|
||||
}, 1000);
|
||||
return () => clearTimeout(timer);
|
||||
}
|
||||
}, [reconnectCountdown, connected]);
|
||||
|
||||
const [isCommandPressed, setIsCommandPressed] = useState(false);
|
||||
|
||||
const handleToggleFocus = useCallback(() => {
|
||||
@@ -349,10 +364,15 @@ export default function ChatInput({
|
||||
<div className="absolute top-0 right-0 bottom-0 left-0 px-2 py-4 bg-red-500/10 rounded-md font-normal text-xs text-gray-400 flex items-center gap-4">
|
||||
{t("search.input.connectionError")}
|
||||
<div
|
||||
className="w-[96px] h-[24px] bg-[#0061FF] rounded-[12px] font-normal text-xs text-white flex items-center justify-center cursor-pointer"
|
||||
onClick={reconnect}
|
||||
className="h-[24px] px-2 bg-[#0061FF] rounded-[12px] font-normal text-xs text-white flex items-center justify-center cursor-pointer"
|
||||
onClick={() => {
|
||||
reconnect();
|
||||
setReconnectCountdown(10);
|
||||
}}
|
||||
>
|
||||
{t("search.input.reconnect")}
|
||||
{reconnectCountdown > 0
|
||||
? `${t("search.input.connecting")}(${reconnectCountdown}s)`
|
||||
: t("search.input.reconnect")}
|
||||
</div>
|
||||
</div>
|
||||
) : null}
|
||||
|
||||
@@ -17,6 +17,7 @@ export function useChatActions(
|
||||
isDeepThinkActive?: boolean,
|
||||
sourceDataIds?: string[],
|
||||
changeInput?: (val: string) => void,
|
||||
websocketSessionId?: string,
|
||||
) {
|
||||
const chatClose = useCallback(async (activeChat?: Chat) => {
|
||||
if (!activeChat?._id || !currentServiceId) return;
|
||||
@@ -74,7 +75,7 @@ export function useChatActions(
|
||||
}, [currentServiceId, setActiveChat]);
|
||||
|
||||
const createNewChat = useCallback(
|
||||
async (value: string = "", activeChat?: Chat) => {
|
||||
async (value: string = "", activeChat?: Chat, id?: string) => {
|
||||
setTimedoutShow(false);
|
||||
setErrorShow(false);
|
||||
chatClose(activeChat);
|
||||
@@ -82,9 +83,15 @@ export function useChatActions(
|
||||
setQuestion(value);
|
||||
if (!currentServiceId) return;
|
||||
try {
|
||||
console.log("sourceDataIds", sourceDataIds);
|
||||
if (!(websocketSessionId || id)){
|
||||
setErrorShow(true);
|
||||
console.error("websocketSessionId", websocketSessionId, id);
|
||||
return;
|
||||
}
|
||||
console.log("sourceDataIds", sourceDataIds, websocketSessionId, id);
|
||||
let response: any = await new_chat({
|
||||
serverId: currentServiceId,
|
||||
websocketId: websocketSessionId || id,
|
||||
message: value,
|
||||
queryParams: {
|
||||
search: isSearchActive,
|
||||
@@ -112,16 +119,23 @@ export function useChatActions(
|
||||
console.error("createNewChat:", error);
|
||||
}
|
||||
},
|
||||
[currentServiceId, sourceDataIds, isSearchActive, isDeepThinkActive, curIdRef]
|
||||
[currentServiceId, sourceDataIds, isSearchActive, isDeepThinkActive, curIdRef, websocketSessionId]
|
||||
);
|
||||
|
||||
const sendMessage = useCallback(
|
||||
async (content: string, newChat: Chat) => {
|
||||
async (content: string, newChat: Chat, id?: string) => {
|
||||
if (!newChat?._id || !currentServiceId || !content) return;
|
||||
|
||||
clearAllChunkData();
|
||||
try {
|
||||
if (!(websocketSessionId || id)){
|
||||
setErrorShow(true);
|
||||
console.error("websocketSessionId", websocketSessionId, id);
|
||||
return;
|
||||
}
|
||||
let response: any = await send_message({
|
||||
serverId: currentServiceId,
|
||||
websocketId: websocketSessionId || id,
|
||||
sessionId: newChat?._id,
|
||||
queryParams: {
|
||||
search: isSearchActive,
|
||||
@@ -147,18 +161,18 @@ export function useChatActions(
|
||||
console.error("sendMessage:", error);
|
||||
}
|
||||
},
|
||||
[currentServiceId, sourceDataIds, isSearchActive, isDeepThinkActive, curIdRef, setActiveChat, setCurChatEnd, setErrorShow, changeInput]
|
||||
[currentServiceId, sourceDataIds, isSearchActive, isDeepThinkActive, curIdRef, setActiveChat, setCurChatEnd, setErrorShow, changeInput, websocketSessionId]
|
||||
);
|
||||
|
||||
const handleSendMessage = useCallback(
|
||||
async (content: string, activeChat?: Chat) => {
|
||||
async (content: string, activeChat?: Chat, id?: string) => {
|
||||
if (!activeChat?._id || !content) return;
|
||||
setQuestion(content);
|
||||
|
||||
setTimedoutShow(false);
|
||||
setErrorShow(false);
|
||||
|
||||
await chatHistory(activeChat, (chat) => sendMessage(content, chat));
|
||||
await chatHistory(activeChat, (chat) => sendMessage(content, chat, id));
|
||||
},
|
||||
[chatHistory, sendMessage, setQuestion, setTimedoutShow, setErrorShow, clearAllChunkData]
|
||||
);
|
||||
|
||||
@@ -15,7 +15,7 @@ export function useMessageHandler(
|
||||
deal_deep_read: (data: IChunkData) => void;
|
||||
deal_think: (data: IChunkData) => void;
|
||||
deal_response: (data: IChunkData) => void;
|
||||
}
|
||||
},
|
||||
) {
|
||||
const messageTimeoutRef = useRef<NodeJS.Timeout>();
|
||||
|
||||
|
||||
@@ -1,73 +1,104 @@
|
||||
import { useState, useEffect, useCallback } from "react";
|
||||
import { useState, useEffect, useCallback, useRef } from "react";
|
||||
import { listen } from "@tauri-apps/api/event";
|
||||
|
||||
import { IServer } from "@/stores/appStore";
|
||||
import { connect_to_server } from "@/commands"
|
||||
import { connect_to_server, disconnect } from "@/commands"
|
||||
|
||||
interface WebSocketProps {
|
||||
clientId: string;
|
||||
connected: boolean;
|
||||
setConnected: (connected: boolean) => void;
|
||||
currentService: IServer | null;
|
||||
dealMsgRef: React.MutableRefObject<((msg: string) => void) | null>;
|
||||
onWebsocketSessionId?: (sessionId: string) => void;
|
||||
}
|
||||
|
||||
export default function useWebSocket({
|
||||
clientId,
|
||||
connected,
|
||||
setConnected,
|
||||
currentService,
|
||||
dealMsgRef,
|
||||
onWebsocketSessionId,
|
||||
}: WebSocketProps) {
|
||||
const [errorShow, setErrorShow] = useState(false);
|
||||
|
||||
// 1. WebSocket connects when loading or switching services
|
||||
// src/components/Assistant/ChatHeader.tsx
|
||||
// 2. If not connected or disconnected, input box has a connect button, clicking it will connect to WebSocket
|
||||
// src/components/Search/InputBox.tsx
|
||||
const reconnect = useCallback(async (server?: IServer) => {
|
||||
const targetServer = server || currentService;
|
||||
console.log("reconnect_targetServer", targetServer?.id);
|
||||
if (!targetServer?.id) return;
|
||||
try {
|
||||
console.log("reconnect", targetServer.id);
|
||||
await connect_to_server(targetServer.id);
|
||||
setConnected(true);
|
||||
console.log("reconnect", targetServer.id, clientId);
|
||||
await connect_to_server(targetServer.id, clientId);
|
||||
} catch (error) {
|
||||
setConnected(false);
|
||||
console.error("Failed to connect:", error);
|
||||
}
|
||||
}, [currentService]);
|
||||
|
||||
const disconnectWS = async () => {
|
||||
if (!connected) return;
|
||||
try {
|
||||
console.log("disconnect");
|
||||
await disconnect(clientId);
|
||||
setConnected(false);
|
||||
} catch (error) {
|
||||
console.error("Failed to disconnect:", error);
|
||||
}
|
||||
};
|
||||
|
||||
const updateDealMsg = useCallback((newDealMsg: (msg: string) => void) => {
|
||||
dealMsgRef.current = newDealMsg;
|
||||
}, [dealMsgRef]);
|
||||
|
||||
const websocketIdRef = useRef<string>('')
|
||||
|
||||
useEffect(() => {
|
||||
if (!currentService?.id) return;
|
||||
|
||||
let unlisten_error = null;
|
||||
let unlisten_message = null;
|
||||
|
||||
if (connected) {
|
||||
setErrorShow(false);
|
||||
unlisten_error = listen("ws-error", (event) => {
|
||||
// {
|
||||
// "error": {
|
||||
// "reason": "invalid login"
|
||||
// },
|
||||
// "status": 401
|
||||
// }
|
||||
console.log("ws-error", event.payload);
|
||||
console.error("WebSocket error:", event.payload);
|
||||
setConnected(false);
|
||||
setErrorShow(true);
|
||||
});
|
||||
|
||||
unlisten_message = listen("ws-message", (event) => {
|
||||
const msg = event.payload as string;
|
||||
dealMsgRef.current && dealMsgRef.current(msg);
|
||||
});
|
||||
}
|
||||
setErrorShow(false);
|
||||
unlisten_error = listen(`ws-error-${clientId}`, (event) => {
|
||||
// {
|
||||
// "error": {
|
||||
// "reason": "invalid login"
|
||||
// },
|
||||
// "status": 401
|
||||
// }
|
||||
console.error(`ws-error-${clientId}`, event.payload);
|
||||
setConnected(false);
|
||||
setErrorShow(true);
|
||||
});
|
||||
|
||||
unlisten_message = listen(`ws-message-${clientId}`, (event) => {
|
||||
const msg = event.payload as string;
|
||||
console.log(`ws-message-${clientId}`, msg);
|
||||
if (msg.includes("websocket-session-id")) {
|
||||
console.log("websocket-session-id:", msg);
|
||||
const sessionId = msg.split(":")[1].trim();
|
||||
websocketIdRef.current = sessionId;
|
||||
console.log("sessionId:", sessionId);
|
||||
setConnected(true);
|
||||
if (onWebsocketSessionId) {
|
||||
onWebsocketSessionId(sessionId);
|
||||
}
|
||||
return;
|
||||
}
|
||||
dealMsgRef.current && dealMsgRef.current(msg);
|
||||
});
|
||||
|
||||
|
||||
return () => {
|
||||
unlisten_error?.then((fn: any) => fn());
|
||||
unlisten_message?.then((fn: any) => fn());
|
||||
};
|
||||
}, [connected, dealMsgRef]);
|
||||
}, [dealMsgRef]);
|
||||
|
||||
return { errorShow, setErrorShow, reconnect, updateDealMsg };
|
||||
return { errorShow, setErrorShow, reconnect, disconnectWS, updateDealMsg };
|
||||
}
|
||||
@@ -153,6 +153,7 @@
|
||||
"searchPlaceholder": "Search whatever you want ...",
|
||||
"connectionError": "Unable to connect to the server",
|
||||
"reconnect": "Reconnect",
|
||||
"connecting": "Connecting",
|
||||
"deepThink": "Deep Think",
|
||||
"search": "Search",
|
||||
"uploadFile": "Upload File",
|
||||
|
||||
@@ -153,6 +153,7 @@
|
||||
"searchPlaceholder": "搜索任何内容...",
|
||||
"connectionError": "无法连接到服务器",
|
||||
"reconnect": "重新连接",
|
||||
"connecting": "连接中",
|
||||
"deepThink": "深度思考",
|
||||
"search": "联网搜索",
|
||||
"uploadFile": "上传文件",
|
||||
|
||||
@@ -20,7 +20,7 @@ import { useAppStore } from "@/stores/appStore";
|
||||
import { useAuthStore } from "@/stores/authStore";
|
||||
import platformAdapter from "@/utils/platformAdapter";
|
||||
import { useStartupStore } from "@/stores/startupStore";
|
||||
import { DataSource } from "@/types/commands"
|
||||
import { DataSource } from "@/types/commands";
|
||||
|
||||
interface SearchChatProps {
|
||||
querySearch: (input: string) => Promise<any>;
|
||||
@@ -59,6 +59,7 @@ function SearchChat({ querySearch, queryDocuments }: SearchChatProps) {
|
||||
|
||||
const changeMode = useCallback(async (value: boolean) => {
|
||||
dispatch({ type: "SET_CHAT_MODE", payload: value });
|
||||
localStorage.setItem("coco-chat-mode", String(value));
|
||||
}, []);
|
||||
|
||||
const handleSendMessage = useCallback(
|
||||
@@ -174,20 +175,27 @@ function SearchChat({ querySearch, queryDocuments }: SearchChatProps) {
|
||||
const setDefaultStartupWindow = useStartupStore((state) => {
|
||||
return state.setDefaultStartupWindow;
|
||||
});
|
||||
|
||||
|
||||
const showCocoListenRef = useRef<(() => void) | undefined>();
|
||||
|
||||
|
||||
useEffect(() => {
|
||||
let unlistenChangeStartupStore: (() => void) | undefined;
|
||||
|
||||
|
||||
const setupListener = async () => {
|
||||
try {
|
||||
unlistenChangeStartupStore = await platformAdapter.listenEvent(
|
||||
"change-startup-store",
|
||||
({ payload }) => {
|
||||
if (payload && typeof payload === 'object' && 'defaultStartupWindow' in payload) {
|
||||
if (
|
||||
payload &&
|
||||
typeof payload === "object" &&
|
||||
"defaultStartupWindow" in payload
|
||||
) {
|
||||
const startupWindow = payload.defaultStartupWindow;
|
||||
if (startupWindow === "searchMode" || startupWindow === "chatMode") {
|
||||
if (
|
||||
startupWindow === "searchMode" ||
|
||||
startupWindow === "chatMode"
|
||||
) {
|
||||
setDefaultStartupWindow(startupWindow);
|
||||
}
|
||||
}
|
||||
@@ -197,36 +205,39 @@ function SearchChat({ querySearch, queryDocuments }: SearchChatProps) {
|
||||
console.error("Error setting up change-startup-store listener:", error);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
setupListener();
|
||||
|
||||
|
||||
return () => {
|
||||
if (unlistenChangeStartupStore) {
|
||||
unlistenChangeStartupStore();
|
||||
}
|
||||
};
|
||||
}, []);
|
||||
|
||||
|
||||
useEffect(() => {
|
||||
const setupShowCocoListener = async () => {
|
||||
if (showCocoListenRef.current) {
|
||||
showCocoListenRef.current();
|
||||
showCocoListenRef.current = undefined;
|
||||
}
|
||||
|
||||
|
||||
try {
|
||||
const unlisten = await platformAdapter.listenEvent("show-coco", () => {
|
||||
changeMode(defaultStartupWindow === "chatMode");
|
||||
const chatMode = localStorage.getItem("coco-chat-mode");
|
||||
changeMode(
|
||||
chatMode ? chatMode === "true" : defaultStartupWindow === "chatMode"
|
||||
);
|
||||
});
|
||||
|
||||
|
||||
showCocoListenRef.current = unlisten;
|
||||
} catch (error) {
|
||||
console.error("Error setting up show-coco listener:", error);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
setupShowCocoListener();
|
||||
|
||||
|
||||
return () => {
|
||||
if (showCocoListenRef.current) {
|
||||
showCocoListenRef.current();
|
||||
@@ -235,7 +246,6 @@ function SearchChat({ querySearch, queryDocuments }: SearchChatProps) {
|
||||
};
|
||||
}, [defaultStartupWindow, changeMode]);
|
||||
|
||||
|
||||
return (
|
||||
<ErrorBoundary>
|
||||
<div
|
||||
|
||||
@@ -16,10 +16,19 @@ export type AppAction =
|
||||
| { type: 'SET_TYPING'; payload: boolean }
|
||||
| { type: 'SET_LOADING'; payload: boolean };
|
||||
|
||||
const getCachedChatMode = (): boolean => {
|
||||
try {
|
||||
const cached = localStorage.getItem('coco-chat-mode');
|
||||
return cached === 'true';
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
export const initialAppState: AppState = {
|
||||
isChatMode: false,
|
||||
isChatMode: getCachedChatMode(),
|
||||
input: "",
|
||||
isTransitioned: false,
|
||||
isTransitioned: getCachedChatMode(),
|
||||
isSearchActive: false,
|
||||
isDeepThinkActive: false,
|
||||
isTyping: false,
|
||||
|
||||
@@ -48,9 +48,7 @@ export const useChatStore = create<IChatStore>()(
|
||||
{
|
||||
name: "chat-state",
|
||||
// storage: createJSONStorage(() => sessionStorage),
|
||||
partialize: (state) => ({
|
||||
curChatEnd: state.curChatEnd,
|
||||
connected: state.connected,
|
||||
partialize: (_state) => ({
|
||||
}),
|
||||
}
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user