tor_interface/
legacy_tor_control_stream.rs1use std::collections::VecDeque;
3use std::default::Default;
4use std::io::{ErrorKind, Read, Write};
5use std::net::{SocketAddr, TcpStream};
6use std::option::Option;
7use std::string::ToString;
8use std::time::Duration;
9
10use regex::Regex;
12
13#[derive(thiserror::Error, Debug)]
14pub enum Error {
15 #[error("control stream read timeout must not be zero")]
16 ReadTimeoutZero(),
17
18 #[error("could not connect to control port")]
19 CreationFailed(#[source] std::io::Error),
20
21 #[error("configure control port socket failed")]
22 ConfigurationFailed(#[source] std::io::Error),
23
24 #[error("control port parsing regex creation failed")]
25 ParsingRegexCreationFailed(#[source] regex::Error),
26
27 #[error("control port stream read failure")]
28 ReadFailed(#[source] std::io::Error),
29
30 #[error("control port stream closed by remote")]
31 ClosedByRemote(),
32
33 #[error("received control port response invalid utf8")]
34 InvalidResponse(#[source] std::str::Utf8Error),
35
36 #[error("failed to parse control port reply: {0}")]
37 ReplyParseFailed(String),
38
39 #[error("control port stream write failure")]
40 WriteFailed(#[source] std::io::Error),
41}
42
43pub(crate) struct LegacyControlStream {
44 stream: TcpStream,
45 closed_by_remote: bool,
46 pending_data: Vec<u8>,
47 pending_lines: VecDeque<String>,
48 pending_reply: Vec<String>,
49 reading_multiline_value: bool,
50 single_line_data: Regex,
52 multi_line_data: Regex,
53 end_reply_line: Regex,
54}
55
56type StatusCode = u32;
57pub(crate) struct Reply {
58 pub status_code: StatusCode,
59 pub reply_lines: Vec<String>,
60}
61
62impl LegacyControlStream {
63 pub fn new(addr: &SocketAddr, read_timeout: Duration) -> Result<LegacyControlStream, Error> {
64 if read_timeout.is_zero() {
65 return Err(Error::ReadTimeoutZero());
66 }
67
68 let stream = TcpStream::connect(addr).map_err(Error::CreationFailed)?;
69 stream
70 .set_read_timeout(Some(read_timeout))
71 .map_err(Error::ConfigurationFailed)?;
72
73 const READ_BUFFER_SIZE: usize = 1024;
75 let pending_data = Vec::with_capacity(READ_BUFFER_SIZE);
76
77 let single_line_data =
78 Regex::new(r"^\d\d\d-.*").map_err(Error::ParsingRegexCreationFailed)?;
79 let multi_line_data =
80 Regex::new(r"^\d\d\d+.*").map_err(Error::ParsingRegexCreationFailed)?;
81 let end_reply_line =
82 Regex::new(r"^\d\d\d .*").map_err(Error::ParsingRegexCreationFailed)?;
83
84 Ok(LegacyControlStream {
85 stream,
86 closed_by_remote: false,
87 pending_data,
88 pending_lines: Default::default(),
89 pending_reply: Default::default(),
90 reading_multiline_value: false,
91 single_line_data,
93 multi_line_data,
94 end_reply_line,
95 })
96 }
97
98 #[cfg(test)]
99 pub(crate) fn closed_by_remote(&mut self) -> bool {
100 self.closed_by_remote
101 }
102
103 fn read_line(&mut self) -> Result<Option<String>, Error> {
104 while self.pending_lines.is_empty() {
106 let byte_count = self.pending_data.len();
107 match self.stream.read_to_end(&mut self.pending_data) {
108 Err(err) => {
109 if err.kind() == ErrorKind::WouldBlock || err.kind() == ErrorKind::TimedOut {
110 if byte_count == self.pending_data.len() {
111 return Ok(None);
112 }
113 } else {
114 return Err(Error::ReadFailed(err));
115 }
116 }
117 Ok(0usize) => {
118 self.closed_by_remote = true;
119 return Err(Error::ClosedByRemote());
120 }
121 Ok(_count) => (),
122 }
123
124 let mut begin = 0;
126 for index in 1..self.pending_data.len() {
127 if self.pending_data[index - 1] == b'\r' && self.pending_data[index] == b'\n' {
128 let end = index - 1;
129 let line_view: &[u8] = &self.pending_data[begin..end];
131 let line_string =
133 std::str::from_utf8(line_view).map_err(Error::InvalidResponse)?;
134
135 self.pending_lines.push_back(line_string.to_string());
137 begin = end + 2;
139 }
140 }
141 self.pending_data.drain(0..begin);
143 }
144
145 Ok(self.pending_lines.pop_front())
146 }
147
148 pub fn read_reply(&mut self) -> Result<Option<Reply>, Error> {
149 loop {
150 let current_line = match self.read_line()? {
151 Some(line) => line,
152 None => return Ok(None),
153 };
154
155 if let Some(first_line) = self.pending_reply.first() {
158 if !self.reading_multiline_value {
159 let first_status_code = &first_line[0..3];
160 let current_status_code = ¤t_line[0..3];
161 if first_status_code != current_status_code {
162 return Err(Error::ReplyParseFailed(format!(
163 "mismatched status codes, {} != {}",
164 first_status_code, current_status_code
165 )));
166 }
167 }
168 }
169
170 if self.end_reply_line.is_match(¤t_line) {
172 if self.reading_multiline_value {
173 return Err(Error::ReplyParseFailed(
174 "found multi-line end reply but not reading a multi-line reply".to_string(),
175 ));
176 }
177 self.pending_reply.push(current_line);
178 break;
179 } else if self.single_line_data.is_match(¤t_line) {
181 if self.reading_multiline_value {
182 return Err(Error::ReplyParseFailed(
183 "found single-line reply but still reading a multi-line reply".to_string(),
184 ));
185 }
186 self.pending_reply.push(current_line);
187 } else if self.multi_line_data.is_match(¤t_line) {
189 if self.reading_multiline_value {
190 return Err(Error::ReplyParseFailed(
191 "found multi-line start reply but still reading a multi-line reply"
192 .to_string(),
193 ));
194 }
195 self.pending_reply.push(current_line);
196 self.reading_multiline_value = true;
197 } else {
199 if !self.reading_multiline_value {
200 return Err(Error::ReplyParseFailed(
201 "found a multi-line intermediate reply but not reading a multi-line reply"
202 .to_string(),
203 ));
204 }
205 if current_line == "." {
207 self.reading_multiline_value = false;
208 } else {
209 let multiline = match self.pending_reply.last_mut() {
210 Some(multiline) => multiline,
211 None => unreachable!(),
216 };
217 multiline.push('\n');
218 multiline.push_str(¤t_line);
219 }
220 }
221 }
222
223 let mut reply_lines: Vec<String> = Default::default();
225 std::mem::swap(&mut self.pending_reply, &mut reply_lines);
226
227 let status_code_string = match reply_lines.first() {
229 Some(line) => line[0..3].to_string(),
230 None => unreachable!(),
232 };
233 let status_code: u32 = match status_code_string.parse() {
234 Ok(status_code) => status_code,
235 Err(_) => {
236 return Err(Error::ReplyParseFailed(format!(
237 "unable to parse '{}' as status code",
238 status_code_string
239 )))
240 }
241 };
242
243 for line in reply_lines.iter_mut() {
245 if line.starts_with(&status_code_string) {
246 *line = line[4..].to_string();
247 }
248 }
249
250 Ok(Some(Reply {
251 status_code,
252 reply_lines,
253 }))
254 }
255
256 pub fn write(&mut self, cmd: &str) -> Result<(), Error> {
257 if let Err(err) = write!(self.stream, "{}\r\n", cmd) {
258 self.closed_by_remote = true;
259 return Err(Error::WriteFailed(err));
260 }
261 Ok(())
262 }
263}