1use std::net::SocketAddr;
3use std::ops::DerefMut;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, Mutex};
8
9use arti_client::config::{CfgPath, TorClientConfigBuilder};
11use arti_client::{BootstrapBehavior, DangerouslyIntoTorAddr, IntoTorAddr, TorClient};
12use tokio::io::{AsyncReadExt, AsyncWriteExt};
13use tokio::net::{TcpListener, TcpStream};
14use tokio::runtime;
15use tokio_stream::StreamExt;
16use tor_cell::relaycell::msg::Connected;
17use tor_config::ExplicitOrAuto;
18use tor_llcrypto::pk::ed25519::ExpandedKeypair;
19use tor_hsservice::config::OnionServiceConfigBuilder;
20use tor_hsservice::config::restricted_discovery::HsClientNickname;
21use tor_hsservice::{HsNickname, RunningOnionService};
22use tor_keymgr::{config::ArtiKeystoreKind, KeystoreSelector};
23use tor_proto::stream::IncomingStreamRequest;
24use tor_rtcompat::PreferredRuntime;
25
26use crate::tor_crypto::*;
28use crate::tor_provider;
29use crate::tor_provider::*;
30
31#[derive(thiserror::Error, Debug)]
33pub enum Error {
34 #[error("not implemented")]
35 NotImplemented(),
36
37 #[error("unable to bind TCP listener")]
38 TcpListenerBindFailed(#[source] std::io::Error),
39
40 #[error("unable to get TCP listener's local address")]
41 TcpListenerLocalAddrFailed(#[source] std::io::Error),
42
43 #[error("unable to accept connection on TCP Listener")]
44 TcpListenerAcceptFailed(#[source] std::io::Error),
45
46 #[error("unable to connect to TCP listener")]
47 TcpStreamConnectFailed(#[source] std::io::Error),
48
49 #[error("unable to convert tokio::TcpStream to std::net::TcpStream")]
50 TcpStreamIntoFailed(#[source] std::io::Error),
51
52 #[error("arti-client config-builder error: {0}")]
53 ArtiClientConfigBuilderError(#[source] arti_client::config::ConfigBuildError),
54
55 #[error("arti-client error: {0}")]
56 ArtiClientError(#[source] arti_client::Error),
57
58 #[error("arti-client tor-addr error: {0}")]
59 ArtiClientTorAddrError(#[source] arti_client::TorAddrError),
60
61 #[error("arti-client onion-service startup error: {0}")]
62 ArtiClientOnionServiceLaunchError(#[source] arti_client::Error),
63
64 #[error("tor-keymgr error: {0}")]
65 TorKeyMgrError(#[source] tor_keymgr::Error),
66
67 #[error("onion-service config-builder error: {0}")]
68 OnionServiceConfigBuilderError(#[source] tor_config::ConfigBuildError),
69}
70
71impl From<Error> for crate::tor_provider::Error {
72 fn from(error: Error) -> Self {
73 crate::tor_provider::Error::Generic(error.to_string())
74 }
75}
76
77pub struct ArtiClientTorClient {
81 tokio_runtime: Arc<runtime::Runtime>,
82 arti_client: TorClient<PreferredRuntime>,
83 pending_events: Arc<Mutex<Vec<TorEvent>>>,
84 bootstrapped: Arc<AtomicBool>,
85}
86
87async fn forward_stream<R, W>(alive: Arc<AtomicBool>, mut reader: R, mut writer: W) -> ()
89where
90 R: AsyncReadExt + Unpin,
91 W: AsyncWriteExt + Unpin,
92{
93 let read_timeout = std::time::Duration::from_millis(100);
95 let mut remaining_retries = 3;
98 let mut buf = [0u8; 1024];
99
100 loop {
101 if !alive.load(Ordering::Relaxed) && remaining_retries == 0 {
102 break;
103 }
104
105 tokio::select! {
106 count = reader.read(&mut buf) => match count {
107 Ok(0) => break,
109 Ok(count) => {
111 match writer.write_all(&buf[0..count]).await {
113 Ok(()) => (),
114 Err(_err) => break,
115 }
116 match writer.flush().await {
117 Ok(()) => (),
118 Err(_err) => break,
119 }
120 },
121 Err(_err) => break,
123 },
124 _ = tokio::time::sleep(read_timeout.clone()) => match writer.flush().await {
125 Ok(()) => {
126 if !alive.load(Ordering::Relaxed) {
129 remaining_retries -= 1;
130 }
131 },
132 Err(_err) => break,
133 }
134 }
135 }
136 alive.store(false, Ordering::Relaxed);
138}
139
140impl ArtiClientTorClient {
141 pub fn new(
143 tokio_runtime: Arc<runtime::Runtime>,
144 root_data_directory: &Path,
145 ) -> Result<Self, Error> {
146 let mut config_builder: TorClientConfigBuilder = Default::default();
148
149 let mut cache_dir = PathBuf::from(root_data_directory);
152 cache_dir.push("cache");
153 config_builder
154 .storage()
155 .cache_dir(CfgPath::new_literal(cache_dir))
156 .keystore()
157 .primary().kind(ExplicitOrAuto::Explicit(ArtiKeystoreKind::Ephemeral));
158
159 let mut state_dir = PathBuf::from(root_data_directory);
160 state_dir.push("state");
161 config_builder
162 .storage()
163 .state_dir(CfgPath::new_literal(state_dir));
164
165 config_builder
167 .address_filter()
168 .allow_local_addrs(false)
169 .allow_onion_addrs(true);
170
171 let config = match config_builder.build() {
172 Ok(config) => config,
173 Err(err) => return Err(err).map_err(Error::ArtiClientConfigBuilderError),
174 };
175
176 let arti_client = tokio_runtime.block_on(async {
177 TorClient::builder()
178 .config(config)
179 .bootstrap_behavior(BootstrapBehavior::Manual)
180 .create_unbootstrapped()
181 .map_err(Error::ArtiClientError)
182
183 })?;
186
187 let pending_events = std::vec![TorEvent::LogReceived {
188 line: "Starting arti-client TorProvider".to_string()
189 }];
190 let pending_events = Arc::new(Mutex::new(pending_events));
191
192 Ok(Self {
193 tokio_runtime,
194 arti_client,
195 pending_events,
196 bootstrapped: Arc::new(AtomicBool::new(false)),
197 })
198 }
199}
200
201impl TorProvider for ArtiClientTorClient {
202 fn update(&mut self) -> Result<Vec<TorEvent>, tor_provider::Error> {
203 std::thread::sleep(std::time::Duration::from_millis(16));
204 match self.pending_events.lock() {
205 Ok(mut pending_events) => Ok(std::mem::take(pending_events.deref_mut())),
206 Err(_) => {
207 unreachable!("another thread panicked while holding this pending_events mutex")
208 }
209 }
210 }
211
212 fn bootstrap(&mut self) -> Result<(), tor_provider::Error> {
213 let mut bootstrap_events = self.arti_client.bootstrap_events();
215 let pending_events = self.pending_events.clone();
216 let bootstrapped = self.bootstrapped.clone();
217 self.tokio_runtime.spawn(async move {
218 while let Some(evt) = bootstrap_events.next().await {
219 if bootstrapped.load(Ordering::Relaxed) {
220 break;
221 }
222 match pending_events.lock() {
223 Ok(mut pending_events) => {
224 pending_events.push(TorEvent::BootstrapStatus {
225 progress: (evt.as_frac().clamp(0.0f32, 1.0f32) * 100f32) as u32,
226 tag: "no-tag".to_string(),
227 summary: "no summary".to_string(),
228 });
229 }
231 Err(_) => unreachable!(
232 "another thread panicked while holding this pending_events mutex"
233 ),
234 }
235 }
236 });
237
238 let arti_client = self.arti_client.clone();
240 let pending_events = self.pending_events.clone();
241 let bootstrapped = self.bootstrapped.clone();
242 self.tokio_runtime.spawn(async move {
243 match arti_client.bootstrap().await {
244 Ok(()) => match pending_events.lock() {
245 Ok(mut pending_events) => {
246 pending_events.push(TorEvent::BootstrapStatus {
247 progress: 100,
248 tag: "no-tag".to_string(),
249 summary: "no summary".to_string(),
250 });
251 pending_events.push(TorEvent::BootstrapComplete);
252 bootstrapped.store(true, Ordering::Relaxed);
253 return;
254 }
255 Err(_) => unreachable!(
256 "another thread panicked while holding this pending_events mutex"
257 ),
258 },
259 Err(_err) => {
260 }
262 }
263 });
264
265 Ok(())
266 }
267
268 fn add_client_auth(
269 &mut self,
270 service_id: &V3OnionServiceId,
271 client_auth: &X25519PrivateKey,
272 ) -> Result<(), tor_provider::Error> {
273 let ed25519_public = Ed25519PublicKey::from_service_id(service_id).unwrap();
274 let hs_id = ed25519_public.as_bytes().clone();
275
276 self.arti_client.insert_service_discovery_key(KeystoreSelector::Primary, hs_id.into(), client_auth.inner().clone().into()).map_err(Error::ArtiClientError)?;
277
278 Ok(())
279 }
280
281 fn remove_client_auth(
282 &mut self,
283 service_id: &V3OnionServiceId,
284 ) -> Result<(), tor_provider::Error> {
285 let ed25519_public = Ed25519PublicKey::from_service_id(service_id).unwrap();
286 let hs_id = ed25519_public.as_bytes().clone();
287
288 self.arti_client.remove_service_discovery_key(KeystoreSelector::Primary, hs_id.into()).map_err(Error::ArtiClientError)?;
289
290 Ok(())
291 }
292
293 fn connect(
294 &mut self,
295 target: TargetAddr,
296 circuit: Option<CircuitToken>,
297 ) -> Result<OnionStream, tor_provider::Error> {
298 if circuit.is_some() {
300 return Err(Error::NotImplemented().into());
301 }
302
303 let arti_target = match target.clone() {
305 TargetAddr::Socket(socket_addr) => socket_addr.into_tor_addr_dangerously(),
306 TargetAddr::Domain(domain_addr) => {
307 (domain_addr.domain(), domain_addr.port()).into_tor_addr()
308 }
309 TargetAddr::OnionService(OnionAddr::V3(OnionAddrV3 {
310 service_id,
311 virt_port,
312 })) => (format!("{}.onion", service_id), virt_port).into_tor_addr(),
313 }
314 .map_err(Error::ArtiClientTorAddrError)?;
315
316 let arti_client = self.arti_client.clone();
317 let data_stream = self
318 .tokio_runtime
319 .block_on(async move { arti_client.connect(arti_target).await })
320 .map_err(Error::ArtiClientError)?;
321
322 let client_stream = self.tokio_runtime.block_on(async move {
325 let (data_reader, data_writer) = data_stream.split();
326
327 let socket_addr = SocketAddr::from(([127, 0, 0, 1], 0u16));
329 let server_listener = TcpListener::bind(socket_addr)
330 .await
331 .map_err(Error::TcpListenerBindFailed)?;
332 let server_accept_future = server_listener.accept();
334 let socket_addr = server_listener
335 .local_addr()
336 .map_err(Error::TcpListenerLocalAddrFailed)?;
337
338 let client_stream = TcpStream::connect(socket_addr)
340 .await
341 .map_err(Error::TcpStreamConnectFailed)?;
342 let (server_stream, _socket_addr) = server_accept_future
344 .await
345 .map_err(Error::TcpListenerAcceptFailed)?;
346 let (tcp_reader, tcp_writer) = server_stream.into_split();
347
348 let pump_alive = Arc::new(AtomicBool::new(true));
350 tokio::task::spawn({
351 let pump_alive = pump_alive.clone();
352 async move {
353 forward_stream(pump_alive, tcp_reader, data_writer).await;
354 }
355 });
356 tokio::task::spawn(async move {
357 forward_stream(pump_alive, data_reader, tcp_writer).await;
358 });
359 Ok::<TcpStream, tor_provider::Error>(client_stream)
360 })?;
361
362 let stream = client_stream
363 .into_std()
364 .map_err(Error::TcpStreamIntoFailed)?;
365 Ok(OnionStream {
366 stream,
367 local_addr: None,
368 peer_addr: Some(target),
369 })
370 }
371
372 fn listener(
373 &mut self,
374 private_key: &Ed25519PrivateKey,
375 virt_port: u16,
376 authorized_clients: Option<&[X25519PublicKey]>,
377 ) -> Result<OnionListener, tor_provider::Error> {
378
379 let socket_addr = SocketAddr::from(([127, 0, 0, 1], 0u16));
381 let listener =
383 std::net::TcpListener::bind(socket_addr).map_err(Error::TcpListenerBindFailed)?;
384 let socket_addr = listener
385 .local_addr()
386 .map_err(Error::TcpListenerLocalAddrFailed)?;
387
388 let service_id = V3OnionServiceId::from_private_key(private_key);
390 let hs_nickname = match HsNickname::new(service_id.to_string()) {
391 Ok(nickname) => nickname,
392 Err(_) => {
393 panic!("v3 onion service id string representation should be a valid HsNickname")
394 }
395 };
396 let secret_key_bytes = private_key.inner().to_secret_key_bytes();
399 let hs_id_keypair = ExpandedKeypair::from_secret_key_bytes(secret_key_bytes)
400 .unwrap();
401
402 let mut onion_service_config_builder = OnionServiceConfigBuilder::default();
404 onion_service_config_builder
405 .nickname(hs_nickname);
406
407 if let Some(authorized_clients) = authorized_clients {
409 if !authorized_clients.is_empty() {
410 let restricted_discovery_config = onion_service_config_builder
411 .restricted_discovery();
412 restricted_discovery_config.enabled(true);
413
414 for (i, key) in authorized_clients.iter().enumerate() {
415 let nickname = format!("client_{i}");
416 restricted_discovery_config
417 .static_keys()
418 .access()
419 .push((
420 HsClientNickname::from_str(nickname.as_str()).unwrap(),
421 key.inner().clone().into(),
422 ));
423 }
424 }
425 }
426
427 let onion_service_config = match onion_service_config_builder.build()
428 {
429 Ok(onion_service_config) => onion_service_config,
430 Err(err) => Err(err).map_err(Error::OnionServiceConfigBuilderError)?,
431 };
432
433 let (onion_service, mut rend_requests) = self.arti_client
434 .launch_onion_service_with_hsid(onion_service_config, hs_id_keypair.into())
435 .map_err(Error::ArtiClientOnionServiceLaunchError)?;
436
437 let pending_events = self.pending_events.clone();
439 let mut status_events = onion_service.status_events();
440 let service_id_clone = service_id.clone();
441
442 self.tokio_runtime.spawn(async move {
443 while let Some(evt) = status_events.next().await {
444 match evt.state() {
445 tor_hsservice::status::State::Running => match pending_events.lock() {
446 Ok(mut pending_events) => {
447 pending_events.push(TorEvent::OnionServicePublished { service_id: service_id_clone });
448 return;
449 }
450 Err(_) => unreachable!(
451 "another thread panicked while holding this pending_events mutex"
452 ),
453 },
454 _ => (),
455 }
456 }
457 });
458
459 self.tokio_runtime.spawn(async move {
461 while let Some(request) = rend_requests.next().await {
462 let mut stream_requests = match request.accept().await {
463 Ok(stream_requests) => stream_requests,
464 _ => return,
466 };
467 tokio::task::spawn(async move {
469 while let Some(stream_request) = stream_requests.next().await {
470 let should_accept =
471 if let IncomingStreamRequest::Begin(begin) = stream_request.request() {
472 begin.port() == virt_port
474 } else {
475 false
476 };
477
478 if should_accept {
479 let data_stream =
480 match stream_request.accept(Connected::new_empty()).await {
481 Ok(data_stream) => data_stream,
482 _ => continue,
484 };
485 let (data_reader, data_writer) = data_stream.split();
486
487 let (tcp_reader, tcp_writer) =
488 match TcpStream::connect(socket_addr).await {
489 Ok(tcp_stream) => tcp_stream.into_split(),
490 _ => continue,
492 };
493 let pump_alive = Arc::new(AtomicBool::new(true));
496 tokio::task::spawn({
498 let pump_alive = pump_alive.clone();
499 async move {
500 forward_stream(pump_alive, data_reader, tcp_writer).await;
501 }
502 });
503 tokio::task::spawn(async move {
505 forward_stream(pump_alive, tcp_reader, data_writer).await;
506 });
507 } else {
508 let _ = stream_request.shutdown_circuit();
510 }
511 }
512 });
513 }
514 });
515
516 let onion_addr = OnionAddr::V3(OnionAddrV3::new(service_id, virt_port));
517 Ok(OnionListener::new::<Arc<RunningOnionService>>(listener, onion_addr, onion_service, |_|{}))
519 }
520
521 fn generate_token(&mut self) -> CircuitToken {
522 0usize
523 }
524
525 fn release_token(&mut self, _token: CircuitToken) {}
526}