tor_interface/
legacy_tor_control_stream.rs

1// standard
2use std::collections::VecDeque;
3use std::default::Default;
4use std::io::{ErrorKind, Read, Write};
5use std::net::{SocketAddr, TcpStream};
6use std::option::Option;
7use std::string::ToString;
8use std::time::Duration;
9
10// extern crates
11use regex::Regex;
12
13#[derive(thiserror::Error, Debug)]
14pub enum Error {
15    #[error("control stream read timeout must not be zero")]
16    ReadTimeoutZero(),
17
18    #[error("could not connect to control port")]
19    CreationFailed(#[source] std::io::Error),
20
21    #[error("configure control port socket failed")]
22    ConfigurationFailed(#[source] std::io::Error),
23
24    #[error("control port parsing regex creation failed")]
25    ParsingRegexCreationFailed(#[source] regex::Error),
26
27    #[error("control port stream read failure")]
28    ReadFailed(#[source] std::io::Error),
29
30    #[error("control port stream closed by remote")]
31    ClosedByRemote(),
32
33    #[error("received control port response invalid utf8")]
34    InvalidResponse(#[source] std::str::Utf8Error),
35
36    #[error("failed to parse control port reply: {0}")]
37    ReplyParseFailed(String),
38
39    #[error("control port stream write failure")]
40    WriteFailed(#[source] std::io::Error),
41}
42
43pub(crate) struct LegacyControlStream {
44    stream: TcpStream,
45    closed_by_remote: bool,
46    pending_data: Vec<u8>,
47    pending_lines: VecDeque<String>,
48    pending_reply: Vec<String>,
49    reading_multiline_value: bool,
50    // regexes used to parse control port responses
51    single_line_data: Regex,
52    multi_line_data: Regex,
53    end_reply_line: Regex,
54}
55
56type StatusCode = u32;
57pub(crate) struct Reply {
58    pub status_code: StatusCode,
59    pub reply_lines: Vec<String>,
60}
61
62impl LegacyControlStream {
63    pub fn new(addr: &SocketAddr, read_timeout: Duration) -> Result<LegacyControlStream, Error> {
64        if read_timeout.is_zero() {
65            return Err(Error::ReadTimeoutZero());
66        }
67
68        let stream = TcpStream::connect(addr).map_err(Error::CreationFailed)?;
69        stream
70            .set_read_timeout(Some(read_timeout))
71            .map_err(Error::ConfigurationFailed)?;
72
73        // pre-allocate a kilobyte for the read buffer
74        const READ_BUFFER_SIZE: usize = 1024;
75        let pending_data = Vec::with_capacity(READ_BUFFER_SIZE);
76
77        let single_line_data =
78            Regex::new(r"^\d\d\d-.*").map_err(Error::ParsingRegexCreationFailed)?;
79        let multi_line_data =
80            Regex::new(r"^\d\d\d+.*").map_err(Error::ParsingRegexCreationFailed)?;
81        let end_reply_line =
82            Regex::new(r"^\d\d\d .*").map_err(Error::ParsingRegexCreationFailed)?;
83
84        Ok(LegacyControlStream {
85            stream,
86            closed_by_remote: false,
87            pending_data,
88            pending_lines: Default::default(),
89            pending_reply: Default::default(),
90            reading_multiline_value: false,
91            // regex
92            single_line_data,
93            multi_line_data,
94            end_reply_line,
95        })
96    }
97
98    #[cfg(test)]
99    pub(crate) fn closed_by_remote(&mut self) -> bool {
100        self.closed_by_remote
101    }
102
103    fn read_line(&mut self) -> Result<Option<String>, Error> {
104        // read pending bytes from stream until we have a line to return
105        while self.pending_lines.is_empty() {
106            let byte_count = self.pending_data.len();
107            match self.stream.read_to_end(&mut self.pending_data) {
108                Err(err) => {
109                    if err.kind() == ErrorKind::WouldBlock || err.kind() == ErrorKind::TimedOut {
110                        if byte_count == self.pending_data.len() {
111                            return Ok(None);
112                        }
113                    } else {
114                        return Err(Error::ReadFailed(err));
115                    }
116                }
117                Ok(0usize) => {
118                    self.closed_by_remote = true;
119                    return Err(Error::ClosedByRemote());
120                }
121                Ok(_count) => (),
122            }
123
124            // split our read buffer into individual lines
125            let mut begin = 0;
126            for index in 1..self.pending_data.len() {
127                if self.pending_data[index - 1] == b'\r' && self.pending_data[index] == b'\n' {
128                    let end = index - 1;
129                    // view into byte vec of just the found line
130                    let line_view: &[u8] = &self.pending_data[begin..end];
131                    // convert to string
132                    let line_string =
133                        std::str::from_utf8(line_view).map_err(Error::InvalidResponse)?;
134
135                    // save in pending list
136                    self.pending_lines.push_back(line_string.to_string());
137                    // update begin (and skip over \r\n)
138                    begin = end + 2;
139                }
140            }
141            // leave any leftover bytes in the buffer for the next call
142            self.pending_data.drain(0..begin);
143        }
144
145        Ok(self.pending_lines.pop_front())
146    }
147
148    pub fn read_reply(&mut self) -> Result<Option<Reply>, Error> {
149        loop {
150            let current_line = match self.read_line()? {
151                Some(line) => line,
152                None => return Ok(None),
153            };
154
155            // make sure the status code matches (if we are not in the
156            // middle of a multi-line read
157            if let Some(first_line) = self.pending_reply.first() {
158                if !self.reading_multiline_value {
159                    let first_status_code = &first_line[0..3];
160                    let current_status_code = &current_line[0..3];
161                    if first_status_code != current_status_code {
162                        return Err(Error::ReplyParseFailed(format!(
163                            "mismatched status codes, {} != {}",
164                            first_status_code, current_status_code
165                        )));
166                    }
167                }
168            }
169
170            // end of a response
171            if self.end_reply_line.is_match(&current_line) {
172                if self.reading_multiline_value {
173                    return Err(Error::ReplyParseFailed(
174                        "found multi-line end reply but not reading a multi-line reply".to_string(),
175                    ));
176                }
177                self.pending_reply.push(current_line);
178                break;
179            // single line data from getinfo and friends
180            } else if self.single_line_data.is_match(&current_line) {
181                if self.reading_multiline_value {
182                    return Err(Error::ReplyParseFailed(
183                        "found single-line reply but still reading a multi-line reply".to_string(),
184                    ));
185                }
186                self.pending_reply.push(current_line);
187            // begin of multiline data from getinfo and friends
188            } else if self.multi_line_data.is_match(&current_line) {
189                if self.reading_multiline_value {
190                    return Err(Error::ReplyParseFailed(
191                        "found multi-line start reply but still reading a multi-line reply"
192                            .to_string(),
193                    ));
194                }
195                self.pending_reply.push(current_line);
196                self.reading_multiline_value = true;
197            // multiline data to be squashed to a single entry
198            } else {
199                if !self.reading_multiline_value {
200                    return Err(Error::ReplyParseFailed(
201                        "found a multi-line intermediate reply but not reading a multi-line reply"
202                            .to_string(),
203                    ));
204                }
205                // don't bother writing the end of multiline token
206                if current_line == "." {
207                    self.reading_multiline_value = false;
208                } else {
209                    let multiline = match self.pending_reply.last_mut() {
210                        Some(multiline) => multiline,
211                        // if our logic here is right, then
212                        // self.reading_multiline_value == !self.pending_reply.is_empty()
213                        // should always be true regardless of the data received
214                        // from the control port
215                        None => unreachable!(),
216                    };
217                    multiline.push('\n');
218                    multiline.push_str(&current_line);
219                }
220            }
221        }
222
223        // take ownership of the reply lines
224        let mut reply_lines: Vec<String> = Default::default();
225        std::mem::swap(&mut self.pending_reply, &mut reply_lines);
226
227        // parse out the response code for easier matching
228        let status_code_string = match reply_lines.first() {
229            Some(line) => line[0..3].to_string(),
230            // the lines have already been parsed+validated in the above loop
231            None => unreachable!(),
232        };
233        let status_code: u32 = match status_code_string.parse() {
234            Ok(status_code) => status_code,
235            Err(_) => {
236                return Err(Error::ReplyParseFailed(format!(
237                    "unable to parse '{}' as status code",
238                    status_code_string
239                )))
240            }
241        };
242
243        // strip the redundant status code from start of lines
244        for line in reply_lines.iter_mut() {
245            if line.starts_with(&status_code_string) {
246                *line = line[4..].to_string();
247            }
248        }
249
250        Ok(Some(Reply {
251            status_code,
252            reply_lines,
253        }))
254    }
255
256    pub fn write(&mut self, cmd: &str) -> Result<(), Error> {
257        if let Err(err) = write!(self.stream, "{}\r\n", cmd) {
258            self.closed_by_remote = true;
259            return Err(Error::WriteFailed(err));
260        }
261        Ok(())
262    }
263}