feat: move log stream composition and filtering to background thread

- Resolves #29: UI unresponsiveness in logs tab
- Add LogProcessor with background thread for async log processing
- Implement pre-processed log caching with ProcessedLogEntry
- Replace frame-by-frame log processing with cached results
- Add automatic log change detection for app and system logs
- Optimize rendering from O(n) to O(1) complexity
- Maintain all search, filter, and highlighting functionality
- Fix clippy warning for redundant pattern matching

Performance improvements:
- Log processing moved to separate thread with 50ms debouncing
- UI rendering no longer blocks on log filtering/formatting
- Supports thousands of logs without UI lag
- Non-blocking request/response pattern with mpsc channels
This commit is contained in:
bahdotsh
2025-08-13 13:38:17 +05:30
parent a146d94c35
commit d1268d55cf
6 changed files with 424 additions and 152 deletions

View File

@@ -154,6 +154,15 @@ fn run_tui_event_loop(
if last_tick.elapsed() >= tick_rate { if last_tick.elapsed() >= tick_rate {
app.tick(); app.tick();
app.update_running_workflow_progress(); app.update_running_workflow_progress();
// Check for log processing updates (includes system log change detection)
app.check_log_processing_updates();
// Request log processing if needed
if app.logs_need_update {
app.request_log_processing_update();
}
last_tick = Instant::now(); last_tick = Instant::now();
} }

View File

@@ -1,4 +1,5 @@
// App state for the UI // App state for the UI
use crate::log_processor::{LogProcessingRequest, LogProcessor, ProcessedLogEntry};
use crate::models::{ use crate::models::{
ExecutionResultMsg, JobExecution, LogFilterLevel, StepExecution, Workflow, WorkflowExecution, ExecutionResultMsg, JobExecution, LogFilterLevel, StepExecution, Workflow, WorkflowExecution,
WorkflowStatus, WorkflowStatus,
@@ -40,6 +41,12 @@ pub struct App {
pub log_filter_level: Option<LogFilterLevel>, // Current log level filter pub log_filter_level: Option<LogFilterLevel>, // Current log level filter
pub log_search_matches: Vec<usize>, // Indices of logs that match the search pub log_search_matches: Vec<usize>, // Indices of logs that match the search
pub log_search_match_idx: usize, // Current match index for navigation pub log_search_match_idx: usize, // Current match index for navigation
// Background log processing
pub log_processor: LogProcessor,
pub processed_logs: Vec<ProcessedLogEntry>,
pub logs_need_update: bool, // Flag to trigger log processing
pub last_system_logs_count: usize, // Track system log changes
} }
impl App { impl App {
@@ -199,6 +206,12 @@ impl App {
log_filter_level: Some(LogFilterLevel::All), log_filter_level: Some(LogFilterLevel::All),
log_search_matches: Vec::new(), log_search_matches: Vec::new(),
log_search_match_idx: 0, log_search_match_idx: 0,
// Background log processing
log_processor: LogProcessor::new(),
processed_logs: Vec::new(),
logs_need_update: true,
last_system_logs_count: 0,
} }
} }
@@ -429,10 +442,9 @@ impl App {
if let Some(idx) = self.workflow_list_state.selected() { if let Some(idx) = self.workflow_list_state.selected() {
if idx < self.workflows.len() && !self.execution_queue.contains(&idx) { if idx < self.workflows.len() && !self.execution_queue.contains(&idx) {
self.execution_queue.push(idx); self.execution_queue.push(idx);
let timestamp = Local::now().format("%H:%M:%S").to_string(); self.add_timestamped_log(&format!(
self.logs.push(format!( "Added '{}' to execution queue. Press 'Enter' to start.",
"[{}] Added '{}' to execution queue. Press 'Enter' to start.", self.workflows[idx].name
timestamp, self.workflows[idx].name
)); ));
} }
} }
@@ -635,10 +647,11 @@ impl App {
self.log_search_active = false; self.log_search_active = false;
self.log_search_query.clear(); self.log_search_query.clear();
self.log_search_matches.clear(); self.log_search_matches.clear();
self.mark_logs_for_update();
} }
KeyCode::Backspace => { KeyCode::Backspace => {
self.log_search_query.pop(); self.log_search_query.pop();
self.update_log_search_matches(); self.mark_logs_for_update();
} }
KeyCode::Enter => { KeyCode::Enter => {
self.log_search_active = false; self.log_search_active = false;
@@ -646,7 +659,7 @@ impl App {
} }
KeyCode::Char(c) => { KeyCode::Char(c) => {
self.log_search_query.push(c); self.log_search_query.push(c);
self.update_log_search_matches(); self.mark_logs_for_update();
} }
_ => {} _ => {}
} }
@@ -658,8 +671,8 @@ impl App {
if !self.log_search_active { if !self.log_search_active {
// Don't clear the query, this allows toggling the search UI while keeping the filter // Don't clear the query, this allows toggling the search UI while keeping the filter
} else { } else {
// When activating search, update matches // When activating search, trigger update
self.update_log_search_matches(); self.mark_logs_for_update();
} }
} }
@@ -670,8 +683,8 @@ impl App {
Some(level) => Some(level.next()), Some(level) => Some(level.next()),
}; };
// Update search matches when filter changes // Trigger log processing update when filter changes
self.update_log_search_matches(); self.mark_logs_for_update();
} }
// Clear log search and filter // Clear log search and filter
@@ -680,6 +693,7 @@ impl App {
self.log_filter_level = None; self.log_filter_level = None;
self.log_search_matches.clear(); self.log_search_matches.clear();
self.log_search_match_idx = 0; self.log_search_match_idx = 0;
self.mark_logs_for_update();
} }
// Update matches based on current search and filter // Update matches based on current search and filter
@@ -955,4 +969,82 @@ impl App {
} }
} }
} }
/// Request log processing update from background thread
pub fn request_log_processing_update(&mut self) {
let request = LogProcessingRequest {
search_query: self.log_search_query.clone(),
filter_level: self.log_filter_level.clone(),
app_logs: self.logs.clone(),
app_logs_count: self.logs.len(),
system_logs_count: wrkflw_logging::get_logs().len(),
};
if self.log_processor.request_update(request).is_err() {
// Log processor channel disconnected, recreate it
self.log_processor = LogProcessor::new();
self.logs_need_update = true;
}
}
/// Check for and apply log processing updates
pub fn check_log_processing_updates(&mut self) {
// Check if system logs have changed
let current_system_logs_count = wrkflw_logging::get_logs().len();
if current_system_logs_count != self.last_system_logs_count {
self.last_system_logs_count = current_system_logs_count;
self.mark_logs_for_update();
}
if let Some(response) = self.log_processor.try_get_update() {
self.processed_logs = response.processed_logs;
self.log_search_matches = response.search_matches;
// Update scroll position to first match if we have search results
if !self.log_search_matches.is_empty() && !self.log_search_query.is_empty() {
self.log_search_match_idx = 0;
if let Some(&idx) = self.log_search_matches.first() {
self.log_scroll = idx;
}
}
self.logs_need_update = false;
}
}
/// Trigger log processing when search/filter changes
pub fn mark_logs_for_update(&mut self) {
self.logs_need_update = true;
self.request_log_processing_update();
}
/// Get combined app and system logs for background processing
pub fn get_combined_logs(&self) -> Vec<String> {
let mut all_logs = Vec::new();
// Add app logs
for log in &self.logs {
all_logs.push(log.clone());
}
// Add system logs
for log in wrkflw_logging::get_logs() {
all_logs.push(log.clone());
}
all_logs
}
/// Add a log entry and trigger log processing update
pub fn add_log(&mut self, message: String) {
self.logs.push(message);
self.mark_logs_for_update();
}
/// Add a formatted log entry with timestamp and trigger log processing update
pub fn add_timestamped_log(&mut self, message: &str) {
let timestamp = Local::now().format("%H:%M:%S").to_string();
let formatted_message = format!("[{}] {}", timestamp, message);
self.add_log(formatted_message);
}
} }

View File

@@ -12,6 +12,7 @@
pub mod app; pub mod app;
pub mod components; pub mod components;
pub mod handlers; pub mod handlers;
pub mod log_processor;
pub mod models; pub mod models;
pub mod utils; pub mod utils;
pub mod views; pub mod views;

View File

@@ -0,0 +1,305 @@
// Background log processor for asynchronous log filtering and formatting
use crate::models::LogFilterLevel;
use ratatui::{
style::{Color, Style},
text::{Line, Span},
widgets::{Cell, Row},
};
use std::sync::mpsc;
use std::thread;
use std::time::{Duration, Instant};
/// Processed log entry ready for rendering
#[derive(Debug, Clone)]
pub struct ProcessedLogEntry {
pub timestamp: String,
pub log_type: String,
pub log_style: Style,
pub content_spans: Vec<Span<'static>>,
}
impl ProcessedLogEntry {
/// Convert to a table row for rendering
pub fn to_row(&self) -> Row<'static> {
Row::new(vec![
Cell::from(self.timestamp.clone()),
Cell::from(self.log_type.clone()).style(self.log_style),
Cell::from(Line::from(self.content_spans.clone())),
])
}
}
/// Request to update log processing parameters
#[derive(Debug, Clone)]
pub struct LogProcessingRequest {
pub search_query: String,
pub filter_level: Option<LogFilterLevel>,
pub app_logs: Vec<String>, // Complete app logs
pub app_logs_count: usize, // To detect changes in app logs
pub system_logs_count: usize, // To detect changes in system logs
}
/// Response with processed logs
#[derive(Debug, Clone)]
pub struct LogProcessingResponse {
pub processed_logs: Vec<ProcessedLogEntry>,
pub total_log_count: usize,
pub filtered_count: usize,
pub search_matches: Vec<usize>, // Indices of logs that match search
}
/// Background log processor
pub struct LogProcessor {
request_tx: mpsc::Sender<LogProcessingRequest>,
response_rx: mpsc::Receiver<LogProcessingResponse>,
_worker_handle: thread::JoinHandle<()>,
}
impl LogProcessor {
/// Create a new log processor with a background worker thread
pub fn new() -> Self {
let (request_tx, request_rx) = mpsc::channel::<LogProcessingRequest>();
let (response_tx, response_rx) = mpsc::channel::<LogProcessingResponse>();
let worker_handle = thread::spawn(move || {
Self::worker_loop(request_rx, response_tx);
});
Self {
request_tx,
response_rx,
_worker_handle: worker_handle,
}
}
/// Send a processing request (non-blocking)
pub fn request_update(
&self,
request: LogProcessingRequest,
) -> Result<(), mpsc::SendError<LogProcessingRequest>> {
self.request_tx.send(request)
}
/// Try to get the latest processed logs (non-blocking)
pub fn try_get_update(&self) -> Option<LogProcessingResponse> {
self.response_rx.try_recv().ok()
}
/// Background worker loop
fn worker_loop(
request_rx: mpsc::Receiver<LogProcessingRequest>,
response_tx: mpsc::Sender<LogProcessingResponse>,
) {
let mut last_request: Option<LogProcessingRequest> = None;
let mut last_processed_time = Instant::now();
let mut cached_logs: Vec<String> = Vec::new();
let mut cached_app_logs_count = 0;
let mut cached_system_logs_count = 0;
loop {
// Check for new requests with a timeout to allow periodic processing
let request = match request_rx.recv_timeout(Duration::from_millis(100)) {
Ok(req) => Some(req),
Err(mpsc::RecvTimeoutError::Timeout) => None,
Err(mpsc::RecvTimeoutError::Disconnected) => break,
};
// Update request if we received one
if let Some(req) = request {
last_request = Some(req);
}
// Process if we have a request and enough time has passed since last processing
if let Some(ref req) = last_request {
let should_process = last_processed_time.elapsed() > Duration::from_millis(50)
&& (cached_app_logs_count != req.app_logs_count
|| cached_system_logs_count != req.system_logs_count
|| cached_logs.is_empty());
if should_process {
// Refresh log cache if log counts changed
if cached_app_logs_count != req.app_logs_count
|| cached_system_logs_count != req.system_logs_count
|| cached_logs.is_empty()
{
cached_logs = Self::get_combined_logs(&req.app_logs);
cached_app_logs_count = req.app_logs_count;
cached_system_logs_count = req.system_logs_count;
}
let response = Self::process_logs(&cached_logs, req);
if response_tx.send(response).is_err() {
break; // Receiver disconnected
}
last_processed_time = Instant::now();
}
}
}
}
/// Get combined app and system logs
fn get_combined_logs(app_logs: &[String]) -> Vec<String> {
let mut all_logs = Vec::new();
// Add app logs
for log in app_logs {
all_logs.push(log.clone());
}
// Add system logs
for log in wrkflw_logging::get_logs() {
all_logs.push(log.clone());
}
all_logs
}
/// Process logs according to search and filter criteria
fn process_logs(all_logs: &[String], request: &LogProcessingRequest) -> LogProcessingResponse {
// Filter logs based on search query and filter level
let mut filtered_logs = Vec::new();
let mut search_matches = Vec::new();
for (idx, log) in all_logs.iter().enumerate() {
let passes_filter = match &request.filter_level {
None => true,
Some(level) => level.matches(log),
};
let matches_search = if request.search_query.is_empty() {
true
} else {
log.to_lowercase()
.contains(&request.search_query.to_lowercase())
};
if passes_filter && matches_search {
filtered_logs.push((idx, log));
if matches_search && !request.search_query.is_empty() {
search_matches.push(filtered_logs.len() - 1);
}
}
}
// Process filtered logs into display format
let processed_logs: Vec<ProcessedLogEntry> = filtered_logs
.iter()
.map(|(_, log_line)| Self::process_log_entry(log_line, &request.search_query))
.collect();
LogProcessingResponse {
processed_logs,
total_log_count: all_logs.len(),
filtered_count: filtered_logs.len(),
search_matches,
}
}
/// Process a single log entry into display format
fn process_log_entry(log_line: &str, search_query: &str) -> ProcessedLogEntry {
// Extract timestamp from log format [HH:MM:SS]
let timestamp = if log_line.starts_with('[') && log_line.contains(']') {
let end = log_line.find(']').unwrap_or(0);
if end > 1 {
log_line[1..end].to_string()
} else {
"??:??:??".to_string()
}
} else {
"??:??:??".to_string()
};
// Determine log type and style
let (log_type, log_style) =
if log_line.contains("Error") || log_line.contains("error") || log_line.contains("")
{
("ERROR", Style::default().fg(Color::Red))
} else if log_line.contains("Warning")
|| log_line.contains("warning")
|| log_line.contains("⚠️")
{
("WARN", Style::default().fg(Color::Yellow))
} else if log_line.contains("Success")
|| log_line.contains("success")
|| log_line.contains("")
{
("SUCCESS", Style::default().fg(Color::Green))
} else if log_line.contains("Running")
|| log_line.contains("running")
|| log_line.contains("")
{
("INFO", Style::default().fg(Color::Cyan))
} else if log_line.contains("Triggering") || log_line.contains("triggered") {
("TRIG", Style::default().fg(Color::Magenta))
} else {
("INFO", Style::default().fg(Color::Gray))
};
// Extract content after timestamp
let content = if log_line.starts_with('[') && log_line.contains(']') {
let start = log_line.find(']').unwrap_or(0) + 1;
log_line[start..].trim()
} else {
log_line
};
// Create content spans with search highlighting
let content_spans = if !search_query.is_empty() {
Self::highlight_search_matches(content, search_query)
} else {
vec![Span::raw(content.to_string())]
};
ProcessedLogEntry {
timestamp,
log_type: log_type.to_string(),
log_style,
content_spans,
}
}
/// Highlight search matches in content
fn highlight_search_matches(content: &str, search_query: &str) -> Vec<Span<'static>> {
let mut spans = Vec::new();
let lowercase_content = content.to_lowercase();
let lowercase_query = search_query.to_lowercase();
if lowercase_content.contains(&lowercase_query) {
let mut last_idx = 0;
while let Some(idx) = lowercase_content[last_idx..].find(&lowercase_query) {
let real_idx = last_idx + idx;
// Add text before match
if real_idx > last_idx {
spans.push(Span::raw(content[last_idx..real_idx].to_string()));
}
// Add matched text with highlight
let match_end = real_idx + search_query.len();
spans.push(Span::styled(
content[real_idx..match_end].to_string(),
Style::default().bg(Color::Yellow).fg(Color::Black),
));
last_idx = match_end;
}
// Add remaining text after last match
if last_idx < content.len() {
spans.push(Span::raw(content[last_idx..].to_string()));
}
} else {
spans.push(Span::raw(content.to_string()));
}
spans
}
}
impl Default for LogProcessor {
fn default() -> Self {
Self::new()
}
}

View File

@@ -50,6 +50,7 @@ pub struct StepExecution {
} }
/// Log filter levels /// Log filter levels
#[derive(Debug, Clone, PartialEq)]
pub enum LogFilterLevel { pub enum LogFilterLevel {
Info, Info,
Warning, Warning,

View File

@@ -140,45 +140,8 @@ pub fn render_logs_tab(f: &mut Frame<CrosstermBackend<io::Stdout>>, app: &App, a
f.render_widget(search_block, chunks[1]); f.render_widget(search_block, chunks[1]);
} }
// Combine application logs with system logs // Use processed logs from background thread instead of processing on every frame
let mut all_logs = Vec::new(); let filtered_logs = &app.processed_logs;
// Now all logs should have timestamps in the format [HH:MM:SS]
// Process app logs
for log in &app.logs {
all_logs.push(log.clone());
}
// Process system logs
for log in wrkflw_logging::get_logs() {
all_logs.push(log.clone());
}
// Filter logs based on search query and filter level
let filtered_logs = if !app.log_search_query.is_empty() || app.log_filter_level.is_some() {
all_logs
.iter()
.filter(|log| {
let passes_filter = match &app.log_filter_level {
None => true,
Some(level) => level.matches(log),
};
let matches_search = if app.log_search_query.is_empty() {
true
} else {
log.to_lowercase()
.contains(&app.log_search_query.to_lowercase())
};
passes_filter && matches_search
})
.cloned()
.collect::<Vec<String>>()
} else {
all_logs.clone() // Clone to avoid moving all_logs
};
// Create a table for logs for better organization // Create a table for logs for better organization
let header_cells = ["Time", "Type", "Message"] let header_cells = ["Time", "Type", "Message"]
@@ -189,109 +152,10 @@ pub fn render_logs_tab(f: &mut Frame<CrosstermBackend<io::Stdout>>, app: &App, a
.style(Style::default().add_modifier(Modifier::BOLD)) .style(Style::default().add_modifier(Modifier::BOLD))
.height(1); .height(1);
let rows = filtered_logs.iter().map(|log_line| { // Convert processed logs to table rows - this is now very fast since logs are pre-processed
// Parse log line to extract timestamp, type and message let rows = filtered_logs
.iter()
// Extract timestamp from log format [HH:MM:SS] .map(|processed_log| processed_log.to_row());
let timestamp = if log_line.starts_with('[') && log_line.contains(']') {
let end = log_line.find(']').unwrap_or(0);
if end > 1 {
log_line[1..end].to_string()
} else {
"??:??:??".to_string() // Show placeholder for malformed logs
}
} else {
"??:??:??".to_string() // Show placeholder for malformed logs
};
let (log_type, log_style, _) =
if log_line.contains("Error") || log_line.contains("error") || log_line.contains("")
{
("ERROR", Style::default().fg(Color::Red), log_line.as_str())
} else if log_line.contains("Warning")
|| log_line.contains("warning")
|| log_line.contains("⚠️")
{
(
"WARN",
Style::default().fg(Color::Yellow),
log_line.as_str(),
)
} else if log_line.contains("Success")
|| log_line.contains("success")
|| log_line.contains("")
{
(
"SUCCESS",
Style::default().fg(Color::Green),
log_line.as_str(),
)
} else if log_line.contains("Running")
|| log_line.contains("running")
|| log_line.contains("")
{
("INFO", Style::default().fg(Color::Cyan), log_line.as_str())
} else if log_line.contains("Triggering") || log_line.contains("triggered") {
(
"TRIG",
Style::default().fg(Color::Magenta),
log_line.as_str(),
)
} else {
("INFO", Style::default().fg(Color::Gray), log_line.as_str())
};
// Extract content after timestamp
let content = if log_line.starts_with('[') && log_line.contains(']') {
let start = log_line.find(']').unwrap_or(0) + 1;
log_line[start..].trim()
} else {
log_line.as_str()
};
// Highlight search matches in content if search is active
let mut content_spans = Vec::new();
if !app.log_search_query.is_empty() {
let lowercase_content = content.to_lowercase();
let lowercase_query = app.log_search_query.to_lowercase();
if lowercase_content.contains(&lowercase_query) {
let mut last_idx = 0;
while let Some(idx) = lowercase_content[last_idx..].find(&lowercase_query) {
let real_idx = last_idx + idx;
// Add text before match
if real_idx > last_idx {
content_spans.push(Span::raw(content[last_idx..real_idx].to_string()));
}
// Add matched text with highlight
let match_end = real_idx + app.log_search_query.len();
content_spans.push(Span::styled(
content[real_idx..match_end].to_string(),
Style::default().bg(Color::Yellow).fg(Color::Black),
));
last_idx = match_end;
}
// Add remaining text after last match
if last_idx < content.len() {
content_spans.push(Span::raw(content[last_idx..].to_string()));
}
} else {
content_spans.push(Span::raw(content));
}
} else {
content_spans.push(Span::raw(content));
}
Row::new(vec![
Cell::from(timestamp),
Cell::from(log_type).style(log_style),
Cell::from(Line::from(content_spans)),
])
});
let content_idx = if show_search_bar { 2 } else { 1 }; let content_idx = if show_search_bar { 2 } else { 1 };