1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
//! Higher-level interface for serving fully buffered requests
//!
use std::net::SocketAddr;
use std::sync::Arc;
use std::marker::PhantomData;

use futures::{Async, Future, IntoFuture};
use futures::future::FutureResult;
use tokio_core::reactor::Handle;
use tk_bufstream::{ReadBuf, WriteBuf, ReadFramed, WriteFramed};

use websocket::{ServerCodec as WebsocketCodec};
use super::{Error, Encoder, EncoderDone, Dispatcher, Codec, Head, RecvMode};
use super::{WebsocketHandshake};
use {Version};

/// Buffered request struct
///
/// some known headers may be moved to upper structure (ie, Host)
// TODO(tailhook) hide internal structure?
#[derive(Debug)]
pub struct Request {
    peer_addr: SocketAddr,
    method: String,
    path: String,
    host: Option<String>,
    version: Version,
    headers: Vec<(String, Vec<u8>)>,
    body: Vec<u8>,
    websocket_handshake: Option<WebsocketHandshake>,
}

/// A dispatcher that allows to process request and return response using
/// a one single function
pub struct BufferedDispatcher<S, N: NewService<S>> {
    addr: SocketAddr,
    max_request_length: usize,
    service: N,
    handle: Handle,
    phantom: PhantomData<S>,
}

/// A codec counterpart of the BufferedDispatcher, might be used with your
/// own dispatcher too
pub struct BufferedCodec<R> {
    max_request_length: usize,
    service: R,
    request: Option<Request>,
    handle: Handle,
}

/// A helper to create a simple websocket (and HTTP) service
///
/// It's internally created by `BufferedDispatcher::new_with_websockets()`
pub struct WebsocketFactory<H, I> {
    service: Arc<H>,
    websockets: Arc<I>,
}

/// An instance of websocket factory, created by WebsocketFactory itself
pub struct WebsocketService<H, I, T, U> {
    service: Arc<H>,
    websockets: Arc<I>,
    phantom: PhantomData<(T, U)>,
}

/// A trait that you must implement to reply on requests, usually a function
pub trait NewService<S> {
    /// Future returned by the service (an actual function serving request)
    type Future: Future<Item=EncoderDone<S>, Error=Error>;
    /// Instance of the service, created for each request
    type Instance: Service<S, Future=Self::Future>;
    /// Constructor of the instance of the service, created for each request
    fn new(&self) -> Self::Instance;
}

/// An instance of a NewService for a single request, usually just a function
pub trait Service<S> {
    /// A future returned by `call()`
    type Future: Future<Item=EncoderDone<S>, Error=Error>;

    /// A future returned by `start_websocket`, it's spawned on the main loop
    /// hence needed to be static.
    type WebsocketFuture: Future<Item=(), Error=()> + 'static;

    /// A method which is called when request arrives, including the websocket
    /// negotiation request.
    ///
    /// See examples for a way to negotiate both websockets and services
    fn call(&mut self, request: Request, encoder: Encoder<S>) -> Self::Future;

    /// A method which is called when websocket connection established
    fn start_websocket(&mut self, output: WriteFramed<S, WebsocketCodec>,
                                  input: ReadFramed<S, WebsocketCodec>)
        -> Self::WebsocketFuture;
}

impl<H, I, T, U, S> NewService<S> for WebsocketFactory<H, I>
    where H: Fn(Request, Encoder<S>) -> T,
          I: Fn(WriteFramed<S, WebsocketCodec>,
                ReadFramed<S, WebsocketCodec>) -> U,
          T: Future<Item=EncoderDone<S>, Error=Error>,
          U: Future<Item=(), Error=()> + 'static,
{
    type Future = T;
    type Instance = WebsocketService<H, I, T, U>;
    fn new(&self) -> Self::Instance {
        WebsocketService {
            service: self.service.clone(),
            websockets: self.websockets.clone(),
            phantom: PhantomData,
        }
    }
}

impl<S, H, I, T, U> Service<S> for WebsocketService<H, I, T, U>
    where H: Fn(Request, Encoder<S>) -> T,
          I: Fn(WriteFramed<S, WebsocketCodec>,
                ReadFramed<S, WebsocketCodec>) -> U,
          T: Future<Item=EncoderDone<S>, Error=Error>,
          U: Future<Item=(), Error=()> + 'static,
{
    type Future = T;
    type WebsocketFuture = U;
    fn call(&mut self, request: Request, encoder: Encoder<S>) -> T {
        (self.service)(request, encoder)
    }
    fn start_websocket(&mut self, output: WriteFramed<S, WebsocketCodec>,
                                  input: ReadFramed<S, WebsocketCodec>)
        -> U
    {
        (self.websockets)(output, input)
    }
}

impl Request {
    /// Returns peer address that initiated HTTP connection
    pub fn peer_addr(&self) -> SocketAddr {
        self.peer_addr
    }
    /// Returns method of a request
    pub fn method(&self) -> &str {
        &self.method
    }
    /// Returns path of a request
    pub fn path(&self) -> &str {
        &self.path
    }
    /// Returns the host header of a request
    pub fn host(&self) -> Option<&str> {
        self.host.as_ref().map(|s| s.as_ref())
    }
    /// Returns HTTP version used in request
    pub fn version(&self) -> Version {
        self.version
    }
    /// Returns request headers
    pub fn headers(&self) -> &[(String, Vec<u8>)] {
        &self.headers
    }
    /// Returns request body
    pub fn body(&self) -> &[u8] {
        &self.body
    }
    /// Returns websocket handshake if exists
    pub fn websocket_handshake(&self) -> Option<&WebsocketHandshake> {
        self.websocket_handshake.as_ref()
    }
}

impl<S, T, R> NewService<S> for T
    where T: Fn() -> R,
          R: Service<S>,
{
    type Future = R::Future;
    type Instance = R;
    fn new(&self) -> R {
        (self)()
    }
}

impl<S, T, F> Service<S> for T
    where T: Fn(Request, Encoder<S>) -> F,
        F: Future<Item=EncoderDone<S>, Error=Error>,
{
    type Future = F;
    type WebsocketFuture = FutureResult<(), ()>;
    fn call(&mut self, request: Request, encoder: Encoder<S>) -> F
    {
        (self)(request, encoder)
    }
    fn start_websocket(&mut self, _output: WriteFramed<S, WebsocketCodec>,
                                  _input: ReadFramed<S, WebsocketCodec>)
        -> Self::WebsocketFuture
    {
        /// Basically no websockets
        Ok(()).into_future()
    }
}


impl<S, N: NewService<S>> BufferedDispatcher<S, N> {
    /// Create an instance of bufferd dispatcher
    pub fn new(addr: SocketAddr, handle: &Handle, service: N)
        -> BufferedDispatcher<S, N>
    {
        BufferedDispatcher {
            addr: addr,
            max_request_length: 10_485_760,
            service: service,
            handle: handle.clone(),
            phantom: PhantomData,
        }
    }
    /// Sets max request length
    pub fn max_request_length(&mut self, value: usize) {
        self.max_request_length = value;
    }
}

impl<S, H, I, T, U> BufferedDispatcher<S, WebsocketFactory<H, I>>
    where H: Fn(Request, Encoder<S>) -> T,
          I: Fn(WriteFramed<S, WebsocketCodec>,
                ReadFramed<S, WebsocketCodec>) -> U,
          T: Future<Item=EncoderDone<S>, Error=Error>,
          U: Future<Item=(), Error=()> + 'static,
{
    /// Creates a dispatcher with two functions: one serving http requests and
    /// websockets.
    pub fn new_with_websockets(addr: SocketAddr, handle: &Handle,
        http: H, websockets: I)
        -> BufferedDispatcher<S, WebsocketFactory<H, I>>
    {
        BufferedDispatcher {
            addr: addr,
            max_request_length: 10_485_760,
            service: WebsocketFactory {
                service: Arc::new(http),
                websockets: Arc::new(websockets),
            },
            handle: handle.clone(),
            phantom: PhantomData,
        }
    }
}

impl<S, N: NewService<S>> Dispatcher<S> for BufferedDispatcher<S, N> {
    type Codec = BufferedCodec<N::Instance>;

    fn headers_received(&mut self, headers: &Head)
        -> Result<Self::Codec, Error>
    {
        // TODO(tailhook) strip hop-by-hop headers
        let up = headers.get_websocket_upgrade();
        Ok(BufferedCodec {
            max_request_length: self.max_request_length,
            service: self.service.new(),
            request: Some(Request {
                peer_addr: self.addr,
                method: headers.method().to_string(),
                // TODO(tailhook) process other forms of path
                path: headers.path().unwrap().to_string(),
                host: headers.host().map(|x| x.to_string()),
                version: headers.version(),
                headers: headers.headers().map(|(name, value)| {
                    (name.to_string(), value.to_vec())
                }).collect(),
                body: Vec::new(),
                websocket_handshake: up.unwrap_or(None),
            }),
            handle: self.handle.clone(),
        })
    }
}

impl<S, R: Service<S>> Codec<S> for BufferedCodec<R> {
    type ResponseFuture = R::Future;
    fn recv_mode(&mut self) -> RecvMode {
        if self.request.as_ref().unwrap().websocket_handshake.is_some() {
            RecvMode::hijack()
        } else {
            RecvMode::buffered_upfront(self.max_request_length)
        }
    }
    fn data_received(&mut self, data: &[u8], end: bool)
        -> Result<Async<usize>, Error>
    {
        assert!(end);
        self.request.as_mut().unwrap().body = data.to_vec();
        Ok(Async::Ready(data.len()))
    }
    fn start_response(&mut self, e: Encoder<S>) -> R::Future {
        self.service.call(self.request.take().unwrap(), e)
    }
    fn hijack(&mut self, write_buf: WriteBuf<S>, read_buf: ReadBuf<S>){
        let inp = read_buf.framed(WebsocketCodec);
        let out = write_buf.framed(WebsocketCodec);
        self.handle.spawn(self.service.start_websocket(out, inp));
    }
}