tor_interface/
arti_tor_client.rs

1// std
2use std::collections::BTreeMap;
3use std::ops::DerefMut;
4use std::path::PathBuf;
5use std::sync::{Arc, Mutex};
6use std::time::{Duration, Instant};
7
8// extern
9use arti_rpc_client_core::{RpcConn, RpcConnBuilder};
10
11// internal crates
12use crate::tor_crypto::*;
13use crate::tor_provider;
14use crate::tor_provider::*;
15use crate::arti_process::*;
16
17/// [`ArtiTorClient`]-specific error type
18#[derive(thiserror::Error, Debug)]
19pub enum Error {
20    #[error("failed to create ArtiProcess object: {0}")]
21    ArtiProcessCreationFailed(#[source] crate::arti_process::Error),
22
23    #[error("failed to connect to ArtiProcess after {0:?}")]
24    ArtiRpcConnectFailed(std::time::Duration),
25
26    #[error("arti not bootstrapped")]
27    ArtiNotBootstrapped(),
28
29    #[error("failed to connect: {0}")]
30    ArtiOpenStreamFailed(#[source] arti_rpc_client_core::StreamError),
31
32    #[error("invalid circuit token: {0}")]
33    CircuitTokenInvalid(CircuitToken),
34
35    #[error("failed to spawn connect_async thread")]
36    ConnectAsyncThreadSpawnFailed(#[source] std::io::Error),
37
38    #[error("not implemented")]
39    NotImplemented(),
40}
41
42impl From<Error> for crate::tor_provider::Error {
43    fn from(error: Error) -> Self {
44        crate::tor_provider::Error::Generic(error.to_string())
45    }
46}
47
48#[derive(Clone, Debug)]
49pub enum ArtiTorClientConfig {
50    BundledArti {
51        arti_bin_path: PathBuf,
52        data_directory: PathBuf,
53    },
54    SystemArti {
55
56    },
57}
58
59pub struct ArtiTorClient {
60    _daemon: Option<ArtiProcess>,
61    rpc_conn: Arc<RpcConn>,
62    pending_log_lines: Arc<Mutex<Vec<String>>>,
63    pending_events: Arc<Mutex<Vec<TorEvent>>>,
64    bootstrapped: bool,
65    next_connect_handle: ConnectHandle,
66    // our list of circuit tokens for the arti daemon
67    circuit_token_counter: usize,
68    circuit_tokens: BTreeMap<CircuitToken, String>,
69}
70
71impl ArtiTorClient {
72    pub fn new(config: ArtiTorClientConfig) -> Result<Self, tor_provider::Error> {
73        let pending_log_lines: Arc<Mutex<Vec<String>>> = Default::default();
74
75        let (daemon, rpc_conn) = match &config {
76            ArtiTorClientConfig::BundledArti {
77                arti_bin_path,
78                data_directory,
79            } => {
80
81                // launch arti
82                let daemon =
83                    ArtiProcess::new(arti_bin_path.as_path(), data_directory.as_path(), Arc::downgrade(&pending_log_lines))
84                        .map_err(Error::ArtiProcessCreationFailed)?;
85
86                let rpc_conn = {
87                    // try to open an rpc connnection for 5 seconds beore giving up
88                    let timeout = Duration::from_secs(5);
89                    let mut rpc_conn: Option<RpcConn> = None;
90
91                    let start = Instant::now();
92                    while rpc_conn.is_none() && start.elapsed() < timeout {
93
94                        let mut builder = RpcConnBuilder::new();
95                        builder.prepend_literal_path(daemon.connect_string().into());
96
97                        rpc_conn = builder.connect().map_or(None, |rpc_conn| Some(rpc_conn));
98                    }
99
100                    if let Some(rpc_conn) = rpc_conn {
101                        rpc_conn
102                    } else {
103                        return Err(Error::ArtiRpcConnectFailed(timeout))?
104                    }
105                };
106
107                (daemon, rpc_conn)
108            },
109            _ => {
110                return Err(Error::NotImplemented().into())
111            }
112        };
113
114        let pending_events = std::vec![TorEvent::LogReceived {
115            line: "Starting arti TorProvider".to_string()
116        }];
117        let pending_events = Arc::new(Mutex::new(pending_events));
118
119        Ok(Self {
120            _daemon: Some(daemon),
121            rpc_conn: Arc::new(rpc_conn),
122            pending_log_lines,
123            pending_events,
124            bootstrapped: false,
125            next_connect_handle: Default::default(),
126            circuit_token_counter: 0,
127            circuit_tokens: Default::default(),
128        })
129    }
130
131    fn connect_impl(
132        target: TargetAddr,
133        rpc_conn: &RpcConn,
134        circuit_isolation: &str,
135    ) -> Result<std::net::TcpStream, tor_provider::Error> {
136
137        // convert TargetAddr to (String, u16) tuple
138        let (host, port) = match &target {
139            TargetAddr::Socket(socket_addr) => (format!("{:?}", socket_addr.ip()), socket_addr.port()),
140            TargetAddr::OnionService(OnionAddr::V3(onion_addr)) => (format!("{}.onion", onion_addr.service_id()), onion_addr.virt_port()),
141            TargetAddr::Domain(domain_addr) => (domain_addr.domain().to_string(), domain_addr.port()),
142        };
143
144        // connect to target
145        let stream = rpc_conn.open_stream(None, (host.as_str(), port), circuit_isolation)
146            .map_err(Error::ArtiOpenStreamFailed)?;
147        Ok(stream)
148    }
149}
150
151impl TorProvider for ArtiTorClient {
152    fn update(&mut self) -> Result<Vec<TorEvent>, tor_provider::Error> {
153        std::thread::sleep(std::time::Duration::from_millis(16));
154        let mut tor_events = match self.pending_events.lock() {
155            Ok(mut pending_events) => std::mem::take(pending_events.deref_mut()),
156            Err(_) => {
157                unreachable!("another thread panicked while holding this pending_events mutex")
158            }
159        };
160        // take our log lines
161        let mut log_lines = match self.pending_log_lines.lock() {
162            Ok(mut pending_log_lines) => std::mem::take(pending_log_lines.deref_mut()),
163            Err(_) => {
164                unreachable!("another thread panicked while holding this pending_log_lines mutex")
165            }
166        };
167
168        // append raw lines as TorEvent
169        for log_line in log_lines.iter_mut() {
170            tor_events.push(TorEvent::LogReceived {
171                line: std::mem::take(log_line),
172            });
173        }
174
175        Ok(tor_events)
176    }
177
178    fn bootstrap(&mut self) -> Result<(), tor_provider::Error> {
179        // TODO: seems no way to start arti without automatically bootstrapping
180        if !self.bootstrapped {
181            match self.pending_events.lock() {
182                Ok(mut pending_events) => {
183                    pending_events.push(TorEvent::BootstrapStatus {
184                        progress: 0,
185                        tag: "no-tag".to_string(),
186                        summary: "no summary".to_string(),
187                    });
188                    pending_events.push(TorEvent::BootstrapStatus {
189                        progress: 100,
190                        tag: "no-tag".to_string(),
191                        summary: "no summary".to_string(),
192                    });
193                    pending_events.push(TorEvent::BootstrapComplete);
194                }
195                Err(_) => unreachable!(
196                    "another thread panicked while holding this pending_events mutex"
197                ),
198            }
199            self.bootstrapped = true;
200        }
201        Ok(())
202    }
203
204    fn add_client_auth(
205        &mut self,
206        _service_id: &V3OnionServiceId,
207        _client_auth: &X25519PrivateKey,
208    ) -> Result<(), tor_provider::Error> {
209        Err(Error::NotImplemented().into())
210    }
211
212    fn remove_client_auth(
213        &mut self,
214        _service_id: &V3OnionServiceId,
215    ) -> Result<(), tor_provider::Error> {
216        Err(Error::NotImplemented().into())
217    }
218
219    fn connect(
220        &mut self,
221        target: TargetAddr,
222        circuit_token: Option<CircuitToken>,
223    ) -> Result<OnionStream, tor_provider::Error> {
224        if !self.bootstrapped {
225            return Err(Error::ArtiNotBootstrapped().into());
226        }
227
228        // map circuit_token to isolation string for arti
229        let isolation = if let Some(circuit_token) = circuit_token {
230            if let Some(isolation) = self.circuit_tokens.get(&circuit_token) {
231                isolation.as_str()
232            } else {
233                return Err(Error::CircuitTokenInvalid(circuit_token))?;
234            }
235        } else {
236            ""
237        };
238
239        let stream = Self::connect_impl(target.clone(), self.rpc_conn.as_ref(), isolation)?;
240
241        Ok(OnionStream {
242            stream,
243            local_addr: None,
244            peer_addr: Some(target),
245        })
246    }
247
248    fn connect_async(
249        &mut self,
250        target: TargetAddr,
251        circuit_token: Option<CircuitToken>,
252    ) -> Result<ConnectHandle, tor_provider::Error> {
253
254        // map circuit_token to isolation string for arti
255        let isolation = if let Some(circuit_token) = circuit_token {
256            if let Some(isolation) = self.circuit_tokens.get(&circuit_token) {
257                isolation.as_str()
258            } else {
259                return Err(Error::CircuitTokenInvalid(circuit_token))?;
260            }
261        } else {
262            ""
263        }.to_string();
264
265        let handle = self.next_connect_handle;
266        self.next_connect_handle += 1usize;
267
268        let rpc_conn = Arc::downgrade(&self.rpc_conn);
269        let pending_events = Arc::downgrade(&self.pending_events);
270
271        // open connection on background thread
272        std::thread::Builder::new()
273            .spawn(move || {
274                if let Some(rpc_conn) = rpc_conn.upgrade() {
275                    let stream = Self::connect_impl(target.clone(), &rpc_conn, isolation.as_str());
276                    if let Some(pending_events) = pending_events.upgrade() {
277                        let event = match stream {
278                            Ok(stream) => {
279                                let stream = OnionStream {
280                                    stream,
281                                    local_addr: None,
282                                    peer_addr: Some(target),
283                                };
284                                TorEvent::ConnectComplete{
285                                    handle,
286                                    stream,
287                                }
288                            },
289                            Err(error) => TorEvent::ConnectFailed{
290                                handle,
291                                error,
292                            },
293                        };
294                        let mut pending_events = pending_events.lock().expect("async_events mutex poisoned");
295                        pending_events.push(event);
296                    }
297                }
298            }).map_err(Error::ConnectAsyncThreadSpawnFailed)?;
299        Ok(handle)
300    }
301
302    fn listener(
303        &mut self,
304        _private_key: &Ed25519PrivateKey,
305        _virt_port: u16,
306        _authorized_clients: Option<&[X25519PublicKey]>,
307    ) -> Result<OnionListener, tor_provider::Error> {
308        Err(Error::NotImplemented().into())
309    }
310
311    fn generate_token(&mut self) -> CircuitToken {
312        const ISOLATION_TOKEN_LEN: usize = 32;
313        let new_token = self.circuit_token_counter;
314        self.circuit_token_counter += 1;
315        self.circuit_tokens.insert(
316            new_token,
317            generate_password(ISOLATION_TOKEN_LEN));
318
319        new_token
320    }
321
322    fn release_token(&mut self, token: CircuitToken) {
323        self.circuit_tokens.remove(&token);
324    }
325}