diff --git a/Cargo.toml b/Cargo.toml index fa22dcd7b..e0f78c29a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,7 +47,7 @@ screencapturekit = ["cpal/screencapturekit"] [dependencies] async-trait = "0.1" scrap = { path = "libs/scrap", features = ["wayland"] } -hbb_common = { path = "libs/hbb_common" } +hbb_common = { path = "libs/hbb_common", features = ["webrtc"] } serde_derive = "1.0" serde = "1.0" serde_json = "1.0" diff --git a/libs/hbb_common b/libs/hbb_common index c8cbb6be2..5a78ec423 160000 --- a/libs/hbb_common +++ b/libs/hbb_common @@ -1 +1 @@ -Subproject commit c8cbb6be283e9215da87625016fe8838dda76c02 +Subproject commit 5a78ec42303ca046d808742e911a156636a2432b diff --git a/src/cli.rs b/src/cli.rs index 2f3b3550f..351a580f1 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -3,7 +3,7 @@ use async_trait::async_trait; use hbb_common::{ config::PeerConfig, config::READ_TIMEOUT, - futures::{SinkExt, StreamExt}, + futures::StreamExt, log, message_proto::*, protobuf::Message as _, @@ -46,6 +46,7 @@ impl Session { false, None, None, + None, ); session } @@ -53,7 +54,7 @@ impl Session { #[async_trait] impl Interface for Session { - fn get_login_config_handler(&self) -> Arc> { + fn get_lch(&self) -> Arc> { return self.lc.clone(); } @@ -61,14 +62,20 @@ impl Interface for Session { match msgtype { "input-password" => { self.sender - .send(Data::Login((self.password.clone(), true))) + .send(Data::Login(( + String::new(), + String::new(), + self.password.clone(), + true, + ))) .ok(); } "re-input-password" => { log::error!("{}: {}", title, text); match rpassword::prompt_password("Enter password: ") { Ok(password) => { - let login_data = Data::Login((password, true)); + let login_data = + Data::Login((String::new(), String::new(), password, true)); self.sender.send(login_data).ok(); } Err(e) => { @@ -93,6 +100,8 @@ impl Interface for Session { self.lc.write().unwrap().handle_peer_info(&pi); } + fn set_multiple_windows_session(&self, _sessions: Vec) {} + async fn handle_hash(&self, pass: &str, hash: Hash, peer: &mut Stream) { log::info!( "password={}", @@ -137,8 +146,8 @@ pub async fn connect_test(id: &str, key: String, token: String) { Err(err) => { log::error!("Failed to connect {}: {}", &id, err); } - Ok((mut stream, direct)) => { - log::info!("direct: {}", direct); + Ok(((mut stream, _direct, _secure, _kcp, _typ), direct)) => { + log::info!("direct: {:?}", direct); // rpassword::prompt_password("Input anything to exit").ok(); loop { tokio::select! { diff --git a/src/client.rs b/src/client.rs index 321a49ee6..fd4a5e881 100644 --- a/src/client.rs +++ b/src/client.rs @@ -65,11 +65,12 @@ use hbb_common::{ self, net::UdpSocket, sync::{ - mpsc::{unbounded_channel, UnboundedReceiver}, + mpsc::{error::TryRecvError, unbounded_channel, UnboundedReceiver}, oneshot, }, time::{interval, Duration, Instant}, }, + webrtc::WebRTCStream, AddrMangle, ResultType, Stream, }; pub use helper::*; @@ -330,6 +331,19 @@ impl Client { } else { (None, None) }; + let ipv6 = if crate::get_ipv6_punch_enabled() { + crate::get_ipv6_socket().await + } else { + None + }; + let webrtc_offerer = + match WebRTCStream::new("", interface.is_force_relay(), CONNECT_TIMEOUT).await { + Ok(stream) => Some(stream), + Err(err) => { + log::warn!("webrtc offerer setup failed: {}", err); + None + } + }; let fut = Self::_start_inner( peer.to_owned(), key.to_owned(), @@ -338,6 +352,8 @@ impl Client { interface.clone(), udp.clone(), Some(stop_udp_tx), + ipv6, + webrtc_offerer, rendezvous_server.clone(), servers.clone(), contained, @@ -355,6 +371,8 @@ impl Client { interface, (None, None), None, + None, + None, rendezvous_server, servers, contained, @@ -366,6 +384,67 @@ impl Client { } } + fn is_expected_webrtc_ice_candidate(ice: &IceCandidate, session_key: &str) -> bool { + !session_key.is_empty() && ice.session_key == session_key && !ice.candidate.is_empty() + } + + fn spawn_webrtc_ice_bridge( + mut socket: Stream, + mut local_ice_rx: Option>, + webrtc: WebRTCStream, + peer: String, + session_key: String, + ) -> oneshot::Sender<()> { + let (stop_tx, mut stop_rx) = oneshot::channel::<()>(); + tokio::spawn(async move { + loop { + match stop_rx.try_recv() { + Ok(_) | Err(tokio::sync::oneshot::error::TryRecvError::Closed) => break, + Err(tokio::sync::oneshot::error::TryRecvError::Empty) => {} + } + + if let Some(rx) = local_ice_rx.as_mut() { + loop { + match rx.try_recv() { + Ok(candidate) => { + let mut msg = RendezvousMessage::new(); + msg.set_ice_candidate(IceCandidate { + id: peer.clone(), + session_key: session_key.clone(), + candidate, + ..Default::default() + }); + if let Err(err) = socket.send(&msg).await { + log::warn!("failed to send WebRTC ICE candidate: {}", err); + return; + } + } + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => { + local_ice_rx = None; + break; + } + } + } + } + + if let Some(msg_in) = + crate::get_next_nonkeyexchange_msg(&mut socket, Some(100)).await + { + if let Some(rendezvous_message::Union::IceCandidate(ice)) = msg_in.union { + if Self::is_expected_webrtc_ice_candidate(&ice, &session_key) { + if let Err(err) = webrtc.add_remote_ice_candidate(&ice.candidate).await + { + log::warn!("failed to add WebRTC ICE candidate: {}", err); + } + } + } + } + } + }); + stop_tx + } + async fn _start_inner( peer: String, key: String, @@ -374,6 +453,8 @@ impl Client { interface: impl Interface, mut udp: (Option>, Option>>), stop_udp_tx: Option>, + mut ipv6: Option<(Arc, bytes::Bytes)>, + mut webrtc_offerer: Option, mut rendezvous_server: String, servers: Vec, contained: bool, @@ -446,14 +527,20 @@ impl Client { // Stop UDP NAT test task if still running stop_udp_tx.map(|tx| tx.send(())); let mut msg_out = RendezvousMessage::new(); - let mut ipv6 = if crate::get_ipv6_punch_enabled() { - if let Some((socket, addr)) = crate::get_ipv6_socket().await { - (Some(socket), Some(addr)) - } else { - (None, None) + let mut ipv6 = ipv6 + .take() + .map(|(socket, addr)| (Some(socket), Some(addr))) + .unwrap_or((None, None)); + let webrtc_sdp_offer = if let Some(webrtc) = webrtc_offerer.as_ref() { + match webrtc.get_local_endpoint().await { + Ok(endpoint) => endpoint, + Err(err) => { + log::warn!("failed to read local WebRTC offer: {}", err); + String::new() + } } } else { - (None, None) + String::new() }; let udp_nat_port = udp.1.map(|x| *x.lock().unwrap()).unwrap_or(0); let punch_type = if udp_nat_port > 0 { "UDP" } else { "TCP" }; @@ -467,9 +554,16 @@ impl Client { udp_port: udp_nat_port as _, force_relay: interface.is_force_relay(), socket_addr_v6: ipv6.1.unwrap_or_default(), + webrtc_sdp_offer: webrtc_sdp_offer.clone(), ..Default::default() }); - for i in 1..=3 { + let webrtc_session_key = webrtc_offerer + .as_ref() + .map(|webrtc| webrtc.session_key().to_owned()) + .unwrap_or_default(); + let mut webrtc_sdp_answer = String::new(); + let mut pending_webrtc_ice = Vec::::new(); + 'punch_attempts: for i in 1..=3 { log::info!( "#{} {} punch attempt with {}, id: {}", i, @@ -479,9 +573,20 @@ impl Client { ); socket.send(&msg_out).await?; // below timeout should not bigger than hbbs's connection timeout. - if let Some(msg_in) = - crate::get_next_nonkeyexchange_msg(&mut socket, Some(i * 3000)).await - { + let attempt_deadline = Instant::now() + Duration::from_millis((i * 3000) as u64); + loop { + let remaining = attempt_deadline.saturating_duration_since(Instant::now()); + if remaining.is_zero() { + break; + } + let timeout_ms = remaining + .as_millis() + .clamp(1, u64::MAX as u128) as u64; + let Some(msg_in) = + crate::get_next_nonkeyexchange_msg(&mut socket, Some(timeout_ms)).await + else { + break; + }; match msg_in.union { Some(rendezvous_message::Union::PunchHoleResponse(ph)) => { if ph.socket_addr.is_empty() { @@ -510,6 +615,7 @@ impl Client { relay_server = ph.relay_server; peer_addr = AddrMangle::decode(&ph.socket_addr); feedback = ph.feedback; + webrtc_sdp_answer = ph.webrtc_sdp_answer; let s = udp.0.take(); if ph.is_udp && s.is_some() { if let Some(s) = s { @@ -528,7 +634,7 @@ impl Client { } } log::info!("{} Hole Punched {} = {}", punch_type, peer, peer_addr); - break; + break 'punch_attempts; } } Some(rendezvous_message::Union::RelayResponse(rr)) => { @@ -549,6 +655,38 @@ impl Client { } } signed_id_pk = rr.pk().into(); + let mut webrtc_bridge_stop = None; + let mut webrtc_for_connect = None; + if !rr.webrtc_sdp_answer.is_empty() { + if let Some(webrtc) = webrtc_offerer.take() { + if let Err(err) = + webrtc.set_remote_endpoint(&rr.webrtc_sdp_answer).await + { + log::warn!("failed to set WebRTC relay answer: {}", err); + } else { + for candidate in pending_webrtc_ice.drain(..) { + if let Err(err) = + webrtc.add_remote_ice_candidate(&candidate).await + { + log::warn!( + "failed to add buffered WebRTC ICE candidate: {}", + err + ); + } + } + let session_key = webrtc.session_key().to_owned(); + let local_ice_rx = webrtc.take_local_ice_rx(); + webrtc_bridge_stop = Some(Self::spawn_webrtc_ice_bridge( + socket, + local_ice_rx, + webrtc.clone(), + peer.clone(), + session_key, + )); + webrtc_for_connect = Some(webrtc); + } + } + } let fut = Self::create_relay( &peer, rr.uuid, @@ -564,30 +702,81 @@ impl Client { } .boxed(), ); + if let Some(mut webrtc) = webrtc_for_connect { + connect_futures.push( + async move { + webrtc.wait_connected(CONNECT_TIMEOUT).await?; + Ok((Stream::WebRTC(webrtc), None, "WebRTC")) + } + .boxed(), + ); + } // Run all connection attempts concurrently, return the first successful one let (conn, kcp, typ) = match select_ok(connect_futures).await { Ok(conn) => (Ok(conn.0 .0), conn.0 .1, conn.0 .2), Err(e) => (Err(e), None, ""), }; + if let Some(stop) = webrtc_bridge_stop { + let _ = stop.send(()); + } let mut conn = conn?; feedback = rr.feedback; log::info!("{:?} used to establish {typ} connection", start.elapsed()); let pk = Self::secure_connection(&peer, signed_id_pk, &key, &mut conn).await?; return Ok(( - (conn, typ == "IPv6", pk, kcp, typ), + (conn, typ == "IPv6" || typ == "WebRTC", pk, kcp, typ), (feedback, rendezvous_server), false, )); } + Some(rendezvous_message::Union::IceCandidate(ice)) => { + if Self::is_expected_webrtc_ice_candidate(&ice, &webrtc_session_key) { + pending_webrtc_ice.push(ice.candidate); + } else { + log::debug!( + "dropping ICE candidate for unexpected WebRTC session key {}", + ice.session_key, + ); + } + } _ => { log::error!("Unexpected protobuf msg received: {:?}", msg_in); } } } } - drop(socket); + let mut webrtc_bridge_stop = None; + let mut webrtc_for_connect = None; + if !webrtc_sdp_answer.is_empty() { + if let Some(webrtc) = webrtc_offerer.take() { + if let Err(err) = webrtc.set_remote_endpoint(&webrtc_sdp_answer).await { + log::warn!("failed to set WebRTC answer: {}", err); + drop(socket); + } else { + for candidate in pending_webrtc_ice.drain(..) { + if let Err(err) = webrtc.add_remote_ice_candidate(&candidate).await { + log::warn!("failed to add buffered WebRTC ICE candidate: {}", err); + } + } + let session_key = webrtc.session_key().to_owned(); + let local_ice_rx = webrtc.take_local_ice_rx(); + webrtc_bridge_stop = Some(Self::spawn_webrtc_ice_bridge( + socket, + local_ice_rx, + webrtc.clone(), + peer.clone(), + session_key, + )); + webrtc_for_connect = Some(webrtc); + } + } else { + drop(socket); + } + } else { + drop(socket); + } if peer_addr.port() == 0 { bail!("Failed to connect via rendezvous server"); } @@ -621,6 +810,8 @@ impl Client { interface, udp.0, ipv6.0, + webrtc_for_connect, + webrtc_bridge_stop, punch_type, ) .await?, @@ -647,6 +838,8 @@ impl Client { interface: impl Interface, udp_socket_nat: Option>, udp_socket_v6: Option>, + webrtc_offerer: Option, + webrtc_bridge_stop: Option>, punch_type: &str, ) -> ResultType<( Stream, @@ -705,11 +898,23 @@ impl Client { if let Some(udp_socket_v6) = udp_socket_v6 { connect_futures.push(udp_nat_connect(udp_socket_v6, "IPv6", connect_timeout).boxed()); } + if let Some(mut webrtc) = webrtc_offerer { + connect_futures.push( + async move { + webrtc.wait_connected(connect_timeout).await?; + Ok((Stream::WebRTC(webrtc), None, "WebRTC")) + } + .boxed(), + ); + } // Run all connection attempts concurrently, return the first successful one let (mut conn, kcp, mut typ) = match select_ok(connect_futures).await { Ok(conn) => (Ok(conn.0 .0), conn.0 .1, conn.0 .2), Err(e) => (Err(e), None, ""), }; + if let Some(stop) = webrtc_bridge_stop { + let _ = stop.send(()); + } let mut direct = !conn.is_err(); if interface.is_force_relay() || conn.is_err() { @@ -4120,8 +4325,22 @@ pub mod peer_online { #[cfg(test)] mod tests { + use crate::client::Client; + use hbb_common::rendezvous_proto::IceCandidate; use hbb_common::tokio; + #[test] + fn accepts_webrtc_ice_by_session_key_only() { + let ice = IceCandidate { + session_key: "session-a".to_owned(), + candidate: "candidate-json".to_owned(), + ..Default::default() + }; + + assert!(Client::is_expected_webrtc_ice_candidate(&ice, "session-a")); + assert!(!Client::is_expected_webrtc_ice_candidate(&ice, "session-b")); + } + #[tokio::test] async fn test_query_onlines() { super::query_online_states( diff --git a/src/main.rs b/src/main.rs index 9bc90a8fa..3d5237ab5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -38,49 +38,68 @@ fn main() { if !common::global_init() { return; } - use clap::App; + use clap::{Arg, ArgAction, Command}; use hbb_common::log; - let args = format!( - "-p, --port-forward=[PORT-FORWARD-OPTIONS] 'Format: remote-id:local-port:remote-port[:remote-host]' - -c, --connect=[REMOTE_ID] 'test only' - -k, --key=[KEY] '' - -s, --server=[] 'Start server'", - ); - let matches = App::new("rustdesk") + let matches = Command::new("rustdesk") .version(crate::VERSION) .author("Purslane Ltd") .about("RustDesk command line tool") - .args_from_usage(&args) + .arg( + Arg::new("port-forward") + .short('p') + .long("port-forward") + .value_name("PORT-FORWARD-OPTIONS") + .help("Format: remote-id:local-port:remote-port[:remote-host]"), + ) + .arg( + Arg::new("connect") + .short('c') + .long("connect") + .value_name("REMOTE_ID") + .help("test only"), + ) + .arg(Arg::new("key").short('k').long("key").value_name("KEY")) + .arg( + Arg::new("server") + .short('s') + .long("server") + .action(ArgAction::SetTrue) + .help("Start server"), + ) .get_matches(); use hbb_common::{config::LocalConfig, env_logger::*}; init_from_env(Env::default().filter_or(DEFAULT_FILTER_ENV, "info")); - if let Some(p) = matches.value_of("port-forward") { - let options: Vec = p.split(":").map(|x| x.to_owned()).collect(); + if let Some(p) = matches.get_one::("port-forward") { + let options: Vec = p.split(':').map(|x| x.to_owned()).collect(); if options.len() < 3 { log::error!("Wrong port-forward options"); return; } - let mut port = 0; - if let Ok(v) = options[1].parse::() { - port = v; - } else { - log::error!("Wrong local-port"); - return; - } - let mut remote_port = 0; - if let Ok(v) = options[2].parse::() { - remote_port = v; - } else { - log::error!("Wrong remote-port"); - return; - } + let port = match options[1].parse::() { + Ok(v) => v, + Err(_) => { + log::error!("Wrong local-port"); + return; + } + }; + let remote_port = match options[2].parse::() { + Ok(v) => v, + Err(_) => { + log::error!("Wrong remote-port"); + return; + } + }; let mut remote_host = "localhost".to_owned(); if options.len() > 3 { remote_host = options[3].clone(); } common::test_rendezvous_server(); common::test_nat_type(); - let key = matches.value_of("key").unwrap_or("").to_owned(); + let key = matches + .get_one::("key") + .map(String::as_str) + .unwrap_or("") + .to_owned(); let token = LocalConfig::get_option("access_token"); cli::start_one_port_forward( options[0].clone(), @@ -90,13 +109,17 @@ fn main() { key, token, ); - } else if let Some(p) = matches.value_of("connect") { + } else if let Some(p) = matches.get_one::("connect") { common::test_rendezvous_server(); common::test_nat_type(); - let key = matches.value_of("key").unwrap_or("").to_owned(); + let key = matches + .get_one::("key") + .map(String::as_str) + .unwrap_or("") + .to_owned(); let token = LocalConfig::get_option("access_token"); cli::connect_test(p, key, token); - } else if let Some(p) = matches.value_of("server") { + } else if matches.get_flag("server") { log::info!("id={}", hbb_common::config::Config::get_id()); crate::start_server(true, false); } diff --git a/src/rendezvous_mediator.rs b/src/rendezvous_mediator.rs index 89d7fa01e..41fc75b18 100644 --- a/src/rendezvous_mediator.rs +++ b/src/rendezvous_mediator.rs @@ -1,4 +1,5 @@ use std::{ + collections::HashMap, net::SocketAddr, sync::{ atomic::{AtomicBool, Ordering}, @@ -21,8 +22,13 @@ use hbb_common::{ rendezvous_proto::*, sleep, socket_client::{self, connect_tcp, is_ipv4, new_direct_udp_for, new_udp_for}, - tokio::{self, select, sync::Mutex, time::interval}, + tokio::{ + self, select, + sync::{mpsc, Mutex}, + time::interval, + }, udp::FramedSocket, + webrtc::WebRTCStream, AddrMangle, IntoTargetAddr, ResultType, Stream, TargetAddr, }; @@ -32,11 +38,13 @@ use crate::{ }; type Message = RendezvousMessage; +type RendezvousSender = mpsc::UnboundedSender; lazy_static::lazy_static! { static ref SOLVING_PK_MISMATCH: Mutex = Default::default(); static ref LAST_MSG: Mutex<(SocketAddr, Instant)> = Mutex::new((SocketAddr::new([0; 4].into(), 0), Instant::now())); static ref LAST_RELAY_MSG: Mutex<(SocketAddr, Instant)> = Mutex::new((SocketAddr::new([0; 4].into(), 0), Instant::now())); + static ref WEBRTC_ICE_TXS: Mutex>> = Default::default(); } static SHOULD_EXIT: AtomicBool = AtomicBool::new(false); static MANUAL_RESTARTED: AtomicBool = AtomicBool::new(false); @@ -72,6 +80,7 @@ pub struct RendezvousMediator { host: String, host_prefix: String, keep_alive: i32, + rz_sender: RendezvousSender, } impl RendezvousMediator { @@ -182,11 +191,13 @@ impl RendezvousMediator { let host = check_port(&host, RENDEZVOUS_PORT); log::info!("start udp: {host}"); let (mut socket, mut addr) = new_udp_for(&host, CONNECT_TIMEOUT).await?; + let (rz_sender, mut rz_out_rx) = mpsc::unbounded_channel::(); let mut rz = Self { addr: addr.clone(), host: host.clone(), host_prefix: Self::get_host_prefix(&host), keep_alive: crate::DEFAULT_KEEP_ALIVE, + rz_sender, }; let mut timer = crate::rustdesk_interval(interval(crate::TIMER_OUT)); @@ -246,6 +257,9 @@ impl RendezvousMediator { }, } }, + Some(msg_out) = rz_out_rx.recv() => { + Sink::Framed(&mut socket, &addr).send(&msg_out).await?; + }, _ = timer.tick() => { if SHOULD_EXIT.load(Ordering::SeqCst) { break; @@ -367,6 +381,17 @@ impl RendezvousMediator { allow_err!(rz.handle_intranet(fla, server).await); }); } + Some(rendezvous_message::Union::IceCandidate(ice)) => { + let tx = WEBRTC_ICE_TXS.lock().await.get(&ice.session_key).cloned(); + if let Some(tx) = tx { + let _ = tx.send(ice.candidate); + } else { + log::debug!( + "dropping ICE candidate for unknown WebRTC session key {}", + ice.session_key + ); + } + } Some(rendezvous_message::Union::ConfigureUpdate(cu)) => { let v0 = Config::get_rendezvous_servers(); Config::set_option( @@ -389,11 +414,13 @@ impl RendezvousMediator { let mut conn = connect_tcp(host.clone(), CONNECT_TIMEOUT).await?; let key = crate::get_key(true).await; crate::secure_tcp(&mut conn, &key).await?; + let (rz_sender, mut rz_out_rx) = mpsc::unbounded_channel::(); let mut rz = Self { addr: conn.local_addr().into_target_addr()?, host: host.clone(), host_prefix: Self::get_host_prefix(&host), keep_alive: crate::DEFAULT_KEEP_ALIVE, + rz_sender, }; let mut timer = crate::rustdesk_interval(interval(crate::TIMER_OUT)); let mut last_register_sent: Option = None; @@ -421,6 +448,9 @@ impl RendezvousMediator { let msg = Message::parse_from_bytes(&bytes)?; rz.handle_resp(msg.union, Sink::Stream(&mut conn), &server, &mut update_latency).await? } + Some(msg_out) = rz_out_rx.recv() => { + Sink::Stream(&mut conn).send(&msg_out).await?; + } _ = timer.tick() => { if SHOULD_EXIT.load(Ordering::SeqCst) { break; @@ -472,6 +502,7 @@ impl RendezvousMediator { rr.secure, false, Default::default(), + String::new(), rr.control_permissions.clone().into_option(), ) .await @@ -486,6 +517,7 @@ impl RendezvousMediator { secure: bool, initiate: bool, socket_addr_v6: bytes::Bytes, + webrtc_sdp_answer: String, control_permissions: Option, ) -> ResultType<()> { let peer_addr = AddrMangle::decode(&socket_addr); @@ -504,6 +536,7 @@ impl RendezvousMediator { socket_addr: socket_addr.into(), version: crate::VERSION.to_owned(), socket_addr_v6, + webrtc_sdp_answer, ..Default::default() }; if initiate { @@ -571,6 +604,7 @@ impl RendezvousMediator { true, true, socket_addr_v6, + String::new(), fla.control_permissions.into_option(), ) .await @@ -613,6 +647,81 @@ impl RendezvousMediator { Ok(()) } + async fn spawn_webrtc_answerer( + &self, + ph: &PunchHole, + force_relay: bool, + server: ServerPtr, + peer_addr: SocketAddr, + control_permissions: Option, + ) -> ResultType { + let mut stream = + WebRTCStream::new(&ph.webrtc_sdp_offer, force_relay, CONNECT_TIMEOUT).await?; + let answer = stream.get_local_endpoint().await?; + let session_key = stream.session_key().to_owned(); + let return_route = ph.socket_addr.clone(); + + let (remote_ice_tx, mut remote_ice_rx) = mpsc::unbounded_channel::(); + WEBRTC_ICE_TXS + .lock() + .await + .insert(session_key.clone(), remote_ice_tx); + + let stream_for_remote_ice = stream.clone(); + tokio::spawn(async move { + while let Some(candidate) = remote_ice_rx.recv().await { + if let Err(err) = stream_for_remote_ice.add_remote_ice_candidate(&candidate).await + { + log::warn!("failed to add remote WebRTC ICE candidate: {}", err); + } + } + }); + + if let Some(mut local_ice_rx) = stream.take_local_ice_rx() { + let sender = self.rz_sender.clone(); + let socket_addr = return_route.clone(); + let session_key_for_ice = session_key.clone(); + tokio::spawn(async move { + while let Some(candidate) = local_ice_rx.recv().await { + let mut msg = Message::new(); + msg.set_ice_candidate(IceCandidate { + socket_addr: socket_addr.clone(), + session_key: session_key_for_ice.clone(), + candidate, + ..Default::default() + }); + let _ = sender.send(msg); + } + }); + } + + let session_key_for_cleanup = session_key.clone(); + tokio::spawn(async move { + let result = stream.wait_connected(CONNECT_TIMEOUT).await; + WEBRTC_ICE_TXS + .lock() + .await + .remove(&session_key_for_cleanup); + if let Err(err) = result { + log::warn!("webrtc wait_connected failed: {}", err); + return; + } + if let Err(err) = crate::server::create_tcp_connection( + server, + Stream::WebRTC(stream), + peer_addr, + true, + control_permissions, + ) + .await + { + log::warn!("failed to create WebRTC server connection: {}", err); + } + }); + + Ok(answer) + } + async fn handle_punch_hole(&self, ph: PunchHole, server: ServerPtr) -> ResultType<()> { let mut peer_addr = AddrMangle::decode(&ph.socket_addr); let last = *LAST_MSG.lock().await; @@ -624,7 +733,23 @@ impl RendezvousMediator { let peer_addr_v6 = hbb_common::AddrMangle::decode(&ph.socket_addr_v6); let relay = use_ws() || Config::is_proxy() || ph.force_relay; let mut socket_addr_v6 = Default::default(); - let control_permissions = ph.control_permissions.into_option(); + let control_permissions = ph.control_permissions.clone().into_option(); + let webrtc_sdp_answer = if !ph.webrtc_sdp_offer.is_empty() { + self.spawn_webrtc_answerer( + &ph, + relay, + server.clone(), + peer_addr, + control_permissions.clone(), + ) + .await + .unwrap_or_else(|err| { + log::warn!("failed to create WebRTC answer: {}", err); + String::new() + }) + } else { + String::new() + }; if peer_addr_v6.port() > 0 && !relay { socket_addr_v6 = start_ipv6( peer_addr_v6, @@ -651,6 +776,7 @@ impl RendezvousMediator { true, true, socket_addr_v6.clone(), + webrtc_sdp_answer.clone(), control_permissions, ) .await; @@ -664,6 +790,7 @@ impl RendezvousMediator { nat_type: nat_type.into(), version: crate::VERSION.to_owned(), socket_addr_v6, + webrtc_sdp_answer, ..Default::default() }; if ph.udp_port > 0 {