mirror of
https://github.com/infinilabs/coco-app.git
synced 2025-12-15 19:17:42 +01:00
feat: add a heartbeat worker to check Coco server availability (#988)
* feat: add a heartbeat worker to check Coco server availability * relase notes
This commit is contained in:
@@ -13,6 +13,8 @@ Information about release notes of Coco App is provided here.
|
||||
|
||||
### 🚀 Features
|
||||
|
||||
- feat: add a heartbeat worker to check Coco server availability #988
|
||||
|
||||
### 🐛 Bug fix
|
||||
|
||||
- fix: search_extension should not panic when ext is not found #983
|
||||
|
||||
@@ -22,9 +22,11 @@ impl SearchSourceRegistry {
|
||||
sources.clear();
|
||||
}
|
||||
|
||||
pub async fn remove_source(&self, id: &str) {
|
||||
/// Remove the SearchSource specified by `id`, return a boolean indicating
|
||||
/// if it get removed or not.
|
||||
pub async fn remove_source(&self, id: &str) -> bool {
|
||||
let mut sources = self.sources.write().await;
|
||||
sources.remove(id);
|
||||
sources.remove(id).is_some()
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
|
||||
@@ -12,7 +12,9 @@ pub mod util;
|
||||
|
||||
use crate::common::register::SearchSourceRegistry;
|
||||
use crate::common::{CHECK_WINDOW_LABEL, MAIN_WINDOW_LABEL, SETTINGS_WINDOW_LABEL};
|
||||
use crate::server::servers::{load_or_insert_default_server, load_servers_token};
|
||||
use crate::server::servers::{
|
||||
load_or_insert_default_server, load_servers_token, start_bg_heartbeat_worker,
|
||||
};
|
||||
use crate::util::logging::set_up_tauri_logger;
|
||||
use crate::util::prevent_default;
|
||||
use autostart::change_autostart;
|
||||
@@ -285,6 +287,12 @@ pub async fn init(app_handle: &AppHandle) {
|
||||
.await;
|
||||
}
|
||||
|
||||
/*
|
||||
* Start the background heartbeat worker here after setting up Coco server
|
||||
* storage and SearchSourceRegistry.
|
||||
*/
|
||||
start_bg_heartbeat_worker(app_handle.clone());
|
||||
|
||||
extension::built_in::pizza_engine_runtime::start_pizza_engine_runtime().await;
|
||||
}
|
||||
|
||||
|
||||
@@ -13,8 +13,11 @@ use serde_json::Value as JsonValue;
|
||||
use serde_json::from_value;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::LazyLock;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use tauri::{AppHandle, Manager};
|
||||
use tauri_plugin_store::StoreExt;
|
||||
use tokio::runtime;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
/// Coco sever list
|
||||
@@ -312,6 +315,109 @@ pub async fn refresh_all_coco_server_info(app_handle: AppHandle) {
|
||||
}
|
||||
}
|
||||
|
||||
/// Start a background worker that periodically sends heartbeats (`GET /provider/_info`)
|
||||
/// to the connected Coco servers, checks if they are available and updates the
|
||||
/// `SearchSourceRegistry` accordingly.
|
||||
pub(crate) fn start_bg_heartbeat_worker(tauri_app_handle: AppHandle) {
|
||||
const THREAD_NAME: &str = "Coco background heartbeat worker";
|
||||
const SLEEP_DURATION: Duration = Duration::from_secs(15);
|
||||
|
||||
let main_closure = || {
|
||||
let single_thread_rt = runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap_or_else(|e| {
|
||||
panic!(
|
||||
"failed to create a single-threaded Tokio runtime within thread [{}] because [{}]",
|
||||
THREAD_NAME, e
|
||||
);
|
||||
});
|
||||
|
||||
single_thread_rt.block_on(async move {
|
||||
let mut server_removed = Vec::new();
|
||||
let mut server_added = Vec::new();
|
||||
|
||||
let search_sources = tauri_app_handle.state::<SearchSourceRegistry>();
|
||||
loop {
|
||||
log::info!("Coco Server Heartbeat worker is working...");
|
||||
|
||||
refresh_all_coco_server_info(tauri_app_handle.clone()).await;
|
||||
|
||||
/*
|
||||
* For the Coco servers that are included in the SearchSourceRegistry
|
||||
* but unavailable, they should be removed from the registry.
|
||||
*
|
||||
* We do this step first so that there are less search source to
|
||||
* scan.
|
||||
*/
|
||||
for search_source in search_sources.get_sources().await {
|
||||
let query_source = search_source.get_type();
|
||||
let search_source_id = query_source.id;
|
||||
let search_source_name = query_source.name;
|
||||
|
||||
let Some(coco_server) = get_server_by_id(&search_source_id).await else {
|
||||
// This search source may not be a Coco server, try the next one.
|
||||
continue;
|
||||
};
|
||||
|
||||
assert!(
|
||||
coco_server.enabled,
|
||||
"Coco servers stored in search source list should all be enabled"
|
||||
);
|
||||
|
||||
if !coco_server.available {
|
||||
let removed = search_sources.remove_source(&search_source_id).await;
|
||||
if removed {
|
||||
server_removed.push((search_source_id, search_source_name));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Coco servers that are available and enabled should be added to
|
||||
* the SearchSourceRegistry if they are not already included.
|
||||
*/
|
||||
for coco_server in get_all_servers().await {
|
||||
if coco_server.enabled
|
||||
&& coco_server.available
|
||||
&& search_sources.get_source(&coco_server.id).await.is_none()
|
||||
{
|
||||
server_added.push((coco_server.id.clone(), coco_server.name.clone()));
|
||||
|
||||
let source = CocoSearchSource::new(coco_server);
|
||||
search_sources.register_source(source).await;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Log the updates to SearchSourceRegistry
|
||||
*/
|
||||
log::info!(
|
||||
"Coco Server Heartbeat worker: removed {:?} from the SearchSourceRegistry",
|
||||
server_removed
|
||||
);
|
||||
log::info!(
|
||||
"Coco Server Heartbeat worker: added {:?} to the SearchSourceRegistry",
|
||||
server_added
|
||||
);
|
||||
|
||||
// Sleep for a period of time
|
||||
tokio::time::sleep(SLEEP_DURATION).await;
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
thread::Builder::new()
|
||||
.name(THREAD_NAME.into())
|
||||
.spawn(main_closure)
|
||||
.unwrap_or_else(|e| {
|
||||
panic!(
|
||||
"failed to start thread [{}] for reason [{}]",
|
||||
THREAD_NAME, e
|
||||
)
|
||||
});
|
||||
}
|
||||
|
||||
#[tauri::command]
|
||||
pub async fn refresh_coco_server_info(app_handle: AppHandle, id: String) -> Result<Server, String> {
|
||||
// Retrieve the server from the cache
|
||||
|
||||
@@ -82,6 +82,12 @@ pub(crate) async fn backend_setup(tauri_app_handle: AppHandle, app_lang: String)
|
||||
.expect("global tauri AppHandle already initialized");
|
||||
log::trace!("global Tauri AppHandle set");
|
||||
|
||||
/*
|
||||
* This should be set before Rust code makes any HTTP requests as it is
|
||||
* needed to provider HTTP header: X-APP-LANG
|
||||
*/
|
||||
update_app_lang(app_lang).await;
|
||||
|
||||
let registry = SearchSourceRegistry::default();
|
||||
tauri_app_handle.manage(registry); // Store registry in Tauri's app state
|
||||
|
||||
@@ -110,8 +116,6 @@ pub(crate) async fn backend_setup(tauri_app_handle: AppHandle, app_lang: String)
|
||||
|
||||
autostart::ensure_autostart_state_consistent(&tauri_app_handle).unwrap();
|
||||
|
||||
update_app_lang(app_lang).await;
|
||||
|
||||
// Invoked, now update the state
|
||||
BACKEND_SETUP_COMPLETED.store(true, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user