1use std::collections::BTreeMap;
3use std::ops::DerefMut;
4use std::path::PathBuf;
5use std::sync::{Arc, Mutex};
6use std::time::{Duration, Instant};
7
8use arti_rpc_client_core::{RpcConn, RpcConnBuilder};
10
11use crate::tor_crypto::*;
13use crate::tor_provider;
14use crate::tor_provider::*;
15use crate::arti_process::*;
16
17#[derive(thiserror::Error, Debug)]
19pub enum Error {
20 #[error("failed to create ArtiProcess object: {0}")]
21 ArtiProcessCreationFailed(#[source] crate::arti_process::Error),
22
23 #[error("failed to connect to ArtiProcess after {0:?}")]
24 ArtiRpcConnectFailed(std::time::Duration),
25
26 #[error("arti not bootstrapped")]
27 ArtiNotBootstrapped(),
28
29 #[error("failed to connect: {0}")]
30 ArtiOpenStreamFailed(#[source] arti_rpc_client_core::StreamError),
31
32 #[error("invalid circuit token: {0}")]
33 CircuitTokenInvalid(CircuitToken),
34
35 #[error("failed to spawn connect_async thread")]
36 ConnectAsyncThreadSpawnFailed(#[source] std::io::Error),
37
38 #[error("not implemented")]
39 NotImplemented(),
40}
41
42impl From<Error> for crate::tor_provider::Error {
43 fn from(error: Error) -> Self {
44 crate::tor_provider::Error::Generic(error.to_string())
45 }
46}
47
48#[derive(Clone, Debug)]
49pub enum ArtiTorClientConfig {
50 BundledArti {
51 arti_bin_path: PathBuf,
52 data_directory: PathBuf,
53 },
54 SystemArti {
55
56 },
57}
58
59pub struct ArtiTorClient {
60 _daemon: Option<ArtiProcess>,
61 rpc_conn: Arc<RpcConn>,
62 pending_log_lines: Arc<Mutex<Vec<String>>>,
63 pending_events: Arc<Mutex<Vec<TorEvent>>>,
64 bootstrapped: bool,
65 next_connect_handle: ConnectHandle,
66 circuit_token_counter: usize,
68 circuit_tokens: BTreeMap<CircuitToken, String>,
69}
70
71impl ArtiTorClient {
72 pub fn new(config: ArtiTorClientConfig) -> Result<Self, tor_provider::Error> {
73 let pending_log_lines: Arc<Mutex<Vec<String>>> = Default::default();
74
75 let (daemon, rpc_conn) = match &config {
76 ArtiTorClientConfig::BundledArti {
77 arti_bin_path,
78 data_directory,
79 } => {
80
81 let daemon =
83 ArtiProcess::new(arti_bin_path.as_path(), data_directory.as_path(), Arc::downgrade(&pending_log_lines))
84 .map_err(Error::ArtiProcessCreationFailed)?;
85
86 let rpc_conn = {
87 let timeout = Duration::from_secs(5);
89 let mut rpc_conn: Option<RpcConn> = None;
90
91 let start = Instant::now();
92 while rpc_conn.is_none() && start.elapsed() < timeout {
93
94 let mut builder = RpcConnBuilder::new();
95 builder.prepend_literal_path(daemon.connect_string().into());
96
97 rpc_conn = builder.connect().map_or(None, |rpc_conn| Some(rpc_conn));
98 }
99
100 if let Some(rpc_conn) = rpc_conn {
101 rpc_conn
102 } else {
103 return Err(Error::ArtiRpcConnectFailed(timeout))?
104 }
105 };
106
107 (daemon, rpc_conn)
108 },
109 _ => {
110 return Err(Error::NotImplemented().into())
111 }
112 };
113
114 let pending_events = std::vec![TorEvent::LogReceived {
115 line: "Starting arti TorProvider".to_string()
116 }];
117 let pending_events = Arc::new(Mutex::new(pending_events));
118
119 Ok(Self {
120 _daemon: Some(daemon),
121 rpc_conn: Arc::new(rpc_conn),
122 pending_log_lines,
123 pending_events,
124 bootstrapped: false,
125 next_connect_handle: Default::default(),
126 circuit_token_counter: 0,
127 circuit_tokens: Default::default(),
128 })
129 }
130
131 fn connect_impl(
132 target: TargetAddr,
133 rpc_conn: &RpcConn,
134 circuit_isolation: &str,
135 ) -> Result<std::net::TcpStream, tor_provider::Error> {
136
137 let (host, port) = match &target {
139 TargetAddr::Socket(socket_addr) => (format!("{:?}", socket_addr.ip()), socket_addr.port()),
140 TargetAddr::OnionService(OnionAddr::V3(onion_addr)) => (format!("{}.onion", onion_addr.service_id()), onion_addr.virt_port()),
141 TargetAddr::Domain(domain_addr) => (domain_addr.domain().to_string(), domain_addr.port()),
142 };
143
144 let stream = rpc_conn.open_stream(None, (host.as_str(), port), circuit_isolation)
146 .map_err(Error::ArtiOpenStreamFailed)?;
147 Ok(stream)
148 }
149}
150
151impl TorProvider for ArtiTorClient {
152 fn update(&mut self) -> Result<Vec<TorEvent>, tor_provider::Error> {
153 std::thread::sleep(std::time::Duration::from_millis(16));
154 let mut tor_events = match self.pending_events.lock() {
155 Ok(mut pending_events) => std::mem::take(pending_events.deref_mut()),
156 Err(_) => {
157 unreachable!("another thread panicked while holding this pending_events mutex")
158 }
159 };
160 let mut log_lines = match self.pending_log_lines.lock() {
162 Ok(mut pending_log_lines) => std::mem::take(pending_log_lines.deref_mut()),
163 Err(_) => {
164 unreachable!("another thread panicked while holding this pending_log_lines mutex")
165 }
166 };
167
168 for log_line in log_lines.iter_mut() {
170 tor_events.push(TorEvent::LogReceived {
171 line: std::mem::take(log_line),
172 });
173 }
174
175 Ok(tor_events)
176 }
177
178 fn bootstrap(&mut self) -> Result<(), tor_provider::Error> {
179 if !self.bootstrapped {
181 match self.pending_events.lock() {
182 Ok(mut pending_events) => {
183 pending_events.push(TorEvent::BootstrapStatus {
184 progress: 0,
185 tag: "no-tag".to_string(),
186 summary: "no summary".to_string(),
187 });
188 pending_events.push(TorEvent::BootstrapStatus {
189 progress: 100,
190 tag: "no-tag".to_string(),
191 summary: "no summary".to_string(),
192 });
193 pending_events.push(TorEvent::BootstrapComplete);
194 }
195 Err(_) => unreachable!(
196 "another thread panicked while holding this pending_events mutex"
197 ),
198 }
199 self.bootstrapped = true;
200 }
201 Ok(())
202 }
203
204 fn add_client_auth(
205 &mut self,
206 _service_id: &V3OnionServiceId,
207 _client_auth: &X25519PrivateKey,
208 ) -> Result<(), tor_provider::Error> {
209 Err(Error::NotImplemented().into())
210 }
211
212 fn remove_client_auth(
213 &mut self,
214 _service_id: &V3OnionServiceId,
215 ) -> Result<(), tor_provider::Error> {
216 Err(Error::NotImplemented().into())
217 }
218
219 fn connect(
220 &mut self,
221 target: TargetAddr,
222 circuit_token: Option<CircuitToken>,
223 ) -> Result<OnionStream, tor_provider::Error> {
224 if !self.bootstrapped {
225 return Err(Error::ArtiNotBootstrapped().into());
226 }
227
228 let isolation = if let Some(circuit_token) = circuit_token {
230 if let Some(isolation) = self.circuit_tokens.get(&circuit_token) {
231 isolation.as_str()
232 } else {
233 return Err(Error::CircuitTokenInvalid(circuit_token))?;
234 }
235 } else {
236 ""
237 };
238
239 let stream = Self::connect_impl(target.clone(), self.rpc_conn.as_ref(), isolation)?;
240
241 Ok(OnionStream {
242 stream,
243 local_addr: None,
244 peer_addr: Some(target),
245 })
246 }
247
248 fn connect_async(
249 &mut self,
250 target: TargetAddr,
251 circuit_token: Option<CircuitToken>,
252 ) -> Result<ConnectHandle, tor_provider::Error> {
253
254 let isolation = if let Some(circuit_token) = circuit_token {
256 if let Some(isolation) = self.circuit_tokens.get(&circuit_token) {
257 isolation.as_str()
258 } else {
259 return Err(Error::CircuitTokenInvalid(circuit_token))?;
260 }
261 } else {
262 ""
263 }.to_string();
264
265 let handle = self.next_connect_handle;
266 self.next_connect_handle += 1usize;
267
268 let rpc_conn = Arc::downgrade(&self.rpc_conn);
269 let pending_events = Arc::downgrade(&self.pending_events);
270
271 std::thread::Builder::new()
273 .spawn(move || {
274 if let Some(rpc_conn) = rpc_conn.upgrade() {
275 let stream = Self::connect_impl(target.clone(), &rpc_conn, isolation.as_str());
276 if let Some(pending_events) = pending_events.upgrade() {
277 let event = match stream {
278 Ok(stream) => {
279 let stream = OnionStream {
280 stream,
281 local_addr: None,
282 peer_addr: Some(target),
283 };
284 TorEvent::ConnectComplete{
285 handle,
286 stream,
287 }
288 },
289 Err(error) => TorEvent::ConnectFailed{
290 handle,
291 error,
292 },
293 };
294 let mut pending_events = pending_events.lock().expect("async_events mutex poisoned");
295 pending_events.push(event);
296 }
297 }
298 }).map_err(Error::ConnectAsyncThreadSpawnFailed)?;
299 Ok(handle)
300 }
301
302 fn listener(
303 &mut self,
304 _private_key: &Ed25519PrivateKey,
305 _virt_port: u16,
306 _authorized_clients: Option<&[X25519PublicKey]>,
307 ) -> Result<OnionListener, tor_provider::Error> {
308 Err(Error::NotImplemented().into())
309 }
310
311 fn generate_token(&mut self) -> CircuitToken {
312 const ISOLATION_TOKEN_LEN: usize = 32;
313 let new_token = self.circuit_token_counter;
314 self.circuit_token_counter += 1;
315 self.circuit_tokens.insert(
316 new_token,
317 generate_password(ISOLATION_TOKEN_LEN));
318
319 new_token
320 }
321
322 fn release_token(&mut self, token: CircuitToken) {
323 self.circuit_tokens.remove(&token);
324 }
325}