From 80b3b8465dd91ee2a6565b3e8a9565831949fa74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=92=B2=E9=9B=85=20misaki=20masa?= Date: Sun, 14 Dec 2025 21:02:22 +0800 Subject: [PATCH] perf: immediate task cancellation (#3429) --- CHANGELOG.md | 2 + Cargo.toml | 6 + yazi-actor/src/mgr/download.rs | 6 +- yazi-core/src/tasks/prework.rs | 2 +- yazi-parser/src/tasks/process_open.rs | 5 +- yazi-proxy/src/tasks.rs | 9 +- yazi-scheduler/src/file/file.rs | 18 +-- yazi-scheduler/src/file/in.rs | 6 +- yazi-scheduler/src/file/traverse.rs | 18 ++- yazi-scheduler/src/hook/hook.rs | 38 +------ yazi-scheduler/src/hook/in.rs | 65 +---------- yazi-scheduler/src/in.rs | 41 +++++-- yazi-scheduler/src/macros.rs | 15 +++ yazi-scheduler/src/ongoing.rs | 40 ++++++- yazi-scheduler/src/out.rs | 12 +- yazi-scheduler/src/prework/out.rs | 4 - yazi-scheduler/src/prework/progress.rs | 5 +- yazi-scheduler/src/process/in.rs | 13 +-- yazi-scheduler/src/process/out.rs | 12 -- yazi-scheduler/src/process/process.rs | 5 +- yazi-scheduler/src/process/progress.rs | 15 +-- yazi-scheduler/src/runner.rs | 8 -- yazi-scheduler/src/scheduler.rs | 149 +++++++++++++++---------- yazi-scheduler/src/task.rs | 6 +- yazi-shared/src/completion_token.rs | 38 +++++++ yazi-shared/src/lib.rs | 2 +- yazi-vfs/src/provider/copier.rs | 13 ++- 27 files changed, 287 insertions(+), 266 deletions(-) create mode 100644 yazi-shared/src/completion_token.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 6c57833b..29c98e1b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -84,6 +84,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/): ### Improved +- Make copy, cut, delete, link, hardlink, download, and upload tasks immediately cancellable ([#3429]) - Make preload tasks discardable ([#2875]) - Reduce file change event frequency ([#2820]) - Upload and download of a single file over SFTP in chunks concurrently ([#3393]) @@ -1556,3 +1557,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/): [#3396]: https://github.com/sxyazi/yazi/pull/3396 [#3419]: https://github.com/sxyazi/yazi/pull/3419 [#3422]: https://github.com/sxyazi/yazi/pull/3422 +[#3429]: https://github.com/sxyazi/yazi/pull/3429 diff --git a/Cargo.toml b/Cargo.toml index 1988d1c9..02b57583 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,9 @@ resolver = "2" members = [ "yazi-*" ] default-members = [ "yazi-fm", "yazi-cli" ] +[profile.dev] +debug = "line-tables-only" + [profile.release] codegen-units = 1 lto = true @@ -19,6 +22,9 @@ codegen-units = 256 incremental = true lto = false +[profile.dev.package."*"] +debug = false + [workspace.dependencies] ansi-to-tui = "7.0.0" anyhow = "1.0.100" diff --git a/yazi-actor/src/mgr/download.rs b/yazi-actor/src/mgr/download.rs index 1263f7d8..8fcb2640 100644 --- a/yazi-actor/src/mgr/download.rs +++ b/yazi-actor/src/mgr/download.rs @@ -3,7 +3,6 @@ use std::{mem, time::{Duration, Instant}}; use anyhow::Result; use futures::{StreamExt, stream::FuturesUnordered}; use hashbrown::HashSet; -use tokio::sync::oneshot; use yazi_fs::{File, FsScheme, provider::{Provider, local::Local}}; use yazi_macro::succ; use yazi_parser::mgr::{DownloadOpt, OpenOpt}; @@ -29,9 +28,8 @@ impl Actor for Download { let mut wg1 = FuturesUnordered::new(); for url in opt.urls { - let (tx, rx) = oneshot::channel(); - scheduler.file_download(url.to_owned(), Some(tx)); - wg1.push(async move { (rx.await == Ok(true), url) }); + let done = scheduler.file_download(url.to_owned()); + wg1.push(async move { (done.future().await, url) }); } let mut wg2 = vec![]; diff --git a/yazi-core/src/tasks/prework.rs b/yazi-core/src/tasks/prework.rs index 2fb6fc88..c9f6c15a 100644 --- a/yazi-core/src/tasks/prework.rs +++ b/yazi-core/src/tasks/prework.rs @@ -23,7 +23,7 @@ impl Tasks { drop(loaded); for (i, tasks) in tasks.into_iter().enumerate() { if !tasks.is_empty() { - self.scheduler.fetch_paged(&YAZI.plugin.fetchers[i], tasks, None); + self.scheduler.fetch_paged(&YAZI.plugin.fetchers[i], tasks); } } } diff --git a/yazi-parser/src/tasks/process_open.rs b/yazi-parser/src/tasks/process_open.rs index 99b1b6bf..554a6a50 100644 --- a/yazi-parser/src/tasks/process_open.rs +++ b/yazi-parser/src/tasks/process_open.rs @@ -2,8 +2,7 @@ use std::ffi::OsString; use anyhow::anyhow; use mlua::{ExternalError, FromLua, IntoLua, Lua, Value}; -use tokio::sync::oneshot; -use yazi_shared::{event::CmdCow, url::UrlCow}; +use yazi_shared::{CompletionToken, event::CmdCow, url::UrlCow}; // --- Exec #[derive(Debug)] @@ -13,7 +12,7 @@ pub struct ProcessOpenOpt { pub args: Vec>, pub block: bool, pub orphan: bool, - pub done: Option>, + pub done: Option, pub spread: bool, // TODO: remove } diff --git a/yazi-proxy/src/tasks.rs b/yazi-proxy/src/tasks.rs index 425330ad..096139d3 100644 --- a/yazi-proxy/src/tasks.rs +++ b/yazi-proxy/src/tasks.rs @@ -1,9 +1,8 @@ use std::ffi::OsString; -use tokio::sync::oneshot; use yazi_macro::{emit, relay}; use yazi_parser::tasks::ProcessOpenOpt; -use yazi_shared::url::{UrlBuf, UrlCow}; +use yazi_shared::{CompletionToken, url::{UrlBuf, UrlCow}}; pub struct TasksProxy; @@ -20,17 +19,17 @@ impl TasksProxy { block: bool, orphan: bool, ) { - let (tx, rx) = oneshot::channel(); + let done = CompletionToken::new(); emit!(Call(relay!(tasks:process_open).with_any("opt", ProcessOpenOpt { cwd, cmd, args, block, orphan, - done: Some(tx), + done: Some(done.clone()), spread: false }))); - rx.await.ok(); + done.future().await; } pub fn update_succeed(url: impl Into) { diff --git a/yazi-scheduler/src/file/file.rs b/yazi-scheduler/src/file/file.rs index f48d12af..aacca663 100644 --- a/yazi-scheduler/src/file/file.rs +++ b/yazi-scheduler/src/file/file.rs @@ -9,7 +9,7 @@ use yazi_shared::{path::PathCow, timestamp_us, url::{AsUrl, UrlBuf, UrlCow, UrlL use yazi_vfs::{VfsCha, maybe_exists, must_be_dir, provider::{self, DirEntry}, unique_name}; use super::{FileInCopy, FileInDelete, FileInHardlink, FileInLink, FileInTrash}; -use crate::{LOW, NORMAL, TaskIn, TaskOp, TaskOps, ctx, file::{FileInCut, FileInDownload, FileInUpload, FileOutCopy, FileOutCopyDo, FileOutCut, FileOutCutDo, FileOutDelete, FileOutDeleteDo, FileOutDownload, FileOutDownloadDo, FileOutHardlink, FileOutHardlinkDo, FileOutLink, FileOutTrash, FileOutUpload, FileOutUploadDo}, hook::HookInOutCut, ok_or_not_found}; +use crate::{LOW, NORMAL, TaskIn, TaskOp, TaskOps, ctx, file::{FileInCut, FileInDownload, FileInUpload, FileOutCopy, FileOutCopyDo, FileOutCut, FileOutCutDo, FileOutDelete, FileOutDeleteDo, FileOutDownload, FileOutDownloadDo, FileOutHardlink, FileOutHardlinkDo, FileOutLink, FileOutTrash, FileOutUpload, FileOutUploadDo}, hook::HookInOutCut, ok_or_not_found, progress_or_break}; pub(crate) struct File { ops: TaskOps, @@ -62,8 +62,8 @@ impl File { let mut it = ctx!(task, provider::copy_with_progress(&task.from, &task.to, task.cha.unwrap()).await)?; - while let Some(res) = it.recv().await { - match res { + loop { + match progress_or_break!(it, task.done) { Ok(0) => break, Ok(n) => self.ops.out(task.id, FileOutCopyDo::Adv(n)), Err(e) if e.kind() == NotFound => { @@ -152,8 +152,8 @@ impl File { let mut it = ctx!(task, provider::copy_with_progress(&task.from, &task.to, task.cha.unwrap()).await)?; - while let Some(res) = it.recv().await { - match res { + loop { + match progress_or_break!(it, task.done) { Ok(0) => { provider::remove_file(&task.from).await.ok(); break; @@ -340,8 +340,8 @@ impl File { let cache_tmp = ctx!(task, Self::tmp(&cache).await, "Cannot determine download cache")?; let mut it = ctx!(task, provider::copy_with_progress(&task.url, &cache_tmp, cha).await)?; - while let Some(res) = it.recv().await { - match res { + loop { + match progress_or_break!(it, task.done) { Ok(0) => { Local::regular(&cache).remove_dir_all().await.ok(); ctx!(task, provider::rename(cache_tmp, cache).await, "Cannot persist downloaded file")?; @@ -424,8 +424,8 @@ impl File { .await )?; - while let Some(res) = it.recv().await { - match res { + loop { + match progress_or_break!(it, task.done) { Ok(0) => { let cha = ctx!(task, Self::cha(&task.url, true, None).await, "Cannot stat original file")?; diff --git a/yazi-scheduler/src/file/in.rs b/yazi-scheduler/src/file/in.rs index 77ccce5a..614ebec6 100644 --- a/yazi-scheduler/src/file/in.rs +++ b/yazi-scheduler/src/file/in.rs @@ -2,7 +2,7 @@ use std::{mem, path::PathBuf}; use tokio::sync::mpsc; use yazi_fs::cha::Cha; -use yazi_shared::{Id, url::UrlBuf}; +use yazi_shared::{CompletionToken, Id, url::UrlBuf}; // --- Copy #[derive(Clone, Debug)] @@ -14,6 +14,7 @@ pub(crate) struct FileInCopy { pub(crate) cha: Option, pub(crate) follow: bool, pub(crate) retry: u8, + pub(crate) done: CompletionToken, } impl FileInCopy { @@ -41,6 +42,7 @@ pub(crate) struct FileInCut { pub(crate) cha: Option, pub(crate) follow: bool, pub(crate) retry: u8, + pub(crate) done: CompletionToken, pub(crate) drop: Option>, } @@ -114,6 +116,7 @@ pub(crate) struct FileInDownload { pub(crate) url: UrlBuf, pub(crate) cha: Option, pub(crate) retry: u8, + pub(crate) done: CompletionToken, } // --- Upload @@ -123,4 +126,5 @@ pub(crate) struct FileInUpload { pub(crate) url: UrlBuf, pub(crate) cha: Option, pub(crate) cache: Option, + pub(crate) done: CompletionToken, } diff --git a/yazi-scheduler/src/file/traverse.rs b/yazi-scheduler/src/file/traverse.rs index 019f58da..6e367a67 100644 --- a/yazi-scheduler/src/file/traverse.rs +++ b/yazi-scheduler/src/file/traverse.rs @@ -41,6 +41,7 @@ impl Traverse for FileInCopy { cha: Some(cha), follow: self.follow, retry: self.retry, + done: self.done.clone(), } } @@ -63,6 +64,7 @@ impl Traverse for FileInCut { cha: Some(cha), follow: self.follow, retry: self.retry, + done: self.done.clone(), drop: self.drop.clone(), } } @@ -113,7 +115,13 @@ impl Traverse for FileInDownload { fn from(&self) -> Url<'_> { self.url.as_url() } fn spawn(&self, from: UrlBuf, _to: Option, cha: Cha) -> Self { - Self { id: self.id, url: from, cha: Some(cha), retry: self.retry } + Self { + id: self.id, + url: from, + cha: Some(cha), + retry: self.retry, + done: self.done.clone(), + } } fn to(&self) -> Option> { None } @@ -137,7 +145,13 @@ impl Traverse for FileInUpload { } fn spawn(&self, from: UrlBuf, _to: Option, cha: Cha) -> Self { - Self { id: self.id, cha: Some(cha), cache: from.cache(), url: from } + Self { + id: self.id, + cha: Some(cha), + cache: from.cache(), + url: from, + done: self.done.clone(), + } } fn to(&self) -> Option> { None } diff --git a/yazi-scheduler/src/hook/hook.rs b/yazi-scheduler/src/hook/hook.rs index 91f4bc8f..f67e941e 100644 --- a/yazi-scheduler/src/hook/hook.rs +++ b/yazi-scheduler/src/hook/hook.rs @@ -6,7 +6,7 @@ use yazi_dds::Pump; use yazi_proxy::TasksProxy; use yazi_vfs::provider; -use crate::{Ongoing, TaskOp, TaskOps, file::{FileOutCut, FileOutDelete, FileOutDownload, FileOutTrash}, hook::{HookInOutBg, HookInOutBlock, HookInOutCut, HookInOutDelete, HookInOutDownload, HookInOutFetch, HookInOutOrphan, HookInOutTrash}, prework::PreworkOutFetch, process::{ProcessOutBg, ProcessOutBlock, ProcessOutOrphan}}; +use crate::{Ongoing, TaskOp, TaskOps, file::{FileOutCut, FileOutDelete, FileOutDownload, FileOutTrash}, hook::{HookInOutCut, HookInOutDelete, HookInOutDownload, HookInOutTrash}}; pub(crate) struct Hook { ops: TaskOps, @@ -48,42 +48,6 @@ impl Hook { } pub(crate) async fn download(&self, task: HookInOutDownload) { - let intact = self.ongoing.lock().intact(task.id); - task.done.send(intact).ok(); self.ops.out(task.id, FileOutDownload::Clean); } - - // --- Process - pub(crate) async fn block(&self, task: HookInOutBlock) { - if let Some(tx) = task.done { - tx.send(()).ok(); - } - self.ops.out(task.id, ProcessOutBlock::Clean); - } - - pub(crate) async fn orphan(&self, task: HookInOutOrphan) { - if let Some(tx) = task.done { - tx.send(()).ok(); - } - self.ops.out(task.id, ProcessOutOrphan::Clean); - } - - pub(crate) async fn bg(&self, task: HookInOutBg) { - let intact = self.ongoing.lock().intact(task.id); - if !intact { - task.cancel.send(()).await.ok(); - task.cancel.closed().await; - } - if let Some(tx) = task.done { - tx.send(()).ok(); - } - self.ops.out(task.id, ProcessOutBg::Clean); - } - - // --- Prework - pub(crate) async fn fetch(&self, task: HookInOutFetch) { - let intact = self.ongoing.lock().intact(task.id); - task.done.send(intact).ok(); - self.ops.out(task.id, PreworkOutFetch::Clean); - } } diff --git a/yazi-scheduler/src/hook/in.rs b/yazi-scheduler/src/hook/in.rs index e5549d85..4d5f468a 100644 --- a/yazi-scheduler/src/hook/in.rs +++ b/yazi-scheduler/src/hook/in.rs @@ -1,4 +1,3 @@ -use tokio::sync::{mpsc, oneshot}; use yazi_shared::{Id, url::UrlBuf}; use crate::{Task, TaskProg, file::FileInCut}; @@ -58,8 +57,7 @@ impl HookInOutTrash { // --- Download #[derive(Debug)] pub(crate) struct HookInOutDownload { - pub(crate) id: Id, - pub(crate) done: oneshot::Sender, + pub(crate) id: Id, } impl HookInOutDownload { @@ -69,64 +67,3 @@ impl HookInOutDownload { } } } - -// --- Fetch -#[derive(Debug)] -pub(crate) struct HookInOutFetch { - pub(crate) id: Id, - pub(crate) done: oneshot::Sender, -} - -impl HookInOutFetch { - pub(crate) fn reduce(self, task: &mut Task) { - if let TaskProg::PreworkFetch(_) = &task.prog { - task.hook = Some(self.into()); - } - } -} - -// --- Block -#[derive(Debug)] -pub(crate) struct HookInOutBlock { - pub(crate) id: Id, - pub(crate) done: Option>, -} - -impl HookInOutBlock { - pub(crate) fn reduce(self, task: &mut Task) { - if let TaskProg::ProcessBlock(_) = &task.prog { - task.hook = Some(self.into()); - } - } -} - -// --- Orphan -#[derive(Debug)] -pub(crate) struct HookInOutOrphan { - pub(crate) id: Id, - pub(crate) done: Option>, -} - -impl HookInOutOrphan { - pub(crate) fn reduce(self, task: &mut Task) { - if let TaskProg::ProcessOrphan(_) = &task.prog { - task.hook = Some(self.into()); - } - } -} - -// --- Bg -#[derive(Debug)] -pub(crate) struct HookInOutBg { - pub(crate) id: Id, - pub(crate) cancel: mpsc::Sender<()>, - pub(crate) done: Option>, -} - -impl HookInOutBg { - pub(crate) fn reduce(self, task: &mut Task) { - if let TaskProg::ProcessBg(_) = &task.prog { - task.hook = Some(self.into()); - } - } -} diff --git a/yazi-scheduler/src/in.rs b/yazi-scheduler/src/in.rs index 09bb7b8c..5b2edb7b 100644 --- a/yazi-scheduler/src/in.rs +++ b/yazi-scheduler/src/in.rs @@ -1,6 +1,6 @@ use yazi_shared::Id; -use crate::{file::{FileInCopy, FileInCut, FileInDelete, FileInDownload, FileInHardlink, FileInLink, FileInTrash, FileInUpload}, hook::{HookInOutBg, HookInOutBlock, HookInOutCut, HookInOutDelete, HookInOutDownload, HookInOutFetch, HookInOutOrphan, HookInOutTrash}, impl_from_in, plugin::PluginInEntry, prework::{PreworkInFetch, PreworkInLoad, PreworkInSize}, process::{ProcessInBg, ProcessInBlock, ProcessInOrphan}}; +use crate::{file::{FileInCopy, FileInCut, FileInDelete, FileInDownload, FileInHardlink, FileInLink, FileInTrash, FileInUpload}, hook::{HookInOutCut, HookInOutDelete, HookInOutDownload, HookInOutTrash}, impl_from_in, plugin::PluginInEntry, prework::{PreworkInFetch, PreworkInLoad, PreworkInSize}, process::{ProcessInBg, ProcessInBlock, ProcessInOrphan}}; #[derive(Debug)] pub(crate) enum TaskIn { @@ -28,10 +28,6 @@ pub(crate) enum TaskIn { HookDelete(HookInOutDelete), HookTrash(HookInOutTrash), HookDownload(HookInOutDownload), - HookBlock(HookInOutBlock), - HookOrphan(HookInOutOrphan), - HookBg(HookInOutBg), - HookFetch(HookInOutFetch), } impl_from_in! { @@ -44,7 +40,7 @@ impl_from_in! { // Process ProcessBlock(ProcessInBlock), ProcessOrphan(ProcessInOrphan), ProcessBg(ProcessInBg), // Hook - HookCut(HookInOutCut), HookDelete(HookInOutDelete), HookTrash(HookInOutTrash), HookDownload(HookInOutDownload), HookBlock(HookInOutBlock), HookOrphan(HookInOutOrphan), HookBg(HookInOutBg), HookFetch(HookInOutFetch), + HookCut(HookInOutCut), HookDelete(HookInOutDelete), HookTrash(HookInOutTrash), HookDownload(HookInOutDownload), } impl TaskIn { @@ -74,10 +70,35 @@ impl TaskIn { Self::HookDelete(r#in) => r#in.id, Self::HookTrash(r#in) => r#in.id, Self::HookDownload(r#in) => r#in.id, - Self::HookBlock(r#in) => r#in.id, - Self::HookOrphan(r#in) => r#in.id, - Self::HookBg(r#in) => r#in.id, - Self::HookFetch(r#in) => r#in.id, + } + } + + pub fn is_hook(&self) -> bool { + match self { + // File + Self::FileCopy(_) => false, + Self::FileCut(_) => false, + Self::FileLink(_) => false, + Self::FileHardlink(_) => false, + Self::FileDelete(_) => false, + Self::FileTrash(_) => false, + Self::FileDownload(_) => false, + Self::FileUpload(_) => false, + // Plugin + Self::PluginEntry(_) => false, + // Prework + Self::PreworkFetch(_) => false, + Self::PreworkLoad(_) => false, + Self::PreworkSize(_) => false, + // Process + Self::ProcessBlock(_) => false, + Self::ProcessOrphan(_) => false, + Self::ProcessBg(_) => false, + // Hook + Self::HookCut(_) => true, + Self::HookDelete(_) => true, + Self::HookTrash(_) => true, + Self::HookDownload(_) => true, } } } diff --git a/yazi-scheduler/src/macros.rs b/yazi-scheduler/src/macros.rs index 61ee88f3..ae8fb014 100644 --- a/yazi-scheduler/src/macros.rs +++ b/yazi-scheduler/src/macros.rs @@ -24,6 +24,21 @@ macro_rules! ok_or_not_found { }; } +#[macro_export] +macro_rules! progress_or_break { + ($rx:ident, $done:expr) => { + tokio::select! { + r = $rx.recv() => { + match r { + Some(prog) => prog, + None => break, + } + }, + false = $done.future() => break, + } + }; +} + #[macro_export] macro_rules! impl_from_in { ($($variant:ident($type:ty)),* $(,)?) => { diff --git a/yazi-scheduler/src/ongoing.rs b/yazi-scheduler/src/ongoing.rs index 9fec36b0..e01caec9 100644 --- a/yazi-scheduler/src/ongoing.rs +++ b/yazi-scheduler/src/ongoing.rs @@ -1,15 +1,15 @@ -use hashbrown::HashMap; +use hashbrown::{HashMap, hash_map::RawEntryMut}; use ordered_float::OrderedFloat; use yazi_config::YAZI; use yazi_parser::app::TaskSummary; -use yazi_shared::{Id, Ids}; +use yazi_shared::{CompletionToken, Id, Ids}; use super::Task; -use crate::TaskProg; +use crate::{TaskIn, TaskProg}; #[derive(Default)] pub struct Ongoing { - pub(super) inner: HashMap, + inner: HashMap, } impl Ongoing { @@ -23,11 +23,39 @@ impl Ongoing { self.inner.entry(id).insert(Task::new::(id, name)).into_mut() } + pub(super) fn cancel(&mut self, id: Id) -> Option { + match self.inner.raw_entry_mut().from_key(&id) { + RawEntryMut::Occupied(mut oe) => { + let task = oe.get_mut(); + task.done.complete(false); + + if let Some(hook) = task.hook.take() { + return Some(hook); + } + + oe.remove(); + } + RawEntryMut::Vacant(_) => {} + } + None + } + + pub(super) fn fulfill(&mut self, id: Id) -> Option { + let task = self.inner.remove(&id)?; + task.done.complete(true); + Some(task) + } + #[inline] pub fn get_mut(&mut self, id: Id) -> Option<&mut Task> { self.inner.get_mut(&id) } pub fn get_id(&self, idx: usize) -> Option { self.values().nth(idx).map(|t| t.id) } + #[inline] + pub fn get_token(&self, id: Id) -> Option { + self.inner.get(&id).map(|t| t.done.clone()) + } + pub fn len(&self) -> usize { if YAZI.tasks.suppress_preload { self.inner.values().filter(|&t| t.prog.is_user()).count() @@ -40,7 +68,9 @@ impl Ongoing { pub fn exists(&self, id: Id) -> bool { self.inner.contains_key(&id) } #[inline] - pub fn intact(&self, id: Id) -> bool { self.inner.get(&id).is_some_and(|t| !t.canceled) } + pub fn intact(&self, id: Id) -> bool { + self.inner.get(&id).is_some_and(|t| t.done.completed() != Some(false)) + } pub fn values(&self) -> Box + '_> { if YAZI.tasks.suppress_preload { diff --git a/yazi-scheduler/src/out.rs b/yazi-scheduler/src/out.rs index de4cf878..6fc81c5e 100644 --- a/yazi-scheduler/src/out.rs +++ b/yazi-scheduler/src/out.rs @@ -1,4 +1,4 @@ -use crate::{Task, file::{FileOutCopy, FileOutCopyDo, FileOutCut, FileOutCutDo, FileOutDelete, FileOutDeleteDo, FileOutDownload, FileOutDownloadDo, FileOutHardlink, FileOutHardlinkDo, FileOutLink, FileOutTrash, FileOutUpload, FileOutUploadDo}, hook::{HookInOutBg, HookInOutBlock, HookInOutCut, HookInOutDelete, HookInOutDownload, HookInOutFetch, HookInOutOrphan, HookInOutTrash}, impl_from_out, plugin::PluginOutEntry, prework::{PreworkOutFetch, PreworkOutLoad, PreworkOutSize}, process::{ProcessOutBg, ProcessOutBlock, ProcessOutOrphan}}; +use crate::{Task, file::{FileOutCopy, FileOutCopyDo, FileOutCut, FileOutCutDo, FileOutDelete, FileOutDeleteDo, FileOutDownload, FileOutDownloadDo, FileOutHardlink, FileOutHardlinkDo, FileOutLink, FileOutTrash, FileOutUpload, FileOutUploadDo}, hook::{HookInOutCut, HookInOutDelete, HookInOutDownload, HookInOutTrash}, impl_from_out, plugin::PluginOutEntry, prework::{PreworkOutFetch, PreworkOutLoad, PreworkOutSize}, process::{ProcessOutBg, ProcessOutBlock, ProcessOutOrphan}}; #[derive(Debug)] pub(super) enum TaskOut { @@ -32,10 +32,6 @@ pub(super) enum TaskOut { HookDelete(HookInOutDelete), HookTrash(HookInOutTrash), HookDownload(HookInOutDownload), - HookBlock(HookInOutBlock), - HookOrphan(HookInOutOrphan), - HookBg(HookInOutBg), - HookFetch(HookInOutFetch), } impl_from_out! { @@ -48,7 +44,7 @@ impl_from_out! { // Process ProcessBlock(ProcessOutBlock), ProcessOrphan(ProcessOutOrphan), ProcessBg(ProcessOutBg), // Hook - HookCut(HookInOutCut), HookDelete(HookInOutDelete), HookTrash(HookInOutTrash), HookDownload(HookInOutDownload), HookBlock(HookInOutBlock), HookOrphan(HookInOutOrphan), HookBg(HookInOutBg), HookFetch(HookInOutFetch), + HookCut(HookInOutCut), HookDelete(HookInOutDelete), HookTrash(HookInOutTrash), HookDownload(HookInOutDownload), } impl TaskOut { @@ -84,10 +80,6 @@ impl TaskOut { Self::HookDelete(out) => out.reduce(task), Self::HookTrash(out) => out.reduce(task), Self::HookDownload(out) => out.reduce(task), - Self::HookBlock(out) => out.reduce(task), - Self::HookOrphan(out) => out.reduce(task), - Self::HookBg(out) => out.reduce(task), - Self::HookFetch(out) => out.reduce(task), } } } diff --git a/yazi-scheduler/src/prework/out.rs b/yazi-scheduler/src/prework/out.rs index 13724bc6..762ae06d 100644 --- a/yazi-scheduler/src/prework/out.rs +++ b/yazi-scheduler/src/prework/out.rs @@ -5,7 +5,6 @@ use crate::{Task, TaskProg}; pub(crate) enum PreworkOutFetch { Succ, Fail(String), - Clean, } impl From for PreworkOutFetch { @@ -23,9 +22,6 @@ impl PreworkOutFetch { prog.state = Some(false); task.log(reason); } - Self::Clean => { - prog.cleaned = true; - } } } } diff --git a/yazi-scheduler/src/prework/progress.rs b/yazi-scheduler/src/prework/progress.rs index 694abbee..9174ef82 100644 --- a/yazi-scheduler/src/prework/progress.rs +++ b/yazi-scheduler/src/prework/progress.rs @@ -4,8 +4,7 @@ use yazi_parser::app::TaskSummary; // --- Fetch #[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize)] pub struct PreworkProgFetch { - pub state: Option, - pub cleaned: bool, + pub state: Option, } impl From for TaskSummary { @@ -26,7 +25,7 @@ impl PreworkProgFetch { pub fn failed(self) -> bool { self.state == Some(false) } - pub fn cleaned(self) -> bool { self.cleaned } + pub fn cleaned(self) -> bool { false } pub fn percent(self) -> Option { None } } diff --git a/yazi-scheduler/src/process/in.rs b/yazi-scheduler/src/process/in.rs index ac88d157..f0c20ee5 100644 --- a/yazi-scheduler/src/process/in.rs +++ b/yazi-scheduler/src/process/in.rs @@ -1,7 +1,6 @@ use std::ffi::OsString; -use tokio::sync::mpsc; -use yazi_shared::{Id, url::UrlCow}; +use yazi_shared::{CompletionToken, Id, url::UrlCow}; use super::ShellOpt; @@ -38,11 +37,11 @@ impl From for ShellOpt { // --- Bg #[derive(Debug)] pub(crate) struct ProcessInBg { - pub(crate) id: Id, - pub(crate) cwd: UrlCow<'static>, - pub(crate) cmd: OsString, - pub(crate) args: Vec>, - pub(crate) cancel: mpsc::Receiver<()>, + pub(crate) id: Id, + pub(crate) cwd: UrlCow<'static>, + pub(crate) cmd: OsString, + pub(crate) args: Vec>, + pub(crate) done: CompletionToken, } impl From for ShellOpt { diff --git a/yazi-scheduler/src/process/out.rs b/yazi-scheduler/src/process/out.rs index 8d96a5d1..32b458ff 100644 --- a/yazi-scheduler/src/process/out.rs +++ b/yazi-scheduler/src/process/out.rs @@ -4,7 +4,6 @@ use crate::{Task, TaskProg}; pub(crate) enum ProcessOutBlock { Succ, Fail(String), - Clean, } impl From for ProcessOutBlock { @@ -22,9 +21,6 @@ impl ProcessOutBlock { prog.state = Some(false); task.log(reason); } - Self::Clean => { - prog.cleaned = true; - } } } } @@ -34,7 +30,6 @@ impl ProcessOutBlock { pub(crate) enum ProcessOutOrphan { Succ, Fail(String), - Clean, } impl From for ProcessOutOrphan { @@ -52,9 +47,6 @@ impl ProcessOutOrphan { prog.state = Some(false); task.log(reason); } - Self::Clean => { - prog.cleaned = true; - } } } } @@ -65,7 +57,6 @@ pub(crate) enum ProcessOutBg { Log(String), Succ, Fail(String), - Clean, } impl From for ProcessOutBg { @@ -86,9 +77,6 @@ impl ProcessOutBg { prog.state = Some(false); task.log(reason); } - Self::Clean => { - prog.cleaned = true; - } } } } diff --git a/yazi-scheduler/src/process/process.rs b/yazi-scheduler/src/process/process.rs index 387472cf..3495c137 100644 --- a/yazi-scheduler/src/process/process.rs +++ b/yazi-scheduler/src/process/process.rs @@ -55,14 +55,13 @@ impl Process { }) .await?; + let done = task.done; let mut stdout = BufReader::new(child.stdout.take().unwrap()).lines(); let mut stderr = BufReader::new(child.stderr.take().unwrap()).lines(); - let mut cancel = task.cancel; loop { select! { - _ = cancel.recv() => { + false = done.future() => { child.start_kill().ok(); - cancel.close(); break; } Ok(Some(line)) = stdout.next_line() => { diff --git a/yazi-scheduler/src/process/progress.rs b/yazi-scheduler/src/process/progress.rs index 81980e1d..9cc212bb 100644 --- a/yazi-scheduler/src/process/progress.rs +++ b/yazi-scheduler/src/process/progress.rs @@ -4,8 +4,7 @@ use yazi_parser::app::TaskSummary; // --- Block #[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize)] pub struct ProcessProgBlock { - pub state: Option, - pub cleaned: bool, + pub state: Option, } impl From for TaskSummary { @@ -26,7 +25,7 @@ impl ProcessProgBlock { pub fn failed(self) -> bool { self.state == Some(false) } - pub fn cleaned(self) -> bool { self.cleaned } + pub fn cleaned(self) -> bool { false } pub fn percent(self) -> Option { None } } @@ -34,8 +33,7 @@ impl ProcessProgBlock { // --- Orphan #[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize)] pub struct ProcessProgOrphan { - pub state: Option, - pub cleaned: bool, + pub state: Option, } impl From for TaskSummary { @@ -56,7 +54,7 @@ impl ProcessProgOrphan { pub fn failed(self) -> bool { self.state == Some(false) } - pub fn cleaned(self) -> bool { self.cleaned } + pub fn cleaned(self) -> bool { false } pub fn percent(self) -> Option { None } } @@ -64,8 +62,7 @@ impl ProcessProgOrphan { // --- Bg #[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize)] pub struct ProcessProgBg { - pub state: Option, - pub cleaned: bool, + pub state: Option, } impl From for TaskSummary { @@ -86,7 +83,7 @@ impl ProcessProgBg { pub fn failed(self) -> bool { self.state == Some(false) } - pub fn cleaned(self) -> bool { self.cleaned } + pub fn cleaned(self) -> bool { false } pub fn percent(self) -> Option { None } } diff --git a/yazi-scheduler/src/runner.rs b/yazi-scheduler/src/runner.rs index 23e9c613..e84dcc0b 100644 --- a/yazi-scheduler/src/runner.rs +++ b/yazi-scheduler/src/runner.rs @@ -38,10 +38,6 @@ impl Runner { TaskIn::HookDelete(r#in) => Ok(self.hook.delete(r#in).await), TaskIn::HookTrash(r#in) => Ok(self.hook.trash(r#in).await), TaskIn::HookDownload(r#in) => Ok(self.hook.download(r#in).await), - TaskIn::HookBlock(r#in) => Ok(self.hook.block(r#in).await), - TaskIn::HookOrphan(r#in) => Ok(self.hook.orphan(r#in).await), - TaskIn::HookBg(r#in) => Ok(self.hook.bg(r#in).await), - TaskIn::HookFetch(r#in) => Ok(self.hook.fetch(r#in).await), } } @@ -71,10 +67,6 @@ impl Runner { TaskIn::HookDelete(_in) => unreachable!(), TaskIn::HookTrash(_in) => unreachable!(), TaskIn::HookDownload(_in) => unreachable!(), - TaskIn::HookBlock(_in) => unreachable!(), - TaskIn::HookOrphan(_in) => unreachable!(), - TaskIn::HookBg(_in) => unreachable!(), - TaskIn::HookFetch(_in) => unreachable!(), } } } diff --git a/yazi-scheduler/src/scheduler.rs b/yazi-scheduler/src/scheduler.rs index 57af5561..da245321 100644 --- a/yazi-scheduler/src/scheduler.rs +++ b/yazi-scheduler/src/scheduler.rs @@ -1,14 +1,13 @@ use std::{sync::Arc, time::Duration}; -use hashbrown::hash_map::RawEntryMut; use parking_lot::Mutex; -use tokio::{select, sync::{mpsc::{self, UnboundedReceiver}, oneshot}, task::JoinHandle}; +use tokio::{select, sync::mpsc::{self, UnboundedReceiver}, task::JoinHandle}; use yazi_config::{YAZI, plugin::{Fetcher, Preloader}}; use yazi_parser::{app::PluginOpt, tasks::ProcessOpenOpt}; -use yazi_shared::{Id, Throttle, url::{UrlBuf, UrlLike}}; +use yazi_shared::{CompletionToken, Id, Throttle, url::{UrlBuf, UrlLike}}; use super::{Ongoing, TaskOp}; -use crate::{HIGH, LOW, NORMAL, Runner, TaskIn, TaskOps, file::{File, FileInCopy, FileInCut, FileInDelete, FileInDownload, FileInHardlink, FileInLink, FileInTrash, FileInUpload, FileOutCopy, FileOutCut, FileOutDownload, FileOutHardlink, FileOutUpload, FileProgCopy, FileProgCut, FileProgDelete, FileProgDownload, FileProgHardlink, FileProgLink, FileProgTrash, FileProgUpload}, hook::{Hook, HookInOutBg, HookInOutBlock, HookInOutDelete, HookInOutDownload, HookInOutFetch, HookInOutOrphan, HookInOutTrash}, plugin::{Plugin, PluginInEntry, PluginProgEntry}, prework::{Prework, PreworkInFetch, PreworkInLoad, PreworkInSize, PreworkProgFetch, PreworkProgLoad, PreworkProgSize}, process::{Process, ProcessInBg, ProcessInBlock, ProcessInOrphan, ProcessProgBg, ProcessProgBlock, ProcessProgOrphan}}; +use crate::{HIGH, LOW, NORMAL, Runner, TaskIn, TaskOps, file::{File, FileInCopy, FileInCut, FileInDelete, FileInDownload, FileInHardlink, FileInLink, FileInTrash, FileInUpload, FileOutCopy, FileOutCut, FileOutDownload, FileOutHardlink, FileOutUpload, FileProgCopy, FileProgCut, FileProgDelete, FileProgDownload, FileProgHardlink, FileProgLink, FileProgTrash, FileProgUpload}, hook::{Hook, HookInOutDelete, HookInOutDownload, HookInOutTrash}, plugin::{Plugin, PluginInEntry, PluginProgEntry}, prework::{Prework, PreworkInFetch, PreworkInLoad, PreworkInSize, PreworkProgFetch, PreworkProgLoad, PreworkProgSize}, process::{Process, ProcessInBg, ProcessInBlock, ProcessInOrphan, ProcessProgBg, ProcessProgBlock, ProcessProgOrphan}}; pub struct Scheduler { ops: TaskOps, @@ -54,22 +53,12 @@ impl Scheduler { } pub fn cancel(&self, id: Id) -> bool { - let mut ongoing = self.ongoing.lock(); - - match ongoing.inner.raw_entry_mut().from_key(&id) { - RawEntryMut::Occupied(mut oe) => { - let task = oe.get_mut(); - if let Some(hook) = task.hook.take() { - task.canceled = true; - self.micro.try_send(hook, HIGH).ok(); - false - } else { - oe.remove(); - true - } - } - RawEntryMut::Vacant(_) => false, + if let Some(hook) = self.ongoing.lock().cancel(id) { + self.micro.try_send(hook, HIGH).ok(); + return false; } + + true } pub fn shutdown(&self) { @@ -90,7 +79,17 @@ impl Scheduler { let follow = !from.scheme().covariant(to.scheme()); self.queue( - FileInCut { id: task.id, from, to, force, cha: None, follow, retry: 0, drop: None }, + FileInCut { + id: task.id, + from, + to, + force, + cha: None, + follow, + retry: 0, + drop: None, + done: task.done.clone(), + }, LOW, ); } @@ -106,7 +105,19 @@ impl Scheduler { } let follow = follow || !from.scheme().covariant(to.scheme()); - self.queue(FileInCopy { id: task.id, from, to, force, cha: None, follow, retry: 0 }, LOW); + self.queue( + FileInCopy { + id: task.id, + from, + to, + force, + cha: None, + follow, + retry: 0, + done: task.done.clone(), + }, + LOW, + ); } pub fn file_link(&self, from: UrlBuf, to: UrlBuf, relative: bool, force: bool) { @@ -164,21 +175,21 @@ impl Scheduler { self.queue(FileInTrash { id: task.id, target }, LOW); } - pub fn file_download(&self, url: UrlBuf, done: Option>) { + pub fn file_download(&self, url: UrlBuf) -> CompletionToken { let mut ongoing = self.ongoing.lock(); let task = ongoing.add::(format!("Download {}", url.display())); - if !url.kind().is_remote() { - return self - .ops - .out(task.id, FileOutDownload::Fail("Cannot download non-remote file".to_owned())); - }; - - if let Some(done) = done { - task.set_hook(HookInOutDownload { id: task.id, done }); + task.set_hook(HookInOutDownload { id: task.id }); + if url.kind().is_remote() { + self.queue( + FileInDownload { id: task.id, url, cha: None, retry: 0, done: task.done.clone() }, + LOW, + ); + } else { + self.ops.out(task.id, FileOutDownload::Fail("Cannot download non-remote file".to_owned())); } - self.queue(FileInDownload { id: task.id, url, cha: None, retry: 0 }, LOW); + task.done.clone() } pub fn file_upload(&self, url: UrlBuf) { @@ -191,7 +202,10 @@ impl Scheduler { .out(task.id, FileOutUpload::Fail("Cannot upload non-remote file".to_owned())); }; - self.queue(FileInUpload { id: task.id, url, cha: None, cache: None }, LOW); + self.queue( + FileInUpload { id: task.id, url, cha: None, cache: None, done: task.done.clone() }, + LOW, + ); } pub fn plugin_entry(&self, opt: PluginOpt) { @@ -205,8 +219,7 @@ impl Scheduler { &self, fetcher: &'static Fetcher, targets: Vec, - done: Option>, - ) { + ) -> CompletionToken { let mut ongoing = self.ongoing.lock(); let task = ongoing.add::(format!( "Run fetcher `{}` with {} target(s)", @@ -214,24 +227,19 @@ impl Scheduler { targets.len() )); - if let Some(done) = done { - task.set_hook(HookInOutFetch { id: task.id, done }); - } - self.queue(PreworkInFetch { id: task.id, plugin: fetcher, targets }, NORMAL); + task.done.clone() } pub async fn fetch_mimetype(&self, targets: Vec) -> bool { let mut wg = vec![]; for (fetcher, targets) in YAZI.plugin.mime_fetchers(targets) { - let (tx, rx) = oneshot::channel(); - self.fetch_paged(fetcher, targets, Some(tx)); - wg.push(rx); + wg.push(self.fetch_paged(fetcher, targets)); } - for rx in wg { - if rx.await != Ok(true) { - return false; // Canceled or error + for done in wg { + if !done.future().await { + return false; // Canceled } } true @@ -278,24 +286,24 @@ impl Scheduler { ongoing.add::(name) }; + if let Some(done) = opt.done { + task.done = done; + } + if opt.block { - task.set_hook(HookInOutBlock { id: task.id, done: opt.done }); self .queue(ProcessInBlock { id: task.id, cwd: opt.cwd, cmd: opt.cmd, args: opt.args }, NORMAL); } else if opt.orphan { - task.set_hook(HookInOutOrphan { id: task.id, done: opt.done }); self .queue(ProcessInOrphan { id: task.id, cwd: opt.cwd, cmd: opt.cmd, args: opt.args }, NORMAL); } else { - let (cancel_tx, cancel_rx) = mpsc::channel(1); - task.set_hook(HookInOutBg { id: task.id, cancel: cancel_tx, done: opt.done }); self.queue( ProcessInBg { - id: task.id, - cwd: opt.cwd, - cmd: opt.cmd, - args: opt.args, - cancel: cancel_rx, + id: task.id, + cwd: opt.cwd, + cmd: opt.cmd, + args: opt.args, + done: task.done.clone(), }, NORMAL, ); @@ -311,11 +319,19 @@ impl Scheduler { loop { if let Ok((r#in, _)) = rx.recv().await { let id = r#in.id(); - if !ongoing.lock().exists(id) { + let Some(token) = ongoing.lock().get_token(id) else { continue; - } + }; + + let result = if r#in.is_hook() { + runner.micro(r#in).await + } else { + select! { + r = runner.micro(r#in) => r, + false = token.future() => Ok(()) + } + }; - let result = runner.micro(r#in).await; if let Err(out) = result { ops.out(id, out); } @@ -341,11 +357,24 @@ impl Scheduler { }; let id = r#in.id(); - if !ongoing.lock().exists(id) { + let Some(token) = ongoing.lock().get_token(id) else { continue; - } + }; + + let result = if r#in.is_hook() { + if micro { runner.micro(r#in).await } else { runner.r#macro(r#in).await } + } else if micro { + select! { + r = runner.micro(r#in) => r, + false = token.future() => Ok(()), + } + } else { + select! { + r = runner.r#macro(r#in) => r, + false = token.future() => Ok(()), + } + }; - let result = if micro { runner.micro(r#in).await } else { runner.r#macro(r#in).await }; if let Err(out) = result { ops.out(id, out); } @@ -368,7 +397,7 @@ impl Scheduler { } else if let Some(hook) = task.hook.take() { micro.try_send(hook, LOW).ok(); } else { - ongoing.inner.remove(&op.id); + ongoing.fulfill(op.id); } } }) diff --git a/yazi-scheduler/src/task.rs b/yazi-scheduler/src/task.rs index cd94c54e..0a7e44b4 100644 --- a/yazi-scheduler/src/task.rs +++ b/yazi-scheduler/src/task.rs @@ -1,5 +1,5 @@ use tokio::sync::mpsc; -use yazi_shared::Id; +use yazi_shared::{CompletionToken, Id}; use crate::{TaskIn, TaskProg}; @@ -9,7 +9,7 @@ pub struct Task { pub name: String, pub(crate) prog: TaskProg, pub(crate) hook: Option, - pub canceled: bool, + pub done: CompletionToken, pub logs: String, pub logger: Option>, @@ -25,7 +25,7 @@ impl Task { name, prog: T::default().into(), hook: None, - canceled: false, + done: CompletionToken::new(), logs: Default::default(), logger: Default::default(), diff --git a/yazi-shared/src/completion_token.rs b/yazi-shared/src/completion_token.rs new file mode 100644 index 00000000..1904ac0d --- /dev/null +++ b/yazi-shared/src/completion_token.rs @@ -0,0 +1,38 @@ +use std::sync::{Arc, atomic::{AtomicU8, Ordering}}; + +use tokio::sync::Notify; + +#[derive(Clone, Debug)] +pub struct CompletionToken { + inner: Arc<(AtomicU8, Notify)>, +} + +impl CompletionToken { + pub fn new() -> Self { Self { inner: Arc::new((AtomicU8::new(0), Notify::new())) } } + + pub fn complete(&self, success: bool) { + let new = if success { 1 } else { 2 }; + self.inner.0.compare_exchange(0, new, Ordering::Relaxed, Ordering::Relaxed).ok(); + self.inner.1.notify_waiters(); + } + + pub fn completed(&self) -> Option { + let state = self.inner.0.load(Ordering::Relaxed); + if state == 0 { None } else { Some(state == 1) } + } + + pub async fn future(&self) -> bool { + loop { + if let Some(state) = self.completed() { + return state; + } + + let notified = self.inner.1.notified(); + if let Some(state) = self.completed() { + return state; + } + + notified.await; + } + } +} diff --git a/yazi-shared/src/lib.rs b/yazi-shared/src/lib.rs index f0d71cb6..c51318fb 100644 --- a/yazi-shared/src/lib.rs +++ b/yazi-shared/src/lib.rs @@ -1,6 +1,6 @@ yazi_macro::mod_pub!(data errors event loc path pool scheme shell strand translit url wtf8); -yazi_macro::mod_flat!(alias bytes chars condition debounce either env id layer localset natsort os predictor ro_cell source sync_cell terminal tests throttle time utf8); +yazi_macro::mod_flat!(alias bytes chars completion_token condition debounce either env id layer localset natsort os predictor ro_cell source sync_cell terminal tests throttle time utf8); pub fn init() { LOCAL_SET.with(tokio::task::LocalSet::new); diff --git a/yazi-vfs/src/provider/copier.rs b/yazi-vfs/src/provider/copier.rs index 211a1bf1..977f607a 100644 --- a/yazi-vfs/src/provider/copier.rs +++ b/yazi-vfs/src/provider/copier.rs @@ -55,7 +55,7 @@ pub(super) fn copy_with_progress_impl( }; let chunks = (cha.len + PER_CHUNK - 1) / PER_CHUNK; - let mut result = futures::stream::iter(0..chunks) + let it = futures::stream::iter(0..chunks) .map(|i| { let acc_ = acc_.clone(); let (from, to) = (from.clone(), to.clone()); @@ -104,8 +104,13 @@ pub(super) fn copy_with_progress_impl( } }) .buffer_unordered(4) - .try_fold(None, |first, file| async { Ok(first.or(file)) }) - .await; + .try_fold(None, |first, file| async { Ok(first.or(file)) }); + + let mut result = select! { + r = it => r, + _ = prog_tx_.closed() => return, + }; + done_tx.send(()).ok(); let n = acc_.swap(0, Ordering::SeqCst); if n > 0 { @@ -122,8 +127,6 @@ pub(super) fn copy_with_progress_impl( } else { prog_tx_.send(Ok(0)).await.ok(); } - - done_tx.send(()).ok(); }); tokio::spawn(async move {