refactor: simplify the remote progressive file copier (#3852)

This commit is contained in:
三咲雅 misaki masa 2026-04-04 21:28:57 +08:00 committed by GitHub
parent 0cedbd9c7b
commit 4bb4f37555
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
47 changed files with 486 additions and 353 deletions

1
Cargo.lock generated
View file

@ -5767,6 +5767,7 @@ version = "26.2.2"
dependencies = [
"anyhow",
"crossterm 0.29.0",
"dyn-clone",
"hashbrown 0.16.1",
"indexmap 2.13.1",
"mlua",

View file

@ -1,8 +1,9 @@
use anyhow::Result;
use yazi_core::app::PluginMode;
use yazi_macro::{act, succ};
use yazi_parser::app::PluginForm;
use yazi_proxy::AppProxy;
use yazi_runner::{loader::LOADER, plugin::PluginMode};
use yazi_runner::loader::LOADER;
use yazi_scheduler::NotifyProxy;
use yazi_shared::data::Data;
@ -23,7 +24,7 @@ impl Actor for Plugin {
}
if opt.mode == PluginMode::Async {
succ!(cx.core.tasks.scheduler.plugin_entry(opt));
succ!(cx.core.tasks.scheduler.plugin_entry(opt.id, opt.args));
} else if opt.mode == PluginMode::Sync && hits {
return act!(app:plugin_do, cx, opt);
}

View file

@ -3,11 +3,11 @@ use mlua::ObjectLike;
use scopeguard::defer;
use tracing::{error, warn};
use yazi_binding::runtime_mut;
use yazi_dds::Sendable;
use yazi_core::app::PluginMode;
use yazi_macro::succ;
use yazi_parser::app::PluginForm;
use yazi_plugin::LUA;
use yazi_runner::{loader::{LOADER, Loader}, plugin::PluginMode};
use yazi_runner::{entry::EntryJob, loader::{LOADER, Loader}};
use yazi_scheduler::NotifyProxy;
use yazi_shared::data::Data;
@ -31,7 +31,7 @@ impl Actor for PluginDo {
}
if opt.mode.auto_then(chunk.sync_entry) != PluginMode::Sync {
succ!(cx.core.tasks.scheduler.plugin_entry(opt));
succ!(cx.core.tasks.scheduler.plugin_entry(opt.id, opt.args));
}
let blocking = runtime_mut!(LUA)?.critical_push(&opt.id, true);
@ -47,8 +47,7 @@ impl Actor for PluginDo {
if let Some(cb) = opt.callback {
cb(&LUA, plugin)
} else {
let job = LUA.create_table_from([("args", Sendable::args_to_table(&LUA, opt.args)?)])?;
plugin.call_method("entry", job)
plugin.call_method("entry", EntryJob { args: opt.args, ..Default::default() })
}
});
if let Err(ref e) = result {

View file

@ -1,9 +1,10 @@
use anyhow::Result;
use mlua::ObjectLike;
use yazi_config::YAZI;
use yazi_core::app::PluginOpt;
use yazi_macro::{act, succ};
use yazi_parser::mgr::SeekForm;
use yazi_runner::{plugin::PluginOpt, previewer::SeekJob};
use yazi_runner::previewer::SeekJob;
use yazi_shared::data::Data;
use crate::{Actor, Ctx};

View file

@ -31,6 +31,7 @@ yazi-widgets = { path = "../yazi-widgets", version = "26.2.2" }
# External dependencies
anyhow = { workspace = true }
crossterm = { workspace = true }
dyn-clone = { workspace = true }
hashbrown = { workspace = true }
indexmap = { workspace = true }
mlua = { workspace = true }

View file

@ -1 +1 @@
yazi_macro::mod_flat!(quit);
yazi_macro::mod_flat!(plugin quit);

View file

@ -2,11 +2,11 @@ use mlua::ObjectLike;
use yazi_binding::{Error, elements::Renderable};
use yazi_config::LAYOUT;
use yazi_macro::{emit, relay};
use yazi_runner::{plugin::PluginOpt, previewer::PeekJob};
use yazi_runner::previewer::PeekJob;
use yazi_scheduler::TaskSummary;
use yazi_shared::url::AsUrl;
use crate::tab::PreviewLock;
use crate::{app::PluginOpt, tab::PreviewLock};
pub struct AppProxy;

View file

@ -53,7 +53,7 @@ pub(super) fn copy_with_progress_impl(
to: PathBuf,
attrs: Attrs,
) -> mpsc::Receiver<Result<u64, io::Error>> {
let (prog_tx, prog_rx) = mpsc::channel(10);
let (prog_tx, prog_rx) = mpsc::channel(20);
let (done_tx, mut done_rx) = oneshot::channel();
tokio::spawn({
@ -63,38 +63,35 @@ pub(super) fn copy_with_progress_impl(
}
});
tokio::spawn({
let prog_tx = prog_tx.clone();
async move {
let mut last = 0;
let mut done = None;
loop {
select! {
res = &mut done_rx => done = Some(res.unwrap()),
_ = prog_tx.closed() => break,
_ = tokio::time::sleep(std::time::Duration::from_secs(3)) => {},
}
tokio::spawn(async move {
let mut last = 0;
let mut done = None;
loop {
select! {
res = &mut done_rx => done = Some(res.unwrap()),
_ = prog_tx.closed() => break,
_ = tokio::time::sleep(std::time::Duration::from_secs(3)) => {},
}
match done {
Some(Ok(len)) => {
if len > last {
prog_tx.send(Ok(len - last)).await.ok();
}
prog_tx.send(Ok(0)).await.ok();
break;
match done {
Some(Ok(len)) => {
if len > last {
prog_tx.send(Ok(len - last)).await.ok();
}
Some(Err(e)) => {
prog_tx.send(Err(e)).await.ok();
break;
}
None => {}
prog_tx.send(Ok(0)).await.ok();
break;
}
Some(Err(e)) => {
prog_tx.send(Err(e)).await.ok();
break;
}
None => {}
}
let len = tokio::fs::symlink_metadata(&to).await.map(|m| m.len()).unwrap_or(0);
if len > last {
prog_tx.send(Ok(len - last)).await.ok();
last = len;
}
let len = tokio::fs::symlink_metadata(&to).await.map(|m| m.len()).unwrap_or(0);
if len > last {
prog_tx.send(Ok(len - last)).await.ok();
last = len;
}
}
});

View file

@ -1,7 +1,7 @@
use std::fmt::Debug;
use mlua::{ExternalError, FromLua, IntoLua, Lua, Value};
use yazi_runner::plugin::PluginOpt;
use yazi_core::app::PluginOpt;
use yazi_shared::event::ActionCow;
#[derive(Clone, Debug, Default)]

View file

@ -1,24 +1,20 @@
use anyhow::bail;
use mlua::{ExternalError, FromLua, IntoLua, Lua, Value};
use serde::Deserialize;
use yazi_shared::{Id, event::ActionCow, url::UrlBuf};
#[derive(Debug)]
#[derive(Debug, Deserialize)]
pub struct UpdateSucceedForm {
#[serde(alias = "0")]
pub id: Id,
pub urls: Vec<UrlBuf>,
#[serde(default)]
pub track: bool,
}
impl TryFrom<ActionCow> for UpdateSucceedForm {
type Error = anyhow::Error;
fn try_from(mut a: ActionCow) -> Result<Self, Self::Error> {
let Some(urls) = a.take_any("urls") else {
bail!("Invalid 'urls' in UpdateSucceedForm");
};
Ok(Self { id: a.first()?, urls, track: a.get("track").unwrap_or_default() })
}
fn try_from(a: ActionCow) -> Result<Self, Self::Error> { Ok(a.deserialize()?) }
}
impl FromLua for UpdateSucceedForm {

View file

@ -3,9 +3,9 @@ use futures::future::join_all;
use mlua::{ExternalError, ExternalResult, Function, IntoLuaMulti, Lua, MultiValue, Table, Value, Variadic};
use tokio::sync::mpsc;
use yazi_binding::{Handle, MpscRx, MpscTx, MpscUnboundedRx, MpscUnboundedTx, OneshotRx, OneshotTx, runtime, runtime_mut};
use yazi_core::AppProxy;
use yazi_core::{AppProxy, app::PluginOpt};
use yazi_dds::Sendable;
use yazi_runner::{loader::LOADER, plugin::PluginOpt};
use yazi_runner::loader::LOADER;
use yazi_shared::{LOCAL_SET, data::Data};
use super::Utils;

View file

@ -1,6 +1,5 @@
use yazi_core::app::QuitOpt;
use yazi_core::app::{PluginOpt, QuitOpt};
use yazi_macro::{emit, relay};
use yazi_runner::plugin::PluginOpt;
pub struct AppProxy;

View file

@ -1,21 +0,0 @@
use mlua::{ExternalResult, ObjectLike};
use tokio::runtime::Handle;
use yazi_dds::Sendable;
use crate::{Runner, loader::LOADER, plugin::PluginOpt};
impl Runner {
pub async fn entry(&'static self, opt: PluginOpt) -> mlua::Result<()> {
LOADER.ensure(&opt.id, |_| ()).await.into_lua_err()?;
tokio::task::spawn_blocking(move || {
let lua = self.spawn(&opt.id)?;
let job = lua.create_table_from([("args", Sendable::args_to_table(&lua, opt.args)?)])?;
Handle::current()
.block_on(async { LOADER.load(&lua, &opt.id).await?.call_async_method("entry", job).await })
})
.await
.into_lua_err()?
}
}

View file

@ -0,0 +1,19 @@
use mlua::{ExternalResult, ObjectLike};
use tokio::runtime::Handle;
use crate::{Runner, entry::EntryJob, loader::LOADER};
impl Runner {
pub async fn entry(&'static self, job: EntryJob) -> mlua::Result<()> {
LOADER.ensure(&job.plugin, |_| ()).await.into_lua_err()?;
tokio::task::spawn_blocking(move || {
let lua = self.spawn(&job.plugin)?;
Handle::current().block_on(async {
LOADER.load(&lua, &job.plugin).await?.call_async_method("entry", job).await
})
})
.await
.into_lua_err()?
}
}

View file

@ -0,0 +1,22 @@
use hashbrown::HashMap;
use mlua::{IntoLua, Lua, Value};
use yazi_dds::Sendable;
use yazi_shared::{Id, SStr, data::{Data, DataKey}};
#[derive(Clone, Debug, Default)]
pub struct EntryJob {
pub id: Id,
pub args: HashMap<DataKey, Data>,
pub plugin: SStr,
}
impl IntoLua for EntryJob {
fn into_lua(self, lua: &Lua) -> mlua::Result<Value> {
lua
.create_table_from([
("id", self.id.get().into_lua(lua)?),
("args", Sendable::args_to_table(lua, self.args)?.into_lua(lua)?),
])?
.into_lua(lua)
}
}

View file

@ -0,0 +1 @@
yazi_macro::mod_flat!(entry job);

View file

@ -1,6 +1,6 @@
yazi_macro::mod_pub!(fetcher loader plugin previewer);
yazi_macro::mod_pub!(entry fetcher loader preloader previewer);
yazi_macro::mod_flat!(entry preload runner spot);
yazi_macro::mod_flat!(runner spot);
pub static RUNNER: yazi_shared::RoCell<Runner> = yazi_shared::RoCell::new();

View file

@ -1 +0,0 @@
yazi_macro::mod_flat!(option);

View file

@ -1,63 +0,0 @@
use mlua::{ExternalError, ExternalResult, HookTriggers, IntoLua, ObjectLike, VmState};
use tokio::{runtime::Handle, select};
use tokio_util::sync::CancellationToken;
use yazi_binding::{Error, File, elements::Rect};
use yazi_config::LAYOUT;
use yazi_dds::Sendable;
use yazi_shared::event::Action;
use crate::{Runner, loader::LOADER};
impl Runner {
pub async fn preload(
&'static self,
action: &'static Action,
file: yazi_fs::File,
ct: CancellationToken,
) -> mlua::Result<(bool, Option<Error>)> {
let ct_ = ct.clone();
tokio::task::spawn_blocking(move || {
let future = async {
LOADER.ensure(&action.name, |_| ()).await.into_lua_err()?;
let lua = self.spawn(&action.name)?;
lua.set_hook(
HookTriggers::new().on_calls().on_returns().every_nth_instruction(2000),
move |_, dbg| {
if ct.is_cancelled() && dbg.source().what != "C" {
Err("Preload task cancelled".into_lua_err())
} else {
Ok(VmState::Continue)
}
},
)?;
let plugin = LOADER.load(&lua, &action.name).await?;
let job = lua.create_table_from([
("area", Rect::from(LAYOUT.get().preview).into_lua(&lua)?),
("args", Sendable::args_to_table_ref(&lua, &action.args)?.into_lua(&lua)?),
("file", File::new(file).into_lua(&lua)?),
("skip", 0.into_lua(&lua)?),
])?;
if ct_.is_cancelled() {
Ok((false, None))
} else {
plugin.call_async_method("preload", job).await
}
};
Handle::current().block_on(async {
select! {
_ = ct_.cancelled() => Ok((false, None)),
r = future => match r {
Err(e) if e.to_string().contains("Preload task cancelled") => Ok((false, None)),
Ok(_) | Err(_) => r,
},
}
})
})
.await
.into_lua_err()?
}
}

View file

@ -0,0 +1,17 @@
use std::sync::Arc;
use thiserror::Error;
#[derive(Clone, Debug, Error)]
pub enum PreloadError {
#[error("Preload task cancelled")]
Cancelled,
#[error("Lua error during preload: {0}")]
Lua(#[from] mlua::Error),
#[error("Unexpected error during preload: {0}")]
Unexpected(Arc<anyhow::Error>),
}
impl From<anyhow::Error> for PreloadError {
fn from(e: anyhow::Error) -> Self { Self::Unexpected(e.into()) }
}

View file

@ -0,0 +1,23 @@
use mlua::{IntoLua, Lua, Value};
use yazi_binding::{File, elements::Rect};
use yazi_config::LAYOUT;
use yazi_dds::Sendable;
use yazi_shared::event::Action;
pub struct PreloadJob {
pub action: &'static Action,
pub file: yazi_fs::File,
}
impl IntoLua for PreloadJob {
fn into_lua(self, lua: &Lua) -> mlua::Result<Value> {
lua
.create_table_from([
("area", Rect::from(LAYOUT.get().preview).into_lua(lua)?),
("args", Sendable::args_to_table_ref(lua, &self.action.args)?.into_lua(lua)?),
("file", File::new(self.file).into_lua(lua)?),
("skip", 0.into_lua(lua)?),
])?
.into_lua(lua)
}
}

View file

@ -0,0 +1 @@
yazi_macro::mod_flat!(error job preloader state);

View file

@ -0,0 +1,64 @@
use mlua::{ExternalError, HookTriggers, ObjectLike, VmState};
use tokio::{runtime::Handle, select, sync::mpsc};
use crate::{Runner, loader::LOADER, preloader::{PreloadError, PreloadJob, PreloadState}};
impl Runner {
pub async fn preload(
&'static self,
job: PreloadJob,
) -> mpsc::Receiver<Result<PreloadState, PreloadError>> {
let (tx, rx) = mpsc::channel(1);
match LOADER.ensure(&job.action.name, |_| ()).await {
Ok(()) => self.preload_do(job, tx),
Err(e) => _ = tx.try_send(Err(e.into())),
};
rx
}
fn preload_do(
&'static self,
job: PreloadJob,
tx: mpsc::Sender<Result<PreloadState, PreloadError>>,
) {
let tx_ = tx.clone();
tokio::task::spawn_blocking(move || {
let future = async {
let lua = self.spawn(&job.action.name)?;
lua.set_hook(
HookTriggers::new().on_calls().on_returns().every_nth_instruction(2000),
move |_, dbg| {
if tx.is_closed() && dbg.source().what != "C" {
Err(PreloadError::Cancelled.into_lua_err())
} else {
Ok(VmState::Continue)
}
},
)?;
let plugin = LOADER.load(&lua, &job.action.name).await?;
if tx_.is_closed() {
Err(PreloadError::Cancelled.into_lua_err())
} else {
plugin.call_async_method("preload", job).await
}
};
Handle::current().block_on(async {
select! {
_ = tx_.closed() => {},
r = future => match r {
Ok(state) => _ = tx_.send(Ok(state)).await,
Err(err) => {
if let Some(e) = err.downcast_ref::<PreloadError>() {
tx_.send(Err(e.clone())).await.ok();
} else {
tx_.send(Err(err.into())).await.ok();
}
},
},
}
})
});
}
}

View file

@ -0,0 +1,14 @@
use mlua::{FromLuaMulti, Lua, MultiValue};
#[derive(Default)]
pub struct PreloadState {
pub complete: bool,
pub error: Option<yazi_binding::Error>,
}
impl FromLuaMulti for PreloadState {
fn from_lua_multi(values: MultiValue, lua: &Lua) -> mlua::Result<Self> {
let (complete, error) = FromLuaMulti::from_lua_multi(values, lua)?;
Ok(Self { complete, error })
}
}

View file

@ -11,8 +11,8 @@ impl From<FetchProg> for TaskSummary {
fn from(value: FetchProg) -> Self {
Self {
total: 1,
success: (value.state == Some(true)) as u32,
failed: (value.state == Some(false)) as u32,
success: value.success() as u32,
failed: value.failed() as u32,
percent: value.percent().map(Into::into),
}
}

View file

@ -9,7 +9,7 @@ use yazi_shared::{path::PathCow, url::{AsUrl, UrlCow, UrlLike}};
use yazi_vfs::{VfsCha, maybe_exists, provider::{self, DirEntry}, unique_file};
use super::{FileInCopy, FileInDelete, FileInHardlink, FileInLink, FileInTrash};
use crate::{LOW, NORMAL, TaskOp, TaskOps, TasksProxy, ctx, file::{FileIn, FileInCut, FileInDownload, FileInUpload, FileOutCopy, FileOutCopyDo, FileOutCut, FileOutCutDo, FileOutDelete, FileOutDeleteDo, FileOutDownload, FileOutDownloadDo, FileOutHardlink, FileOutHardlinkDo, FileOutLink, FileOutTrash, FileOutUpload, FileOutUploadDo, Transaction, Traverse}, hook::{HookInOutCopy, HookInOutCut, HookInOutHardlink, HookInOutLink}, ok_or_not_found, progress_or_break};
use crate::{LOW, NORMAL, TaskOp, TaskOps, TasksProxy, ctx, file::{FileIn, FileInCut, FileInDownload, FileInUpload, FileOutCopy, FileOutCopyDo, FileOutCut, FileOutCutDo, FileOutDelete, FileOutDeleteDo, FileOutDownload, FileOutDownloadDo, FileOutHardlink, FileOutHardlinkDo, FileOutLink, FileOutTrash, FileOutUpload, FileOutUploadDo, Transaction, Traverse}, hook::{HookInOutCopy, HookInOutCut, HookInOutHardlink, HookInOutLink}, ok_or_not_found};
pub(crate) struct File {
ops: TaskOps,
@ -62,11 +62,11 @@ impl File {
pub(crate) async fn copy_do(&self, mut task: FileInCopy) -> Result<(), FileOutCopyDo> {
ok_or_not_found!(task, Transaction::unlink(&task.to).await);
let mut it =
let mut rx =
ctx!(task, provider::copy_with_progress(&task.from, &task.to, task.cha.unwrap()).await)?;
loop {
match progress_or_break!(it, task.done) {
match rx.recv().await.unwrap_or(Ok(0)) {
Ok(0) => break,
Ok(n) => self.ops.out(task.id, FileOutCopyDo::Adv(n)),
Err(e) if e.kind() == NotFound => {
@ -154,11 +154,11 @@ impl File {
pub(crate) async fn cut_do(&self, mut task: FileInCut) -> Result<(), FileOutCutDo> {
ok_or_not_found!(task, Transaction::unlink(&task.to).await);
let mut it =
let mut rx =
ctx!(task, provider::copy_with_progress(&task.from, &task.to, task.cha.unwrap()).await)?;
loop {
match progress_or_break!(it, task.done) {
match rx.recv().await.unwrap_or(Ok(0)) {
Ok(0) => {
provider::remove_file(&task.from).await.ok();
break;
@ -345,9 +345,9 @@ impl File {
let cache = ctx!(task, task.target.cache(), "Cannot determine cache path")?;
let cache_tmp = ctx!(task, Transaction::tmp(&cache).await, "Cannot determine download cache")?;
let mut it = ctx!(task, provider::copy_with_progress(&task.target, &cache_tmp, cha).await)?;
let mut rx = ctx!(task, provider::copy_with_progress(&task.target, &cache_tmp, cha).await)?;
loop {
match progress_or_break!(it, task.done) {
match rx.recv().await.unwrap_or(Ok(0)) {
Ok(0) => {
Local::regular(&cache).remove_dir_all().await.ok();
ctx!(task, provider::rename(cache_tmp, cache).await, "Cannot persist downloaded file")?;
@ -420,7 +420,7 @@ impl File {
let tmp =
ctx!(task, Transaction::tmp(&task.target).await, "Cannot determine temporary upload path")?;
let mut it = ctx!(
let mut rx = ctx!(
task,
provider::copy_with_progress(cache, &tmp, Attrs {
mode: Some(cha.mode),
@ -432,7 +432,7 @@ impl File {
)?;
loop {
match progress_or_break!(it, task.done) {
match rx.recv().await.unwrap_or(Ok(0)) {
Ok(0) => {
let cha =
ctx!(task, Self::cha(&task.target, true, None).await, "Cannot stat original file")?;

View file

@ -2,7 +2,7 @@ use std::{mem, path::PathBuf};
use tokio::sync::mpsc;
use yazi_fs::cha::Cha;
use yazi_shared::{CompletionToken, Id, url::UrlBuf};
use yazi_shared::{Id, url::UrlBuf};
#[derive(Debug)]
pub(crate) enum FileIn {
@ -89,7 +89,6 @@ pub(crate) struct FileInCopy {
pub(crate) cha: Option<Cha>,
pub(crate) follow: bool,
pub(crate) retry: u8,
pub(crate) done: CompletionToken,
}
impl FileInCopy {
@ -117,7 +116,6 @@ pub(crate) struct FileInCut {
pub(crate) cha: Option<Cha>,
pub(crate) follow: bool,
pub(crate) retry: u8,
pub(crate) done: CompletionToken,
pub(crate) drop: Option<mpsc::Sender<()>>,
}
@ -191,7 +189,6 @@ pub(crate) struct FileInDownload {
pub(crate) target: UrlBuf,
pub(crate) cha: Option<Cha>,
pub(crate) retry: u8,
pub(crate) done: CompletionToken,
}
// --- Upload
@ -201,5 +198,4 @@ pub(crate) struct FileInUpload {
pub(crate) target: UrlBuf,
pub(crate) cha: Option<Cha>,
pub(crate) cache: Option<PathBuf>,
pub(crate) done: CompletionToken,
}

View file

@ -119,8 +119,8 @@ impl From<FileProgLink> for TaskSummary {
fn from(value: FileProgLink) -> Self {
Self {
total: 1,
success: (value.state == Some(true)) as u32,
failed: (value.state == Some(false)) as u32,
success: value.success() as u32,
failed: value.failed() as u32,
percent: value.percent().map(Into::into),
}
}
@ -243,8 +243,8 @@ impl From<FileProgTrash> for TaskSummary {
fn from(value: FileProgTrash) -> Self {
Self {
total: 1,
success: (value.state == Some(true)) as u32,
failed: (value.state == Some(false)) as u32,
success: value.success() as u32,
failed: value.failed() as u32,
percent: value.percent().map(Into::into),
}
}

View file

@ -41,7 +41,6 @@ impl Traverse for FileInCopy {
cha: Some(cha),
follow: self.follow,
retry: self.retry,
done: self.done.clone(),
}
}
@ -64,7 +63,6 @@ impl Traverse for FileInCut {
cha: Some(cha),
follow: self.follow,
retry: self.retry,
done: self.done.clone(),
drop: self.drop.clone(),
}
}
@ -115,13 +113,7 @@ impl Traverse for FileInDownload {
fn from(&self) -> Url<'_> { self.target.as_url() }
fn spawn(&self, from: UrlBuf, _to: Option<UrlBuf>, cha: Cha) -> Self {
Self {
id: self.id,
target: from,
cha: Some(cha),
retry: self.retry,
done: self.done.clone(),
}
Self { id: self.id, target: from, cha: Some(cha), retry: self.retry }
}
fn to(&self) -> Option<Url<'_>> { None }
@ -145,13 +137,7 @@ impl Traverse for FileInUpload {
}
fn spawn(&self, from: UrlBuf, _to: Option<UrlBuf>, cha: Cha) -> Self {
Self {
id: self.id,
cha: Some(cha),
cache: from.cache(),
target: from,
done: self.done.clone(),
}
Self { id: self.id, cha: Some(cha), cache: from.cache(), target: from }
}
fn to(&self) -> Option<Url<'_>> { None }

View file

@ -6,11 +6,12 @@ use yazi_dds::Pump;
use yazi_fs::ok_or_not_found;
use yazi_vfs::provider;
use crate::{Ongoing, TaskOp, TaskOps, TasksProxy, file::{FileOutCopy, FileOutCut, FileOutDelete, FileOutDownload, FileOutHardlink, FileOutLink, FileOutTrash, FileOutUpload}, hook::{HookIn, HookInDelete, HookInDownload, HookInOutCopy, HookInOutCut, HookInOutHardlink, HookInOutLink, HookInTrash, HookInUpload}};
use crate::{Ongoing, TaskOp, TaskOps, TasksProxy, file::{FileOutCopy, FileOutCut, FileOutDelete, FileOutDownload, FileOutHardlink, FileOutLink, FileOutTrash, FileOutUpload}, hook::{HookIn, HookInDelete, HookInDownload, HookInOutCopy, HookInOutCut, HookInOutHardlink, HookInOutLink, HookInPreload, HookInTrash, HookInUpload}, preload::{Preload, PreloadOut}};
pub(crate) struct Hook {
ops: TaskOps,
ongoing: Arc<Mutex<Ongoing>>,
preload: Arc<Preload>,
tx: async_priority_channel::Sender<HookIn, u8>,
}
@ -18,9 +19,10 @@ impl Hook {
pub(crate) fn new(
ops: &mpsc::UnboundedSender<TaskOp>,
ongoing: &Arc<Mutex<Ongoing>>,
preload: &Arc<Preload>,
tx: async_priority_channel::Sender<HookIn, u8>,
) -> Self {
Self { ops: ops.into(), ongoing: ongoing.clone(), tx }
Self { ops: ops.into(), ongoing: ongoing.clone(), preload: preload.clone(), tx }
}
// --- File
@ -98,6 +100,15 @@ impl Hook {
}
self.ops.out(task.id, FileOutUpload::Clean);
}
// --- Preload
pub(crate) async fn preload(&self, task: HookInPreload) {
if !self.ongoing.lock().intact(task.id) {
self.preload.loaded.lock().get_mut(&task.hash).map(|x| *x &= !(1 << task.idx));
}
self.ops.out(task.id, PreloadOut::Clean);
}
}
impl Hook {

View file

@ -12,6 +12,7 @@ pub(crate) enum HookIn {
Hardlink(HookInOutHardlink),
Download(HookInDownload),
Upload(HookInUpload),
Preload(HookInPreload),
}
impl_from_in!(
@ -23,6 +24,7 @@ impl_from_in!(
Hardlink(HookInOutHardlink),
Download(HookInDownload),
Upload(HookInUpload),
Preload(HookInPreload),
);
impl HookIn {
@ -36,6 +38,7 @@ impl HookIn {
Self::Hardlink(r#in) => r#in.id,
Self::Download(r#in) => r#in.id,
Self::Upload(r#in) => r#in.id,
Self::Preload(r#in) => r#in.id,
}
}
@ -49,6 +52,7 @@ impl HookIn {
Self::Hardlink(r#in) => Self::Hardlink(HookInOutHardlink { id, ..r#in }),
Self::Download(r#in) => Self::Download(HookInDownload { id, ..r#in }),
Self::Upload(r#in) => Self::Upload(HookInUpload { id, ..r#in }),
Self::Preload(r#in) => Self::Preload(HookInPreload { id, ..r#in }),
}
}
}
@ -210,3 +214,15 @@ impl HookInUpload {
Self { id: Id::ZERO, target: target.into() }
}
}
// --- Preload
#[derive(Debug)]
pub(crate) struct HookInPreload {
pub(crate) id: Id,
pub(crate) idx: u8,
pub(crate) hash: u64,
}
impl HookInPreload {
pub(crate) fn new(idx: u8, hash: u64) -> Self { Self { id: Id::ZERO, idx, hash } }
}

View file

@ -24,21 +24,6 @@ macro_rules! ok_or_not_found {
};
}
#[macro_export]
macro_rules! progress_or_break {
($rx:ident, $done:expr) => {
tokio::select! {
r = $rx.recv() => {
match r {
Some(prog) => prog,
None => break,
}
},
false = $done.future() => break,
}
};
}
#[macro_export]
macro_rules! impl_from_out {
($($variant:ident($type:ty)),* $(,)?) => {

View file

@ -1,4 +1,6 @@
use yazi_runner::plugin::PluginOpt;
use std::ops::{Deref, DerefMut};
use yazi_runner::entry::EntryJob;
use yazi_shared::Id;
#[derive(Debug)]
@ -18,7 +20,14 @@ impl PluginIn {
// --- Entry
#[derive(Debug)]
pub(crate) struct PluginInEntry {
pub(crate) id: Id,
pub(crate) opt: PluginOpt,
pub(crate) struct PluginInEntry(pub(crate) EntryJob);
impl Deref for PluginInEntry {
type Target = EntryJob;
fn deref(&self) -> &Self::Target { &self.0 }
}
impl DerefMut for PluginInEntry {
fn deref_mut(&mut self) -> &mut Self::Target { &mut self.0 }
}

View file

@ -18,8 +18,10 @@ impl Plugin {
}
pub(crate) async fn entry(&self, task: PluginInEntry) -> Result<(), PluginOutEntry> {
RUNNER.entry(task.opt).await?;
Ok(self.ops.out(task.id, PluginOutEntry::Succ))
let id = task.id;
RUNNER.entry(task.0).await?;
Ok(self.ops.out(id, PluginOutEntry::Succ))
}
}

View file

@ -12,8 +12,8 @@ impl From<PluginProgEntry> for TaskSummary {
fn from(value: PluginProgEntry) -> Self {
Self {
total: 1,
success: (value.state == Some(true)) as u32,
failed: (value.state == Some(false)) as u32,
success: value.success() as u32,
failed: value.failed() as u32,
percent: value.percent().map(Into::into),
}
}

View file

@ -7,7 +7,3 @@ pub(crate) struct PreloadIn {
pub(crate) plugin: &'static Preloader,
pub(crate) target: yazi_fs::File,
}
impl PreloadIn {
pub(crate) fn id(&self) -> Id { self.id }
}

View file

@ -1,13 +1,16 @@
use yazi_runner::preloader::PreloadError;
use crate::{Task, TaskProg};
#[derive(Debug)]
pub(crate) enum PreloadOut {
Succ,
Fail(String),
Clean,
}
impl From<mlua::Error> for PreloadOut {
fn from(value: mlua::Error) -> Self { Self::Fail(value.to_string()) }
impl From<PreloadError> for PreloadOut {
fn from(value: PreloadError) -> Self { Self::Fail(value.to_string()) }
}
impl PreloadOut {
@ -21,6 +24,9 @@ impl PreloadOut {
prog.state = Some(false);
task.log(reason);
}
Self::Clean => {
prog.cleaned = Some(true);
}
}
}
}

View file

@ -4,11 +4,10 @@ use anyhow::Result;
use lru::LruCache;
use parking_lot::Mutex;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tracing::error;
use yazi_config::Priority;
use yazi_fs::FsHash64;
use yazi_runner::RUNNER;
use yazi_runner::{RUNNER, preloader::{PreloadError, PreloadJob}};
use crate::{HIGH, LOW, NORMAL, TaskOp, TaskOps, preload::{PreloadIn, PreloadOut}};
@ -17,7 +16,7 @@ pub struct Preload {
tx: async_priority_channel::Sender<PreloadIn, u8>,
pub loaded: Mutex<LruCache<u64, u16>>,
pub loading: Mutex<LruCache<u64, CancellationToken>>,
pub loading: Mutex<LruCache<u64, yazi_shared::Id>>,
}
impl Preload {
@ -35,18 +34,19 @@ impl Preload {
}
pub(crate) async fn preload(&self, task: PreloadIn) -> Result<(), PreloadOut> {
let ct = CancellationToken::new();
if let Some(ct) = self.loading.lock().put(task.target.url.hash_u64(), ct.clone()) {
ct.cancel();
}
let hash = task.target.hash_u64();
let (ok, err) = RUNNER.preload(&task.plugin.run, task.target, ct).await?;
if !ok {
let mut rx = RUNNER.preload(PreloadJob { action: &task.plugin.run, file: task.target }).await;
let state = match rx.recv().await.unwrap_or(Err(PreloadError::Cancelled)) {
Ok(state) => state,
Err(PreloadError::Cancelled) => Default::default(),
e @ Err(_) => e?,
};
if !state.complete {
self.loaded.lock().get_mut(&hash).map(|x| *x &= !(1 << task.plugin.idx));
}
if let Some(e) = err {
if let Some(e) = state.error {
error!("Error when running preloader `{}`:\n{e}", task.plugin.run.name);
}

View file

@ -4,15 +4,16 @@ use crate::TaskSummary;
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize)]
pub struct PreloadProg {
pub state: Option<bool>,
pub state: Option<bool>,
pub cleaned: Option<bool>,
}
impl From<PreloadProg> for TaskSummary {
fn from(value: PreloadProg) -> Self {
Self {
total: 1,
success: (value.state == Some(true)) as u32,
failed: (value.state == Some(false)) as u32,
success: value.success() as u32,
failed: value.failed() as u32,
percent: value.percent().map(Into::into),
}
}
@ -21,13 +22,13 @@ impl From<PreloadProg> for TaskSummary {
impl PreloadProg {
pub fn cooked(self) -> bool { self.state == Some(true) }
pub fn running(self) -> bool { self.state.is_none() }
pub fn running(self) -> bool { self.state.is_none() || (self.cleaned.is_none() && self.cooked()) }
pub fn success(self) -> bool { self.cooked() }
pub fn success(self) -> bool { self.cleaned == Some(true) && self.cooked() }
pub fn failed(self) -> bool { self.state == Some(false) }
pub fn failed(self) -> bool { self.cleaned == Some(false) || self.state == Some(false) }
pub fn cleaned(self) -> Option<bool> { None }
pub fn cleaned(self) -> Option<bool> { self.cleaned }
pub fn percent(self) -> Option<f32> { None }
}

View file

@ -11,9 +11,9 @@ pub struct ProcessProgBlock {
impl From<ProcessProgBlock> for TaskSummary {
fn from(value: ProcessProgBlock) -> Self {
Self {
total: (value.state == Some(false)) as u32,
total: value.failed() as u32,
success: 0,
failed: (value.state == Some(false)) as u32,
failed: value.failed() as u32,
percent: value.percent().map(Into::into),
}
}
@ -42,9 +42,9 @@ pub struct ProcessProgOrphan {
impl From<ProcessProgOrphan> for TaskSummary {
fn from(value: ProcessProgOrphan) -> Self {
Self {
total: (value.state == Some(false)) as u32,
total: value.failed() as u32,
success: 0,
failed: (value.state == Some(false)) as u32,
failed: value.failed() as u32,
percent: value.percent().map(Into::into),
}
}
@ -74,8 +74,8 @@ impl From<ProcessProgBg> for TaskSummary {
fn from(value: ProcessProgBg) -> Self {
Self {
total: 1,
success: (value.state == Some(true)) as u32,
failed: (value.state == Some(false)) as u32,
success: value.success() as u32,
failed: value.failed() as u32,
percent: value.percent().map(Into::into),
}
}

View file

@ -27,8 +27,11 @@ impl TasksProxy {
I: IntoIterator,
I::Item: Into<UrlBuf>,
{
let urls: Vec<_> = urls.into_iter().map(Into::into).collect();
emit!(Call(relay!(tasks:update_succeed, [id]).with_any("urls", urls).with("track", track)));
emit!(Call(
relay!(tasks:update_succeed, [id])
.with_list("urls", urls.into_iter().map(Into::into))
.with("track", track)
));
}
}

View file

@ -1,11 +1,13 @@
use std::{ops::Deref, sync::Arc, time::Duration};
use hashbrown::HashMap;
use tokio::task::JoinHandle;
use yazi_config::{YAZI, plugin::{Fetcher, Preloader}};
use yazi_runner::plugin::PluginOpt;
use yazi_shared::{CompletionToken, Id, Throttle, url::{UrlBuf, UrlLike}};
use yazi_fs::FsHash64;
use yazi_runner::entry::EntryJob;
use yazi_shared::{CompletionToken, Id, SStr, Throttle, data::{Data, DataKey}, url::{UrlBuf, UrlLike}};
use crate::{Behavior, HIGH, LOW, NORMAL, Task, TaskProg, Worker, fetch::{FetchIn, FetchProg}, file::{FileInCopy, FileInCut, FileInDelete, FileInDownload, FileInHardlink, FileInLink, FileInTrash, FileInUpload, FileOutCopy, FileOutCut, FileOutDownload, FileOutHardlink, FileOutUpload, FileProgCopy, FileProgCut, FileProgDelete, FileProgDownload, FileProgHardlink, FileProgLink, FileProgTrash, FileProgUpload}, hook::{HookIn, HookInDelete, HookInDownload, HookInTrash, HookInUpload}, plugin::{PluginInEntry, PluginProgEntry}, preload::{PreloadIn, PreloadProg}, process::{ProcessInBg, ProcessInBlock, ProcessInOrphan, ProcessOpt, ProcessProgBg, ProcessProgBlock, ProcessProgOrphan}, size::{SizeIn, SizeProg}};
use crate::{Behavior, HIGH, LOW, NORMAL, Task, TaskProg, Worker, fetch::{FetchIn, FetchProg}, file::{FileInCopy, FileInCut, FileInDelete, FileInDownload, FileInHardlink, FileInLink, FileInTrash, FileInUpload, FileOutCopy, FileOutCut, FileOutDownload, FileOutHardlink, FileOutUpload, FileProgCopy, FileProgCut, FileProgDelete, FileProgDownload, FileProgHardlink, FileProgLink, FileProgTrash, FileProgUpload}, hook::{HookIn, HookInDelete, HookInDownload, HookInPreload, HookInTrash, HookInUpload}, plugin::{PluginInEntry, PluginProgEntry}, preload::{PreloadIn, PreloadProg}, process::{ProcessInBg, ProcessInBlock, ProcessInOrphan, ProcessOpt, ProcessProgBg, ProcessProgBlock, ProcessProgOrphan}, size::{SizeIn, SizeProg}};
pub struct Scheduler {
pub worker: Worker,
@ -67,29 +69,28 @@ impl Scheduler {
pub fn file_cut(&self, from: UrlBuf, to: UrlBuf, force: bool) {
let name = format!("Cut {} to {}", from.display(), to.display());
let (id, done) = self.add::<FileProgCut, _>(name, |t| (t.id, t.done.clone()));
let id = self.add::<FileProgCut, _>(name, |t| t.id);
if to.try_starts_with(&from).unwrap_or(false) && !to.covariant(&from) {
return self.ops.out(id, FileOutCut::Fail("Cannot cut directory into itself".to_owned()));
}
let follow = !from.scheme().covariant(to.scheme());
self.file.submit(
FileInCut { id, from, to, force, cha: None, follow, retry: 0, drop: None, done },
LOW,
);
self
.file
.submit(FileInCut { id, from, to, force, cha: None, follow, retry: 0, drop: None }, LOW);
}
pub fn file_copy(&self, from: UrlBuf, to: UrlBuf, force: bool, follow: bool) {
let name = format!("Copy {} to {}", from.display(), to.display());
let (id, done) = self.add::<FileProgCopy, _>(name, |t| (t.id, t.done.clone()));
let id = self.add::<FileProgCopy, _>(name, |t| t.id);
if to.try_starts_with(&from).unwrap_or(false) && !to.covariant(&from) {
return self.ops.out(id, FileOutCopy::Fail("Cannot copy directory into itself".to_owned()));
}
let follow = follow || !from.scheme().covariant(to.scheme());
self.file.submit(FileInCopy { id, from, to, force, cha: None, follow, retry: 0, done }, LOW);
self.file.submit(FileInCopy { id, from, to, force, cha: None, follow, retry: 0 }, LOW);
}
pub fn file_link(&self, from: UrlBuf, to: UrlBuf, relative: bool, force: bool) {
@ -150,7 +151,7 @@ impl Scheduler {
return done;
}
self.file.submit(FileInDownload { id, target, cha: None, retry: 0, done: done.clone() }, LOW);
self.file.submit(FileInDownload { id, target, cha: None, retry: 0 }, LOW);
done
}
@ -158,20 +159,20 @@ impl Scheduler {
let name = format!("Upload {}", target.display());
let hook = HookInUpload::new(&target);
let (id, done) = self.add_hooked::<FileProgUpload, _>(name, hook, |t| (t.id, t.done.clone()));
let id = self.add_hooked::<FileProgUpload, _>(name, hook, |t| t.id);
if !target.kind().is_remote() {
return self.ops.out(id, FileOutUpload::Fail("Cannot upload non-remote file".to_owned()));
}
self.file.submit(FileInUpload { id, target, cha: None, cache: None, done }, LOW);
self.file.submit(FileInUpload { id, target, cha: None, cache: None }, LOW);
}
pub fn plugin_entry(&self, opt: PluginOpt) {
let name = format!("Run micro plugin `{}`", opt.id);
pub fn plugin_entry(&self, plugin: SStr, args: HashMap<DataKey, Data>) {
let name = format!("Run micro plugin `{plugin}`");
let id = self.add::<PluginProgEntry, _>(name, |t| t.id);
self.plugin.submit(PluginInEntry { id, opt }, NORMAL);
self.plugin.submit(PluginInEntry(EntryJob { id, args, plugin }), NORMAL);
}
pub fn fetch_paged(
@ -202,7 +203,12 @@ impl Scheduler {
pub fn preload_paged(&self, preloader: &'static Preloader, target: &yazi_fs::File) {
let name = format!("Run preloader `{}`", preloader.run.name);
let id = self.add::<PreloadProg, _>(name, |t| t.id);
let hook = HookInPreload::new(preloader.idx, target.hash_u64());
let id = self.add_hooked::<PreloadProg, _>(name, hook, |t| t.id);
if let Some(prev) = self.preload.loading.lock().put(target.url.hash_u64(), id) {
self.cancel(prev);
}
self.preload.submit(PreloadIn { id, plugin: preloader, target: target.clone() });
}

View file

@ -30,20 +30,18 @@ impl Worker {
let (process_tx, process_rx) = async_priority_channel::unbounded();
let (hook_tx, hook_rx) = async_priority_channel::unbounded();
let (op_tx, op_rx) = mpsc::unbounded_channel();
let ongoing = Arc::new(Mutex::new(Ongoing::default()));
let file = Arc::new(File::new(&op_tx, file_tx));
let plugin = Arc::new(Plugin::new(&op_tx, plugin_tx));
let fetch = Arc::new(Fetch::new(&op_tx, fetch_tx));
let preload = Arc::new(Preload::new(&op_tx, preload_tx));
let size = Arc::new(Size::new(&op_tx, size_tx));
let process = Arc::new(Process::new(&op_tx, process_tx));
let hook = Arc::new(Hook::new(&op_tx, &ongoing, &preload, hook_tx));
let me = Self {
file: Arc::new(File::new(&op_tx, file_tx)),
plugin: Arc::new(Plugin::new(&op_tx, plugin_tx)),
fetch: Arc::new(Fetch::new(&op_tx, fetch_tx)),
preload: Arc::new(Preload::new(&op_tx, preload_tx)),
size: Arc::new(Size::new(&op_tx, size_tx)),
process: Arc::new(Process::new(&op_tx, process_tx)),
hook: Arc::new(Hook::new(&op_tx, &ongoing, hook_tx)),
ops: TaskOps(op_tx),
ongoing,
};
let me =
Self { file, plugin, fetch, preload, size, process, hook, ops: TaskOps(op_tx), ongoing };
let handles = []
.into_iter()
@ -165,7 +163,7 @@ impl Worker {
tokio::spawn(async move {
loop {
if let Ok((r#in, _)) = rx.recv().await {
let id = r#in.id();
let id = r#in.id;
let Some(token) = me.ongoing.lock().get_token(id) else {
continue;
};
@ -270,6 +268,7 @@ impl Worker {
HookIn::Hardlink(r#in) => self.hook.hardlink(r#in).await,
HookIn::Download(r#in) => self.hook.download(r#in).await,
HookIn::Upload(r#in) => self.hook.upload(r#in).await,
HookIn::Preload(r#in) => self.hook.preload(r#in).await,
}
}

View file

@ -81,6 +81,15 @@ impl From<&str> for Data {
fn from(value: &str) -> Self { Self::String(Cow::Owned(value.to_owned())) }
}
impl<T> FromIterator<T> for Data
where
T: Into<Data>,
{
fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
Self::List(iter.into_iter().map(Into::into).collect())
}
}
impl TryFrom<&Data> for bool {
type Error = anyhow::Error;

View file

@ -65,6 +65,20 @@ impl Action {
self
}
pub fn with_list<I>(mut self, name: impl Into<DataKey>, values: I) -> Self
where
I: IntoIterator,
I::Item: Into<Data>,
{
self.args.insert(name.into(), values.into_iter().map(Into::into).collect());
self
}
pub fn with_any(mut self, name: impl Into<DataKey>, data: impl DataAny) -> Self {
self.args.insert(name.into(), Data::Any(Box::new(data)));
self
}
pub fn with_opt(mut self, name: impl Into<DataKey>, value: Option<impl Into<Data>>) -> Self {
if let Some(value) = value {
self.args.insert(name.into(), value.into());
@ -83,11 +97,6 @@ impl Action {
self
}
pub fn with_any(mut self, name: impl Into<DataKey>, data: impl DataAny) -> Self {
self.args.insert(name.into(), Data::Any(Box::new(data)));
self
}
pub fn with_replier(mut self, tx: Replier) -> Self {
self.args.insert("replier".into(), Data::Any(Box::new(tx)));
self

View file

@ -2,10 +2,10 @@ use std::{io::{self, SeekFrom}, sync::{Arc, atomic::{AtomicU64, Ordering}}};
use futures::{StreamExt, TryStreamExt};
use tokio::{io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, BufWriter}, select, sync::{mpsc, oneshot}};
use yazi_fs::provider::{Attrs, FileBuilder};
use yazi_fs::{cha::Cha, provider::{Attrs, FileBuilder}};
use yazi_shared::url::{Url, UrlBuf};
use crate::provider::{self, Gate};
use crate::provider::{self, Gate, RwFile};
const BUF_SIZE: usize = 512 * 1024;
const PER_CHUNK: u64 = 8 * 1024 * 1024;
@ -29,120 +29,148 @@ pub(super) fn copy_with_progress_impl(
to: UrlBuf,
attrs: Attrs,
) -> mpsc::Receiver<io::Result<u64>> {
let acc = Arc::new(AtomicU64::new(0));
let (from, to) = (Arc::new(from), Arc::new(to));
let (prog_tx, prog_rx) = mpsc::channel(10);
let (done_tx, mut done_rx) = oneshot::channel();
let (copier, rx) = ProgressiveCopier::new(from, to, attrs);
copier.spawn();
rx
}
let (acc_, prog_tx_) = (acc.clone(), prog_tx.clone());
tokio::spawn(async move {
let init = async {
let src = provider::open(&*from).await?;
let cha = src.metadata().await?;
// --- ProgressiveCopier
struct ProgressiveCopier {
from: UrlBuf,
to: UrlBuf,
attrs: Attrs,
let dist = provider::create(&*to).await?;
dist.set_len(cha.len).await?;
Ok((cha, Some(src), Some(dist)))
};
acc: AtomicU64,
prog_tx: mpsc::Sender<io::Result<u64>>,
}
let (cha, mut src, mut dist) = match init.await {
Ok(r) => r,
Err(e) => {
prog_tx_.send(Err(e)).await.ok();
done_tx.send(()).ok();
return;
impl ProgressiveCopier {
fn new(from: UrlBuf, to: UrlBuf, attrs: Attrs) -> (Arc<Self>, mpsc::Receiver<io::Result<u64>>) {
let acc = AtomicU64::new(0);
let (prog_tx, prog_rx) = mpsc::channel(20);
(Arc::new(Self { from, to, attrs, acc, prog_tx }), prog_rx)
}
fn spawn(self: Arc<Self>) {
let (done_tx, done_rx) = oneshot::channel();
tokio::spawn(self.clone().watch(done_rx));
tokio::spawn(async move {
if let Err(e) = self.work().await {
self.prog_tx.send(Err(e)).await.ok();
}
};
done_tx.send(()).ok();
});
}
async fn init(&self) -> io::Result<(Cha, RwFile, RwFile)> {
let src = provider::open(&self.from).await?;
let cha = src.metadata().await?;
let dist = provider::create(&self.to).await?;
dist.set_len(cha.len).await?;
Ok((cha, src, dist))
}
async fn work(&self) -> io::Result<()> {
let (cha, src, dist) = self.init().await?;
let (mut src, mut dist) = (Some(src), Some(dist));
let chunks = cha.len.div_ceil(PER_CHUNK);
let it = futures::stream::iter(0..chunks)
.map(|i| {
let acc_ = acc_.clone();
let (from, to) = (from.clone(), to.clone());
let (src, dist) = (src.take(), dist.take());
async move {
let offset = i * PER_CHUNK;
let take = cha.len.saturating_sub(offset).min(PER_CHUNK);
let mut src = BufReader::with_capacity(BUF_SIZE, match src {
Some(f) => f,
None => provider::open(&*from).await?,
});
let mut dist = BufWriter::with_capacity(BUF_SIZE, match dist {
Some(f) => f,
None => Gate::default().write(true).open(&*to).await?,
});
src.seek(SeekFrom::Start(offset)).await?;
dist.seek(SeekFrom::Start(offset)).await?;
let mut src = src.take(take);
let mut buf = vec![0u8; 65536];
let mut copied = 0u64;
loop {
let n = src.read(&mut buf).await?;
if n == 0 {
break;
}
dist.write_all(&buf[..n]).await?;
copied += n as u64;
acc_.fetch_add(n as u64, Ordering::SeqCst);
}
dist.flush().await?;
if copied != take {
Err(io::Error::other(format!(
"short copy for chunk {i}: copied {copied} bytes, expected {take}"
)))
} else if i == chunks - 1 {
Ok(Some(dist.into_inner()))
} else {
dist.shutdown().await.ok();
Ok(None)
}
}
})
.map(|i| self.map(i, cha, chunks, src.take(), dist.take()))
.buffer_unordered(4)
.try_fold(None, |first, file| async { Ok(first.or(file)) });
let mut result = select! {
r = it => r,
_ = prog_tx_.closed() => return,
_ = self.prog_tx.closed() => return Ok(()),
};
done_tx.send(()).ok();
let n = acc_.swap(0, Ordering::SeqCst);
let n = self.acc.swap(0, Ordering::SeqCst);
if n > 0 {
prog_tx_.send(Ok(n)).await.ok();
self.prog_tx.send(Ok(n)).await.ok();
}
if let Ok(None) = &mut result {
result = Ok(dist.take());
}
if let Ok(Some(file)) = &mut result {
file.set_attrs(attrs).await.ok();
file.set_attrs(self.attrs).await.ok();
file.shutdown().await.ok();
}
if let Err(e) = result {
prog_tx_.send(Err(e)).await.ok();
self.prog_tx.send(Err(e)).await.ok();
} else {
prog_tx_.send(Ok(0)).await.ok();
self.prog_tx.send(Ok(0)).await.ok();
}
});
Ok(())
}
tokio::spawn(async move {
async fn map(
&self,
i: u64,
cha: Cha,
chunks: u64,
src: Option<RwFile>,
dist: Option<RwFile>,
) -> io::Result<Option<RwFile>> {
let offset = i * PER_CHUNK;
let take = cha.len.saturating_sub(offset).min(PER_CHUNK);
let mut src = BufReader::with_capacity(BUF_SIZE, match src {
Some(f) => f,
None => provider::open(&self.from).await?,
});
let mut dist = BufWriter::with_capacity(BUF_SIZE, match dist {
Some(f) => f,
None => Gate::default().write(true).open(&self.to).await?,
});
src.seek(SeekFrom::Start(offset)).await?;
dist.seek(SeekFrom::Start(offset)).await?;
let mut src = src.take(take);
let mut buf = vec![0u8; 65536];
let mut copied = 0u64;
loop {
let n = src.read(&mut buf).await?;
if n == 0 {
break;
}
dist.write_all(&buf[..n]).await?;
copied += n as u64;
self.acc.fetch_add(n as u64, Ordering::SeqCst);
}
dist.flush().await?;
if copied != take {
Err(io::Error::other(format!(
"short copy for chunk {i}: copied {copied} bytes, expected {take}"
)))
} else if i == chunks - 1 {
Ok(Some(dist.into_inner()))
} else {
dist.shutdown().await.ok();
Ok(None)
}
}
async fn watch(self: Arc<Self>, mut done_rx: oneshot::Receiver<()>) {
loop {
select! {
_ = &mut done_rx => break,
_ = prog_tx.closed() => break,
_ = self.prog_tx.closed() => break,
_ = tokio::time::sleep(std::time::Duration::from_secs(3)) => {},
}
let n = acc.swap(0, Ordering::SeqCst);
let n = self.acc.swap(0, Ordering::SeqCst);
if n > 0 {
prog_tx.send(Ok(n)).await.ok();
self.prog_tx.send(Ok(n)).await.ok();
}
}
});
prog_rx
}
}