Skip to content
This repository was archived by the owner on Sep 27, 2019. It is now read-only.

Commit dfb5675

Browse files
author
Tianyi Chen
committed
implementing protocol handlers' logic
1 parent 0591408 commit dfb5675

File tree

6 files changed

+59
-27
lines changed

6 files changed

+59
-27
lines changed

src/include/network/connection_handle.h

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,10 @@ class ConnectionHandle {
8787
Transition ProcessWrite();
8888
Transition GetResult();
8989
Transition CloseSocket();
90-
90+
/**
91+
* Flush out all the responses and do real SSL handshake
92+
*/
93+
Transition ProcessWrite_SSLHandshake();
9194
private:
9295
/**
9396
* A state machine is defined to be a set of states, a set of symbols it
@@ -171,6 +174,12 @@ class ConnectionHandle {
171174
*/
172175
WriteState FlushWriteBuffer();
173176

177+
/**
178+
* Process SSL handshake to generate valid SSL connection context
179+
* for further communications
180+
*/
181+
Transition ConnectionHandle::SSLHandshake();
182+
174183
/**
175184
* Set the socket to non-blocking mode
176185
*/
@@ -190,6 +199,13 @@ class ConnectionHandle {
190199
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof one);
191200
}
192201

202+
/**
203+
* Determine if there is still responses in the buffer
204+
*/
205+
inline bool HasResponse() {
206+
return (protocol_handler_->responses.size() != 0) || (wbuf_->buf_size != 0);
207+
}
208+
193209
int sock_fd_; // socket file descriptor
194210
struct event *network_event = nullptr; // something to read from network
195211
struct event *workpool_event = nullptr; // worker thread done the job

src/include/network/network_state.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ enum class ConnState {
2424
PROCESS, // State that runs the network protocol on received data
2525
CLOSING, // State for closing the client connection
2626
GET_RESULT, // State when triggered by worker thread that completes the task.
27-
SSL_HANDSHAKE// State when doing (Real) SSL handshake
27+
PROCESS_WRITE_SSL_HANDSHAKE // State to flush out responses and doing (Real)
28+
// SSL handshake
2829
};
2930

3031
// TODO(tianyu): Convert use cases of this to just return Transition
@@ -49,5 +50,5 @@ enum class Transition {
4950
RETRY,
5051
NEED_SSL_HANDSHAKE
5152
};
52-
}
53-
}
53+
} // namespace network
54+
} // namespace peloton

src/include/network/postgres_protocol_handler.h

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,17 +37,26 @@ typedef std::vector<std::unique_ptr<OutputPacket>> ResponseBuffer;
3737

3838
class PostgresProtocolHandler : public ProtocolHandler {
3939
public:
40-
// TODO we need to somehow make this virtual?
4140
PostgresProtocolHandler(tcop::TrafficCop *traffic_cop);
4241

4342
~PostgresProtocolHandler();
43+
/**
44+
* Parse the content in the buffer and process to generate results.
45+
* thread_id is the thread of current running thread. This is used
46+
* to generate txn
47+
*/
48+
ProcessResult Process(Buffer &rbuf, const bool ssl_able, const size_t thread_id);
4449

45-
ProcessResult Process(Buffer &rbuf, const size_t thread_id);
46-
47-
/* Main switch case wrapper to process every packet apart from the startup
48-
* packet. Avoid flushing the response for extended protocols. */
49-
ProcessResult ProcessNormalPacket(InputPacket *pkt, const size_t thread_id);
50+
/**
51+
*
52+
*/
53+
ProcessResult ProcessInitialPackets(
5054

55+
/**
56+
* Main switch case wrapper to process every general packet apart from the
57+
* initial packets. Avoid flushing the response for extended protocols.
58+
*/
59+
ProcessResult ProcessNormalPackets(InputPacket *pkt, const size_t thread_id);
5160
/* Manage the startup packet */
5261
// bool ManageStartupPacket();
5362
void SendInitialResponse();
@@ -96,6 +105,9 @@ class PostgresProtocolHandler : public ProtocolHandler {
96105
//===--------------------------------------------------------------------===//
97106
// PROTOCOL HANDLING FUNCTIONS
98107
//===--------------------------------------------------------------------===//
108+
/* Manage the startup packet */
109+
virtual void SendInitialResponse();
110+
99111
// Generic error protocol packet
100112
void SendErrorResponse(
101113
std::vector<std::pair<NetworkMessageType, std::string>> error_status);

src/include/network/protocol_handler.h

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,7 @@ class ProtocolHandler {
3636
// bool ManageStartupPacket();
3737
virtual void SendInitialResponse();
3838

39-
virtual bool ProcessInitialPackets(InputPacket *pkt, Client client,
40-
bool ssl_able, bool &ssl_sent,
41-
bool &finish_startup_packet) = 0;
42-
43-
virtual ProcessResult Process(Buffer &rbuf, const size_t thread_id);
39+
virtual ProcessResult Process(Buffer &rbuf, const bool ssl_able, const size_t thread_id);
4440

4541
virtual void Reset();
4642

src/network/connection_handle.cpp

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,10 @@ DEF_TRANSITION_GRAPH
9292
ON(FINISH) SET_STATE_TO(CLOSING) AND_INVOKE(CloseSocket)
9393
END_DEF
9494

95-
DEFINE_STATE(SSL_HANDSHAKE)
96-
ON(WAKEUP) SET_STATE_TO(SSL_HANDSHAKE) AND_INVOKE(SSL_handshake)
97-
ON(NEED_DATA) SET_STATE_TO(SSL_HANDSHAKE) AND_WAIT
95+
DEFINE_STATE(PROCESS_WRITE_SSL_HANDSHAKE)
96+
ON(WAKEUP) SET_STATE_TO(PROCESS_WRITE_SSL_HANDSHAKE)
97+
AND_INVOKE(ProcessWrite_SSLHandshake)
98+
ON(NEED_DATA) SET_STATE_TO(PROCESS_WRITE_SSL_HANDSHAKE) AND_WAIT
9899
ON(FINISH) SET_STATE_TO(CLOSING) AND_INVOKE(CloseSocket)
99100
ON(PROCEED) SET_STATE_TO(PROCESS) AND_INVOKE(Process)
100101
END_DEF
@@ -104,7 +105,8 @@ DEF_TRANSITION_GRAPH
104105
ON(NEED_DATA) SET_STATE_TO(READ) AND_INVOKE(FillReadBuffer)
105106
ON(GET_RESULT) SET_STATE_TO(GET_RESULT) AND_WAIT
106107
ON(FINISH) SET_STATE_TO(CLOSING) AND_INVOKE(CloseSocket)
107-
ON(SSL_HANDSHAKE) SET_STATE_TO(SSL_HANDSHKE) AND_INVOKE(SSL_handshake)
108+
ON(NEED_SSL_HANDSHAKE) SET_STATE_TO(PROCESS_WRITE_SSL_HANDSHKE)
109+
AND_INVOKE(ProcessWrite_SSLHandshake)
108110
END_DEF
109111

110112
DEFINE_STATE(WRITE)
@@ -576,13 +578,19 @@ Transition ConnectionHandle::CloseSocket() {
576578
}
577579
}
578580

579-
Transition ConnectionHandle::SSL_handshake() {
580-
if (HaveResponse()) {
581-
switch(ProcessWrite()) {
582-
583-
}
581+
Transition ConnectionHandle::ProcessWrite_SSLHandshake() {
582+
// Flush out all the response first
583+
if (HasResponse()) {
584+
auto write_ret = ProcessWrite();
585+
if (write_ret != Transition::PROCEED) {
586+
return write_ret;
587+
}
584588
}
585589

590+
return SSLHandshake();
591+
}
592+
593+
Transition ConnectionHandle::SSLHandshake() {
586594
if (conn_SSL_context == nullptr) {
587595
conn_SSL_context = SSL_new(PelotonServer::ssl_context);
588596
if (conn_SSL_context == nullptr) {

src/network/postgres_protocol_handler.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1035,12 +1035,11 @@ void PostgresProtocolHandler::ProcessStartupPacket(InputPacket *pkt,
10351035
ProcessResult PostgresProtocolHandler::Process(Buffer &rbuf,
10361036
const bool ssl_able,
10371037
const size_t thread_id) {
1038-
// if the packet is of startup packet format
1039-
if (!ParseInputPacket(rbuf, rpkt_, startup_stage_))
1038+
if (!ParseInputPacket(rbuf, rpkt_, init_stage_))
10401039
return ProcessResult::MORE_DATA_REQUIRED;
10411040

10421041
ProcessResult process_status;
1043-
if (startup_stage) {
1042+
if (init_stage_) {
10441043
process_status = ProcessInitialPacket();
10451044
} else {
10461045
process_status = ProcessNormalPacket();

0 commit comments

Comments
 (0)