tor_interface/
arti_tor_client.rs

1// std
2use std::collections::BTreeMap;
3use std::ops::DerefMut;
4use std::path::PathBuf;
5use std::sync::{Arc, Mutex};
6use std::time::{Duration, Instant};
7
8// extern
9use arti_rpc_client_core::{RpcConn, RpcConnBuilder};
10
11// internal crates
12use crate::tor_crypto::*;
13use crate::tor_provider;
14use crate::tor_provider::*;
15use crate::arti_process::*;
16
17/// [`ArtiTorClient`]-specific error type
18#[derive(thiserror::Error, Debug)]
19pub enum Error {
20    #[error("failed to create ArtiProcess object: {0}")]
21    ArtiProcessCreationFailed(#[source] crate::arti_process::Error),
22
23    #[error("failed to connect to ArtiProcess after {0:?}")]
24    ArtiRpcConnectFailed(std::time::Duration),
25
26    #[error("arti not bootstrapped")]
27    ArtiNotBootstrapped(),
28
29    #[error("failed to connect: {0}")]
30    ArtiOpenStreamFailed(#[source] arti_rpc_client_core::StreamError),
31
32    #[error("invalid circuit token: {0}")]
33    CircuitTokenInvalid(CircuitToken),
34
35    #[error("not implemented")]
36    NotImplemented(),
37}
38
39impl From<Error> for crate::tor_provider::Error {
40    fn from(error: Error) -> Self {
41        crate::tor_provider::Error::Generic(error.to_string())
42    }
43}
44
45#[derive(Clone, Debug)]
46pub enum ArtiTorClientConfig {
47    BundledArti {
48        arti_bin_path: PathBuf,
49        data_directory: PathBuf,
50    },
51    SystemArti {
52
53    },
54}
55
56pub struct ArtiTorClient {
57    _daemon: Option<ArtiProcess>,
58    rpc_conn: RpcConn,
59    pending_log_lines: Arc<Mutex<Vec<String>>>,
60    pending_events: Arc<Mutex<Vec<TorEvent>>>,
61    bootstrapped: bool,
62    // our list of circuit tokens for the arti daemon
63    circuit_token_counter: usize,
64    circuit_tokens: BTreeMap<CircuitToken, String>,
65}
66
67impl ArtiTorClient {
68    pub fn new(config: ArtiTorClientConfig) -> Result<Self, tor_provider::Error> {
69        let pending_log_lines: Arc<Mutex<Vec<String>>> = Default::default();
70
71        let (daemon, rpc_conn) = match &config {
72            ArtiTorClientConfig::BundledArti {
73                arti_bin_path,
74                data_directory,
75            } => {
76
77                // launch arti
78                let daemon =
79                    ArtiProcess::new(arti_bin_path.as_path(), data_directory.as_path(), Arc::downgrade(&pending_log_lines))
80                        .map_err(Error::ArtiProcessCreationFailed)?;
81
82                let rpc_conn = {
83                    // try to open an rpc connnection for 5 seconds beore giving up
84                    let timeout = Duration::from_secs(5);
85                    let mut rpc_conn: Option<RpcConn> = None;
86
87                    let start = Instant::now();
88                    while rpc_conn.is_none() && start.elapsed() < timeout {
89
90                        let mut builder = RpcConnBuilder::new();
91                        builder.prepend_literal_path(daemon.connect_string().into());
92
93                        rpc_conn = builder.connect().map_or(None, |rpc_conn| Some(rpc_conn));
94                    }
95
96                    if let Some(rpc_conn) = rpc_conn {
97                        rpc_conn
98                    } else {
99                        return Err(Error::ArtiRpcConnectFailed(timeout))?
100                    }
101                };
102
103                (daemon, rpc_conn)
104            },
105            _ => {
106                return Err(Error::NotImplemented().into())
107            }
108        };
109
110        let pending_events = std::vec![TorEvent::LogReceived {
111            line: "Starting arti TorProvider".to_string()
112        }];
113        let pending_events = Arc::new(Mutex::new(pending_events));
114
115        Ok(Self {
116            _daemon: Some(daemon),
117            rpc_conn,
118            pending_log_lines,
119            pending_events,
120            bootstrapped: false,
121            circuit_token_counter: 0,
122            circuit_tokens: Default::default(),
123        })
124    }
125}
126
127impl TorProvider for ArtiTorClient {
128    fn update(&mut self) -> Result<Vec<TorEvent>, tor_provider::Error> {
129        std::thread::sleep(std::time::Duration::from_millis(16));
130        let mut tor_events = match self.pending_events.lock() {
131            Ok(mut pending_events) => std::mem::take(pending_events.deref_mut()),
132            Err(_) => {
133                unreachable!("another thread panicked while holding this pending_events mutex")
134            }
135        };
136        // take our log lines
137        let mut log_lines = match self.pending_log_lines.lock() {
138            Ok(mut pending_log_lines) => std::mem::take(pending_log_lines.deref_mut()),
139            Err(_) => {
140                unreachable!("another thread panicked while holding this pending_log_lines mutex")
141            }
142        };
143
144        // append raw lines as TorEvent
145        for log_line in log_lines.iter_mut() {
146            tor_events.push(TorEvent::LogReceived {
147                line: std::mem::take(log_line),
148            });
149        }
150
151        Ok(tor_events)
152    }
153
154    fn bootstrap(&mut self) -> Result<(), tor_provider::Error> {
155        // TODO: seems no way to start arti without automatically bootstrapping
156        if !self.bootstrapped {
157            match self.pending_events.lock() {
158                Ok(mut pending_events) => {
159                    pending_events.push(TorEvent::BootstrapStatus {
160                        progress: 0,
161                        tag: "no-tag".to_string(),
162                        summary: "no summary".to_string(),
163                    });
164                    pending_events.push(TorEvent::BootstrapStatus {
165                        progress: 100,
166                        tag: "no-tag".to_string(),
167                        summary: "no summary".to_string(),
168                    });
169                    pending_events.push(TorEvent::BootstrapComplete);
170                }
171                Err(_) => unreachable!(
172                    "another thread panicked while holding this pending_events mutex"
173                ),
174            }
175            self.bootstrapped = true;
176        }
177        Ok(())
178    }
179
180    fn add_client_auth(
181        &mut self,
182        _service_id: &V3OnionServiceId,
183        _client_auth: &X25519PrivateKey,
184    ) -> Result<(), tor_provider::Error> {
185        Err(Error::NotImplemented().into())
186    }
187
188    fn remove_client_auth(
189        &mut self,
190        _service_id: &V3OnionServiceId,
191    ) -> Result<(), tor_provider::Error> {
192        Err(Error::NotImplemented().into())
193    }
194
195    fn connect(
196        &mut self,
197        target: TargetAddr,
198        circuit_token: Option<CircuitToken>,
199    ) -> Result<OnionStream, tor_provider::Error> {
200        if !self.bootstrapped {
201            return Err(Error::ArtiNotBootstrapped().into());
202        }
203
204        // convert TargetAddr to (String, u16) tuple
205        let (host, port) = match &target {
206            TargetAddr::Socket(socket_addr) => (format!("{:?}", socket_addr.ip()), socket_addr.port()),
207            TargetAddr::OnionService(OnionAddr::V3(onion_addr)) => (format!("{}.onion", onion_addr.service_id()), onion_addr.virt_port()),
208            TargetAddr::Domain(domain_addr) => (domain_addr.domain().to_string(), domain_addr.port()),
209        };
210
211        // map circuit_token to isolation string for arti
212        let isolation = if let Some(circuit_token) = circuit_token {
213            if let Some(isolation) = self.circuit_tokens.get(&circuit_token) {
214                isolation.as_str()
215            } else {
216                return Err(Error::CircuitTokenInvalid(circuit_token))?;
217            }
218        } else {
219            ""
220        };
221
222        // connect to target
223        let stream = self.rpc_conn.open_stream(None, (host.as_str(), port), isolation)
224            .map_err(Error::ArtiOpenStreamFailed)?;
225
226        Ok(OnionStream {
227            stream,
228            local_addr: None,
229            peer_addr: Some(target),
230        })
231    }
232
233    fn listener(
234        &mut self,
235        _private_key: &Ed25519PrivateKey,
236        _virt_port: u16,
237        _authorized_clients: Option<&[X25519PublicKey]>,
238    ) -> Result<OnionListener, tor_provider::Error> {
239        Err(Error::NotImplemented().into())
240    }
241
242    fn generate_token(&mut self) -> CircuitToken {
243        const ISOLATION_TOKEN_LEN: usize = 32;
244        let new_token = self.circuit_token_counter;
245        self.circuit_token_counter += 1;
246        self.circuit_tokens.insert(
247            new_token,
248            generate_password(ISOLATION_TOKEN_LEN));
249
250        new_token
251    }
252
253    fn release_token(&mut self, token: CircuitToken) {
254        self.circuit_tokens.remove(&token);
255    }
256}