tor_interface/
arti_tor_client.rs1use std::collections::BTreeMap;
3use std::ops::DerefMut;
4use std::path::PathBuf;
5use std::sync::{Arc, Mutex};
6use std::time::{Duration, Instant};
7
8use arti_rpc_client_core::{RpcConn, RpcConnBuilder};
10
11use crate::tor_crypto::*;
13use crate::tor_provider;
14use crate::tor_provider::*;
15use crate::arti_process::*;
16
17#[derive(thiserror::Error, Debug)]
19pub enum Error {
20 #[error("failed to create ArtiProcess object: {0}")]
21 ArtiProcessCreationFailed(#[source] crate::arti_process::Error),
22
23 #[error("failed to connect to ArtiProcess after {0:?}")]
24 ArtiRpcConnectFailed(std::time::Duration),
25
26 #[error("arti not bootstrapped")]
27 ArtiNotBootstrapped(),
28
29 #[error("failed to connect: {0}")]
30 ArtiOpenStreamFailed(#[source] arti_rpc_client_core::StreamError),
31
32 #[error("invalid circuit token: {0}")]
33 CircuitTokenInvalid(CircuitToken),
34
35 #[error("not implemented")]
36 NotImplemented(),
37}
38
39impl From<Error> for crate::tor_provider::Error {
40 fn from(error: Error) -> Self {
41 crate::tor_provider::Error::Generic(error.to_string())
42 }
43}
44
45#[derive(Clone, Debug)]
46pub enum ArtiTorClientConfig {
47 BundledArti {
48 arti_bin_path: PathBuf,
49 data_directory: PathBuf,
50 },
51 SystemArti {
52
53 },
54}
55
56pub struct ArtiTorClient {
57 _daemon: Option<ArtiProcess>,
58 rpc_conn: RpcConn,
59 pending_log_lines: Arc<Mutex<Vec<String>>>,
60 pending_events: Arc<Mutex<Vec<TorEvent>>>,
61 bootstrapped: bool,
62 circuit_token_counter: usize,
64 circuit_tokens: BTreeMap<CircuitToken, String>,
65}
66
67impl ArtiTorClient {
68 pub fn new(config: ArtiTorClientConfig) -> Result<Self, tor_provider::Error> {
69 let pending_log_lines: Arc<Mutex<Vec<String>>> = Default::default();
70
71 let (daemon, rpc_conn) = match &config {
72 ArtiTorClientConfig::BundledArti {
73 arti_bin_path,
74 data_directory,
75 } => {
76
77 let daemon =
79 ArtiProcess::new(arti_bin_path.as_path(), data_directory.as_path(), Arc::downgrade(&pending_log_lines))
80 .map_err(Error::ArtiProcessCreationFailed)?;
81
82 let rpc_conn = {
83 let timeout = Duration::from_secs(5);
85 let mut rpc_conn: Option<RpcConn> = None;
86
87 let start = Instant::now();
88 while rpc_conn.is_none() && start.elapsed() < timeout {
89
90 let mut builder = RpcConnBuilder::new();
91 builder.prepend_literal_path(daemon.connect_string().into());
92
93 rpc_conn = builder.connect().map_or(None, |rpc_conn| Some(rpc_conn));
94 }
95
96 if let Some(rpc_conn) = rpc_conn {
97 rpc_conn
98 } else {
99 return Err(Error::ArtiRpcConnectFailed(timeout))?
100 }
101 };
102
103 (daemon, rpc_conn)
104 },
105 _ => {
106 return Err(Error::NotImplemented().into())
107 }
108 };
109
110 let pending_events = std::vec![TorEvent::LogReceived {
111 line: "Starting arti TorProvider".to_string()
112 }];
113 let pending_events = Arc::new(Mutex::new(pending_events));
114
115 Ok(Self {
116 _daemon: Some(daemon),
117 rpc_conn,
118 pending_log_lines,
119 pending_events,
120 bootstrapped: false,
121 circuit_token_counter: 0,
122 circuit_tokens: Default::default(),
123 })
124 }
125}
126
127impl TorProvider for ArtiTorClient {
128 fn update(&mut self) -> Result<Vec<TorEvent>, tor_provider::Error> {
129 std::thread::sleep(std::time::Duration::from_millis(16));
130 let mut tor_events = match self.pending_events.lock() {
131 Ok(mut pending_events) => std::mem::take(pending_events.deref_mut()),
132 Err(_) => {
133 unreachable!("another thread panicked while holding this pending_events mutex")
134 }
135 };
136 let mut log_lines = match self.pending_log_lines.lock() {
138 Ok(mut pending_log_lines) => std::mem::take(pending_log_lines.deref_mut()),
139 Err(_) => {
140 unreachable!("another thread panicked while holding this pending_log_lines mutex")
141 }
142 };
143
144 for log_line in log_lines.iter_mut() {
146 tor_events.push(TorEvent::LogReceived {
147 line: std::mem::take(log_line),
148 });
149 }
150
151 Ok(tor_events)
152 }
153
154 fn bootstrap(&mut self) -> Result<(), tor_provider::Error> {
155 if !self.bootstrapped {
157 match self.pending_events.lock() {
158 Ok(mut pending_events) => {
159 pending_events.push(TorEvent::BootstrapStatus {
160 progress: 0,
161 tag: "no-tag".to_string(),
162 summary: "no summary".to_string(),
163 });
164 pending_events.push(TorEvent::BootstrapStatus {
165 progress: 100,
166 tag: "no-tag".to_string(),
167 summary: "no summary".to_string(),
168 });
169 pending_events.push(TorEvent::BootstrapComplete);
170 }
171 Err(_) => unreachable!(
172 "another thread panicked while holding this pending_events mutex"
173 ),
174 }
175 self.bootstrapped = true;
176 }
177 Ok(())
178 }
179
180 fn add_client_auth(
181 &mut self,
182 _service_id: &V3OnionServiceId,
183 _client_auth: &X25519PrivateKey,
184 ) -> Result<(), tor_provider::Error> {
185 Err(Error::NotImplemented().into())
186 }
187
188 fn remove_client_auth(
189 &mut self,
190 _service_id: &V3OnionServiceId,
191 ) -> Result<(), tor_provider::Error> {
192 Err(Error::NotImplemented().into())
193 }
194
195 fn connect(
196 &mut self,
197 target: TargetAddr,
198 circuit_token: Option<CircuitToken>,
199 ) -> Result<OnionStream, tor_provider::Error> {
200 if !self.bootstrapped {
201 return Err(Error::ArtiNotBootstrapped().into());
202 }
203
204 let (host, port) = match &target {
206 TargetAddr::Socket(socket_addr) => (format!("{:?}", socket_addr.ip()), socket_addr.port()),
207 TargetAddr::OnionService(OnionAddr::V3(onion_addr)) => (format!("{}.onion", onion_addr.service_id()), onion_addr.virt_port()),
208 TargetAddr::Domain(domain_addr) => (domain_addr.domain().to_string(), domain_addr.port()),
209 };
210
211 let isolation = if let Some(circuit_token) = circuit_token {
213 if let Some(isolation) = self.circuit_tokens.get(&circuit_token) {
214 isolation.as_str()
215 } else {
216 return Err(Error::CircuitTokenInvalid(circuit_token))?;
217 }
218 } else {
219 ""
220 };
221
222 let stream = self.rpc_conn.open_stream(None, (host.as_str(), port), isolation)
224 .map_err(Error::ArtiOpenStreamFailed)?;
225
226 Ok(OnionStream {
227 stream,
228 local_addr: None,
229 peer_addr: Some(target),
230 })
231 }
232
233 fn listener(
234 &mut self,
235 _private_key: &Ed25519PrivateKey,
236 _virt_port: u16,
237 _authorized_clients: Option<&[X25519PublicKey]>,
238 ) -> Result<OnionListener, tor_provider::Error> {
239 Err(Error::NotImplemented().into())
240 }
241
242 fn generate_token(&mut self) -> CircuitToken {
243 const ISOLATION_TOKEN_LEN: usize = 32;
244 let new_token = self.circuit_token_counter;
245 self.circuit_token_counter += 1;
246 self.circuit_tokens.insert(
247 new_token,
248 generate_password(ISOLATION_TOKEN_LEN));
249
250 new_token
251 }
252
253 fn release_token(&mut self, token: CircuitToken) {
254 self.circuit_tokens.remove(&token);
255 }
256}