//! \param[in] len bytes will be copied from the output side of the buffer string ByteStream::peek_output(constsize_t len)const{ // DUMMY_CODE(len); string output = ""; size_t output_size = min(buffer_size(), len); for(size_t i = 0; i < output_size; ++i){ output += _deque[i]; } // _bytes_read += output_size; // READ-ONLY! return output; }
//! \param[in] len bytes will be removed from the output side of the buffer voidByteStream::pop_output(constsize_t len){ // DUMMY_CODE(len); size_t pop_size = min(buffer_size(), len); for(size_t i = 0; i < pop_size; ++i){ _deque.pop_front(); } _bytes_read += pop_size; }
//! Read (i.e., copy and then pop) the next "len" bytes of the stream //! \param[in] len bytes will be popped and returned //! \returns a string std::string ByteStream::read(constsize_t len){ // DUMMY_CODE(len); string ret = peek_output(len); pop_output(len); return ret; }
Lab
Checkpoint 1: stitching substrings into a byte stream
In Lab 1, you’ll implement a stream
reassembler—a module that stitches small pieces of the
byte stream (known as substrings, or segments) back
into a contiguous stream of bytes in the correct sequence.
//! \details This function accepts a substring (aka a segment) of bytes, //! possibly out-of-order, from the logical stream, and assembles any newly //! contiguous substrings and writes them into the output stream in order. voidStreamReassembler::push_substring(const string &data, constsize_t index, constbool eof){ // DUMMY_CODE(data, index, eof);
// The number of overlap chars. size_t overlap_size = 0;
// if out of bounds, discard it. if(index + data.size() < _first_unread || index > _first_unacceptable){ return; }
// if in unaassembled area, store it. if(index > _first_unread){ string substring = data.substr(0, min(_first_unacceptable - index, data.size())); // if never stored before if(_auxiliary_storage.find(index) == _auxiliary_storage.end()){ _auxiliary_storage[index] = substring; overlap_size = overlap_length(); if(_auxiliary_storage.find(index) == _auxiliary_storage.end()){ // if deleted after merge _unassembled_bytes += _auxiliary_storage[index].size() - _outside_size; }else{ _unassembled_bytes += _auxiliary_storage[index].size() - overlap_size; // c bcd ->error } }elseif(substring.size() > _auxiliary_storage[index].size()){ // update better(longer) substring size_t before_len = _auxiliary_storage[index].size(); _auxiliary_storage[index] = substring; overlap_size = overlap_length(); _unassembled_bytes += before_len + overlap_size; } }
In Lab 2, you will implement the
TCPReceiver, the part of a TCP implementation
that handles the incoming byte stream. The TCPReceiver
translates between incoming TCP segments (the payloads
of datagrams carried over the Internet) and the incoming byte
stream.
// seg with FIN had come before and all data has been wrtten if(_fin_flag && _reassembler.stream_out().input_ended()){ _ack_no = WrappingInt32(_ack_no.value() + 1); } }
Total Test time (real) = 0.51 sec [100%] Built target check_lab2
更新:将第 27 行代码更新为第 28 行代码 修复了下述情况遇到的错误
1 2
with_syn().with_seqno(5).with_fin() SYN received (ackno exists), and input to stream hasn't ended
seqno
2303251161
2303251162
2303251163
2303251164
2303251165
2303251166
2303251167
2303251168
2303251169
e
g
c
a
b
f
添加_first_unassembled的接口
修复了上述 bug
Lab Checkpoint 3: the
TCP sender
Now, in Lab 3, you’ll implement the other side of the connection. The
TCPSender is a tool that translates from an
outgoing byte stream to segments that will become the payloads
of unreliable datagrams.
modified files: tcp_sender.hh ,
tcp_sender.cc
sender part of TCP responsible for:
reading from a ByteStream (created and written to by
sender-side application)
turing the stream into a sequence of outgoing TCP segments.
TCP sender writes the sequence
number, the SYN flag, the
payload, and the FIN flag.
TCP sender only reads the segment that written by
the receiver: ackno and window_size.
TCPSender’s responsibility to:
Keep track of the receiver’s window (processing incoming
ackno and window_size)
Fill the window when possible
Reading from the ByteStream, creating new TCP segments (including
SYN and FIN flags if needed)
Sending them (until a. the window is full; b. the
ByteStream is empty)
Keep track of "outstanding" segments
have been sent but not yet acknowledged
Re-send "outstanding" segments if enough time passes since they
were sent (time out) and haven't been
asked yet
The basic principle is to send whatever the receiver will allow us to
send (filling the window), and keep retransmitting until the receiver
acknowledges each segment. This is called “automatic repeat request”
(ARQ).
tcp_sender.hh
TCPTimer
Handle outstanding segments via TCP Timer. The timer is
started when a segment is sent, and is reset when receiving an
acknowledgment. If the timer expires (indicating a potential loss of a
segment), the oldest unacknowledged segment is retransmitted, and the
timer's interval is adjusted according to the exponential backoff
algorithm (6(b)ii from lab3.pdf).
//! \brief TCP Timer classTCPTimer { private: //! true: the timer is started //! false: the timer is not started bool _start; //! initial time unsignedint _initial_transmission_time; //! transmission time unsignedint _transmission_time; //! retransmission timeout unsignedint _RTO;
public: //! number of consecutive retransmissions unsignedint num_of_retransmissions;
//! \brief The "sender" part of a TCP implementation. classTCPSender { private: ... /* ! added by myself */
// the latest ack no has been received uint64_t _ackno; // receiver’s window size size_t _window_size;
// sequence numbers are occupied by segments sent but not yet acked size_t _bytes_in_flight; // == _next_seqno - _ackno // timer used for controlling retransmissions TCPTimer timer; // segments in flight queue std::queue<TCPSegment> _segments_in_flight{};
// send segment that is not empty voidsend_segment(TCPSegment &seg);
public: ...
tcp_sender.cc
TCPSender::ack_received processes the data from the
received segment sent by the remote receiver.
graph LR
A["ack_received()"] -- update data --> C["full_window()"]
C --> D[timer]
D -- init --> F["start()"]
D -- end --> G["close()"]
C --> H[process states]
H --> C
H --> I["send_segment()"]
I --> H
// has been acked if (ackno_received <= _ackno) { return; }
// received new valid ackno _ackno = ackno_received;
// pop out all the segments that have been acked while (!_segments_in_flight.empty()) { TCPSegment seg = _segments_in_flight.front(); cerr << "seg.header().seqno.raw_value(): " << seg.header().seqno.raw_value() << ", length: " << static_cast<uint32_t>(seg.length_in_sequence_space()) << endl; cerr << "ackno.raw_value(): " << ackno.raw_value() << endl; if (seg.header().seqno.raw_value() + static_cast<uint32_t>(seg.length_in_sequence_space()) > ackno.raw_value()) { break; } _bytes_in_flight -= seg.length_in_sequence_space(); _segments_in_flight.pop(); logPrint(seg.payload().str(), "has been poped out of `_segments_in_flight`"); }
//! \brief create and send segments to fill as much of the window as possible voidTCPSender::fill_window(){ /* STATUS: CLOSED */ // waiting for stream to begin (no SYN sent) // definition: // next_seqno_absolute() == 0 if (next_seqno_absolute() == 0) { TCPSegment init_seg; init_seg.header().syn = true; init_seg.header().seqno = next_seqno(); send_segment(init_seg); logPrint(init_seg.payload().str(), "init_seg sent (SYN)"); }
/* STATUS: SYN_SENT */ // stream started but nothing acknowledged // definition: // next_seqno_absolute() > 0 // and next_seqno_absolute() == bytes_in_flight() if (next_seqno_absolute() > 0 && next_seqno_absolute() == bytes_in_flight()) { return; }
/* Conection has been built */
// If the receiver has announced a window size of zero, the fill window method should act like the window size is // *one*. size_t window_size = _window_size == 0 ? 1 : _window_size;
/* STATUS: SYN_ACKED */ // “stream ongoing” // if output haven't reach the end if (next_seqno_absolute() > bytes_in_flight() && !stream_in().eof()) { seg.payload() = Buffer(_stream.read(len)); // if all data has been read, set FIN value // second condition: "Don't add FIN if this would make the segment exceed the receiver's window" if (_stream.eof() && remain_size - seg.length_in_sequence_space() > 0) { seg.header().fin = true; } if (seg.length_in_sequence_space() == 0) return; // _stream has nothing to send send_segment(seg); }
/* STATUS: SYN_ACKED */ // “stream ongoing” (stream has reached EOF, but FIN flag hasn't been sent yet) elseif (stream_in().eof() && next_seqno_absolute() < stream_in().bytes_written() + 2) { seg.header().fin = true; send_segment(seg); }
/* STATUS: FIN_SENT & FIN_ACKED */ // have no business with `fill_window()` elseif (stream_in().eof() && next_seqno_absolute() == stream_in().bytes_written() + 2) { return; } // return; } }
When TCPSender::tick(const size_t ms_since_last_tick) is
called, it handles the milliseconds and maintains the timer.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
//! \param[in] ms_since_last_tick the number of milliseconds since the last call to this method voidTCPSender::tick(constsize_t ms_since_last_tick){ // if timer is closed or not timeout, do nothing if (!timer.timeout(ms_since_last_tick)) { return; }
// if nothing can be retransmitted, close the timer if (_segments_in_flight.empty()) { timer.close(); return; }
// occurs timeout, retransmit the first segment in the `segments_in_filght` timer.restart(_window_size); segments_out().push(_segments_in_flight.front()); }
(segments_out) + [ack=0, seqno=0] (segments_out) + [ack=0, seqno=1] (segments_out) + [ack=0, seqno=1] (segments_out) + [ack=0, seqno=2] (segments_out) + [ack=0, seqno=2] (segments_out) + [ack=0, seqno=2] Test Failure on expectation: Expectation: exactly one segment sent with (A=1,ackno=2,)
Failure message: The TCP produced a segment with `ackno = 1`, but ackno was expected to be `2`
List of steps that executed successfully: Action: connect Expectation: exactly one segment sent with (A=0,R=0,S=1,F=0,seqno=0,payload_size=0,) Action: packet arrives: Header(flags=SA,seqno=0,ack=1,win=137) with no payload Expectation: exactly one segment sent with (A=1,R=0,S=0,F=0,ackno=1,payload_size=0,) Action: close Expectation: exactly one segment sent with (A=1,R=0,S=0,F=1,ackno=1,seqno=1,) Action: packet arrives: Header(flags=A,seqno=1,ack=2,win=137) with no payload Action: 4000ms pass Expectation: TCP in state sender=`stream finished and fully acknowledged`, receiver=`SYN received (ackno exists), and input to stream hasn't ended`, active=1, linger_after_streams_finish=1 Action: packet arrives: Header(flags=AF,seqno=1,ack=2,win=137) with no payload Action: 1ms pass
Note: test 3 failed: wrong ACK for FIN
Warning: Unclean shutdown of TCPConnection (segments_out) + [ack=0, seqno=2] The TCP produced a segment with `ackno = 1`, but ackno was expected to be `2`