fix: several issues around search (#502)

* fix: several issues around search

* chore: update release notes
This commit is contained in:
Medcl
2025-05-13 18:12:57 +08:00
committed by GitHub
parent d52ce481f9
commit 95ccbaec3e
11 changed files with 65 additions and 77 deletions

View File

@@ -24,6 +24,8 @@ Information about release notes of Coco Server is provided here.
- feat: the search input box supports multi-line input #501 - feat: the search input box supports multi-line input #501
### 🐛 Bug fix ### 🐛 Bug fix
- fix: several issues around search #502
### ✈️ Improvements ### ✈️ Improvements

View File

@@ -24,7 +24,9 @@ pub async fn get_response_body_text(response: Response) -> Result<String, String
let body = response let body = response
.text() .text()
.await .await
.map_err(|e| format!("Failed to read response body: {}", e))?; .map_err(|e| format!("Failed to read response body: {}, code: {}", e, status))?;
log::debug!("Response status: {}, body: {}", status, &body);
if status < 200 || status >= 400 { if status < 200 || status >= 400 {
// Try to parse the error body // Try to parse the error body

View File

@@ -215,7 +215,7 @@ pub fn run() {
}) })
.build(ctx) .build(ctx)
.expect("error while running tauri application"); .expect("error while running tauri application");
app.run(|app_handle, event| match event { app.run(|app_handle, event| match event {
#[cfg(target_os = "macos")] #[cfg(target_os = "macos")]
tauri::RunEvent::Reopen { tauri::RunEvent::Reopen {

View File

@@ -52,7 +52,7 @@ pub async fn query_coco_fusion<R: Runtime>(
timeout(timeout_duration, async { timeout(timeout_duration, async {
query_source_clone.search(query).await query_source_clone.search(query).await
}) })
.await .await
})); }));
} }
@@ -82,6 +82,18 @@ pub async fn query_coco_fusion<R: Runtime>(
.push((query_hit, score)); .push((query_hit, score));
} }
} }
Ok(Ok(Err(err))) => {
failed_requests.push(FailedRequest {
source: QuerySource {
r#type: "N/A".into(),
name: "N/A".into(),
id: "N/A".into(),
},
status: 0,
error: Some(err.to_string()),
reason: None,
});
}
Ok(Err(err)) => { Ok(Err(err)) => {
failed_requests.push(FailedRequest { failed_requests.push(FailedRequest {
source: QuerySource { source: QuerySource {
@@ -95,7 +107,7 @@ pub async fn query_coco_fusion<R: Runtime>(
}); });
} }
// Timeout reached, skip this request // Timeout reached, skip this request
Ok(_) => { _ => {
failed_requests.push(FailedRequest { failed_requests.push(FailedRequest {
source: QuerySource { source: QuerySource {
r#type: "N/A".into(), r#type: "N/A".into(),
@@ -103,19 +115,7 @@ pub async fn query_coco_fusion<R: Runtime>(
id: "N/A".into(), id: "N/A".into(),
}, },
status: 0, status: 0,
error: Some("Query source timed out".to_string()), error: Some(format!("{:?}", &result)),
reason: None,
});
}
Err(_) => {
failed_requests.push(FailedRequest {
source: QuerySource {
r#type: "N/A".into(),
name: "N/A".into(),
id: "N/A".into(),
},
status: 0,
error: Some("Task panicked".to_string()),
reason: None, reason: None,
}); });
} }

View File

@@ -34,6 +34,10 @@ pub async fn refresh_all_connectors<R: Runtime>(app_handle: &AppHandle<R>) -> Re
// Collect all the tasks for fetching and refreshing connectors // Collect all the tasks for fetching and refreshing connectors
let mut server_map = HashMap::new(); let mut server_map = HashMap::new();
for server in servers { for server in servers {
if !server.enabled {
continue;
}
// dbg!("start fetch connectors for server: {}", &server.id); // dbg!("start fetch connectors for server: {}", &server.id);
let connectors = match get_connectors_by_server(app_handle.clone(), server.id.clone()).await let connectors = match get_connectors_by_server(app_handle.clone(), server.id.clone()).await
{ {

View File

@@ -32,7 +32,7 @@ pub fn save_datasource_to_cache(server_id: &str, datasources: Vec<DataSource>) {
#[allow(dead_code)] #[allow(dead_code)]
pub fn get_datasources_from_cache(server_id: &str) -> Option<HashMap<String, DataSource>> { pub fn get_datasources_from_cache(server_id: &str) -> Option<HashMap<String, DataSource>> {
let cache = DATASOURCE_CACHE.read().unwrap(); // Acquire read lock let cache = DATASOURCE_CACHE.read().unwrap(); // Acquire read lock
// dbg!("cache: {:?}", &cache); // dbg!("cache: {:?}", &cache);
let server_cache = cache.get(server_id)?; // Get the server's cache let server_cache = cache.get(server_id)?; // Get the server's cache
Some(server_cache.clone()) Some(server_cache.clone())
} }
@@ -47,6 +47,10 @@ pub async fn refresh_all_datasources<R: Runtime>(_app_handle: &AppHandle<R>) ->
for server in servers { for server in servers {
// dbg!("fetch datasources for server: {}", &server.id); // dbg!("fetch datasources for server: {}", &server.id);
if !server.enabled {
continue;
}
// Attempt to get datasources by server, and continue even if it fails // Attempt to get datasources by server, and continue even if it fails
let connectors = match datasource_search(server.id.as_str(), None).await { let connectors = match datasource_search(server.id.as_str(), None).await {
Ok(connectors) => { Ok(connectors) => {
@@ -130,8 +134,8 @@ pub async fn datasource_search(
None, None,
Some(reqwest::Body::from(body.to_string())), Some(reqwest::Body::from(body.to_string())),
) )
.await .await
.map_err(|e| format!("Error fetching datasource: {}", e))?; .map_err(|e| format!("Error fetching datasource: {}", e))?;
// Parse the search results from the response // Parse the search results from the response
let datasources: Vec<DataSource> = parse_search_results(resp).await.map_err(|e| { let datasources: Vec<DataSource> = parse_search_results(resp).await.map_err(|e| {
@@ -186,8 +190,8 @@ pub async fn mcp_server_search(
None, None,
Some(reqwest::Body::from(body.to_string())), Some(reqwest::Body::from(body.to_string())),
) )
.await .await
.map_err(|e| format!("Error fetching datasource: {}", e))?; .map_err(|e| format!("Error fetching datasource: {}", e))?;
// Parse the search results from the response // Parse the search results from the response
let mcp_server: Vec<DataSource> = parse_search_results(resp).await.map_err(|e| { let mcp_server: Vec<DataSource> = parse_search_results(resp).await.map_err(|e| {

View File

@@ -12,7 +12,7 @@ pub static HTTP_CLIENT: Lazy<Mutex<Client>> = Lazy::new(|| {
.read_timeout(Duration::from_secs(3)) // Set a timeout of 3 second .read_timeout(Duration::from_secs(3)) // Set a timeout of 3 second
.connect_timeout(Duration::from_secs(3)) // Set a timeout of 3 second .connect_timeout(Duration::from_secs(3)) // Set a timeout of 3 second
.timeout(Duration::from_secs(10)) // Set a timeout of 10 seconds .timeout(Duration::from_secs(10)) // Set a timeout of 10 seconds
.danger_accept_invalid_certs(true) // example for self-signed certificates .danger_accept_invalid_certs(true) // allow self-signed certificates
.build() .build()
.expect("Failed to build client"); .expect("Failed to build client");
Mutex::new(client) Mutex::new(client)
@@ -35,6 +35,9 @@ impl HttpClient {
headers: Option<HashMap<String, String>>, headers: Option<HashMap<String, String>>,
body: Option<reqwest::Body>, body: Option<reqwest::Body>,
) -> Result<reqwest::Response, String> { ) -> Result<reqwest::Response, String> {
log::debug!("Sending Request: {}, query_params: {:?}, header: {:?}, body: {:?}",&url, &query_params, &headers, &body);
let request_builder = let request_builder =
Self::get_request_builder(method, url, headers, query_params, body).await; Self::get_request_builder(method, url, headers, query_params, body).await;
@@ -42,6 +45,9 @@ impl HttpClient {
dbg!("Failed to send request: {}", &e); dbg!("Failed to send request: {}", &e);
format!("Failed to send request: {}", e) format!("Failed to send request: {}", e)
})?; })?;
log::debug!("Request: {}, Response status: {:?}, header: {:?}",&url, &response.status(),&response.headers());
Ok(response) Ok(response)
} }
@@ -140,9 +146,7 @@ impl HttpClient {
headers.insert("X-API-TOKEN".to_string(), t); headers.insert("X-API-TOKEN".to_string(), t);
} }
// dbg!(&server_id); log::debug!("Sending request to server: {}, url: {}, headers: {:?}", &server_id, &url,&headers);
// dbg!(&url);
// dbg!(&headers);
Self::send_raw_request(method, &url, query_params, Some(headers), body).await Self::send_raw_request(method, &url, query_params, Some(headers), body).await
} else { } else {

View File

@@ -5,12 +5,11 @@ use crate::common::search::{QueryHits, QueryResponse, QuerySource, SearchQuery,
use crate::common::server::Server; use crate::common::server::Server;
use crate::common::traits::SearchSource; use crate::common::traits::SearchSource;
use crate::server::http_client::HttpClient; use crate::server::http_client::HttpClient;
use crate::server::servers::get_server_token;
use async_trait::async_trait; use async_trait::async_trait;
// use futures::stream::StreamExt; // use futures::stream::StreamExt;
use ordered_float::OrderedFloat; use ordered_float::OrderedFloat;
use reqwest::{Client, Method, RequestBuilder};
use std::collections::HashMap; use std::collections::HashMap;
use tauri_plugin_store::JsonValue;
// use std::hash::Hash; // use std::hash::Hash;
#[allow(dead_code)] #[allow(dead_code)]
@@ -74,45 +73,11 @@ const COCO_SERVERS: &str = "coco-servers";
pub struct CocoSearchSource { pub struct CocoSearchSource {
server: Server, server: Server,
client: Client,
} }
impl CocoSearchSource { impl CocoSearchSource {
pub fn new(server: Server, client: Client) -> Self { pub fn new(server: Server) -> Self {
CocoSearchSource { server, client } CocoSearchSource { server }
}
async fn build_request_from_query(
&self,
query: &SearchQuery,
) -> Result<RequestBuilder, String> {
self.build_request(query.from, query.size, &query.query_strings)
.await
}
async fn build_request(
&self,
from: u64,
size: u64,
query_strings: &HashMap<String, String>,
) -> Result<RequestBuilder, String> {
let url = HttpClient::join_url(&self.server.endpoint, "/query/_search");
let mut request_builder = self.client.request(Method::GET, url);
if !self.server.public {
if let Some(token) = get_server_token(&self.server.id)
.await?
.map(|t| t.access_token)
{
request_builder = request_builder.header("X-API-TOKEN", token);
}
}
let result = request_builder
.query(&[("from", &from.to_string()), ("size", &size.to_string())])
.query(query_strings);
Ok(result)
} }
} }
@@ -127,17 +92,22 @@ impl SearchSource for CocoSearchSource {
} }
async fn search(&self, query: SearchQuery) -> Result<QueryResponse, SearchError> { async fn search(&self, query: SearchQuery) -> Result<QueryResponse, SearchError> {
// Build the request from the provided query let url = "/query/_search";
let request_builder = self
.build_request_from_query(&query)
.await
.map_err(|e| SearchError::InternalError(e.to_string()))?;
// Send the HTTP request and handle errors let mut query_args: HashMap<String, JsonValue> = HashMap::new();
let response = request_builder query_args.insert("from".into(), JsonValue::Number(query.from.into()));
.send() query_args.insert("size".into(), JsonValue::Number(query.size.into()));
for (key, value) in query.query_strings {
query_args.insert(key, JsonValue::String(value));
}
let response = HttpClient::get(
&self.server.id,
&url,
Some(query_args),
)
.await .await
.map_err(|e| SearchError::HttpError(format!("Failed to send search request: {}", e)))?; .map_err(|e| SearchError::HttpError(format!("Error to send search request: {}", e)))?;
// Use the helper function to parse the response body // Use the helper function to parse the response body
let response_body = get_response_body_text(response) let response_body = get_response_body_text(response)

View File

@@ -7,7 +7,7 @@ use crate::server::http_client::HttpClient;
use crate::server::search::CocoSearchSource; use crate::server::search::CocoSearchSource;
use crate::COCO_TAURI_STORE; use crate::COCO_TAURI_STORE;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use reqwest::{Client, Method}; use reqwest::Method;
use serde_json::from_value; use serde_json::from_value;
use serde_json::Value as JsonValue; use serde_json::Value as JsonValue;
use std::collections::HashMap; use std::collections::HashMap;
@@ -447,7 +447,7 @@ pub async fn try_register_server_to_search_source(
) { ) {
if server.enabled { if server.enabled {
let registry = app_handle.state::<SearchSourceRegistry>(); let registry = app_handle.state::<SearchSourceRegistry>();
let source = CocoSearchSource::new(server.clone(), Client::new()); let source = CocoSearchSource::new(server.clone());
registry.register_source(source).await; registry.register_source(source).await;
} }
} }

View File

@@ -30,7 +30,9 @@ async function invokeWithErrorHandler<T>(
const failedResult = result as any; const failedResult = result as any;
if (failedResult.failed?.length > 0) { if (failedResult.failed?.length > 0) {
failedResult.failed.forEach((error: any) => { failedResult.failed.forEach((error: any) => {
// addError(error.error, 'error'); if (result?.hits?.length == 0) {
addError(error.error, 'error');
}
console.error(error.error); console.error(error.error);
}); });
} }

View File

@@ -137,7 +137,7 @@ function Search({
<div ref={mainWindowRef} className={`h-full pb-10 w-full relative`}> <div ref={mainWindowRef} className={`h-full pb-10 w-full relative`}>
{/* Search Results Panel */} {/* Search Results Panel */}
{suggests.length > 0 ? ( {suggests.length > 0 ? (
sourceData ? ( sourceData ? (
<SearchResults input={input} isChatMode={isChatMode} /> <SearchResults input={input} isChatMode={isChatMode} />
) : ( ) : (
<DropdownList <DropdownList