1use std::collections::BTreeMap;
3use std::ops::DerefMut;
4use std::path::PathBuf;
5use std::sync::{Arc, Mutex};
6use std::time::{Duration, Instant};
7
8use arti_rpc_client_core::{RpcConn, RpcConnBuilder};
10
11use crate::arti_process::*;
13use crate::tor_crypto::*;
14use crate::tor_provider;
15use crate::tor_provider::*;
16
17#[derive(thiserror::Error, Debug)]
19pub enum Error {
20 #[error("failed to create ArtiProcess object: {0}")]
21 ArtiProcessCreationFailed(#[source] crate::arti_process::Error),
22
23 #[error("failed to connect to ArtiProcess after {0:?}")]
24 ArtiRpcConnectFailed(std::time::Duration),
25
26 #[error("arti not bootstrapped")]
27 ArtiNotBootstrapped(),
28
29 #[error("failed to connect: {0}")]
30 ArtiOpenStreamFailed(#[source] arti_rpc_client_core::StreamError),
31
32 #[error("invalid circuit token: {0}")]
33 CircuitTokenInvalid(CircuitToken),
34
35 #[error("failed to spawn connect_async thread")]
36 ConnectAsyncThreadSpawnFailed(#[source] std::io::Error),
37
38 #[error("not implemented")]
39 NotImplemented(),
40}
41
42impl From<Error> for crate::tor_provider::Error {
43 fn from(error: Error) -> Self {
44 crate::tor_provider::Error::Generic(error.to_string())
45 }
46}
47
48#[derive(Clone, Debug)]
49pub enum ArtiTorClientConfig {
50 BundledArti {
51 arti_bin_path: PathBuf,
52 data_directory: PathBuf,
53 },
54 SystemArti {},
55}
56
57pub struct ArtiTorClient {
58 _daemon: Option<ArtiProcess>,
59 rpc_conn: Arc<RpcConn>,
60 pending_log_lines: Arc<Mutex<Vec<String>>>,
61 pending_events: Arc<Mutex<Vec<TorEvent>>>,
62 bootstrapped: bool,
63 next_connect_handle: ConnectHandle,
64 circuit_token_counter: usize,
66 circuit_tokens: BTreeMap<CircuitToken, String>,
67}
68
69impl ArtiTorClient {
70 pub fn new(config: ArtiTorClientConfig) -> Result<Self, tor_provider::Error> {
71 let pending_log_lines: Arc<Mutex<Vec<String>>> = Default::default();
72
73 let (daemon, rpc_conn) = match &config {
74 ArtiTorClientConfig::BundledArti {
75 arti_bin_path,
76 data_directory,
77 } => {
78 let daemon = ArtiProcess::new(
80 arti_bin_path.as_path(),
81 data_directory.as_path(),
82 Arc::downgrade(&pending_log_lines),
83 )
84 .map_err(Error::ArtiProcessCreationFailed)?;
85
86 let rpc_conn = {
87 let timeout = Duration::from_secs(5);
89 let mut rpc_conn: Option<RpcConn> = None;
90
91 let start = Instant::now();
92 while rpc_conn.is_none() && start.elapsed() < timeout {
93 let mut builder = RpcConnBuilder::new();
94 builder.prepend_literal_path(daemon.connect_string().into());
95
96 rpc_conn = builder.connect().map_or(None, |rpc_conn| Some(rpc_conn));
97 }
98
99 if let Some(rpc_conn) = rpc_conn {
100 rpc_conn
101 } else {
102 return Err(Error::ArtiRpcConnectFailed(timeout))?;
103 }
104 };
105
106 (daemon, rpc_conn)
107 }
108 _ => return Err(Error::NotImplemented().into()),
109 };
110
111 let pending_events = std::vec![TorEvent::LogReceived {
112 line: "Starting arti TorProvider".to_string()
113 }];
114 let pending_events = Arc::new(Mutex::new(pending_events));
115
116 Ok(Self {
117 _daemon: Some(daemon),
118 rpc_conn: Arc::new(rpc_conn),
119 pending_log_lines,
120 pending_events,
121 bootstrapped: false,
122 next_connect_handle: Default::default(),
123 circuit_token_counter: 0,
124 circuit_tokens: Default::default(),
125 })
126 }
127
128 fn connect_impl(
129 target: TargetAddr,
130 rpc_conn: &RpcConn,
131 circuit_isolation: &str,
132 ) -> Result<std::net::TcpStream, tor_provider::Error> {
133 let (host, port) = match &target {
135 TargetAddr::Socket(socket_addr) => {
136 (format!("{:?}", socket_addr.ip()), socket_addr.port())
137 }
138 TargetAddr::OnionService(OnionAddr::V3(onion_addr)) => (
139 format!("{}.onion", onion_addr.service_id()),
140 onion_addr.virt_port(),
141 ),
142 TargetAddr::Domain(domain_addr) => {
143 (domain_addr.domain().to_string(), domain_addr.port())
144 }
145 };
146
147 let stream = rpc_conn
149 .open_stream(None, (host.as_str(), port), circuit_isolation)
150 .map_err(Error::ArtiOpenStreamFailed)?;
151 Ok(stream)
152 }
153}
154
155impl TorProvider for ArtiTorClient {
156 fn update(&mut self) -> Result<Vec<TorEvent>, tor_provider::Error> {
157 std::thread::sleep(std::time::Duration::from_millis(16));
158 let mut tor_events = match self.pending_events.lock() {
159 Ok(mut pending_events) => std::mem::take(pending_events.deref_mut()),
160 Err(_) => {
161 unreachable!("another thread panicked while holding this pending_events mutex")
162 }
163 };
164 let mut log_lines = match self.pending_log_lines.lock() {
166 Ok(mut pending_log_lines) => std::mem::take(pending_log_lines.deref_mut()),
167 Err(_) => {
168 unreachable!("another thread panicked while holding this pending_log_lines mutex")
169 }
170 };
171
172 for log_line in log_lines.iter_mut() {
174 tor_events.push(TorEvent::LogReceived {
175 line: std::mem::take(log_line),
176 });
177 }
178
179 Ok(tor_events)
180 }
181
182 fn bootstrap(&mut self) -> Result<(), tor_provider::Error> {
183 if !self.bootstrapped {
185 match self.pending_events.lock() {
186 Ok(mut pending_events) => {
187 pending_events.push(TorEvent::BootstrapStatus {
188 progress: 0,
189 tag: "no-tag".to_string(),
190 summary: "no summary".to_string(),
191 });
192 pending_events.push(TorEvent::BootstrapStatus {
193 progress: 100,
194 tag: "no-tag".to_string(),
195 summary: "no summary".to_string(),
196 });
197 pending_events.push(TorEvent::BootstrapComplete);
198 }
199 Err(_) => {
200 unreachable!("another thread panicked while holding this pending_events mutex")
201 }
202 }
203 self.bootstrapped = true;
204 }
205 Ok(())
206 }
207
208 fn add_client_auth(
209 &mut self,
210 _service_id: &V3OnionServiceId,
211 _client_auth: &X25519PrivateKey,
212 ) -> Result<(), tor_provider::Error> {
213 Err(Error::NotImplemented().into())
214 }
215
216 fn remove_client_auth(
217 &mut self,
218 _service_id: &V3OnionServiceId,
219 ) -> Result<(), tor_provider::Error> {
220 Err(Error::NotImplemented().into())
221 }
222
223 fn connect(
224 &mut self,
225 target: TargetAddr,
226 circuit_token: Option<CircuitToken>,
227 ) -> Result<OnionStream, tor_provider::Error> {
228 if !self.bootstrapped {
229 return Err(Error::ArtiNotBootstrapped().into());
230 }
231
232 let isolation = if let Some(circuit_token) = circuit_token {
234 if let Some(isolation) = self.circuit_tokens.get(&circuit_token) {
235 isolation.as_str()
236 } else {
237 return Err(Error::CircuitTokenInvalid(circuit_token))?;
238 }
239 } else {
240 ""
241 };
242
243 let stream = Self::connect_impl(target.clone(), self.rpc_conn.as_ref(), isolation)?;
244
245 Ok(OnionStream {
246 stream,
247 local_addr: None,
248 peer_addr: Some(target),
249 })
250 }
251
252 fn connect_async(
253 &mut self,
254 target: TargetAddr,
255 circuit_token: Option<CircuitToken>,
256 ) -> Result<ConnectHandle, tor_provider::Error> {
257 let isolation = if let Some(circuit_token) = circuit_token {
259 if let Some(isolation) = self.circuit_tokens.get(&circuit_token) {
260 isolation.as_str()
261 } else {
262 return Err(Error::CircuitTokenInvalid(circuit_token))?;
263 }
264 } else {
265 ""
266 }
267 .to_string();
268
269 let handle = self.next_connect_handle;
270 self.next_connect_handle += 1usize;
271
272 let rpc_conn = Arc::downgrade(&self.rpc_conn);
273 let pending_events = Arc::downgrade(&self.pending_events);
274
275 std::thread::Builder::new()
277 .spawn(move || {
278 if let Some(rpc_conn) = rpc_conn.upgrade() {
279 let stream = Self::connect_impl(target.clone(), &rpc_conn, isolation.as_str());
280 if let Some(pending_events) = pending_events.upgrade() {
281 let event = match stream {
282 Ok(stream) => {
283 let stream = OnionStream {
284 stream,
285 local_addr: None,
286 peer_addr: Some(target),
287 };
288 TorEvent::ConnectComplete { handle, stream }
289 }
290 Err(error) => TorEvent::ConnectFailed { handle, error },
291 };
292 let mut pending_events =
293 pending_events.lock().expect("async_events mutex poisoned");
294 pending_events.push(event);
295 }
296 }
297 })
298 .map_err(Error::ConnectAsyncThreadSpawnFailed)?;
299 Ok(handle)
300 }
301
302 fn listener(
303 &mut self,
304 _private_key: &Ed25519PrivateKey,
305 _virt_port: u16,
306 _authorized_clients: Option<&[X25519PublicKey]>,
307 ) -> Result<OnionListener, tor_provider::Error> {
308 Err(Error::NotImplemented().into())
309 }
310
311 fn generate_token(&mut self) -> CircuitToken {
312 const ISOLATION_TOKEN_LEN: usize = 32;
313 let new_token = self.circuit_token_counter;
314 self.circuit_token_counter += 1;
315 self.circuit_tokens
316 .insert(new_token, generate_password(ISOLATION_TOKEN_LEN));
317
318 new_token
319 }
320
321 fn release_token(&mut self, token: CircuitToken) {
322 self.circuit_tokens.remove(&token);
323 }
324}