1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
use futures::sink::Sink;
use futures::future::FutureResult;
use futures::{Async, AsyncSink, Future, IntoFuture};

use client::{Error, Encoder, EncoderDone, Head, RecvMode};
use client::errors::ErrorEnum;
use client::buffered;


#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum BodyKind {
    Fixed(u64),
    Chunked,
    Eof,
}


/// This is a low-level interface to the http client
///
/// Your requests starts by sending a codec into a connection Sink or a
/// connection pool. And then it's driven by a callbacks here.
///
/// If you don't have any special needs you might want to use
/// `client::buffered::Buffered` codec implementation instead of implemeting
/// this trait manually.
pub trait Codec<S> {
    /// Future that `start_write()` returns
    type Future: Future<Item=EncoderDone<S>, Error=Error>;

    /// Start writing a request
    ///
    /// This method is called when there is an open connection and there
    /// is some space in the output buffer.
    ///
    /// Everything you write into a buffer might be flushed to the network
    /// immediately (or as fast as you yield to main loop). On the other
    /// hand we might buffer/pipeline multiple requests at once.
    fn start_write(&mut self, e: Encoder<S>)
        -> Self::Future;

    /// Received headers of a response
    ///
    /// At this point we already extracted all the headers and other data
    /// that we need to ensure correctness of the protocol. If you need
    /// to handle some data from the headers you need to store them somewhere
    /// (for example on `self`) for further processing.
    ///
    /// Note: headers might be received after `request_line` is written, but
    /// we don't ensure that request is fully written. You should write the
    /// state machine as if request and response might be streamed a the
    /// same time (including request headers (!) if your `start_write` future
    /// writes them incrementally)
    fn headers_received(&mut self, headers: &Head) -> Result<RecvMode, Error>;

    /// Chunk of the response body received
    ///
    /// `end` equals to `true` for the last chunk of the data.
    ///
    /// Method returns `Async::Ready(x)` to denote that it has consumed `x`
    /// bytes. If there are some bytes left in the buffer they will be passed
    /// again on the call.
    ///
    /// If the response is empty, or last chunk arrives later and it's empty
    /// we call `c.data_received(b"", true)` on every wakeup,
    /// until `Async::Ready(0)` is returned (this helps to drive future that
    /// might complete on request completion without spawning another ones,
    /// but note that next request can't start reading in the meantime).
    ///
    /// Protocol panics if returned number of bytes larger than `data.len()`.
    ///
    fn data_received(&mut self, data: &[u8], end: bool)
        -> Result<Async<usize>, Error>;
}

impl<S, F> Codec<S> for Box<Codec<S, Future=F>>
    where F: Future<Item=EncoderDone<S>, Error=Error>
{
    type Future = F;
    fn start_write(&mut self, e: Encoder<S>) -> F {
        (**self).start_write(e)
    }
    fn headers_received(&mut self, headers: &Head) -> Result<RecvMode, Error> {
        (**self).headers_received(headers)
    }
    fn data_received(&mut self, data: &[u8], end: bool)
        -> Result<Async<usize>, Error>
    {
        (**self).data_received(data, end)
    }
}

impl<S, F> Codec<S> for Box<Codec<S, Future=F>+Send>
    where F: Future<Item=EncoderDone<S>, Error=Error>
{
    type Future = F;
    fn start_write(&mut self, e: Encoder<S>) -> F {
        (**self).start_write(e)
    }
    fn headers_received(&mut self, headers: &Head) -> Result<RecvMode, Error> {
        (**self).headers_received(headers)
    }
    fn data_received(&mut self, data: &[u8], end: bool)
        -> Result<Async<usize>, Error>
    {
        (**self).data_received(data, end)
    }
}

/// A marker trait that applies to a Sink that is essentially a HTTP client
///
/// It may apply to a single connection or a connection pool. For a single
/// connection the `client::Proto` implements this interface.
///
/// We expect a boxed codec here because we assume that different kinds of
/// requests may be executed though same connection pool. If you want to avoid
/// boxing or have fine grained control, use `Proto` (which is a `Sink`)
/// directly.
///
pub trait Client<S, F>: Sink<SinkItem=Box<Codec<S, Future=F>>>
    where F: Future<Item=EncoderDone<S>, Error=Error>,
{
    /// Simple fetch helper
    fn fetch_url(&mut self, url: &str)
        -> Box<Future<Item=buffered::Response, Error=Error>>
        where <Self as Sink>::SinkError: Into<Error>;
}

impl<T, S> Client<S, FutureResult<EncoderDone<S>, Error>> for T
    where T: Sink<SinkItem=Box<
            Codec<S, Future=FutureResult<EncoderDone<S>, Error>>
        >>,
{
    fn fetch_url(&mut self, url: &str)
        -> Box<Future<Item=buffered::Response, Error=Error>>
        where <Self as Sink>::SinkError: Into<Error>
    {
        let url = match url.parse() {
            Ok(u) => u,
            Err(_) => {
                return Box::new(Err(ErrorEnum::InvalidUrl.into())
                    .into_future());
            }
        };
        let (codec, receiver) = buffered::Buffered::get(url);
        match self.start_send(Box::new(codec)) {
            Ok(AsyncSink::NotReady(_)) => {
                Box::new(Err(ErrorEnum::Busy.into()).into_future())
            }
            Ok(AsyncSink::Ready) => {
                Box::new(receiver
                    .map_err(|_| ErrorEnum::Canceled.into())
                    .and_then(|res| res))
            }
            Err(e) => {
                Box::new(Err(e.into()).into_future())
            }
        }
    }
}