1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374
use std::io; use std::fmt::Display; use futures::{Future, Poll}; use tk_bufstream::{WriteBuf, WriteRaw, FutureWriteRaw}; use tokio_io::AsyncWrite; use base_serializer::{MessageState, HeaderError}; use enums::{Version, Status}; use super::headers::Head; /// This a response writer that you receive in `Codec` /// /// Methods of this structure ensure that everything you write into a buffer /// is consistent and valid protocol pub struct Encoder<S> { state: MessageState, io: WriteBuf<S>, } /// This structure returned from `Encoder::done` and works as a continuation /// that should be returned from the future that writes request. pub struct EncoderDone<S> { buf: WriteBuf<S>, } /// This structure contains all needed info to start response of the request /// in a correct manner /// /// This is ought to be used in serializer only #[derive(Debug, Clone, Copy)] pub struct ResponseConfig { /// Whether request is a HEAD request pub is_head: bool, /// Is `Connection: close` in request or HTTP version == 1.0 pub do_close: bool, /// Version of HTTP request pub version: Version, } /// A future that yields `RawBody` after buffer is empty /// /// This future is created by `Encoder::raw_body()`` pub struct FutureRawBody<S>(FutureWriteRaw<S>); /// The actual raw body /// /// The object is used to write some data directly to the socket without any /// buffering/copying. Note that chunked encoding must be handled manually /// in this case. /// /// This is a tiny wrapper around `WriteRaw` which is basically tiny wrapper /// around TcpStream or whatever `S` represents. Wrappers are used to /// reconstruct original object, `EncoderDone` in this case. pub struct RawBody<S> { io: WriteRaw<S>, } // TODO: Support responses to CONNECT and `Upgrade: websocket` requests. impl<S> Encoder<S> { /// Write a 100 (Continue) response. /// /// A server should respond with the 100 status code if it receives a /// 100-continue expectation. /// /// # Panics /// /// When the response is already started. It's expected that your response /// handler state machine will never call the method twice. pub fn response_continue(&mut self) { self.state.response_continue(&mut self.io.out_buf) } /// Write status line using `Status` enum /// /// This puts status line into a buffer immediately. If you don't /// continue with request it will be sent to the network shortly. /// /// # Panics /// /// When status line is already written. It's expected that your request /// handler state machine will never call the method twice. /// /// When the status code is 100 (Continue). 100 is not allowed /// as a final status code. pub fn status(&mut self, status: Status) { self.state.response_status(&mut self.io.out_buf, status.code(), status.reason()) } /// Write custom status line /// /// # Panics /// /// When status line is already written. It's expected that your request /// handler state machine will never call the method twice. /// /// When the status code is 100 (Continue). 100 is not allowed /// as a final status code. pub fn custom_status(&mut self, code: u16, reason: &str) { self.state.response_status(&mut self.io.out_buf, code, reason) } /// Add a header to the message. /// /// Header is written into the output buffer immediately. And is sent /// as soon as the next loop iteration /// /// `Content-Length` header must be send using the `add_length` method /// and `Transfer-Encoding: chunked` must be set with the `add_chunked` /// method. These two headers are important for the security of HTTP. /// /// Note that there is currently no way to use a transfer encoding other /// than chunked. /// /// We return Result here to make implementing proxies easier. In the /// application handler it's okay to unwrap the result and to get /// a meaningful panic (that is basically an assertion). /// /// # Panics /// /// Panics when `add_header` is called in the wrong state. pub fn add_header<V: AsRef<[u8]>>(&mut self, name: &str, value: V) -> Result<(), HeaderError> { self.state.add_header(&mut self.io.out_buf, name, value.as_ref()) } /// Same as `add_header` but allows value to be formatted directly into /// the buffer /// /// Useful for dates and numeric headers, as well as some strongly typed /// wrappers pub fn format_header<D: Display>(&mut self, name: &str, value: D) -> Result<(), HeaderError> { self.state.format_header(&mut self.io.out_buf, name, value) } /// Add a content length to the message. /// /// The `Content-Length` header is written to the output buffer immediately. /// It is checked that there are no other body length headers present in the /// message. When the body is send the length is validated. /// /// # Panics /// /// Panics when `add_length` is called in the wrong state. pub fn add_length(&mut self, n: u64) -> Result<(), HeaderError> { self.state.add_length(&mut self.io.out_buf, n) } /// Sets the transfer encoding to chunked. /// /// Writes `Transfer-Encoding: chunked` to the output buffer immediately. /// It is assured that there is only one body length header is present /// and the body is written in chunked encoding. /// /// # Panics /// /// Panics when `add_chunked` is called in the wrong state. pub fn add_chunked(&mut self) -> Result<(), HeaderError> { self.state.add_chunked(&mut self.io.out_buf) } /// Returns true if at least `status()` method has been called /// /// This is mostly useful to find out whether we can build an error page /// or it's already too late. pub fn is_started(&self) -> bool { self.state.is_started() } /// Closes the HTTP header and returns `true` if entity body is expected. /// /// Specifically `false` is returned when status is 1xx, 204, 304 or in /// the response to a `HEAD` request but not if the body has zero-length. /// /// Similarly to `add_header()` it's fine to `unwrap()` here, unless you're /// doing some proxying. /// /// # Panics /// /// Panics when the response is in a wrong state. pub fn done_headers(&mut self) -> Result<bool, HeaderError> { self.state.done_headers(&mut self.io.out_buf) } /// Write a chunk of the message body. /// /// Works both for fixed-size body and chunked body. /// /// For the chunked body each chunk is put into the buffer immediately /// prefixed by chunk size. Empty chunks are ignored. /// /// For both modes chunk is put into the buffer, but is only sent when /// rotor-stream state machine is reached. So you may put multiple chunks /// into the buffer quite efficiently. /// /// You may write a body in responses to HEAD requests just like in real /// requests but the data is not sent to the network. Of course it is /// more efficient to not construct the message body at all. /// /// # Panics /// /// When response is in wrong state. Or there is no headers which /// determine response body length (either Content-Length or /// Transfer-Encoding). pub fn write_body(&mut self, data: &[u8]) { self.state.write_body(&mut self.io.out_buf, data) } /// Returns true if `done()` method is already called and everything /// was okay. pub fn is_complete(&self) -> bool { self.state.is_complete() } /// Writes needed finalization data into the buffer and asserts /// that response is in the appropriate state for that. /// /// The method may be called multiple times. /// /// # Panics /// /// When the response is in the wrong state. pub fn done(mut self) -> EncoderDone<S> { self.state.done(&mut self.io.out_buf); EncoderDone { buf: self.io } } /// Returns a raw body for zero-copy writing techniques /// /// Note: we don't assert on the format of the body if you're using this /// interface. /// /// Note 2: RawBody (returned by this future) locks the underlying BiLock, /// which basically means reading from this socket is not possible while /// you're writing to the raw body. /// /// Good idea is to use interface like this: /// /// 1. Set appropriate content-length /// 2. Write exactly this number of bytes or exit with error /// /// This is specifically designed for using with `sendfile` /// /// # Panics /// /// This method panics if it's called when headers are not written yet. pub fn raw_body(self) -> FutureRawBody<S> { assert!(self.state.is_after_headers()); FutureRawBody(self.io.borrow_raw()) } /// Flush the data to underlying socket /// /// If the whole buffer could not be flushed it schedules a wakeup of /// the current task when the the socket is writable. /// /// You can find out how many bytes are left using `bytes_buffered()` /// method pub fn flush(&mut self) -> Result<(), io::Error> where S: AsyncWrite { self.io.flush() } /// Returns bytes currently lying in the buffer /// /// It's possible that these bytes are left from the previous request if /// pipelining is enabled. pub fn bytes_buffered(&mut self) -> usize { self.io.out_buf.len() } } impl<S> RawBody<S> { /// Returns `EncoderDone` object that might be passed back to the HTTP /// protocol pub fn done(self) -> EncoderDone<S> { EncoderDone { buf: self.io.into_buf() } } } impl<S> io::Write for Encoder<S> { fn write(&mut self, buf: &[u8]) -> io::Result<usize> { // TODO(tailhook) we might want to propatage error correctly // rather than panic self.write_body(buf); Ok((buf.len())) } fn flush(&mut self) -> io::Result<()> { Ok(()) } } impl<S: AsyncWrite> AsyncWrite for Encoder<S> { fn shutdown(&mut self) -> Poll<(), io::Error> { panic!("Can't shutdown request encoder"); } } impl<S: AsyncWrite> io::Write for RawBody<S> { fn write(&mut self, buf: &[u8]) -> io::Result<usize> { self.io.get_mut().write(buf) } fn flush(&mut self) -> io::Result<()> { self.io.get_mut().flush() } } impl<S: AsyncWrite> AsyncWrite for RawBody<S> { fn shutdown(&mut self) -> Poll<(), io::Error> { panic!("Can't shutdown request body"); } } pub fn get_inner<S>(e: EncoderDone<S>) -> WriteBuf<S> { e.buf } pub fn new<S>(io: WriteBuf<S>, cfg: ResponseConfig) -> Encoder<S> { use base_serializer::Body::*; // TODO(tailhook) implement Connection: Close, // (including explicit one in HTTP/1.0) and maybe others Encoder { state: MessageState::ResponseStart { body: if cfg.is_head { Head } else { Normal }, version: cfg.version, close: cfg.do_close || cfg.version == Version::Http10, }, io: io, } } impl ResponseConfig { pub fn from(req: &Head) -> ResponseConfig { ResponseConfig { version: req.version(), is_head: req.method() == "HEAD", do_close: req.connection_close(), } } } impl<S: AsyncWrite> Future for FutureRawBody<S> { type Item = RawBody<S>; type Error = io::Error; fn poll(&mut self) -> Poll<RawBody<S>, io::Error> { self.0.poll().map(|x| x.map(|y| RawBody { io: y })) } } #[cfg(feature="sendfile")] mod sendfile { extern crate tk_sendfile; use std::io; use futures::{Async}; use self::tk_sendfile::{Destination, FileOpener, Sendfile}; use super::RawBody; impl<T: Destination> Destination for RawBody<T> { fn write_file<O: FileOpener>(&mut self, file: &mut Sendfile<O>) -> Result<usize, io::Error> { // TODO(tailhook) check the data written self.io.get_mut().write_file(file) } fn poll_write(&self) -> Async<()> { self.io.get_ref().poll_write() } } }