From 624379680a1d70cded63c51153f1b5ce74c3f9a3 Mon Sep 17 00:00:00 2001 From: Marcin Kulik Date: Sun, 1 Jun 2025 22:47:36 +0200 Subject: [PATCH] Add jitter to exponential backoff in forwarder reconnection --- Cargo.lock | 1 + Cargo.toml | 1 + src/forwarder.rs | 9 +++++++-- 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a89bca2..6786b33 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -95,6 +95,7 @@ dependencies = [ "config", "futures-util", "nix 0.30.1", + "rand", "reqwest", "rgb", "rust-embed", diff --git a/Cargo.toml b/Cargo.toml index 4b9ffbc..8d9617f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,7 @@ url = "2.5.0" tokio-tungstenite = { version = "0.26.2", default-features = false, features = ["connect", "rustls-tls-native-roots"] } rustls = { version = "0.23.26", default-features = false, features = ["aws_lc_rs"] } tokio-util = "0.7.10" +rand = "0.9.1" [build-dependencies] clap = { version = "4.5.37", features = ["derive"] } diff --git a/src/forwarder.rs b/src/forwarder.rs index ef0beba..6f81b46 100644 --- a/src/forwarder.rs +++ b/src/forwarder.rs @@ -5,6 +5,7 @@ use std::time::Duration; use anyhow::{anyhow, bail}; use axum::http::Uri; use futures_util::{SinkExt, Stream, StreamExt}; +use rand::Rng; use tokio::net::TcpStream; use tokio::time::{interval, sleep, timeout}; use tokio_stream::wrappers::errors::BroadcastStreamRecvError; @@ -23,7 +24,8 @@ use crate::stream::Subscriber; const PING_INTERVAL: u64 = 15; const PING_TIMEOUT: u64 = 10; const SEND_TIMEOUT: u64 = 10; -const MAX_RECONNECT_DELAY: u64 = 5000; +const RECONNECT_DELAY_BASE: u64 = 500; +const RECONNECT_DELAY_CAP: u64 = 10_000; pub async fn forward( url: url::Url, @@ -224,7 +226,10 @@ fn handle_close_frame(frame: Option) -> anyhow::Result<()> { } fn exponential_delay(attempt: usize) -> u64 { - (2_u64.pow(attempt as u32) * 500).min(MAX_RECONNECT_DELAY) + let mut rng = rand::rng(); + let base = (RECONNECT_DELAY_BASE * 2_u64.pow(attempt as u32)).min(RECONNECT_DELAY_CAP); + + rng.random_range(..base) } fn ws_result(m: Result, BroadcastStreamRecvError>) -> anyhow::Result {