1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
// Rust Bitcoin Library
// Written in 2014 by
//     Andrew Poelstra <apoelstra@wpsoftware.net>
//
// To the extent possible under law, the author(s) have dedicated all
// copyright and related and neighboring rights to this software to
// the public domain worldwide. This software is distributed without
// any warranty.
//
// You should have received a copy of the CC0 Public Domain Dedication
// along with this software.
// If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
//

//! Stream reader
//!
//! This module defines `StreamReader` struct and its implementation which is used
//! for parsing incoming stream into separate `RawNetworkMessage`s, handling assembling
//! messages from multiple packets or dealing with partial or multiple messages in the stream
//! (like can happen with reading from TCP socket)
//!

use std::fmt;
use std::io::{self, Read};

use consensus::{encode, Decodable};

/// Struct used to configure stream reader function
pub struct StreamReader<R: Read> {
    /// Stream to read from
    pub stream: R,
    /// I/O buffer
    data: Vec<u8>,
    /// Buffer containing unparsed message part
    unparsed: Vec<u8>
}

impl<R: Read> fmt::Debug for StreamReader<R> {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "StreamReader with buffer_size={} and unparsed content {:?}",
               self.data.capacity(), self.unparsed)
    }
}

impl<R: Read> StreamReader<R> {
    /// Constructs new stream reader for a given input stream `stream` with
    /// optional parameter `buffer_size` determining reading buffer size
    pub fn new(stream: R, buffer_size: Option<usize>) -> StreamReader<R> {
        StreamReader {
            stream,
            data: vec![0u8; buffer_size.unwrap_or(64 * 1024)],
            unparsed: vec![]
        }
    }

    /// Reads stream and parses next message from its current input,
    /// also taking into account previously unparsed partial message (if there was such).
    pub fn read_next<D: Decodable>(&mut self) -> Result<D, encode::Error> {
        loop {
            match encode::deserialize_partial::<D>(&self.unparsed) {
                // In this case we just have an incomplete data, so we need to read more
                Err(encode::Error::Io(ref err)) if err.kind () == io::ErrorKind::UnexpectedEof => {
                    let count = self.stream.read(&mut self.data)?;
                    if count > 0 {
                        self.unparsed.extend(self.data[0..count].iter());
                    }
                    else {
                        return Err(encode::Error::Io(io::Error::from(io::ErrorKind::UnexpectedEof)));
                    }
                },
                Err(err) => return Err(err),
                // We have successfully read from the buffer
                Ok((message, index)) => {
                    self.unparsed.drain(..index);
                    return Ok(message)
                },
            }
        }
    }
}

#[cfg(test)]
mod test {
    use std::thread;
    use std::time::Duration;
    use std::io::{self, BufReader, Write};
    use std::net::{TcpListener, TcpStream, Shutdown};
    use std::thread::JoinHandle;
    use network::constants::ServiceFlags;

    use super::StreamReader;
    use network::message::{NetworkMessage, RawNetworkMessage};

    // First, let's define some byte arrays for sample messages - dumps are taken from live
    // Bitcoin Core node v0.17.1 with Wireshark
    const MSG_VERSION: [u8; 126] = [
        0xf9, 0xbe, 0xb4, 0xd9, 0x76, 0x65, 0x72, 0x73,
        0x69, 0x6f, 0x6e, 0x00, 0x00, 0x00, 0x00, 0x00,
        0x66, 0x00, 0x00, 0x00, 0xbe, 0x61, 0xb8, 0x27,
        0x7f, 0x11, 0x01, 0x00, 0x0d, 0x04, 0x00, 0x00,
        0x00, 0x00, 0x00, 0x00, 0xf0, 0x0f, 0x4d, 0x5c,
        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff,
        0x5b, 0xf0, 0x8c, 0x80, 0xb4, 0xbd, 0x0d, 0x04,
        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
        0xfa, 0xa9, 0x95, 0x59, 0xcc, 0x68, 0xa1, 0xc1,
        0x10, 0x2f, 0x53, 0x61, 0x74, 0x6f, 0x73, 0x68,
        0x69, 0x3a, 0x30, 0x2e, 0x31, 0x37, 0x2e, 0x31,
        0x2f, 0x93, 0x8c, 0x08, 0x00, 0x01
    ];

    const MSG_VERACK: [u8; 24] = [
        0xf9, 0xbe, 0xb4, 0xd9, 0x76, 0x65, 0x72, 0x61,
        0x63, 0x6b, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
        0x00, 0x00, 0x00, 0x00, 0x5d, 0xf6, 0xe0, 0xe2
    ];

    const MSG_PING: [u8; 32] = [
        0xf9, 0xbe, 0xb4, 0xd9, 0x70, 0x69, 0x6e, 0x67,
        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
        0x08, 0x00, 0x00, 0x00, 0x24, 0x67, 0xf1, 0x1d,
        0x64, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    ];

    const MSG_ALERT: [u8; 192] = [
        0xf9, 0xbe, 0xb4, 0xd9, 0x61, 0x6c, 0x65, 0x72,
        0x74, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
        0xa8, 0x00, 0x00, 0x00, 0x1b, 0xf9, 0xaa, 0xea,
        0x60, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
        0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0xff,
        0x7f, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0xff,
        0x7f, 0xfe, 0xff, 0xff, 0x7f, 0x01, 0xff, 0xff,
        0xff, 0x7f, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff,
        0xff, 0x7f, 0x00, 0xff, 0xff, 0xff, 0x7f, 0x00,
        0x2f, 0x55, 0x52, 0x47, 0x45, 0x4e, 0x54, 0x3a,
        0x20, 0x41, 0x6c, 0x65, 0x72, 0x74, 0x20, 0x6b,
        0x65, 0x79, 0x20, 0x63, 0x6f, 0x6d, 0x70, 0x72,
        0x6f, 0x6d, 0x69, 0x73, 0x65, 0x64, 0x2c, 0x20,
        0x75, 0x70, 0x67, 0x72, 0x61, 0x64, 0x65, 0x20,
        0x72, 0x65, 0x71, 0x75, 0x69, 0x72, 0x65, 0x64,
        0x00, 0x46, 0x30, 0x44, 0x02, 0x20, 0x65, 0x3f,
        0xeb, 0xd6, 0x41, 0x0f, 0x47, 0x0f, 0x6b, 0xae,
        0x11, 0xca, 0xd1, 0x9c, 0x48, 0x41, 0x3b, 0xec,
        0xb1, 0xac, 0x2c, 0x17, 0xf9, 0x08, 0xfd, 0x0f,
        0xd5, 0x3b, 0xdc, 0x3a, 0xbd, 0x52, 0x02, 0x20,
        0x6d, 0x0e, 0x9c, 0x96, 0xfe, 0x88, 0xd4, 0xa0,
        0xf0, 0x1e, 0xd9, 0xde, 0xda, 0xe2, 0xb6, 0xf9,
        0xe0, 0x0d, 0xa9, 0x4c, 0xad, 0x0f, 0xec, 0xaa,
        0xe6, 0x6e, 0xcf, 0x68, 0x9b, 0xf7, 0x1b, 0x50
    ];

    // Helper functions that checks parsed versions of the messages from the byte arrays above
    fn check_version_msg(msg: &RawNetworkMessage) {
        assert_eq!(msg.magic, 0xd9b4bef9);
        if let NetworkMessage::Version(ref version_msg) = msg.payload {
            assert_eq!(version_msg.version, 70015);
            assert_eq!(version_msg.services, ServiceFlags::NETWORK | ServiceFlags::BLOOM | ServiceFlags::WITNESS | ServiceFlags::NETWORK_LIMITED);
            assert_eq!(version_msg.timestamp, 1548554224);
            assert_eq!(version_msg.nonce, 13952548347456104954);
            assert_eq!(version_msg.user_agent, "/Satoshi:0.17.1/");
            assert_eq!(version_msg.start_height, 560275);
            assert_eq!(version_msg.relay, true);
        } else {
            panic!("Wrong message type: expected VersionMessage");
        }
    }

    fn check_alert_msg(msg: &RawNetworkMessage) {
        assert_eq!(msg.magic, 0xd9b4bef9);
        if let NetworkMessage::Alert(ref alert) = msg.payload {
            assert_eq!(alert.clone(), [
                0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
                0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0xff,
                0x7f, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0xff,
                0x7f, 0xfe, 0xff, 0xff, 0x7f, 0x01, 0xff, 0xff,
                0xff, 0x7f, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff,
                0xff, 0x7f, 0x00, 0xff, 0xff, 0xff, 0x7f, 0x00,
                0x2f, 0x55, 0x52, 0x47, 0x45, 0x4e, 0x54, 0x3a,
                0x20, 0x41, 0x6c, 0x65, 0x72, 0x74, 0x20, 0x6b,
                0x65, 0x79, 0x20, 0x63, 0x6f, 0x6d, 0x70, 0x72,
                0x6f, 0x6d, 0x69, 0x73, 0x65, 0x64, 0x2c, 0x20,
                0x75, 0x70, 0x67, 0x72, 0x61, 0x64, 0x65, 0x20,
                0x72, 0x65, 0x71, 0x75, 0x69, 0x72, 0x65, 0x64,
                0x00,
            ].to_vec());
        } else {
            panic!("Wrong message type: expected AlertMessage");
        }
    }

    #[test]
    fn parse_multipartmsg_test() {
        let stream = io::empty();
        let mut reader = StreamReader::new(stream, None);
        reader.unparsed = MSG_ALERT[..24].to_vec();
        let message: Result<RawNetworkMessage, _> = reader.read_next();
        assert!(message.is_err());
        assert_eq!(reader.unparsed.len(), 24);

        reader.unparsed = MSG_ALERT.to_vec();
        let message = reader.read_next().unwrap();
        assert_eq!(reader.unparsed.len(), 0);

        check_alert_msg(&message);
    }

    #[test]
    fn read_singlemsg_test() {
        let stream = MSG_VERSION[..].to_vec();
        let stream = stream.as_slice();
        let message = StreamReader::new(stream, None).read_next().unwrap();
        check_version_msg(&message);
    }

    #[test]
    fn read_doublemsgs_test() {
        let mut stream = MSG_VERSION.to_vec();
        stream.extend(&MSG_PING);
        let stream = stream.as_slice();
        let mut reader = StreamReader::new(stream, None);
        let message = reader.read_next().unwrap();
        check_version_msg(&message);

        let msg: RawNetworkMessage = reader.read_next().unwrap();
        assert_eq!(msg.magic, 0xd9b4bef9);
        if let NetworkMessage::Ping(nonce) = msg.payload {
            assert_eq!(nonce, 100);
        } else {
            panic!("Wrong message type, expected PingMessage");
        }
    }

    // Helper function that set ups emulation of client-server TCP connection for
    // testing message transfer via TCP packets
    fn serve_tcp(pieces: Vec<Vec<u8>>) -> (JoinHandle<()>, BufReader<TcpStream>) {
        // 1. Creating server part (emulating Bitcoin Core node)
        let listener = TcpListener::bind(format!("127.0.0.1:{}", 0)).unwrap();
        let port = listener.local_addr().unwrap().port();
        // 2. Spawning thread that will be writing our messages to the TCP Stream at the server side
        // in async mode
        let handle = thread::spawn(move || {
            for ostream in listener.incoming() {
                let mut ostream = ostream.unwrap();

                for piece in pieces {
                    ostream.write(&piece[..]).unwrap();
                    ostream.flush().unwrap();
                    thread::sleep(Duration::from_secs(1));
                }

                ostream.shutdown(Shutdown::Both).unwrap();
                break;
            }
        });

        // 3. Creating client side of the TCP socket connection
        thread::sleep(Duration::from_secs(1));
        let istream = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
        let reader = BufReader::new(istream);

        return (handle, reader)
    }

    #[test]
    fn read_multipartmsg_test() {
        // Setting up TCP connection emulation
        let (handle, istream) = serve_tcp(vec![
            // single message split in two parts to emulate real network conditions
            MSG_VERSION[..24].to_vec(), MSG_VERSION[24..].to_vec()
        ]);
        let stream = istream;
        let mut reader = StreamReader::new(stream, None);

        // Reading and checking the whole message back
        let message = reader.read_next().unwrap();
        check_version_msg(&message);

        // Waiting TCP server thread to terminate
        handle.join().unwrap();
    }

    #[test]
    fn read_sequencemsg_test() {
        // Setting up TCP connection emulation
        let (handle, istream) = serve_tcp(vec![
            // Real-world Bitcoin core communication case for /Satoshi:0.17.1/
            MSG_VERSION[..23].to_vec(), MSG_VERSION[23..].to_vec(),
            MSG_VERACK.to_vec(),
            MSG_ALERT[..24].to_vec(), MSG_ALERT[24..].to_vec()
        ]);
        let stream = istream;
        let mut reader = StreamReader::new(stream, None);

        // Reading and checking the first message (Version)
        let message = reader.read_next().unwrap();
        check_version_msg(&message);

        // Reading and checking the second message (Verack)
        let msg: RawNetworkMessage = reader.read_next().unwrap();
        assert_eq!(msg.magic, 0xd9b4bef9);
        assert_eq!(msg.payload, NetworkMessage::Verack, "Wrong message type, expected VerackMessage");

        // Reading and checking the third message (Alert)
        let msg = reader.read_next().unwrap();
        check_alert_msg(&msg);

        // Waiting TCP server thread to terminate
        handle.join().unwrap();
    }

    #[test]
    fn read_block_from_file_test() {
        use std::io;
        use consensus::serialize;
        use hashes::hex::FromHex;
        use Block;

        let normal_data = Vec::from_hex("010000004ddccd549d28f385ab457e98d1b11ce80bfea2c5ab93015ade4973e400000000bf4473e53794beae34e64fccc471dace6ae544180816f89591894e0f417a914cd74d6e49ffff001d323b3a7b0201000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0804ffff001d026e04ffffffff0100f2052a0100000043410446ef0102d1ec5240f0d061a4246c1bdef63fc3dbab7733052fbbf0ecd8f41fc26bf049ebb4f9527f374280259e7cfa99c48b0e3f39c51347a19a5819651503a5ac00000000010000000321f75f3139a013f50f315b23b0c9a2b6eac31e2bec98e5891c924664889942260000000049483045022100cb2c6b346a978ab8c61b18b5e9397755cbd17d6eb2fe0083ef32e067fa6c785a02206ce44e613f31d9a6b0517e46f3db1576e9812cc98d159bfdaf759a5014081b5c01ffffffff79cda0945903627c3da1f85fc95d0b8ee3e76ae0cfdc9a65d09744b1f8fc85430000000049483045022047957cdd957cfd0becd642f6b84d82f49b6cb4c51a91f49246908af7c3cfdf4a022100e96b46621f1bffcf5ea5982f88cef651e9354f5791602369bf5a82a6cd61a62501fffffffffe09f5fe3ffbf5ee97a54eb5e5069e9da6b4856ee86fc52938c2f979b0f38e82000000004847304402204165be9a4cbab8049e1af9723b96199bfd3e85f44c6b4c0177e3962686b26073022028f638da23fc003760861ad481ead4099312c60030d4cb57820ce4d33812a5ce01ffffffff01009d966b01000000434104ea1feff861b51fe3f5f8a3b12d0f4712db80e919548a80839fc47c6a21e66d957e9c5d8cd108c7a2d2324bad71f9904ac0ae7336507d785b17a2c115e427a32fac00000000").unwrap();
        let cutoff_data = Vec::from_hex("010000004ddccd549d28f385ab457e98d1b11ce80bfea2c5ab93015ade4973e400000000bf4473e53794beae34e64fccc471dace6ae544180816f89591894e0f417a914cd74d6e49ffff001d323b3a7b0201000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0804ffff001d026e04ffffffff0100f2052a0100000043410446ef0102d1ec5240f0d061a4246c1bdef63fc3dbab7733052fbbf0ecd8f41fc26bf049ebb4f9527f374280259e7cfa99c48b0e3f39c51347a19a5819651503a5ac00000000010000000321f75f3139a013f50f315b23b0c9a2b6eac31e2bec98e5891c924664889942260000000049483045022100cb2c6b346a978ab8c61b18b5e9397755cbd17d6eb2fe0083ef32e067fa6c785a02206ce44e613f31d9a6b0517e46f3db1576e9812cc98d159bfdaf759a5014081b5c01ffffffff79cda0945903627c3da1f85fc95d0b8ee3e76ae0cfdc9a65d09744b1f8fc85430000000049483045022047957cdd957cfd0becd642f6b84d82f49b6cb4c51a91f49246908af7c3cfdf4a022100e96b46621f1bffcf5ea5982f88cef651e9354f5791602369bf5a82a6cd61a62501fffffffffe09f5fe3ffbf5ee97a54eb5e5069e9da6b4856ee86fc52938c2f979b0f38e82000000004847304402204165be9a4cbab8049e1af9723b96199bfd3e85f44c6b4c0177e3962686b26073022028f638da23fc003760861ad481ead4099312c60030d4cb57820ce4d33812a5ce01ffffffff01009d966b01000000434104ea1feff861b51fe3f5f8a3b12d0f4712db80e919548a80839fc47c6a21e66d957e9c5d8cd108c7a2d2324bad71f9904ac0ae7336507d785b17a2c115e427a32fac").unwrap();
        let prevhash = Vec::from_hex("4ddccd549d28f385ab457e98d1b11ce80bfea2c5ab93015ade4973e400000000").unwrap();
        let merkle = Vec::from_hex("bf4473e53794beae34e64fccc471dace6ae544180816f89591894e0f417a914c").unwrap();

        let stream = io::BufReader::new(&normal_data[..]);
        let mut reader = StreamReader::new(stream, None);
        let normal_block = reader.read_next::<Block>();

        let stream = io::BufReader::new(&cutoff_data[..]);
        let mut reader = StreamReader::new(stream, None);
        let cutoff_block = reader.read_next::<Block>();

        assert!(normal_block.is_ok());
        assert!(cutoff_block.is_err());
        let block = normal_block.unwrap();
        assert_eq!(block.header.version, 1);
        assert_eq!(serialize(&block.header.prev_blockhash), prevhash);
        assert_eq!(serialize(&block.header.merkle_root), merkle);
        assert_eq!(block.header.time, 1231965655);
        assert_eq!(block.header.bits, 486604799);
        assert_eq!(block.header.nonce, 2067413810);

        // should be also ok for a non-witness block as commitment is optional in that case
        assert!(block.check_witness_commitment());
    }
}