From 59bbe153262a62951e689c59905965b8ff3684be Mon Sep 17 00:00:00 2001 From: Godnight1006 Date: Thu, 1 Jan 2026 19:45:45 +0800 Subject: [PATCH 1/5] Restart video stream when first frame stalls --- src/server/video_service.rs | 92 +++++++++++++++++++++++++++++++++---- 1 file changed, 82 insertions(+), 10 deletions(-) diff --git a/src/server/video_service.rs b/src/server/video_service.rs index 13a781c28..9156ec887 100644 --- a/src/server/video_service.rs +++ b/src/server/video_service.rs @@ -192,6 +192,29 @@ impl VideoFrameController { } } +struct FirstFrameWatchdog { + waited: Duration, + timeout: Duration, +} + +impl FirstFrameWatchdog { + fn new(timeout: Duration) -> Self { + Self { + waited: Duration::ZERO, + timeout, + } + } + + fn on_no_frame(&mut self, tick: Duration) -> bool { + self.waited = self.waited.saturating_add(tick); + self.waited >= self.timeout + } + + fn reset(&mut self) { + self.waited = Duration::ZERO; + } +} + #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum VideoSource { Monitor, @@ -648,11 +671,15 @@ fn run(vs: VideoService) -> ResultType<()> { let repeat_encode_max = 10; let mut encode_fail_counter = 0; let mut first_frame = true; + let first_frame_timeout = Duration::from_secs(3); + let mut first_frame_watchdog = FirstFrameWatchdog::new(first_frame_timeout); + let mut sent_first_frame = false; let capture_width = c.width; let capture_height = c.height; let (mut second_instant, mut send_counter) = (Instant::now(), 0); while sp.ok() { + let mut produced_frame = false; #[cfg(windows)] check_uac_switch(c.privacy_mode_id, c._capturer_privacy_mode_id)?; check_qos( @@ -719,8 +746,11 @@ fn run(vs: VideoService) -> ResultType<()> { let ms = (time.as_secs() * 1000 + time.subsec_millis() as u64) as i64; let res = match c.frame(spf) { Ok(frame) => { - repeat_encode_counter = 0; - if frame.valid() { + if !frame.valid() { + Err(std::io::Error::new(WouldBlock, "empty frame")) + } else { + repeat_encode_counter = 0; + let screenshot = SCREENSHOTS.lock().unwrap().remove(&display_idx); if let Some(mut screenshot) = screenshot { let restore_vram = screenshot.restore_vram; @@ -777,18 +807,23 @@ fn run(vs: VideoService) -> ResultType<()> { capture_width, capture_height, )?; + if !send_conn_ids.is_empty() { + produced_frame = true; + } frame_controller.set_send(now, send_conn_ids); send_counter += 1; - } - #[cfg(windows)] - { - #[cfg(feature = "vram")] - if try_gdi == 1 && !c.is_gdi() { - VRamEncoder::set_fallback_gdi(sp.name(), false); + + #[cfg(windows)] + { + #[cfg(feature = "vram")] + if try_gdi == 1 && !c.is_gdi() { + VRamEncoder::set_fallback_gdi(sp.name(), false); + } + try_gdi = 0; } - try_gdi = 0; + + Ok(()) } - Ok(()) } Err(err) => Err(err), }; @@ -836,6 +871,9 @@ fn run(vs: VideoService) -> ResultType<()> { capture_width, capture_height, )?; + if !send_conn_ids.is_empty() { + produced_frame = true; + } frame_controller.set_send(now, send_conn_ids); send_counter += 1; } @@ -864,6 +902,18 @@ fn run(vs: VideoService) -> ResultType<()> { } } + if !sent_first_frame { + if produced_frame { + sent_first_frame = true; + first_frame_watchdog.reset(); + } else if first_frame_watchdog.on_no_frame(spf) { + log::warn!( + "No first video frame for {first_frame_timeout:?}, restarting video service" + ); + bail!("SWITCH"); + } + } + let mut fetched_conn_ids = HashSet::new(); let timeout_millis = 3_000u64; let wait_begin = Instant::now(); @@ -1417,3 +1467,25 @@ fn handle_screenshot(screenshot: Screenshot, msg: String, w: usize, h: usize, da log::error!("Failed to send screenshot, {}", e); } } + +#[cfg(test)] +mod first_frame_watchdog_tests { + use super::FirstFrameWatchdog; + use std::time::Duration; + + #[test] + fn triggers_after_timeout() { + let mut w = FirstFrameWatchdog::new(Duration::from_secs(3)); + assert!(!w.on_no_frame(Duration::from_secs(1))); + assert!(!w.on_no_frame(Duration::from_secs(1))); + assert!(w.on_no_frame(Duration::from_secs(1))); + } + + #[test] + fn resets_after_frame() { + let mut w = FirstFrameWatchdog::new(Duration::from_secs(3)); + assert!(!w.on_no_frame(Duration::from_secs(2))); + w.reset(); + assert!(!w.on_no_frame(Duration::from_secs(2))); + } +} From e196ede42039df28c2199a1ca21c7ecc2196104a Mon Sep 17 00:00:00 2001 From: Godnight1006 Date: Fri, 2 Jan 2026 13:38:14 +0800 Subject: [PATCH 2/5] video_service: use Instant for first-frame watchdog timeout --- src/server/video_service.rs | 40 ++++++++++++++++--------------------- 1 file changed, 17 insertions(+), 23 deletions(-) diff --git a/src/server/video_service.rs b/src/server/video_service.rs index 9156ec887..45f2bedbc 100644 --- a/src/server/video_service.rs +++ b/src/server/video_service.rs @@ -193,25 +193,24 @@ impl VideoFrameController { } struct FirstFrameWatchdog { - waited: Duration, + start: Instant, timeout: Duration, } impl FirstFrameWatchdog { fn new(timeout: Duration) -> Self { Self { - waited: Duration::ZERO, + start: Instant::now(), timeout, } } - fn on_no_frame(&mut self, tick: Duration) -> bool { - self.waited = self.waited.saturating_add(tick); - self.waited >= self.timeout + fn has_timed_out(&self) -> bool { + self.has_timed_out_at(Instant::now()) } - fn reset(&mut self) { - self.waited = Duration::ZERO; + fn has_timed_out_at(&self, now: Instant) -> bool { + now.saturating_duration_since(self.start) >= self.timeout } } @@ -672,7 +671,7 @@ fn run(vs: VideoService) -> ResultType<()> { let mut encode_fail_counter = 0; let mut first_frame = true; let first_frame_timeout = Duration::from_secs(3); - let mut first_frame_watchdog = FirstFrameWatchdog::new(first_frame_timeout); + let first_frame_watchdog = FirstFrameWatchdog::new(first_frame_timeout); let mut sent_first_frame = false; let capture_width = c.width; let capture_height = c.height; @@ -905,8 +904,7 @@ fn run(vs: VideoService) -> ResultType<()> { if !sent_first_frame { if produced_frame { sent_first_frame = true; - first_frame_watchdog.reset(); - } else if first_frame_watchdog.on_no_frame(spf) { + } else if first_frame_watchdog.has_timed_out() { log::warn!( "No first video frame for {first_frame_timeout:?}, restarting video service" ); @@ -1471,21 +1469,17 @@ fn handle_screenshot(screenshot: Screenshot, msg: String, w: usize, h: usize, da #[cfg(test)] mod first_frame_watchdog_tests { use super::FirstFrameWatchdog; - use std::time::Duration; + use std::time::{Duration, Instant}; #[test] fn triggers_after_timeout() { - let mut w = FirstFrameWatchdog::new(Duration::from_secs(3)); - assert!(!w.on_no_frame(Duration::from_secs(1))); - assert!(!w.on_no_frame(Duration::from_secs(1))); - assert!(w.on_no_frame(Duration::from_secs(1))); - } - - #[test] - fn resets_after_frame() { - let mut w = FirstFrameWatchdog::new(Duration::from_secs(3)); - assert!(!w.on_no_frame(Duration::from_secs(2))); - w.reset(); - assert!(!w.on_no_frame(Duration::from_secs(2))); + let t0 = Instant::now(); + let w = FirstFrameWatchdog { + start: t0, + timeout: Duration::from_secs(3), + }; + assert!(!w.has_timed_out_at(t0 + Duration::from_secs(1))); + assert!(!w.has_timed_out_at(t0 + Duration::from_secs(2))); + assert!(w.has_timed_out_at(t0 + Duration::from_secs(3))); } } From caa74a6712072b15abcfdf530cab30247cdcdd9c Mon Sep 17 00:00:00 2001 From: Godnight1006 Date: Fri, 2 Jan 2026 14:50:20 +0800 Subject: [PATCH 3/5] Replace server-side first-frame watchdog with client-side implementation This change addresses robustness issues by moving the responsibility of detecting missing first video frames to the controlling side (client). A new ClientFirstFrameWatchdog triggers a refresh if the first frame is not received within 3 seconds of connection, ensuring recovery even if the server thinks it sent a frame but it was lost or not rendered. --- src/client/io_loop.rs | 80 +++++++++++++++++++++++++++++++++++++ src/server/video_service.rs | 50 ----------------------- 2 files changed, 80 insertions(+), 50 deletions(-) diff --git a/src/client/io_loop.rs b/src/client/io_loop.rs index e0b3fcd6d..02ef740df 100644 --- a/src/client/io_loop.rs +++ b/src/client/io_loop.rs @@ -68,6 +68,7 @@ pub struct Remote { last_update_jobs_status: (Instant, HashMap), is_connected: bool, first_frame: bool, + watchdog: ClientFirstFrameWatchdog, #[cfg(any(target_os = "windows", feature = "unix-file-copy-paste"))] client_conn_id: i32, // used for file clipboard data_count: Arc, @@ -115,6 +116,7 @@ impl Remote { last_update_jobs_status: (Instant::now(), Default::default()), is_connected: false, first_frame: false, + watchdog: ClientFirstFrameWatchdog::new(Duration::from_secs(3)), #[cfg(any(target_os = "windows", feature = "unix-file-copy-paste"))] client_conn_id: 0, data_count: Arc::new(AtomicUsize::new(0)), @@ -279,6 +281,15 @@ impl Remote { } } _ = status_timer.tick() => { + if self.watchdog.check(self.is_connected, self.first_frame, self.handler.is_default(), Instant::now()) { + log::warn!("No first video frame received, sending refresh"); + let mut misc = Misc::new(); + misc.set_refresh_video(true); + let mut msg = Message::new(); + msg.set_misc(misc); + allow_err!(peer.send(&msg).await); + } + let elapsed = fps_instant.elapsed().as_millis(); if elapsed < 1000 { continue; @@ -2439,3 +2450,72 @@ impl Drop for VideoThread { *self.discard_queue.write().unwrap() = true; } } + +struct ClientFirstFrameWatchdog { + deadline: Option, + timeout: Duration, +} + +impl ClientFirstFrameWatchdog { + fn new(timeout: Duration) -> Self { + Self { deadline: None, timeout } + } + + // Returns true if refresh should be triggered + fn check(&mut self, is_connected: bool, first_frame_received: bool, is_default: bool, now: Instant) -> bool { + if is_connected && !first_frame_received && is_default { + if let Some(d) = self.deadline { + if now > d { + self.deadline = Some(now + self.timeout); + return true; + } + } else { + self.deadline = Some(now + self.timeout); + } + } else { + self.deadline = None; + } + false + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + + #[test] + fn test_client_first_frame_watchdog() { + let timeout = Duration::from_secs(3); + let mut watchdog = ClientFirstFrameWatchdog::new(timeout); + let start = Instant::now(); + + // Not connected: no trigger, no deadline set + assert!(!watchdog.check(false, false, true, start)); + assert!(watchdog.deadline.is_none()); + + // Connected, default, no frame: deadline set to start + 3s + assert!(!watchdog.check(true, false, true, start)); + assert!(watchdog.deadline.is_some()); + assert_eq!(watchdog.deadline.unwrap(), start + timeout); + + // Advance 2s: no trigger + assert!(!watchdog.check(true, false, true, start + Duration::from_secs(2))); + + // Advance 4s: trigger! + assert!(watchdog.check(true, false, true, start + Duration::from_secs(4))); + // Deadline reset to +3s from now (start+4s) -> start+7s + assert_eq!(watchdog.deadline.unwrap(), start + Duration::from_secs(4) + timeout); + + // Frame received: reset + assert!(!watchdog.check(true, true, true, start + Duration::from_secs(5))); + assert!(watchdog.deadline.is_none()); + + // Not default: reset + watchdog.check(true, false, true, start); // set deadline + assert!(watchdog.deadline.is_some()); + watchdog.check(true, false, false, start); // reset + assert!(watchdog.deadline.is_none()); + } +} + diff --git a/src/server/video_service.rs b/src/server/video_service.rs index 45f2bedbc..99d53833b 100644 --- a/src/server/video_service.rs +++ b/src/server/video_service.rs @@ -192,27 +192,7 @@ impl VideoFrameController { } } -struct FirstFrameWatchdog { - start: Instant, - timeout: Duration, -} -impl FirstFrameWatchdog { - fn new(timeout: Duration) -> Self { - Self { - start: Instant::now(), - timeout, - } - } - - fn has_timed_out(&self) -> bool { - self.has_timed_out_at(Instant::now()) - } - - fn has_timed_out_at(&self, now: Instant) -> bool { - now.saturating_duration_since(self.start) >= self.timeout - } -} #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum VideoSource { @@ -670,9 +650,6 @@ fn run(vs: VideoService) -> ResultType<()> { let repeat_encode_max = 10; let mut encode_fail_counter = 0; let mut first_frame = true; - let first_frame_timeout = Duration::from_secs(3); - let first_frame_watchdog = FirstFrameWatchdog::new(first_frame_timeout); - let mut sent_first_frame = false; let capture_width = c.width; let capture_height = c.height; let (mut second_instant, mut send_counter) = (Instant::now(), 0); @@ -901,17 +878,6 @@ fn run(vs: VideoService) -> ResultType<()> { } } - if !sent_first_frame { - if produced_frame { - sent_first_frame = true; - } else if first_frame_watchdog.has_timed_out() { - log::warn!( - "No first video frame for {first_frame_timeout:?}, restarting video service" - ); - bail!("SWITCH"); - } - } - let mut fetched_conn_ids = HashSet::new(); let timeout_millis = 3_000u64; let wait_begin = Instant::now(); @@ -1466,20 +1432,4 @@ fn handle_screenshot(screenshot: Screenshot, msg: String, w: usize, h: usize, da } } -#[cfg(test)] -mod first_frame_watchdog_tests { - use super::FirstFrameWatchdog; - use std::time::{Duration, Instant}; - #[test] - fn triggers_after_timeout() { - let t0 = Instant::now(); - let w = FirstFrameWatchdog { - start: t0, - timeout: Duration::from_secs(3), - }; - assert!(!w.has_timed_out_at(t0 + Duration::from_secs(1))); - assert!(!w.has_timed_out_at(t0 + Duration::from_secs(2))); - assert!(w.has_timed_out_at(t0 + Duration::from_secs(3))); - } -} From d41e384f695f98b650686a21d9a6974175ac77c8 Mon Sep 17 00:00:00 2001 From: Godnight1006 Date: Fri, 2 Jan 2026 15:18:49 +0800 Subject: [PATCH 4/5] client: refresh video via handler in first-frame watchdog --- src/client/io_loop.rs | 7 ++----- src/server/video_service.rs | 7 ------- 2 files changed, 2 insertions(+), 12 deletions(-) diff --git a/src/client/io_loop.rs b/src/client/io_loop.rs index 02ef740df..4e86a5a22 100644 --- a/src/client/io_loop.rs +++ b/src/client/io_loop.rs @@ -283,11 +283,8 @@ impl Remote { _ = status_timer.tick() => { if self.watchdog.check(self.is_connected, self.first_frame, self.handler.is_default(), Instant::now()) { log::warn!("No first video frame received, sending refresh"); - let mut misc = Misc::new(); - misc.set_refresh_video(true); - let mut msg = Message::new(); - msg.set_misc(misc); - allow_err!(peer.send(&msg).await); + let display = self.handler.lc.read().unwrap().peer_info.as_ref().map(|p| p.current_display).unwrap_or(0); + self.handler.refresh_video(display as _); } let elapsed = fps_instant.elapsed().as_millis(); diff --git a/src/server/video_service.rs b/src/server/video_service.rs index 99d53833b..77bfa9ed4 100644 --- a/src/server/video_service.rs +++ b/src/server/video_service.rs @@ -655,7 +655,6 @@ fn run(vs: VideoService) -> ResultType<()> { let (mut second_instant, mut send_counter) = (Instant::now(), 0); while sp.ok() { - let mut produced_frame = false; #[cfg(windows)] check_uac_switch(c.privacy_mode_id, c._capturer_privacy_mode_id)?; check_qos( @@ -783,9 +782,6 @@ fn run(vs: VideoService) -> ResultType<()> { capture_width, capture_height, )?; - if !send_conn_ids.is_empty() { - produced_frame = true; - } frame_controller.set_send(now, send_conn_ids); send_counter += 1; @@ -847,9 +843,6 @@ fn run(vs: VideoService) -> ResultType<()> { capture_width, capture_height, )?; - if !send_conn_ids.is_empty() { - produced_frame = true; - } frame_controller.set_send(now, send_conn_ids); send_counter += 1; } From 5e6ef28f330412e383b7e265af06e75711330fba Mon Sep 17 00:00:00 2001 From: Godnight1006 Date: Fri, 2 Jan 2026 23:10:04 +0800 Subject: [PATCH 5/5] log: add first-frame watchdog breadcrumbs --- src/client/io_loop.rs | 21 ++++++++++++++++++--- src/server/connection.rs | 6 ++++++ src/server/video_service.rs | 19 +++++++++++++++++++ 3 files changed, 43 insertions(+), 3 deletions(-) diff --git a/src/client/io_loop.rs b/src/client/io_loop.rs index 4e86a5a22..d82a6da6c 100644 --- a/src/client/io_loop.rs +++ b/src/client/io_loop.rs @@ -282,8 +282,18 @@ impl Remote { } _ = status_timer.tick() => { if self.watchdog.check(self.is_connected, self.first_frame, self.handler.is_default(), Instant::now()) { - log::warn!("No first video frame received, sending refresh"); - let display = self.handler.lc.read().unwrap().peer_info.as_ref().map(|p| p.current_display).unwrap_or(0); + let (id, display) = { + let lch = self.handler.lc.read().unwrap(); + let display = lch + .peer_info + .as_ref() + .map(|p| p.current_display) + .unwrap_or(0); + (lch.id.clone(), display) + }; + log::warn!( + "No first video frame received, id={id}, display={display}, sending refresh" + ); self.handler.refresh_video(display as _); } @@ -1292,8 +1302,14 @@ impl Remote { if let Ok(msg_in) = Message::parse_from_bytes(&data) { match msg_in.union { Some(message::Union::VideoFrame(vf)) => { + let display = vf.display as usize; if !self.first_frame { self.first_frame = true; + log::info!( + "First video frame received, id={}, display={}", + self.handler.get_id(), + display + ); self.handler.close_success(); self.handler.adapt_size(); self.send_toggle_virtual_display_msg(peer).await; @@ -1301,7 +1317,6 @@ impl Remote { } self.video_format = CodecFormat::from(&vf); - let display = vf.display as usize; if !self.video_threads.contains_key(&display) { self.new_video_thread(display); } diff --git a/src/server/connection.rs b/src/server/connection.rs index 3670fb7cf..98397762c 100644 --- a/src/server/connection.rs +++ b/src/server/connection.rs @@ -3406,6 +3406,12 @@ impl Connection { } fn refresh_video_display(&self, display: Option) { + log::debug!( + "refresh_video_display: conn_id={}, display={:?}, source={:?}", + self.inner.id, + display, + self.video_source() + ); video_service::refresh(); self.server.upgrade().map(|s| { s.read().unwrap().set_video_service_opt( diff --git a/src/server/video_service.rs b/src/server/video_service.rs index 77bfa9ed4..d276a0ef3 100644 --- a/src/server/video_service.rs +++ b/src/server/video_service.rs @@ -650,6 +650,7 @@ fn run(vs: VideoService) -> ResultType<()> { let repeat_encode_max = 10; let mut encode_fail_counter = 0; let mut first_frame = true; + let mut logged_invalid_frame = false; let capture_width = c.width; let capture_height = c.height; let (mut second_instant, mut send_counter) = (Instant::now(), 0); @@ -722,6 +723,24 @@ fn run(vs: VideoService) -> ResultType<()> { let res = match c.frame(spf) { Ok(frame) => { if !frame.valid() { + if !logged_invalid_frame { + logged_invalid_frame = true; + match &frame { + scrap::Frame::PixelBuffer(f) => { + log::debug!( + "capturer returned invalid frame (pixelbuffer), len={}, w={}, h={}, treating as WouldBlock", + f.data().len(), + f.width(), + f.height() + ); + } + scrap::Frame::Texture((texture, _)) => { + log::debug!( + "capturer returned invalid frame (texture={texture:?}), treating as WouldBlock" + ); + } + } + } Err(std::io::Error::new(WouldBlock, "empty frame")) } else { repeat_encode_counter = 0;