tor_interface/
arti_client_tor_client.rs

1// standard
2use std::net::SocketAddr;
3use std::ops::DerefMut;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, Mutex};
8
9//extern
10use arti_client::config::{CfgPath, TorClientConfigBuilder};
11use arti_client::{BootstrapBehavior, DangerouslyIntoTorAddr, IntoTorAddr, TorClient};
12use tokio::io::{AsyncReadExt, AsyncWriteExt};
13use tokio::net::{TcpListener, TcpStream};
14use tokio::runtime;
15use tokio_stream::StreamExt;
16use tor_cell::relaycell::msg::Connected;
17use tor_config::ExplicitOrAuto;
18use tor_llcrypto::pk::ed25519::ExpandedKeypair;
19use tor_hsservice::config::OnionServiceConfigBuilder;
20use tor_hsservice::config::restricted_discovery::HsClientNickname;
21use tor_hsservice::{HsNickname, RunningOnionService};
22use tor_keymgr::{config::ArtiKeystoreKind, KeystoreSelector};
23use tor_proto::stream::IncomingStreamRequest;
24use tor_rtcompat::PreferredRuntime;
25
26// internal crates
27use crate::tor_crypto::*;
28use crate::tor_provider;
29use crate::tor_provider::*;
30
31/// [`ArtiClientTorClient`]-specific error type
32#[derive(thiserror::Error, Debug)]
33pub enum Error {
34    #[error("not implemented")]
35    NotImplemented(),
36
37    #[error("unable to bind TCP listener")]
38    TcpListenerBindFailed(#[source] std::io::Error),
39
40    #[error("unable to get TCP listener's local address")]
41    TcpListenerLocalAddrFailed(#[source] std::io::Error),
42
43    #[error("unable to accept connection on TCP Listener")]
44    TcpListenerAcceptFailed(#[source] std::io::Error),
45
46    #[error("unable to connect to TCP listener")]
47    TcpStreamConnectFailed(#[source] std::io::Error),
48
49    #[error("unable to convert tokio::TcpStream to std::net::TcpStream")]
50    TcpStreamIntoFailed(#[source] std::io::Error),
51
52    #[error("arti-client config-builder error: {0}")]
53    ArtiClientConfigBuilderError(#[source] arti_client::config::ConfigBuildError),
54
55    #[error("arti-client error: {0}")]
56    ArtiClientError(#[source] arti_client::Error),
57
58    #[error("arti-client tor-addr error: {0}")]
59    ArtiClientTorAddrError(#[source] arti_client::TorAddrError),
60
61    #[error("arti-client onion-service startup error: {0}")]
62    ArtiClientOnionServiceLaunchError(#[source] arti_client::Error),
63
64    #[error("tor-keymgr error: {0}")]
65    TorKeyMgrError(#[source] tor_keymgr::Error),
66
67    #[error("onion-service config-builder error: {0}")]
68    OnionServiceConfigBuilderError(#[source] tor_config::ConfigBuildError),
69}
70
71impl From<Error> for crate::tor_provider::Error {
72    fn from(error: Error) -> Self {
73        crate::tor_provider::Error::Generic(error.to_string())
74    }
75}
76
77/// The `ArtiClientTorClient` is an in-process [`arti-client`](https://crates.io/crates/arti-client)-based [`TorProvider`].
78///
79///
80pub struct ArtiClientTorClient {
81    tokio_runtime: Arc<runtime::Runtime>,
82    arti_client: TorClient<PreferredRuntime>,
83    pending_events: Arc<Mutex<Vec<TorEvent>>>,
84    bootstrapped: Arc<AtomicBool>,
85}
86
87// used to forward traffic to/from arti to local tcp streams
88async fn forward_stream<R, W>(alive: Arc<AtomicBool>, mut reader: R, mut writer: W) -> ()
89where
90    R: AsyncReadExt + Unpin,
91    W: AsyncWriteExt + Unpin,
92{
93    // allow 100ms timeout on reads to verify writer is still good
94    let read_timeout = std::time::Duration::from_millis(100);
95    // allow additional retries in the event the other half of the pump
96    // dies; keep pumping data until our read times out 3 times
97    let mut remaining_retries = 3;
98    let mut buf = [0u8; 1024];
99
100    loop {
101        if !alive.load(Ordering::Relaxed) && remaining_retries == 0 {
102            break;
103        }
104
105        tokio::select! {
106            count = reader.read(&mut buf) => match count {
107                // end of stream
108                Ok(0) => break,
109                // read N bytes
110                Ok(count) => {
111                    // forward traffic
112                    match writer.write_all(&buf[0..count]).await {
113                        Ok(()) => (),
114                        Err(_err) => break,
115                    }
116                    match writer.flush().await {
117                        Ok(()) => (),
118                        Err(_err) => break,
119                    }
120                },
121                // read failed
122                Err(_err) => break,
123            },
124            _ = tokio::time::sleep(read_timeout.clone()) => match writer.flush().await {
125                Ok(()) => {
126                    // so long as our writer and reader are good, we should
127                    // allow a few additional data pump attempts
128                    if !alive.load(Ordering::Relaxed) {
129                        remaining_retries -= 1;
130                    }
131                },
132                Err(_err) => break,
133            }
134        }
135    }
136    // signal pump death
137    alive.store(false, Ordering::Relaxed);
138}
139
140impl ArtiClientTorClient {
141    /// Construct a new `ArtiClientTorClient` which uses a [Tokio](https://crates.io/crates/tokio) runtime internally for all async operations.
142    pub fn new(
143        tokio_runtime: Arc<runtime::Runtime>,
144        root_data_directory: &Path,
145    ) -> Result<Self, Error> {
146        // set custom config options
147        let mut config_builder: TorClientConfigBuilder = Default::default();
148
149        // manually set arti cache and data directories so we can have
150        // multiple concurrent instances and control where it writes
151        let mut cache_dir = PathBuf::from(root_data_directory);
152        cache_dir.push("cache");
153        config_builder
154            .storage()
155            .cache_dir(CfgPath::new_literal(cache_dir))
156            .keystore()
157            .primary().kind(ExplicitOrAuto::Explicit(ArtiKeystoreKind::Ephemeral));
158
159        let mut state_dir = PathBuf::from(root_data_directory);
160        state_dir.push("state");
161        config_builder
162            .storage()
163            .state_dir(CfgPath::new_literal(state_dir));
164
165        // disable access to clearnet addresses and enable access to onion services
166        config_builder
167            .address_filter()
168            .allow_local_addrs(false)
169            .allow_onion_addrs(true);
170
171        let config = match config_builder.build() {
172            Ok(config) => config,
173            Err(err) => return Err(err).map_err(Error::ArtiClientConfigBuilderError),
174        };
175
176        let arti_client = tokio_runtime.block_on(async {
177            TorClient::builder()
178                .config(config)
179                .bootstrap_behavior(BootstrapBehavior::Manual)
180                .create_unbootstrapped()
181                .map_err(Error::ArtiClientError)
182
183            // TODO: implement TorEvent::LogReceived events once upstream issue is resolved:
184            // https://gitlab.torproject.org/tpo/core/arti/-/issues/1356
185        })?;
186
187        let pending_events = std::vec![TorEvent::LogReceived {
188            line: "Starting arti-client TorProvider".to_string()
189        }];
190        let pending_events = Arc::new(Mutex::new(pending_events));
191
192        Ok(Self {
193            tokio_runtime,
194            arti_client,
195            pending_events,
196            bootstrapped: Arc::new(AtomicBool::new(false)),
197        })
198    }
199}
200
201impl TorProvider for ArtiClientTorClient {
202    fn update(&mut self) -> Result<Vec<TorEvent>, tor_provider::Error> {
203        std::thread::sleep(std::time::Duration::from_millis(16));
204        match self.pending_events.lock() {
205            Ok(mut pending_events) => Ok(std::mem::take(pending_events.deref_mut())),
206            Err(_) => {
207                unreachable!("another thread panicked while holding this pending_events mutex")
208            }
209        }
210    }
211
212    fn bootstrap(&mut self) -> Result<(), tor_provider::Error> {
213        // save progress events
214        let mut bootstrap_events = self.arti_client.bootstrap_events();
215        let pending_events = self.pending_events.clone();
216        let bootstrapped = self.bootstrapped.clone();
217        self.tokio_runtime.spawn(async move {
218            while let Some(evt) = bootstrap_events.next().await {
219                if bootstrapped.load(Ordering::Relaxed) {
220                    break;
221                }
222                match pending_events.lock() {
223                    Ok(mut pending_events) => {
224                        pending_events.push(TorEvent::BootstrapStatus {
225                            progress: (evt.as_frac().clamp(0.0f32, 1.0f32) * 100f32) as u32,
226                            tag: "no-tag".to_string(),
227                            summary: "no summary".to_string(),
228                        });
229                        // TODO: properly handle evt.blocked() with a new TorEvent::Error or something
230                    }
231                    Err(_) => unreachable!(
232                        "another thread panicked while holding this pending_events mutex"
233                    ),
234                }
235            }
236        });
237
238        // initiate bootstrap
239        let arti_client = self.arti_client.clone();
240        let pending_events = self.pending_events.clone();
241        let bootstrapped = self.bootstrapped.clone();
242        self.tokio_runtime.spawn(async move {
243            match arti_client.bootstrap().await {
244                Ok(()) => match pending_events.lock() {
245                    Ok(mut pending_events) => {
246                        pending_events.push(TorEvent::BootstrapStatus {
247                            progress: 100,
248                            tag: "no-tag".to_string(),
249                            summary: "no summary".to_string(),
250                        });
251                        pending_events.push(TorEvent::BootstrapComplete);
252                        bootstrapped.store(true, Ordering::Relaxed);
253                        return;
254                    }
255                    Err(_) => unreachable!(
256                        "another thread panicked while holding this pending_events mutex"
257                    ),
258                },
259                Err(_err) => {
260                    // TODO: add an error event to TorEvent
261                }
262            }
263        });
264
265        Ok(())
266    }
267
268    fn add_client_auth(
269        &mut self,
270        service_id: &V3OnionServiceId,
271        client_auth: &X25519PrivateKey,
272    ) -> Result<(), tor_provider::Error> {
273        let ed25519_public = Ed25519PublicKey::from_service_id(service_id).unwrap();
274        let hs_id = ed25519_public.as_bytes().clone();
275
276        self.arti_client.insert_service_discovery_key(KeystoreSelector::Primary, hs_id.into(), client_auth.inner().clone().into()).map_err(Error::ArtiClientError)?;
277
278        Ok(())
279    }
280
281    fn remove_client_auth(
282        &mut self,
283        service_id: &V3OnionServiceId,
284    ) -> Result<(), tor_provider::Error> {
285        let ed25519_public = Ed25519PublicKey::from_service_id(service_id).unwrap();
286        let hs_id = ed25519_public.as_bytes().clone();
287
288        self.arti_client.remove_service_discovery_key(KeystoreSelector::Primary, hs_id.into()).map_err(Error::ArtiClientError)?;
289
290        Ok(())
291    }
292
293    fn connect(
294        &mut self,
295        target: TargetAddr,
296        circuit: Option<CircuitToken>,
297    ) -> Result<OnionStream, tor_provider::Error> {
298        // stream isolation not implemented yet
299        if circuit.is_some() {
300            return Err(Error::NotImplemented().into());
301        }
302
303        // connect to onion service
304        let arti_target = match target.clone() {
305            TargetAddr::Socket(socket_addr) => socket_addr.into_tor_addr_dangerously(),
306            TargetAddr::Domain(domain_addr) => {
307                (domain_addr.domain(), domain_addr.port()).into_tor_addr()
308            }
309            TargetAddr::OnionService(OnionAddr::V3(OnionAddrV3 {
310                service_id,
311                virt_port,
312            })) => (format!("{}.onion", service_id), virt_port).into_tor_addr(),
313        }
314        .map_err(Error::ArtiClientTorAddrError)?;
315
316        let arti_client = self.arti_client.clone();
317        let data_stream = self
318            .tokio_runtime
319            .block_on(async move { arti_client.connect(arti_target).await })
320            .map_err(Error::ArtiClientError)?;
321
322        // start a task to forward traffic from returned data stream
323        // and tcp socket
324        let client_stream = self.tokio_runtime.block_on(async move {
325            let (data_reader, data_writer) = data_stream.split();
326
327            // try to bind to a local address, let OS pick our port
328            let socket_addr = SocketAddr::from(([127, 0, 0, 1], 0u16));
329            let server_listener = TcpListener::bind(socket_addr)
330                .await
331                .map_err(Error::TcpListenerBindFailed)?;
332            // await future after a client connects
333            let server_accept_future = server_listener.accept();
334            let socket_addr = server_listener
335                .local_addr()
336                .map_err(Error::TcpListenerLocalAddrFailed)?;
337
338            // client stream will ultimatley be returned from connect()
339            let client_stream = TcpStream::connect(socket_addr)
340                .await
341                .map_err(Error::TcpStreamConnectFailed)?;
342            // client has connected so now get the server's tcp stream
343            let (server_stream, _socket_addr) = server_accept_future
344                .await
345                .map_err(Error::TcpListenerAcceptFailed)?;
346            let (tcp_reader, tcp_writer) = server_stream.into_split();
347
348            // now spawn new tasks to forward traffic to/from local listener
349            let pump_alive = Arc::new(AtomicBool::new(true));
350            tokio::task::spawn({
351                let pump_alive = pump_alive.clone();
352                async move {
353                    forward_stream(pump_alive, tcp_reader, data_writer).await;
354                }
355            });
356            tokio::task::spawn(async move {
357                forward_stream(pump_alive, data_reader, tcp_writer).await;
358            });
359            Ok::<TcpStream, tor_provider::Error>(client_stream)
360        })?;
361
362        let stream = client_stream
363            .into_std()
364            .map_err(Error::TcpStreamIntoFailed)?;
365        Ok(OnionStream {
366            stream,
367            local_addr: None,
368            peer_addr: Some(target),
369        })
370    }
371
372    fn listener(
373        &mut self,
374        private_key: &Ed25519PrivateKey,
375        virt_port: u16,
376        authorized_clients: Option<&[X25519PublicKey]>,
377    ) -> Result<OnionListener, tor_provider::Error> {
378
379        // try to bind to a local address, let OS pick our port
380        let socket_addr = SocketAddr::from(([127, 0, 0, 1], 0u16));
381        // TODO: make this one async too
382        let listener =
383            std::net::TcpListener::bind(socket_addr).map_err(Error::TcpListenerBindFailed)?;
384        let socket_addr = listener
385            .local_addr()
386            .map_err(Error::TcpListenerLocalAddrFailed)?;
387
388        // generate a nickname to identify this onion service
389        let service_id = V3OnionServiceId::from_private_key(private_key);
390        let hs_nickname = match HsNickname::new(service_id.to_string()) {
391            Ok(nickname) => nickname,
392            Err(_) => {
393                panic!("v3 onion service id string representation should be a valid HsNickname")
394            }
395        };
396        // generate a new HsIdKeypair (from an Ed25519PrivateKey)
397        // clone() isn't implemented for ExpandedKeypair >:[
398        let secret_key_bytes = private_key.inner().to_secret_key_bytes();
399        let hs_id_keypair = ExpandedKeypair::from_secret_key_bytes(secret_key_bytes)
400            .unwrap();
401
402        // create an OnionServiceConfig with the ephemeral nickname
403        let mut onion_service_config_builder = OnionServiceConfigBuilder::default();
404        onion_service_config_builder
405            .nickname(hs_nickname);
406
407        // add authorised client keys if they exist
408        if let Some(authorized_clients) = authorized_clients {
409            if !authorized_clients.is_empty() {
410                let restricted_discovery_config = onion_service_config_builder
411                    .restricted_discovery();
412                restricted_discovery_config.enabled(true);
413
414                for (i, key) in authorized_clients.iter().enumerate() {
415                    let nickname = format!("client_{i}");
416                    restricted_discovery_config
417                        .static_keys()
418                        .access()
419                        .push((
420                            HsClientNickname::from_str(nickname.as_str()).unwrap(),
421                            key.inner().clone().into(),
422                        ));
423                }
424            }
425        }
426
427        let onion_service_config = match onion_service_config_builder.build()
428        {
429            Ok(onion_service_config) => onion_service_config,
430            Err(err) => Err(err).map_err(Error::OnionServiceConfigBuilderError)?,
431        };
432
433        let (onion_service, mut rend_requests) = self.arti_client
434            .launch_onion_service_with_hsid(onion_service_config, hs_id_keypair.into())
435            .map_err(Error::ArtiClientOnionServiceLaunchError)?;
436
437        // start a task to signal onion service published
438        let pending_events = self.pending_events.clone();
439        let mut status_events = onion_service.status_events();
440        let service_id_clone = service_id.clone();
441
442        self.tokio_runtime.spawn(async move {
443            while let Some(evt) = status_events.next().await {
444                match evt.state() {
445                    tor_hsservice::status::State::Running => match pending_events.lock() {
446                        Ok(mut pending_events) => {
447                            pending_events.push(TorEvent::OnionServicePublished { service_id: service_id_clone });
448                            return;
449                        }
450                        Err(_) => unreachable!(
451                            "another thread panicked while holding this pending_events mutex"
452                        ),
453                    },
454                    _ => (),
455                }
456            }
457        });
458
459        // start a task which accepts every RendRequest to get a StreamRequest
460        self.tokio_runtime.spawn(async move {
461            while let Some(request) = rend_requests.next().await {
462                let mut stream_requests = match request.accept().await {
463                    Ok(stream_requests) => stream_requests,
464                    // TODO: probably not our problem?
465                    _ => return,
466                };
467                // spawn a new task to consume the stream requsts
468                tokio::task::spawn(async move {
469                    while let Some(stream_request) = stream_requests.next().await {
470                        let should_accept =
471                            if let IncomingStreamRequest::Begin(begin) = stream_request.request() {
472                                // we only accept connections on the virt port
473                                begin.port() == virt_port
474                            } else {
475                                false
476                            };
477
478                        if should_accept {
479                            let data_stream =
480                                match stream_request.accept(Connected::new_empty()).await {
481                                    Ok(data_stream) => data_stream,
482                                    // TODO: probably not our problem
483                                    _ => continue,
484                                };
485                            let (data_reader, data_writer) = data_stream.split();
486
487                            let (tcp_reader, tcp_writer) =
488                                match TcpStream::connect(socket_addr).await {
489                                    Ok(tcp_stream) => tcp_stream.into_split(),
490                                    // TODO: possibly our problem?
491                                    _ => continue,
492                                };
493                            // now spawn new tasks to forward traffic to/from the onion listener
494
495                            let pump_alive = Arc::new(AtomicBool::new(true));
496                            // read from connected client and write to local socket
497                            tokio::task::spawn({
498                                let pump_alive = pump_alive.clone();
499                                async move {
500                                    forward_stream(pump_alive, data_reader, tcp_writer).await;
501                                }
502                            });
503                            // read from local socket and write to connected client
504                            tokio::task::spawn(async move {
505                                forward_stream(pump_alive, tcp_reader, data_writer).await;
506                            });
507                        } else {
508                            // either requesting the wrong port or the wrong type of stream request
509                            let _ = stream_request.shutdown_circuit();
510                        }
511                    }
512                });
513            }
514        });
515
516        let onion_addr = OnionAddr::V3(OnionAddrV3::new(service_id, virt_port));
517        // onion-service is torn down when `onion_service` is dropped
518        Ok(OnionListener::new::<Arc<RunningOnionService>>(listener, onion_addr, onion_service, |_|{}))
519    }
520
521    fn generate_token(&mut self) -> CircuitToken {
522        0usize
523    }
524
525    fn release_token(&mut self, _token: CircuitToken) {}
526}