Skip to content
This repository was archived by the owner on Sep 27, 2019. It is now read-only.

Commit 8d237c3

Browse files
author
Tianyi Chen
committed
compiled now, bug in inital packet
1 parent fa50c16 commit 8d237c3

File tree

7 files changed

+79
-78
lines changed

7 files changed

+79
-78
lines changed

src/include/network/connection_handle.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ class ConnectionHandle {
179179
* Process SSL handshake to generate valid SSL connection context
180180
* for further communications
181181
*/
182-
Transition ConnectionHandle::SSLHandshake();
182+
Transition SSLHandshake();
183183

184184
/**
185185
* Set the socket to non-blocking mode
@@ -204,7 +204,7 @@ class ConnectionHandle {
204204
* Determine if there is still responses in the buffer
205205
*/
206206
inline bool HasResponse() {
207-
return (protocol_handler_->responses.size() != 0) || (wbuf_->buf_size != 0);
207+
return (protocol_handler_->responses_.size() != 0) || (wbuf_->buf_size != 0);
208208
}
209209

210210
int sock_fd_; // socket file descriptor

src/include/network/network_state.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ enum class ConnState {
2424
PROCESS, // State that runs the network protocol on received data
2525
CLOSING, // State for closing the client connection
2626
GET_RESULT, // State when triggered by worker thread that completes the task.
27-
PROCESS_WRITE_SSL_HANDSHAKE // State to flush out responses and doing (Real)
27+
PROCESS_WRITE_SSL_HANDSHAKE, // State to flush out responses and doing (Real)
2828
// SSL handshake
2929
};
3030

@@ -48,7 +48,7 @@ enum class Transition {
4848
GET_RESULT,
4949
FINISH,
5050
RETRY,
51-
NEED_SSL_HANDSHAKE
51+
NEED_SSL_HANDSHAKE,
5252
};
5353
} // namespace network
5454
} // namespace peloton

src/include/network/postgres_protocol_handler.h

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -47,16 +47,7 @@ class PostgresProtocolHandler : public ProtocolHandler {
4747
*/
4848
ProcessResult Process(Buffer &rbuf, size_t thread_id);
4949

50-
void Reset();
51-
52-
void GetResult();
53-
54-
private:
55-
//===--------------------------------------------------------------------===//
56-
// STATIC HELPERS
57-
//===--------------------------------------------------------------------===//
58-
59-
// Deserialize the parameter types from packet
50+
// Deserialize the parame types from packet
6051
static size_t ReadParamType(InputPacket *pkt, int num_params,
6152
std::vector<int32_t> &param_types);
6253

@@ -70,6 +61,15 @@ class PostgresProtocolHandler : public ProtocolHandler {
7061
std::vector<std::pair<type::TypeId, std::string>> &bind_parameters,
7162
std::vector<type::Value> &param_values, std::vector<int16_t> &formats);
7263

64+
void Reset();
65+
66+
void GetResult();
67+
68+
private:
69+
//===--------------------------------------------------------------------===//
70+
// STATIC HELPERS
71+
//===--------------------------------------------------------------------===//
72+
7373
// Parse the input packet based on if it is the startup packet
7474
static bool ParseInputPacket(Buffer &rbuf, InputPacket &rpkt,
7575
bool startup_format);
@@ -90,12 +90,12 @@ class PostgresProtocolHandler : public ProtocolHandler {
9090
/**
9191
* @brief Routine to deal with the first packet from the client
9292
*/
93-
ProcessResult ProcessInitialPackets(InputPacket *pkt);
93+
ProcessResult ProcessInitialPacket(InputPacket *pkt);
9494

9595
/**
9696
* @brief Main Switch function to process general packets
9797
*/
98-
ProcessResult ProcessNormalPackets(InputPacket *pkt, const size_t thread_id);
98+
ProcessResult ProcessNormalPacket(InputPacket *pkt, const size_t thread_id);
9999

100100
/**
101101
* @brief Helper function to process startup packet

src/include/network/protocol_handler.h

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,9 @@ class ProtocolHandler {
2929

3030
virtual ~ProtocolHandler();
3131

32-
/* Main switch case wrapper to process every packet apart from the startup
33-
* packet. Avoid flushing the response for extended protocols. */
34-
3532
/**
36-
*
33+
* Main switch case wrapper to process every packet apart from the startup
34+
* packet. Avoid flushing the response for extended protocols.
3735
*/
3836
virtual ProcessResult Process(Buffer &rbuf, const size_t thread_id);
3937

src/network/connection_handle.cpp

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ DEF_TRANSITION_GRAPH
105105
ON(NEED_DATA) SET_STATE_TO(READ) AND_INVOKE(FillReadBuffer)
106106
ON(GET_RESULT) SET_STATE_TO(GET_RESULT) AND_WAIT
107107
ON(FINISH) SET_STATE_TO(CLOSING) AND_INVOKE(CloseSocket)
108-
ON(NEED_SSL_HANDSHAKE) SET_STATE_TO(PROCESS_WRITE_SSL_HANDSHKE)
108+
ON(NEED_SSL_HANDSHAKE) SET_STATE_TO(PROCESS_WRITE_SSL_HANDSHAKE)
109109
AND_INVOKE(ProcessWrite_SSLHandshake)
110110
END_DEF
111111

@@ -120,11 +120,6 @@ DEF_TRANSITION_GRAPH
120120
ON(PROCEED) SET_STATE_TO(WRITE) AND_INVOKE(ProcessWrite)
121121
END_DEF
122122

123-
DEFINE_STATE(CLOSING)
124-
ON(PROCEED) SET_STATE_TO(CLOSED) AND_WAIT
125-
ON(NEED_DATA) SET_STATE_TO(CLOSING) AND_WAIT
126-
END_DEF
127-
128123
END_DEF
129124
// clang-format on
130125

@@ -188,9 +183,9 @@ void ConnectionHandle::UpdateEventFlags(short flags) {
188183

189184
WriteState ConnectionHandle::WritePackets() {
190185
// iterate through all the packets
191-
for (; next_response_ < protocol_handler_->responses.size();
186+
for (; next_response_ < protocol_handler_->responses_.size();
192187
next_response_++) {
193-
auto pkt = protocol_handler_->responses[next_response_].get();
188+
auto pkt = protocol_handler_->responses_[next_response_].get();
194189
LOG_TRACE("To send packet with type: %c, len %lu",
195190
static_cast<char>(pkt->msg_type), pkt->len);
196191
// write is not ready during write. transit to WRITE
@@ -201,7 +196,7 @@ WriteState ConnectionHandle::WritePackets() {
201196
}
202197

203198
// Done writing all packets. clear packets
204-
protocol_handler_->responses.clear();
199+
protocol_handler_->responses_.clear();
205200
next_response_ = 0;
206201

207202
if (protocol_handler_->GetFlushFlag()) {
@@ -442,6 +437,7 @@ std::string ConnectionHandle::WriteBufferToString() {
442437
return std::string(wbuf_->buf.begin(), wbuf_->buf.end());
443438
}
444439

440+
// TODO (Tianyi) Make this to be protocol specific
445441
// Writes a packet's header (type, size) into the write buffer.
446442
// Return false when the socket is not ready for write
447443
WriteState ConnectionHandle::BufferWriteBytesHeader(OutputPacket *pkt) {
@@ -473,14 +469,13 @@ WriteState ConnectionHandle::BufferWriteBytesHeader(OutputPacket *pkt) {
473469
// make len include its field size as well
474470
len_nb = htonl(len + sizeof(int32_t));
475471

476-
if (finish_startup_packet_) {
477-
// append the bytes of this integer in network-byte order
478-
std::copy(reinterpret_cast<uchar *>(&len_nb),
479-
reinterpret_cast<uchar *>(&len_nb) + 4,
480-
std::begin(wbuf_->buf) + wbuf_->buf_ptr);
481-
// move the write buffer pointer and update size of the socket buffer
482-
wbuf_->buf_ptr += sizeof(int32_t);
483-
}
472+
// TODO (Tianyi) Check why we need finish startup packet before
473+
// append the bytes of this integer in network-byte order
474+
std::copy(reinterpret_cast<uchar *>(&len_nb),
475+
reinterpret_cast<uchar *>(&len_nb) + 4,
476+
std::begin(wbuf_->buf) + wbuf_->buf_ptr);
477+
// move the write buffer pointer and update size of the socket buffer
478+
wbuf_->buf_ptr += sizeof(int32_t);
484479

485480
wbuf_->buf_size = wbuf_->buf_ptr;
486481

@@ -680,8 +675,11 @@ Transition ConnectionHandle::Process() {
680675
return Transition::GET_RESULT;
681676
case ProcessResult::TERMINATE:
682677
return Transition::FINISH;
683-
case ProcessResult::NEED_SSL_HANDSHAKE
678+
case ProcessResult::NEED_SSL_HANDSHAKE:
684679
return Transition::NEED_SSL_HANDSHAKE;
680+
default:
681+
LOG_ERROR("Unknown process result");
682+
throw NetworkProcessException("Unknown process result");
685683
}
686684
}
687685

@@ -690,7 +688,6 @@ Transition ConnectionHandle::ProcessWrite() {
690688
switch (WritePackets()) {
691689
case WriteState::COMPLETE:
692690
UpdateEventFlags(EV_READ | EV_PERSIST);
693-
if (!finish_startup_packet_) return Transition::NEED_DATA;
694691
return Transition::PROCEED;
695692
case WriteState::NOT_READY:
696693
return Transition::NONE;

src/network/postgres_protocol_handler.cpp

Lines changed: 43 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include "expression/expression_util.h"
2222
#include "network/marshal.h"
2323
#include "network/postgres_protocol_handler.h"
24+
#include "network/peloton_server.h"
2425
#include "parser/postgresparser.h"
2526
#include "planner/abstract_plan.h"
2627
#include "planner/delete_plan.h"
@@ -57,7 +58,6 @@ const std::unordered_map<std::string, std::string>
5758

5859
PostgresProtocolHandler::PostgresProtocolHandler(tcop::TrafficCop *traffic_cop)
5960
: ProtocolHandler(traffic_cop),
60-
stage_(CommStage::SSL_SETUP),
6161
txn_state_(NetworkTransactionStateType::IDLE) {}
6262

6363
PostgresProtocolHandler::~PostgresProtocolHandler() {}
@@ -429,7 +429,7 @@ void PostgresProtocolHandler::ExecBindMessage(InputPacket *pkt) {
429429
std::unique_ptr<OutputPacket> response(new OutputPacket());
430430
// Send Bind complete response
431431
response->msg_type = NetworkMessageType::BIND_COMPLETE;
432-
responses.push_back(std::move(response));
432+
responses_.push_back(std::move(response));
433433
// TODO(Tianyi) This is a hack to respond correct describe message
434434
// as well as execute message
435435
skipped_stmt_ = true;
@@ -456,7 +456,7 @@ void PostgresProtocolHandler::ExecBindMessage(InputPacket *pkt) {
456456
std::unique_ptr<OutputPacket> response(new OutputPacket());
457457
// Send Bind complete response
458458
response->msg_type = NetworkMessageType::BIND_COMPLETE;
459-
responses.push_back(std::move(response));
459+
responses_.push_back(std::move(response));
460460
return;
461461
}
462462

@@ -536,7 +536,7 @@ void PostgresProtocolHandler::ExecBindMessage(InputPacket *pkt) {
536536
// send bind complete
537537
std::unique_ptr<OutputPacket> response(new OutputPacket());
538538
response->msg_type = NetworkMessageType::BIND_COMPLETE;
539-
responses.push_back(std::move(response));
539+
responses_.push_back(std::move(response));
540540
}
541541

542542
size_t PostgresProtocolHandler::ReadParamType(
@@ -694,7 +694,7 @@ ProcessResult PostgresProtocolHandler::ExecDescribeMessage(InputPacket *pkt) {
694694
// send 'no-data' message
695695
std::unique_ptr<OutputPacket> response(new OutputPacket());
696696
response->msg_type = NetworkMessageType::NO_DATA_RESPONSE;
697-
responses.push_back(std::move(response));
697+
responses_.push_back(std::move(response));
698698
return ProcessResult::COMPLETE;
699699
}
700700

@@ -868,28 +868,29 @@ void PostgresProtocolHandler::ExecCloseMessage(InputPacket *pkt) {
868868
// Send close complete response
869869
std::unique_ptr<OutputPacket> response(new OutputPacket());
870870
response->msg_type = NetworkMessageType::CLOSE_COMPLETE;
871-
responses.push_back(std::move(response));
871+
responses_.push_back(std::move(response));
872872
}
873873

874-
bool PostgresProtocolHandler::ParseInputPacket(Buffer &rbuf, InputPacket,
875-
bool starup_format) {
876-
if (request.header_parsed == false) {
874+
bool PostgresProtocolHandler::ParseInputPacket(Buffer &rbuf, InputPacket &rpkt,
875+
bool startup_format) {
876+
if (rpkt.header_parsed == false) {
877877
// parse out the header first
878-
if (ReadPacketHeader(rbuf, request, startup_format) == false) {
878+
if (ReadPacketHeader(rbuf, rpkt, startup_format) == false) {
879879
// need more data
880880
return false;
881881
}
882882
}
883883

884-
PL_ASSERT(request.header_parsed == true);
884+
PL_ASSERT(rpkt.header_parsed == true);
885885

886-
if (request.is_initialized == false) {
886+
if (rpkt.is_initialized == false) {
887887
// packet needs to be initialized with rest of the contents
888-
if (PostgresProtocolHandler::ReadPacket(rbuf, request) == false) {
888+
if (PostgresProtocolHandler::ReadPacket(rbuf, rpkt) == false) {
889889
// need more data
890890
return false;
891891
}
892892
}
893+
return true;
893894
}
894895

895896
// The function tries to do a preliminary read to fetch the size value and
@@ -967,27 +968,14 @@ bool PostgresProtocolHandler::ReadPacket(Buffer &rbuf, InputPacket &rpkt) {
967968
return true;
968969
}
969970

970-
ProcessResult PostgresProtocolHandler::Process(Buffer &rbuf,
971-
const size_t thread_id) {
972-
if (!ParseInputPacket(rbuf, request_, init_stage_))
973-
return ProcessResult::MORE_DATA_REQUIRED;
974-
975-
ProcessResult process_status;
976-
if (init_stage_) {
977-
process_status = ProcessInitialPacket(request_);
978-
} else {
979-
process_status = ProcessNormalPacket(request_);
980-
}
981-
request_.Reset();
982-
983-
return process_status;
984-
}
985-
971+
/*
972+
* process_startup_packet - Processes the startup packet
973+
* (after the size field of the header).
974+
*/
986975
ProcessResult PostgresProtocolHandler::ProcessInitialPacket(InputPacket *pkt) {
987976
int32_t proto_version = PacketGetInt(pkt, sizeof(int32_t));
988977
LOG_INFO("protocol version: %d", proto_version);
989978

990-
force_flush_ = true;
991979
// TODO(Yuchen): consider more about return value
992980
if (proto_version == SSL_MESSAGE_VERNO) {
993981
LOG_TRACE("process SSL MESSAGE");
@@ -996,6 +984,7 @@ ProcessResult PostgresProtocolHandler::ProcessInitialPacket(InputPacket *pkt) {
996984
response->msg_type =
997985
ssl_able ? NetworkMessageType::SSL_YES : NetworkMessageType::SSL_NO;
998986
responses_.push_back(std::move(response));
987+
force_flush_ = true;
999988
return ssl_able ? ProcessResult::NEED_SSL_HANDSHAKE
1000989
: ProcessResult::COMPLETE;
1001990
} else {
@@ -1023,7 +1012,7 @@ ProcessResult PostgresProtocolHandler::ProcessStartupPacket(
10231012
if (pkt->ptr >= pkt->len) break;
10241013
GetStringToken(pkt, value);
10251014
LOG_TRACE("Option value is %s", token.c_str());
1026-
client.cmdline_options[token] = value;
1015+
cmdline_options_[token] = value;
10271016
if (token.compare("database") == 0) {
10281017
traffic_cop_->SetDefaultDatabaseName(value);
10291018
}
@@ -1035,10 +1024,32 @@ ProcessResult PostgresProtocolHandler::ProcessStartupPacket(
10351024
SendStartupResponse();
10361025

10371026
init_stage_ = false;
1027+
force_flush_ = true;
10381028
return ProcessResult::COMPLETE;
10391029
}
10401030

1041-
ProcessResult PostgresProtocolHandler::ProcessNormalPackets(
1031+
ProcessResult PostgresProtocolHandler::Process(Buffer &rbuf,
1032+
const size_t thread_id) {
1033+
InputPacket rpkt;
1034+
rpkt.Reset();
1035+
if (!ParseInputPacket(rbuf, rpkt, init_stage_))
1036+
return ProcessResult::MORE_DATA_REQUIRED;
1037+
1038+
ProcessResult process_status;
1039+
if (init_stage_) {
1040+
process_status = ProcessInitialPacket(&rpkt);
1041+
} else {
1042+
process_status = ProcessNormalPacket(&rpkt, thread_id);
1043+
}
1044+
1045+
return process_status;
1046+
}
1047+
1048+
/*
1049+
* process_packet - Main switch block; process incoming packets,
1050+
* Returns false if the session needs to be closed.
1051+
*/
1052+
ProcessResult PostgresProtocolHandler::ProcessNormalPacket(
10421053
InputPacket *pkt, const size_t thread_id) {
10431054
LOG_TRACE("Message type: %c", static_cast<unsigned char>(pkt->msg_type));
10441055
// We don't set force_flush to true for `PBDE` messages because they're
@@ -1093,7 +1104,6 @@ ProcessResult PostgresProtocolHandler::ProcessNormalPackets(
10931104
}
10941105
return ProcessResult::COMPLETE;
10951106
}
1096-
10971107
void PostgresProtocolHandler::MakeHardcodedParameterStatus(
10981108
const std::pair<std::string, std::string> &kv) {
10991109
std::unique_ptr<OutputPacket> response(new OutputPacket());

src/network/protocol_handler.cpp

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,15 @@ ProtocolHandler::ProtocolHandler(tcop::TrafficCop *traffic_cop) {
2323

2424
ProtocolHandler::~ProtocolHandler() {}
2525

26-
/* Manage the startup packet */
27-
// bool ManageStartupPacket();
28-
void ProtocolHandler::SendInitialResponse() {}
29-
3026
ProcessResult ProtocolHandler::Process(
3127
UNUSED_ATTRIBUTE Buffer &rbuf, UNUSED_ATTRIBUTE const size_t thread_id) {
3228
return ProcessResult::TERMINATE;
3329
}
3430

3531
void ProtocolHandler::Reset() {
3632
SetFlushFlag(false);
37-
responses.clear();
38-
request.Reset();
33+
responses_.clear();
34+
request_.Reset();
3935
}
4036

4137
void ProtocolHandler::GetResult() {}

0 commit comments

Comments
 (0)