tor_interface/
arti_tor_client.rs

1// std
2use std::collections::BTreeMap;
3use std::ops::DerefMut;
4use std::path::PathBuf;
5use std::sync::{Arc, Mutex};
6use std::time::{Duration, Instant};
7
8// extern
9use arti_rpc_client_core::{RpcConn, RpcConnBuilder};
10
11// internal crates
12use crate::arti_process::*;
13use crate::tor_crypto::*;
14use crate::tor_provider;
15use crate::tor_provider::*;
16
17/// [`ArtiTorClient`]-specific error type
18#[derive(thiserror::Error, Debug)]
19pub enum Error {
20    #[error("failed to create ArtiProcess object: {0}")]
21    ArtiProcessCreationFailed(#[source] crate::arti_process::Error),
22
23    #[error("failed to connect to ArtiProcess after {0:?}")]
24    ArtiRpcConnectFailed(std::time::Duration),
25
26    #[error("arti not bootstrapped")]
27    ArtiNotBootstrapped(),
28
29    #[error("failed to connect: {0}")]
30    ArtiOpenStreamFailed(#[source] arti_rpc_client_core::StreamError),
31
32    #[error("invalid circuit token: {0}")]
33    CircuitTokenInvalid(CircuitToken),
34
35    #[error("failed to spawn connect_async thread")]
36    ConnectAsyncThreadSpawnFailed(#[source] std::io::Error),
37
38    #[error("not implemented")]
39    NotImplemented(),
40}
41
42impl From<Error> for crate::tor_provider::Error {
43    fn from(error: Error) -> Self {
44        crate::tor_provider::Error::Generic(error.to_string())
45    }
46}
47
48#[derive(Clone, Debug)]
49pub enum ArtiTorClientConfig {
50    BundledArti {
51        arti_bin_path: PathBuf,
52        data_directory: PathBuf,
53    },
54    SystemArti {},
55}
56
57pub struct ArtiTorClient {
58    _daemon: Option<ArtiProcess>,
59    rpc_conn: Arc<RpcConn>,
60    pending_log_lines: Arc<Mutex<Vec<String>>>,
61    pending_events: Arc<Mutex<Vec<TorEvent>>>,
62    bootstrapped: bool,
63    next_connect_handle: ConnectHandle,
64    // our list of circuit tokens for the arti daemon
65    circuit_token_counter: usize,
66    circuit_tokens: BTreeMap<CircuitToken, String>,
67}
68
69impl ArtiTorClient {
70    pub fn new(config: ArtiTorClientConfig) -> Result<Self, tor_provider::Error> {
71        let pending_log_lines: Arc<Mutex<Vec<String>>> = Default::default();
72
73        let (daemon, rpc_conn) = match &config {
74            ArtiTorClientConfig::BundledArti {
75                arti_bin_path,
76                data_directory,
77            } => {
78                // launch arti
79                let daemon = ArtiProcess::new(
80                    arti_bin_path.as_path(),
81                    data_directory.as_path(),
82                    Arc::downgrade(&pending_log_lines),
83                )
84                .map_err(Error::ArtiProcessCreationFailed)?;
85
86                let rpc_conn = {
87                    // try to open an rpc connnection for 5 seconds beore giving up
88                    let timeout = Duration::from_secs(5);
89                    let mut rpc_conn: Option<RpcConn> = None;
90
91                    let start = Instant::now();
92                    while rpc_conn.is_none() && start.elapsed() < timeout {
93                        let mut builder = RpcConnBuilder::new();
94                        builder.prepend_literal_path(daemon.connect_string().into());
95
96                        rpc_conn = builder.connect().map_or(None, |rpc_conn| Some(rpc_conn));
97                    }
98
99                    if let Some(rpc_conn) = rpc_conn {
100                        rpc_conn
101                    } else {
102                        return Err(Error::ArtiRpcConnectFailed(timeout))?;
103                    }
104                };
105
106                (daemon, rpc_conn)
107            }
108            _ => return Err(Error::NotImplemented().into()),
109        };
110
111        let pending_events = std::vec![TorEvent::LogReceived {
112            line: "Starting arti TorProvider".to_string()
113        }];
114        let pending_events = Arc::new(Mutex::new(pending_events));
115
116        Ok(Self {
117            _daemon: Some(daemon),
118            rpc_conn: Arc::new(rpc_conn),
119            pending_log_lines,
120            pending_events,
121            bootstrapped: false,
122            next_connect_handle: Default::default(),
123            circuit_token_counter: 0,
124            circuit_tokens: Default::default(),
125        })
126    }
127
128    fn connect_impl(
129        target: TargetAddr,
130        rpc_conn: &RpcConn,
131        circuit_isolation: &str,
132    ) -> Result<std::net::TcpStream, tor_provider::Error> {
133        // convert TargetAddr to (String, u16) tuple
134        let (host, port) = match &target {
135            TargetAddr::Socket(socket_addr) => {
136                (format!("{:?}", socket_addr.ip()), socket_addr.port())
137            }
138            TargetAddr::OnionService(OnionAddr::V3(onion_addr)) => (
139                format!("{}.onion", onion_addr.service_id()),
140                onion_addr.virt_port(),
141            ),
142            TargetAddr::Domain(domain_addr) => {
143                (domain_addr.domain().to_string(), domain_addr.port())
144            }
145        };
146
147        // connect to target
148        let stream = rpc_conn
149            .open_stream(None, (host.as_str(), port), circuit_isolation)
150            .map_err(Error::ArtiOpenStreamFailed)?;
151        Ok(stream)
152    }
153}
154
155impl TorProvider for ArtiTorClient {
156    fn update(&mut self) -> Result<Vec<TorEvent>, tor_provider::Error> {
157        std::thread::sleep(std::time::Duration::from_millis(16));
158        let mut tor_events = match self.pending_events.lock() {
159            Ok(mut pending_events) => std::mem::take(pending_events.deref_mut()),
160            Err(_) => {
161                unreachable!("another thread panicked while holding this pending_events mutex")
162            }
163        };
164        // take our log lines
165        let mut log_lines = match self.pending_log_lines.lock() {
166            Ok(mut pending_log_lines) => std::mem::take(pending_log_lines.deref_mut()),
167            Err(_) => {
168                unreachable!("another thread panicked while holding this pending_log_lines mutex")
169            }
170        };
171
172        // append raw lines as TorEvent
173        for log_line in log_lines.iter_mut() {
174            tor_events.push(TorEvent::LogReceived {
175                line: std::mem::take(log_line),
176            });
177        }
178
179        Ok(tor_events)
180    }
181
182    fn bootstrap(&mut self) -> Result<(), tor_provider::Error> {
183        // TODO: seems no way to start arti without automatically bootstrapping
184        if !self.bootstrapped {
185            match self.pending_events.lock() {
186                Ok(mut pending_events) => {
187                    pending_events.push(TorEvent::BootstrapStatus {
188                        progress: 0,
189                        tag: "no-tag".to_string(),
190                        summary: "no summary".to_string(),
191                    });
192                    pending_events.push(TorEvent::BootstrapStatus {
193                        progress: 100,
194                        tag: "no-tag".to_string(),
195                        summary: "no summary".to_string(),
196                    });
197                    pending_events.push(TorEvent::BootstrapComplete);
198                }
199                Err(_) => {
200                    unreachable!("another thread panicked while holding this pending_events mutex")
201                }
202            }
203            self.bootstrapped = true;
204        }
205        Ok(())
206    }
207
208    fn add_client_auth(
209        &mut self,
210        _service_id: &V3OnionServiceId,
211        _client_auth: &X25519PrivateKey,
212    ) -> Result<(), tor_provider::Error> {
213        Err(Error::NotImplemented().into())
214    }
215
216    fn remove_client_auth(
217        &mut self,
218        _service_id: &V3OnionServiceId,
219    ) -> Result<(), tor_provider::Error> {
220        Err(Error::NotImplemented().into())
221    }
222
223    fn connect(
224        &mut self,
225        target: TargetAddr,
226        circuit_token: Option<CircuitToken>,
227    ) -> Result<OnionStream, tor_provider::Error> {
228        if !self.bootstrapped {
229            return Err(Error::ArtiNotBootstrapped().into());
230        }
231
232        // map circuit_token to isolation string for arti
233        let isolation = if let Some(circuit_token) = circuit_token {
234            if let Some(isolation) = self.circuit_tokens.get(&circuit_token) {
235                isolation.as_str()
236            } else {
237                return Err(Error::CircuitTokenInvalid(circuit_token))?;
238            }
239        } else {
240            ""
241        };
242
243        let stream = Self::connect_impl(target.clone(), self.rpc_conn.as_ref(), isolation)?;
244
245        Ok(OnionStream {
246            stream,
247            local_addr: None,
248            peer_addr: Some(target),
249        })
250    }
251
252    fn connect_async(
253        &mut self,
254        target: TargetAddr,
255        circuit_token: Option<CircuitToken>,
256    ) -> Result<ConnectHandle, tor_provider::Error> {
257        // map circuit_token to isolation string for arti
258        let isolation = if let Some(circuit_token) = circuit_token {
259            if let Some(isolation) = self.circuit_tokens.get(&circuit_token) {
260                isolation.as_str()
261            } else {
262                return Err(Error::CircuitTokenInvalid(circuit_token))?;
263            }
264        } else {
265            ""
266        }
267        .to_string();
268
269        let handle = self.next_connect_handle;
270        self.next_connect_handle += 1usize;
271
272        let rpc_conn = Arc::downgrade(&self.rpc_conn);
273        let pending_events = Arc::downgrade(&self.pending_events);
274
275        // open connection on background thread
276        std::thread::Builder::new()
277            .spawn(move || {
278                if let Some(rpc_conn) = rpc_conn.upgrade() {
279                    let stream = Self::connect_impl(target.clone(), &rpc_conn, isolation.as_str());
280                    if let Some(pending_events) = pending_events.upgrade() {
281                        let event = match stream {
282                            Ok(stream) => {
283                                let stream = OnionStream {
284                                    stream,
285                                    local_addr: None,
286                                    peer_addr: Some(target),
287                                };
288                                TorEvent::ConnectComplete { handle, stream }
289                            }
290                            Err(error) => TorEvent::ConnectFailed { handle, error },
291                        };
292                        let mut pending_events =
293                            pending_events.lock().expect("async_events mutex poisoned");
294                        pending_events.push(event);
295                    }
296                }
297            })
298            .map_err(Error::ConnectAsyncThreadSpawnFailed)?;
299        Ok(handle)
300    }
301
302    fn listener(
303        &mut self,
304        _private_key: &Ed25519PrivateKey,
305        _virt_port: u16,
306        _authorized_clients: Option<&[X25519PublicKey]>,
307    ) -> Result<OnionListener, tor_provider::Error> {
308        Err(Error::NotImplemented().into())
309    }
310
311    fn generate_token(&mut self) -> CircuitToken {
312        const ISOLATION_TOKEN_LEN: usize = 32;
313        let new_token = self.circuit_token_counter;
314        self.circuit_token_counter += 1;
315        self.circuit_tokens
316            .insert(new_token, generate_password(ISOLATION_TOKEN_LEN));
317
318        new_token
319    }
320
321    fn release_token(&mut self, token: CircuitToken) {
322        self.circuit_tokens.remove(&token);
323    }
324}