1use std::net::SocketAddr;
3use std::ops::DerefMut;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, Mutex};
8
9use arti_client::config::{CfgPath, TorClientConfigBuilder};
11use arti_client::{BootstrapBehavior, DangerouslyIntoTorAddr, IntoTorAddr, TorClient};
12use tokio::io::{AsyncReadExt, AsyncWriteExt};
13use tokio::net::{TcpListener, TcpStream};
14use tokio::runtime;
15use tokio_stream::StreamExt;
16use tor_cell::relaycell::msg::Connected;
17use tor_config::ExplicitOrAuto;
18use tor_llcrypto::pk::ed25519::ExpandedKeypair;
19use tor_hsservice::config::OnionServiceConfigBuilder;
20use tor_hsservice::config::restricted_discovery::HsClientNickname;
21use tor_hsservice::{HsNickname, RunningOnionService};
22use tor_keymgr::{config::ArtiKeystoreKind, KeystoreSelector};
23use tor_proto::stream::IncomingStreamRequest;
24use tor_rtcompat::PreferredRuntime;
25
26use crate::tor_crypto::*;
28use crate::tor_provider;
29use crate::tor_provider::*;
30
31#[derive(thiserror::Error, Debug)]
33pub enum Error {
34 #[error("not implemented")]
35 NotImplemented(),
36
37 #[error("unable to bind TCP listener")]
38 TcpListenerBindFailed(#[source] std::io::Error),
39
40 #[error("unable to get TCP listener's local address")]
41 TcpListenerLocalAddrFailed(#[source] std::io::Error),
42
43 #[error("unable to accept connection on TCP Listener")]
44 TcpListenerAcceptFailed(#[source] std::io::Error),
45
46 #[error("unable to connect to TCP listener")]
47 TcpStreamConnectFailed(#[source] std::io::Error),
48
49 #[error("unable to convert tokio::TcpStream to std::net::TcpStream")]
50 TcpStreamIntoFailed(#[source] std::io::Error),
51
52 #[error("arti-client config-builder error: {0}")]
53 ArtiClientConfigBuilderError(#[source] arti_client::config::ConfigBuildError),
54
55 #[error("arti-client error: {0}")]
56 ArtiClientError(#[source] arti_client::Error),
57
58 #[error("arti-client tor-addr error: {0}")]
59 ArtiClientTorAddrError(#[source] arti_client::TorAddrError),
60
61 #[error("arti-client onion-service startup error: {0}")]
62 ArtiClientOnionServiceLaunchError(#[source] arti_client::Error),
63
64 #[error("tor-keymgr error: {0}")]
65 TorKeyMgrError(#[source] tor_keymgr::Error),
66
67 #[error("onion-service config-builder error: {0}")]
68 OnionServiceConfigBuilderError(#[source] tor_config::ConfigBuildError),
69}
70
71impl From<Error> for crate::tor_provider::Error {
72 fn from(error: Error) -> Self {
73 crate::tor_provider::Error::Generic(error.to_string())
74 }
75}
76
77pub struct ArtiClientTorClient {
81 tokio_runtime: Arc<runtime::Runtime>,
82 arti_client: TorClient<PreferredRuntime>,
83 pending_events: Arc<Mutex<Vec<TorEvent>>>,
84 bootstrapped: Arc<AtomicBool>,
85 next_connect_handle: ConnectHandle,
86}
87
88async fn forward_stream<R, W>(alive: Arc<AtomicBool>, mut reader: R, mut writer: W) -> ()
90where
91 R: AsyncReadExt + Unpin,
92 W: AsyncWriteExt + Unpin,
93{
94 let read_timeout = std::time::Duration::from_millis(100);
96 let mut remaining_retries = 3;
99 let mut buf = [0u8; 1024];
100
101 loop {
102 if !alive.load(Ordering::Relaxed) && remaining_retries == 0 {
103 break;
104 }
105
106 tokio::select! {
107 count = reader.read(&mut buf) => match count {
108 Ok(0) => break,
110 Ok(count) => {
112 match writer.write_all(&buf[0..count]).await {
114 Ok(()) => (),
115 Err(_err) => break,
116 }
117 match writer.flush().await {
118 Ok(()) => (),
119 Err(_err) => break,
120 }
121 },
122 Err(_err) => break,
124 },
125 _ = tokio::time::sleep(read_timeout.clone()) => match writer.flush().await {
126 Ok(()) => {
127 if !alive.load(Ordering::Relaxed) {
130 remaining_retries -= 1;
131 }
132 },
133 Err(_err) => break,
134 }
135 }
136 }
137 alive.store(false, Ordering::Relaxed);
139}
140
141impl ArtiClientTorClient {
142 pub fn new(
144 tokio_runtime: Arc<runtime::Runtime>,
145 root_data_directory: &Path,
146 ) -> Result<Self, Error> {
147 let mut config_builder: TorClientConfigBuilder = Default::default();
149
150 let mut cache_dir = PathBuf::from(root_data_directory);
153 cache_dir.push("cache");
154 config_builder
155 .storage()
156 .cache_dir(CfgPath::new_literal(cache_dir))
157 .keystore()
158 .primary().kind(ExplicitOrAuto::Explicit(ArtiKeystoreKind::Ephemeral));
159
160 let mut state_dir = PathBuf::from(root_data_directory);
161 state_dir.push("state");
162 config_builder
163 .storage()
164 .state_dir(CfgPath::new_literal(state_dir));
165
166 config_builder
168 .address_filter()
169 .allow_local_addrs(false)
170 .allow_onion_addrs(true);
171
172 let config = match config_builder.build() {
173 Ok(config) => config,
174 Err(err) => return Err(err).map_err(Error::ArtiClientConfigBuilderError),
175 };
176
177 let arti_client = tokio_runtime.block_on(async {
178 TorClient::builder()
179 .config(config)
180 .bootstrap_behavior(BootstrapBehavior::Manual)
181 .create_unbootstrapped()
182 .map_err(Error::ArtiClientError)
183
184 })?;
187
188 let pending_events = std::vec![TorEvent::LogReceived {
189 line: "Starting arti-client TorProvider".to_string()
190 }];
191 let pending_events = Arc::new(Mutex::new(pending_events));
192
193 Ok(Self {
194 tokio_runtime,
195 arti_client,
196 pending_events,
197 bootstrapped: Arc::new(AtomicBool::new(false)),
198 next_connect_handle: Default::default(),
199 })
200 }
201
202 async fn connect_impl(
203 target_addr: TargetAddr,
204 arti_client: TorClient<PreferredRuntime>,
205 ) -> Result<std::net::TcpStream, tor_provider::Error> {
206 let arti_target = match target_addr.clone() {
208 TargetAddr::Socket(socket_addr) => socket_addr.into_tor_addr_dangerously(),
209 TargetAddr::Domain(domain_addr) => {
210 (domain_addr.domain(), domain_addr.port()).into_tor_addr()
211 }
212 TargetAddr::OnionService(OnionAddr::V3(OnionAddrV3 {
213 service_id,
214 virt_port,
215 })) => (format!("{}.onion", service_id), virt_port).into_tor_addr(),
216 }
217 .map_err(Error::ArtiClientTorAddrError)?;
218
219 let data_stream = arti_client.connect(arti_target)
221 .await
222 .map_err(Error::ArtiClientError)?;
223
224 let (data_reader, data_writer) = data_stream.split();
227
228 let socket_addr = SocketAddr::from(([127, 0, 0, 1], 0u16));
230 let server_listener = TcpListener::bind(socket_addr)
231 .await
232 .map_err(Error::TcpListenerBindFailed)?;
233 let server_accept_future = server_listener.accept();
235 let socket_addr = server_listener
236 .local_addr()
237 .map_err(Error::TcpListenerLocalAddrFailed)?;
238
239 let client_stream = TcpStream::connect(socket_addr)
241 .await
242 .map_err(Error::TcpStreamConnectFailed)?;
243 let (server_stream, _socket_addr) = server_accept_future
245 .await
246 .map_err(Error::TcpListenerAcceptFailed)?;
247 let (tcp_reader, tcp_writer) = server_stream.into_split();
248
249 let pump_alive = Arc::new(AtomicBool::new(true));
251 tokio::task::spawn({
252 let pump_alive = pump_alive.clone();
253 async move {
254 forward_stream(pump_alive, tcp_reader, data_writer).await;
255 }
256 });
257 tokio::task::spawn(async move {
258 forward_stream(pump_alive, data_reader, tcp_writer).await;
259 });
260 let client_stream = client_stream
261 .into_std()
262 .map_err(Error::TcpStreamIntoFailed)?;
263 Ok::<std::net::TcpStream, tor_provider::Error>(client_stream)
264 }
265}
266
267impl TorProvider for ArtiClientTorClient {
268 fn update(&mut self) -> Result<Vec<TorEvent>, tor_provider::Error> {
269 std::thread::sleep(std::time::Duration::from_millis(16));
270 match self.pending_events.lock() {
271 Ok(mut pending_events) => Ok(std::mem::take(pending_events.deref_mut())),
272 Err(_) => {
273 unreachable!("another thread panicked while holding this pending_events mutex")
274 }
275 }
276 }
277
278 fn bootstrap(&mut self) -> Result<(), tor_provider::Error> {
279 let mut bootstrap_events = self.arti_client.bootstrap_events();
281 let pending_events = self.pending_events.clone();
282 let bootstrapped = self.bootstrapped.clone();
283 self.tokio_runtime.spawn(async move {
284 while let Some(evt) = bootstrap_events.next().await {
285 if bootstrapped.load(Ordering::Relaxed) {
286 break;
287 }
288 match pending_events.lock() {
289 Ok(mut pending_events) => {
290 pending_events.push(TorEvent::BootstrapStatus {
291 progress: (evt.as_frac().clamp(0.0f32, 1.0f32) * 100f32) as u32,
292 tag: "no-tag".to_string(),
293 summary: "no summary".to_string(),
294 });
295 }
297 Err(_) => unreachable!(
298 "another thread panicked while holding this pending_events mutex"
299 ),
300 }
301 }
302 });
303
304 let arti_client = self.arti_client.clone();
306 let pending_events = self.pending_events.clone();
307 let bootstrapped = self.bootstrapped.clone();
308 self.tokio_runtime.spawn(async move {
309 match arti_client.bootstrap().await {
310 Ok(()) => match pending_events.lock() {
311 Ok(mut pending_events) => {
312 pending_events.push(TorEvent::BootstrapStatus {
313 progress: 100,
314 tag: "no-tag".to_string(),
315 summary: "no summary".to_string(),
316 });
317 pending_events.push(TorEvent::BootstrapComplete);
318 bootstrapped.store(true, Ordering::Relaxed);
319 return;
320 }
321 Err(_) => unreachable!(
322 "another thread panicked while holding this pending_events mutex"
323 ),
324 },
325 Err(_err) => {
326 }
328 }
329 });
330
331 Ok(())
332 }
333
334 fn add_client_auth(
335 &mut self,
336 service_id: &V3OnionServiceId,
337 client_auth: &X25519PrivateKey,
338 ) -> Result<(), tor_provider::Error> {
339 let ed25519_public = Ed25519PublicKey::from_service_id(service_id).unwrap();
340 let hs_id = ed25519_public.as_bytes().clone();
341
342 self.arti_client.insert_service_discovery_key(KeystoreSelector::Primary, hs_id.into(), client_auth.inner().clone().into()).map_err(Error::ArtiClientError)?;
343
344 Ok(())
345 }
346
347 fn remove_client_auth(
348 &mut self,
349 service_id: &V3OnionServiceId,
350 ) -> Result<(), tor_provider::Error> {
351 let ed25519_public = Ed25519PublicKey::from_service_id(service_id).unwrap();
352 let hs_id = ed25519_public.as_bytes().clone();
353
354 self.arti_client.remove_service_discovery_key(KeystoreSelector::Primary, hs_id.into()).map_err(Error::ArtiClientError)?;
355
356 Ok(())
357 }
358
359 fn connect(
360 &mut self,
361 target: TargetAddr,
362 circuit: Option<CircuitToken>,
363 ) -> Result<OnionStream, tor_provider::Error> {
364 if circuit.is_some() {
366 return Err(Error::NotImplemented().into());
367 }
368
369 let arti_client = self.arti_client.clone();
370 let stream = self.tokio_runtime.block_on({
371 let target = target.clone();
372 async move {
373 Self::connect_impl(target, arti_client).await
374 }
375 })?;
376 Ok(OnionStream {
377 stream,
378 local_addr: None,
379 peer_addr: Some(target),
380 })
381 }
382
383 fn connect_async(
384 &mut self,
385 target: TargetAddr,
386 circuit: Option<CircuitToken>,
387 ) -> Result<ConnectHandle, tor_provider::Error> {
388
389 if circuit.is_some() {
391 return Err(Error::NotImplemented().into());
392 }
393
394 let handle = self.next_connect_handle;
395 self.next_connect_handle += 1usize;
396
397 let arti_client = self.arti_client.clone();
398 let pending_events = Arc::downgrade(&self.pending_events);
399
400 self.tokio_runtime.spawn(async move {
401 let stream = Self::connect_impl(target.clone(), arti_client).await;
402 if let Some(pending_events) = pending_events.upgrade() {
403 let event = match stream {
404 Ok(stream) => {
405 let stream = OnionStream {
406 stream,
407 local_addr: None,
408 peer_addr: Some(target),
409 };
410 TorEvent::ConnectComplete{
411 handle,
412 stream,
413 }
414 },
415 Err(error) => TorEvent::ConnectFailed{
416 handle,
417 error,
418 },
419 };
420 let mut pending_events = pending_events.lock().expect("pending_events mutex poisoned");
421 pending_events.push(event);
422 }
423 });
424
425 Ok(handle)
426 }
427
428 fn listener(
429 &mut self,
430 private_key: &Ed25519PrivateKey,
431 virt_port: u16,
432 authorized_clients: Option<&[X25519PublicKey]>,
433 ) -> Result<OnionListener, tor_provider::Error> {
434
435 let socket_addr = SocketAddr::from(([127, 0, 0, 1], 0u16));
437 let listener =
439 std::net::TcpListener::bind(socket_addr).map_err(Error::TcpListenerBindFailed)?;
440 let socket_addr = listener
441 .local_addr()
442 .map_err(Error::TcpListenerLocalAddrFailed)?;
443
444 let service_id = V3OnionServiceId::from_private_key(private_key);
446 let hs_nickname = match HsNickname::new(service_id.to_string()) {
447 Ok(nickname) => nickname,
448 Err(_) => {
449 panic!("v3 onion service id string representation should be a valid HsNickname")
450 }
451 };
452 let secret_key_bytes = private_key.inner().to_secret_key_bytes();
455 let hs_id_keypair = ExpandedKeypair::from_secret_key_bytes(secret_key_bytes)
456 .unwrap();
457
458 let mut onion_service_config_builder = OnionServiceConfigBuilder::default();
460 onion_service_config_builder
461 .nickname(hs_nickname);
462
463 if let Some(authorized_clients) = authorized_clients {
465 if !authorized_clients.is_empty() {
466 let restricted_discovery_config = onion_service_config_builder
467 .restricted_discovery();
468 restricted_discovery_config.enabled(true);
469
470 for (i, key) in authorized_clients.iter().enumerate() {
471 let nickname = format!("client_{i}");
472 restricted_discovery_config
473 .static_keys()
474 .access()
475 .push((
476 HsClientNickname::from_str(nickname.as_str()).unwrap(),
477 key.inner().clone().into(),
478 ));
479 }
480 }
481 }
482
483 let onion_service_config = match onion_service_config_builder.build()
484 {
485 Ok(onion_service_config) => onion_service_config,
486 Err(err) => Err(err).map_err(Error::OnionServiceConfigBuilderError)?,
487 };
488
489 let (onion_service, mut rend_requests) = self.arti_client
490 .launch_onion_service_with_hsid(onion_service_config, hs_id_keypair.into())
491 .map_err(Error::ArtiClientOnionServiceLaunchError)?;
492
493 let pending_events = self.pending_events.clone();
495 let mut status_events = onion_service.status_events();
496 let service_id_clone = service_id.clone();
497
498 self.tokio_runtime.spawn(async move {
499 while let Some(evt) = status_events.next().await {
500 match evt.state() {
501 tor_hsservice::status::State::Running => match pending_events.lock() {
502 Ok(mut pending_events) => {
503 pending_events.push(TorEvent::OnionServicePublished { service_id: service_id_clone });
504 return;
505 }
506 Err(_) => unreachable!(
507 "another thread panicked while holding this pending_events mutex"
508 ),
509 },
510 _ => (),
511 }
512 }
513 });
514
515 self.tokio_runtime.spawn(async move {
517 while let Some(request) = rend_requests.next().await {
518 let mut stream_requests = match request.accept().await {
519 Ok(stream_requests) => stream_requests,
520 _ => return,
522 };
523 tokio::task::spawn(async move {
525 while let Some(stream_request) = stream_requests.next().await {
526 let should_accept =
527 if let IncomingStreamRequest::Begin(begin) = stream_request.request() {
528 begin.port() == virt_port
530 } else {
531 false
532 };
533
534 if should_accept {
535 let data_stream =
536 match stream_request.accept(Connected::new_empty()).await {
537 Ok(data_stream) => data_stream,
538 _ => continue,
540 };
541 let (data_reader, data_writer) = data_stream.split();
542
543 let (tcp_reader, tcp_writer) =
544 match TcpStream::connect(socket_addr).await {
545 Ok(tcp_stream) => tcp_stream.into_split(),
546 _ => continue,
548 };
549 let pump_alive = Arc::new(AtomicBool::new(true));
552 tokio::task::spawn({
554 let pump_alive = pump_alive.clone();
555 async move {
556 forward_stream(pump_alive, data_reader, tcp_writer).await;
557 }
558 });
559 tokio::task::spawn(async move {
561 forward_stream(pump_alive, tcp_reader, data_writer).await;
562 });
563 } else {
564 let _ = stream_request.shutdown_circuit();
566 }
567 }
568 });
569 }
570 });
571
572 let onion_addr = OnionAddr::V3(OnionAddrV3::new(service_id, virt_port));
573 Ok(OnionListener::new::<Arc<RunningOnionService>>(listener, onion_addr, onion_service, |_|{}))
575 }
576
577 fn generate_token(&mut self) -> CircuitToken {
578 0usize
579 }
580
581 fn release_token(&mut self, _token: CircuitToken) {}
582}