tor_interface/
arti_client_tor_client.rs

1// standard
2use std::net::SocketAddr;
3use std::ops::DerefMut;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, Mutex};
8
9//extern
10use arti_client::config::{CfgPath, TorClientConfigBuilder};
11use arti_client::{BootstrapBehavior, DangerouslyIntoTorAddr, IntoTorAddr, TorClient};
12use tokio::io::{AsyncReadExt, AsyncWriteExt};
13use tokio::net::{TcpListener, TcpStream};
14use tokio::runtime;
15use tokio_stream::StreamExt;
16use tor_cell::relaycell::msg::Connected;
17use tor_config::ExplicitOrAuto;
18use tor_llcrypto::pk::ed25519::ExpandedKeypair;
19use tor_hsservice::config::OnionServiceConfigBuilder;
20use tor_hsservice::config::restricted_discovery::HsClientNickname;
21use tor_hsservice::{HsNickname, RunningOnionService};
22use tor_keymgr::{config::ArtiKeystoreKind, KeystoreSelector};
23use tor_proto::stream::IncomingStreamRequest;
24use tor_rtcompat::PreferredRuntime;
25
26// internal crates
27use crate::tor_crypto::*;
28use crate::tor_provider;
29use crate::tor_provider::*;
30
31/// [`ArtiClientTorClient`]-specific error type
32#[derive(thiserror::Error, Debug)]
33pub enum Error {
34    #[error("not implemented")]
35    NotImplemented(),
36
37    #[error("unable to bind TCP listener")]
38    TcpListenerBindFailed(#[source] std::io::Error),
39
40    #[error("unable to get TCP listener's local address")]
41    TcpListenerLocalAddrFailed(#[source] std::io::Error),
42
43    #[error("unable to accept connection on TCP Listener")]
44    TcpListenerAcceptFailed(#[source] std::io::Error),
45
46    #[error("unable to connect to TCP listener")]
47    TcpStreamConnectFailed(#[source] std::io::Error),
48
49    #[error("unable to convert tokio::TcpStream to std::net::TcpStream")]
50    TcpStreamIntoFailed(#[source] std::io::Error),
51
52    #[error("arti-client config-builder error: {0}")]
53    ArtiClientConfigBuilderError(#[source] arti_client::config::ConfigBuildError),
54
55    #[error("arti-client error: {0}")]
56    ArtiClientError(#[source] arti_client::Error),
57
58    #[error("arti-client tor-addr error: {0}")]
59    ArtiClientTorAddrError(#[source] arti_client::TorAddrError),
60
61    #[error("arti-client onion-service startup error: {0}")]
62    ArtiClientOnionServiceLaunchError(#[source] arti_client::Error),
63
64    #[error("tor-keymgr error: {0}")]
65    TorKeyMgrError(#[source] tor_keymgr::Error),
66
67    #[error("onion-service config-builder error: {0}")]
68    OnionServiceConfigBuilderError(#[source] tor_config::ConfigBuildError),
69}
70
71impl From<Error> for crate::tor_provider::Error {
72    fn from(error: Error) -> Self {
73        crate::tor_provider::Error::Generic(error.to_string())
74    }
75}
76
77/// The `ArtiClientTorClient` is an in-process [`arti-client`](https://crates.io/crates/arti-client)-based [`TorProvider`].
78///
79///
80pub struct ArtiClientTorClient {
81    tokio_runtime: Arc<runtime::Runtime>,
82    arti_client: TorClient<PreferredRuntime>,
83    pending_events: Arc<Mutex<Vec<TorEvent>>>,
84    bootstrapped: Arc<AtomicBool>,
85    next_connect_handle: ConnectHandle,
86}
87
88// used to forward traffic to/from arti to local tcp streams
89async fn forward_stream<R, W>(alive: Arc<AtomicBool>, mut reader: R, mut writer: W) -> ()
90where
91    R: AsyncReadExt + Unpin,
92    W: AsyncWriteExt + Unpin,
93{
94    // allow 100ms timeout on reads to verify writer is still good
95    let read_timeout = std::time::Duration::from_millis(100);
96    // allow additional retries in the event the other half of the pump
97    // dies; keep pumping data until our read times out 3 times
98    let mut remaining_retries = 3;
99    let mut buf = [0u8; 1024];
100
101    loop {
102        if !alive.load(Ordering::Relaxed) && remaining_retries == 0 {
103            break;
104        }
105
106        tokio::select! {
107            count = reader.read(&mut buf) => match count {
108                // end of stream
109                Ok(0) => break,
110                // read N bytes
111                Ok(count) => {
112                    // forward traffic
113                    match writer.write_all(&buf[0..count]).await {
114                        Ok(()) => (),
115                        Err(_err) => break,
116                    }
117                    match writer.flush().await {
118                        Ok(()) => (),
119                        Err(_err) => break,
120                    }
121                },
122                // read failed
123                Err(_err) => break,
124            },
125            _ = tokio::time::sleep(read_timeout.clone()) => match writer.flush().await {
126                Ok(()) => {
127                    // so long as our writer and reader are good, we should
128                    // allow a few additional data pump attempts
129                    if !alive.load(Ordering::Relaxed) {
130                        remaining_retries -= 1;
131                    }
132                },
133                Err(_err) => break,
134            }
135        }
136    }
137    // signal pump death
138    alive.store(false, Ordering::Relaxed);
139}
140
141impl ArtiClientTorClient {
142    /// Construct a new `ArtiClientTorClient` which uses a [Tokio](https://crates.io/crates/tokio) runtime internally for all async operations.
143    pub fn new(
144        tokio_runtime: Arc<runtime::Runtime>,
145        root_data_directory: &Path,
146    ) -> Result<Self, Error> {
147        // set custom config options
148        let mut config_builder: TorClientConfigBuilder = Default::default();
149
150        // manually set arti cache and data directories so we can have
151        // multiple concurrent instances and control where it writes
152        let mut cache_dir = PathBuf::from(root_data_directory);
153        cache_dir.push("cache");
154        config_builder
155            .storage()
156            .cache_dir(CfgPath::new_literal(cache_dir))
157            .keystore()
158            .primary().kind(ExplicitOrAuto::Explicit(ArtiKeystoreKind::Ephemeral));
159
160        let mut state_dir = PathBuf::from(root_data_directory);
161        state_dir.push("state");
162        config_builder
163            .storage()
164            .state_dir(CfgPath::new_literal(state_dir));
165
166        // disable access to clearnet addresses and enable access to onion services
167        config_builder
168            .address_filter()
169            .allow_local_addrs(false)
170            .allow_onion_addrs(true);
171
172        let config = match config_builder.build() {
173            Ok(config) => config,
174            Err(err) => return Err(err).map_err(Error::ArtiClientConfigBuilderError),
175        };
176
177        let arti_client = tokio_runtime.block_on(async {
178            TorClient::builder()
179                .config(config)
180                .bootstrap_behavior(BootstrapBehavior::Manual)
181                .create_unbootstrapped()
182                .map_err(Error::ArtiClientError)
183
184            // TODO: implement TorEvent::LogReceived events once upstream issue is resolved:
185            // https://gitlab.torproject.org/tpo/core/arti/-/issues/1356
186        })?;
187
188        let pending_events = std::vec![TorEvent::LogReceived {
189            line: "Starting arti-client TorProvider".to_string()
190        }];
191        let pending_events = Arc::new(Mutex::new(pending_events));
192
193        Ok(Self {
194            tokio_runtime,
195            arti_client,
196            pending_events,
197            bootstrapped: Arc::new(AtomicBool::new(false)),
198            next_connect_handle: Default::default(),
199        })
200    }
201
202    async fn connect_impl(
203        target_addr: TargetAddr,
204        arti_client: TorClient<PreferredRuntime>,
205    ) -> Result<std::net::TcpStream, tor_provider::Error> {
206        // convert TargetAddr to TorAddr
207        let arti_target = match target_addr.clone() {
208            TargetAddr::Socket(socket_addr) => socket_addr.into_tor_addr_dangerously(),
209            TargetAddr::Domain(domain_addr) => {
210                (domain_addr.domain(), domain_addr.port()).into_tor_addr()
211            }
212            TargetAddr::OnionService(OnionAddr::V3(OnionAddrV3 {
213                service_id,
214                virt_port,
215            })) => (format!("{}.onion", service_id), virt_port).into_tor_addr(),
216        }
217        .map_err(Error::ArtiClientTorAddrError)?;
218
219        // connect to target
220        let data_stream = arti_client.connect(arti_target)
221            .await
222            .map_err(Error::ArtiClientError)?;
223
224        // start a task to forward traffic from returned data stream
225        // and tcp socket
226        let (data_reader, data_writer) = data_stream.split();
227
228        // try to bind to a local address, let OS pick our port
229        let socket_addr = SocketAddr::from(([127, 0, 0, 1], 0u16));
230        let server_listener = TcpListener::bind(socket_addr)
231            .await
232            .map_err(Error::TcpListenerBindFailed)?;
233        // await future after a client connects
234        let server_accept_future = server_listener.accept();
235        let socket_addr = server_listener
236            .local_addr()
237            .map_err(Error::TcpListenerLocalAddrFailed)?;
238
239        // client stream will ultimatley be returned from connect_impl()
240        let client_stream = TcpStream::connect(socket_addr)
241            .await
242            .map_err(Error::TcpStreamConnectFailed)?;
243        // client has connected so now get the server's tcp stream
244        let (server_stream, _socket_addr) = server_accept_future
245            .await
246            .map_err(Error::TcpListenerAcceptFailed)?;
247        let (tcp_reader, tcp_writer) = server_stream.into_split();
248
249        // now spawn new tasks to forward traffic to/from local listener
250        let pump_alive = Arc::new(AtomicBool::new(true));
251        tokio::task::spawn({
252            let pump_alive = pump_alive.clone();
253            async move {
254                forward_stream(pump_alive, tcp_reader, data_writer).await;
255            }
256        });
257        tokio::task::spawn(async move {
258            forward_stream(pump_alive, data_reader, tcp_writer).await;
259        });
260        let client_stream = client_stream
261            .into_std()
262            .map_err(Error::TcpStreamIntoFailed)?;
263        Ok::<std::net::TcpStream, tor_provider::Error>(client_stream)
264    }
265}
266
267impl TorProvider for ArtiClientTorClient {
268    fn update(&mut self) -> Result<Vec<TorEvent>, tor_provider::Error> {
269        std::thread::sleep(std::time::Duration::from_millis(16));
270        match self.pending_events.lock() {
271            Ok(mut pending_events) => Ok(std::mem::take(pending_events.deref_mut())),
272            Err(_) => {
273                unreachable!("another thread panicked while holding this pending_events mutex")
274            }
275        }
276    }
277
278    fn bootstrap(&mut self) -> Result<(), tor_provider::Error> {
279        // save progress events
280        let mut bootstrap_events = self.arti_client.bootstrap_events();
281        let pending_events = self.pending_events.clone();
282        let bootstrapped = self.bootstrapped.clone();
283        self.tokio_runtime.spawn(async move {
284            while let Some(evt) = bootstrap_events.next().await {
285                if bootstrapped.load(Ordering::Relaxed) {
286                    break;
287                }
288                match pending_events.lock() {
289                    Ok(mut pending_events) => {
290                        pending_events.push(TorEvent::BootstrapStatus {
291                            progress: (evt.as_frac().clamp(0.0f32, 1.0f32) * 100f32) as u32,
292                            tag: "no-tag".to_string(),
293                            summary: "no summary".to_string(),
294                        });
295                        // TODO: properly handle evt.blocked() with a new TorEvent::Error or something
296                    }
297                    Err(_) => unreachable!(
298                        "another thread panicked while holding this pending_events mutex"
299                    ),
300                }
301            }
302        });
303
304        // initiate bootstrap
305        let arti_client = self.arti_client.clone();
306        let pending_events = self.pending_events.clone();
307        let bootstrapped = self.bootstrapped.clone();
308        self.tokio_runtime.spawn(async move {
309            match arti_client.bootstrap().await {
310                Ok(()) => match pending_events.lock() {
311                    Ok(mut pending_events) => {
312                        pending_events.push(TorEvent::BootstrapStatus {
313                            progress: 100,
314                            tag: "no-tag".to_string(),
315                            summary: "no summary".to_string(),
316                        });
317                        pending_events.push(TorEvent::BootstrapComplete);
318                        bootstrapped.store(true, Ordering::Relaxed);
319                        return;
320                    }
321                    Err(_) => unreachable!(
322                        "another thread panicked while holding this pending_events mutex"
323                    ),
324                },
325                Err(_err) => {
326                    // TODO: add an error event to TorEvent
327                }
328            }
329        });
330
331        Ok(())
332    }
333
334    fn add_client_auth(
335        &mut self,
336        service_id: &V3OnionServiceId,
337        client_auth: &X25519PrivateKey,
338    ) -> Result<(), tor_provider::Error> {
339        let ed25519_public = Ed25519PublicKey::from_service_id(service_id).unwrap();
340        let hs_id = ed25519_public.as_bytes().clone();
341
342        self.arti_client.insert_service_discovery_key(KeystoreSelector::Primary, hs_id.into(), client_auth.inner().clone().into()).map_err(Error::ArtiClientError)?;
343
344        Ok(())
345    }
346
347    fn remove_client_auth(
348        &mut self,
349        service_id: &V3OnionServiceId,
350    ) -> Result<(), tor_provider::Error> {
351        let ed25519_public = Ed25519PublicKey::from_service_id(service_id).unwrap();
352        let hs_id = ed25519_public.as_bytes().clone();
353
354        self.arti_client.remove_service_discovery_key(KeystoreSelector::Primary, hs_id.into()).map_err(Error::ArtiClientError)?;
355
356        Ok(())
357    }
358
359    fn connect(
360        &mut self,
361        target: TargetAddr,
362        circuit: Option<CircuitToken>,
363    ) -> Result<OnionStream, tor_provider::Error> {
364        // stream isolation not implemented yet
365        if circuit.is_some() {
366            return Err(Error::NotImplemented().into());
367        }
368
369        let arti_client = self.arti_client.clone();
370        let stream = self.tokio_runtime.block_on({
371            let target = target.clone();
372            async move {
373                Self::connect_impl(target, arti_client).await
374            }
375        })?;
376        Ok(OnionStream {
377            stream,
378            local_addr: None,
379            peer_addr: Some(target),
380        })
381    }
382
383    fn connect_async(
384        &mut self,
385        target: TargetAddr,
386        circuit: Option<CircuitToken>,
387    ) -> Result<ConnectHandle, tor_provider::Error> {
388
389        // stream isolation not implemented yet
390        if circuit.is_some() {
391            return Err(Error::NotImplemented().into());
392        }
393
394        let handle = self.next_connect_handle;
395        self.next_connect_handle += 1usize;
396
397        let arti_client = self.arti_client.clone();
398        let pending_events = Arc::downgrade(&self.pending_events);
399
400        self.tokio_runtime.spawn(async move {
401            let stream = Self::connect_impl(target.clone(), arti_client).await;
402            if let Some(pending_events) = pending_events.upgrade() {
403                let event = match stream {
404                    Ok(stream) => {
405                        let stream = OnionStream {
406                            stream,
407                            local_addr: None,
408                            peer_addr: Some(target),
409                        };
410                        TorEvent::ConnectComplete{
411                            handle,
412                            stream,
413                        }
414                    },
415                    Err(error) => TorEvent::ConnectFailed{
416                        handle,
417                        error,
418                    },
419                };
420                let mut pending_events = pending_events.lock().expect("pending_events mutex poisoned");
421                pending_events.push(event);
422            }
423        });
424
425        Ok(handle)
426    }
427
428    fn listener(
429        &mut self,
430        private_key: &Ed25519PrivateKey,
431        virt_port: u16,
432        authorized_clients: Option<&[X25519PublicKey]>,
433    ) -> Result<OnionListener, tor_provider::Error> {
434
435        // try to bind to a local address, let OS pick our port
436        let socket_addr = SocketAddr::from(([127, 0, 0, 1], 0u16));
437        // TODO: make this one async too
438        let listener =
439            std::net::TcpListener::bind(socket_addr).map_err(Error::TcpListenerBindFailed)?;
440        let socket_addr = listener
441            .local_addr()
442            .map_err(Error::TcpListenerLocalAddrFailed)?;
443
444        // generate a nickname to identify this onion service
445        let service_id = V3OnionServiceId::from_private_key(private_key);
446        let hs_nickname = match HsNickname::new(service_id.to_string()) {
447            Ok(nickname) => nickname,
448            Err(_) => {
449                panic!("v3 onion service id string representation should be a valid HsNickname")
450            }
451        };
452        // generate a new HsIdKeypair (from an Ed25519PrivateKey)
453        // clone() isn't implemented for ExpandedKeypair >:[
454        let secret_key_bytes = private_key.inner().to_secret_key_bytes();
455        let hs_id_keypair = ExpandedKeypair::from_secret_key_bytes(secret_key_bytes)
456            .unwrap();
457
458        // create an OnionServiceConfig with the ephemeral nickname
459        let mut onion_service_config_builder = OnionServiceConfigBuilder::default();
460        onion_service_config_builder
461            .nickname(hs_nickname);
462
463        // add authorised client keys if they exist
464        if let Some(authorized_clients) = authorized_clients {
465            if !authorized_clients.is_empty() {
466                let restricted_discovery_config = onion_service_config_builder
467                    .restricted_discovery();
468                restricted_discovery_config.enabled(true);
469
470                for (i, key) in authorized_clients.iter().enumerate() {
471                    let nickname = format!("client_{i}");
472                    restricted_discovery_config
473                        .static_keys()
474                        .access()
475                        .push((
476                            HsClientNickname::from_str(nickname.as_str()).unwrap(),
477                            key.inner().clone().into(),
478                        ));
479                }
480            }
481        }
482
483        let onion_service_config = match onion_service_config_builder.build()
484        {
485            Ok(onion_service_config) => onion_service_config,
486            Err(err) => Err(err).map_err(Error::OnionServiceConfigBuilderError)?,
487        };
488
489        let (onion_service, mut rend_requests) = self.arti_client
490            .launch_onion_service_with_hsid(onion_service_config, hs_id_keypair.into())
491            .map_err(Error::ArtiClientOnionServiceLaunchError)?;
492
493        // start a task to signal onion service published
494        let pending_events = self.pending_events.clone();
495        let mut status_events = onion_service.status_events();
496        let service_id_clone = service_id.clone();
497
498        self.tokio_runtime.spawn(async move {
499            while let Some(evt) = status_events.next().await {
500                match evt.state() {
501                    tor_hsservice::status::State::Running => match pending_events.lock() {
502                        Ok(mut pending_events) => {
503                            pending_events.push(TorEvent::OnionServicePublished { service_id: service_id_clone });
504                            return;
505                        }
506                        Err(_) => unreachable!(
507                            "another thread panicked while holding this pending_events mutex"
508                        ),
509                    },
510                    _ => (),
511                }
512            }
513        });
514
515        // start a task which accepts every RendRequest to get a StreamRequest
516        self.tokio_runtime.spawn(async move {
517            while let Some(request) = rend_requests.next().await {
518                let mut stream_requests = match request.accept().await {
519                    Ok(stream_requests) => stream_requests,
520                    // TODO: probably not our problem?
521                    _ => return,
522                };
523                // spawn a new task to consume the stream requsts
524                tokio::task::spawn(async move {
525                    while let Some(stream_request) = stream_requests.next().await {
526                        let should_accept =
527                            if let IncomingStreamRequest::Begin(begin) = stream_request.request() {
528                                // we only accept connections on the virt port
529                                begin.port() == virt_port
530                            } else {
531                                false
532                            };
533
534                        if should_accept {
535                            let data_stream =
536                                match stream_request.accept(Connected::new_empty()).await {
537                                    Ok(data_stream) => data_stream,
538                                    // TODO: probably not our problem
539                                    _ => continue,
540                                };
541                            let (data_reader, data_writer) = data_stream.split();
542
543                            let (tcp_reader, tcp_writer) =
544                                match TcpStream::connect(socket_addr).await {
545                                    Ok(tcp_stream) => tcp_stream.into_split(),
546                                    // TODO: possibly our problem?
547                                    _ => continue,
548                                };
549                            // now spawn new tasks to forward traffic to/from the onion listener
550
551                            let pump_alive = Arc::new(AtomicBool::new(true));
552                            // read from connected client and write to local socket
553                            tokio::task::spawn({
554                                let pump_alive = pump_alive.clone();
555                                async move {
556                                    forward_stream(pump_alive, data_reader, tcp_writer).await;
557                                }
558                            });
559                            // read from local socket and write to connected client
560                            tokio::task::spawn(async move {
561                                forward_stream(pump_alive, tcp_reader, data_writer).await;
562                            });
563                        } else {
564                            // either requesting the wrong port or the wrong type of stream request
565                            let _ = stream_request.shutdown_circuit();
566                        }
567                    }
568                });
569            }
570        });
571
572        let onion_addr = OnionAddr::V3(OnionAddrV3::new(service_id, virt_port));
573        // onion-service is torn down when `onion_service` is dropped
574        Ok(OnionListener::new::<Arc<RunningOnionService>>(listener, onion_addr, onion_service, |_|{}))
575    }
576
577    fn generate_token(&mut self) -> CircuitToken {
578        0usize
579    }
580
581    fn release_token(&mut self, _token: CircuitToken) {}
582}