Skip to content
This repository was archived by the owner on Sep 27, 2019. It is now read-only.

Commit 0591408

Browse files
author
Tianyi Chen
committed
design decision made. Skeloton code based on return value is built
1 parent e42b1a0 commit 0591408

File tree

7 files changed

+210
-281
lines changed

7 files changed

+210
-281
lines changed

src/include/common/internal_types.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1397,7 +1397,8 @@ enum class ProcessResult {
13971397
COMPLETE,
13981398
TERMINATE,
13991399
PROCESSING,
1400-
MORE_DATA_REQUIRED
1400+
MORE_DATA_REQUIRED,
1401+
NEED_SSL_HANDSHAKE,
14011402
};
14021403

14031404
enum class NetworkProtocolType {

src/include/network/connection_handle.h

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -204,17 +204,12 @@ class ConnectionHandle {
204204
std::shared_ptr<Buffer> rbuf_; // Socket's read buffer
205205
std::shared_ptr<Buffer> wbuf_; // Socket's write buffer
206206
unsigned int next_response_ = 0; // The next response in the response buffer
207-
Client client_;
207+
208208
StateMachine state_machine_;
209209

210210
// TODO(Tianyi) Can we encapsulate these flags?
211-
bool ssl_handshake_ = false;
212-
bool finish_startup_packet_ = false;
213211
bool ssl_able_;
214212

215-
// TODO(Tianyi) hide this in protocol handler
216-
InputPacket initial_packet_;
217-
218213
short curr_event_flag_; // current libevent event flag
219214
};
220215
} // namespace network

src/include/network/marshal.h

Lines changed: 17 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515
#include <string>
1616
#include <vector>
1717

18+
#include "common/internal_types.h"
1819
#include "common/logger.h"
1920
#include "common/macros.h"
20-
#include "common/internal_types.h"
2121

2222
#define BUFFER_INIT_SIZE 100
2323

@@ -152,25 +152,13 @@ struct OutputPacket {
152152
}
153153
};
154154

155-
struct Client {
156-
// Authentication details
157-
std::string dbname;
158-
std::string user;
159-
std::unordered_map<std::string, std::string> cmdline_options;
160-
161-
inline void Reset() {
162-
dbname.clear();
163-
user.clear();
164-
cmdline_options.clear();
165-
}
166-
};
167-
168155
/*
169-
* Marshallers
170-
*/
156+
* Marshallers
157+
*/
171158

172159
/* packet_put_byte - used to write a single byte into a packet */
173-
extern void PacketPutByte(OutputPacket *pkt, const uchar c);
160+
extern void
161+
PacketPutByte(OutputPacket *pkt, const uchar c);
174162

175163
/* packet_put_string - used to write a string into a packet */
176164
extern void PacketPutStringWithTerminator(OutputPacket *pkt,
@@ -186,22 +174,22 @@ extern void PacketPutCbytes(OutputPacket *pkt, const uchar *b, int len);
186174
extern void PacketPutString(OutputPacket *pkt, const std::string &data);
187175

188176
/*
189-
* Unmarshallers
190-
*/
177+
* Unmarshallers
178+
*/
191179

192180
/* Copy len bytes from the position indicated by begin to an array */
193181
extern uchar *PacketCopyBytes(ByteBuf::const_iterator begin, int len);
194182
/*
195-
* packet_get_int - Parse an int out of the head of the
196-
* packet. "base" bytes determine the number of bytes of integer
197-
* we are parsing out.
198-
*/
183+
* packet_get_int - Parse an int out of the head of the
184+
* packet. "base" bytes determine the number of bytes of integer
185+
* we are parsing out.
186+
*/
199187
extern int PacketGetInt(InputPacket *pkt, uchar base);
200188

201189
/*
202-
* packet_get_string - parse out a string of size len.
203-
* if len=0? parse till the end of the string
204-
*/
190+
* packet_get_string - parse out a string of size len.
191+
* if len=0? parse till the end of the string
192+
*/
205193
extern void PacketGetString(InputPacket *pkt, size_t len, std::string &result);
206194

207195
/* packet_get_bytes - Parse out "len" bytes of pkt as raw bytes */
@@ -211,9 +199,9 @@ extern void PacketGetBytes(InputPacket *pkt, size_t len, ByteBuf &result);
211199
extern void PacketGetByte(InputPacket *rpkt, uchar &result);
212200

213201
/*
214-
* get_string_token - used to extract a string token
215-
* from an unsigned char vector
216-
*/
202+
* get_string_token - used to extract a string token
203+
* from an unsigned char vector
204+
*/
217205
extern void GetStringToken(InputPacket *pkt, std::string &result);
218206

219207
} // namespace network

src/include/network/postgres_protocol_handler.h

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class PostgresProtocolHandler : public ProtocolHandler {
4646

4747
/* Main switch case wrapper to process every packet apart from the startup
4848
* packet. Avoid flushing the response for extended protocols. */
49-
ProcessResult ProcessPacket(InputPacket *pkt, const size_t thread_id);
49+
ProcessResult ProcessNormalPacket(InputPacket *pkt, const size_t thread_id);
5050

5151
/* Manage the startup packet */
5252
// bool ManageStartupPacket();
@@ -55,6 +55,7 @@ class PostgresProtocolHandler : public ProtocolHandler {
5555

5656
void GetResult();
5757

58+
private:
5859
//===--------------------------------------------------------------------===//
5960
// STATIC HELPERS
6061
//===--------------------------------------------------------------------===//
@@ -73,28 +74,25 @@ class PostgresProtocolHandler : public ProtocolHandler {
7374
std::vector<std::pair<type::TypeId, std::string>> &bind_parameters,
7475
std::vector<type::Value> &param_values, std::vector<int16_t> &formats);
7576

77+
// Parse the input packet based on if it is the startup packet
78+
static bool ParseInputPacket(Buffer &rbuf, InputPacket &rpkt,
79+
bool startup_format);
80+
7681
// Packet Reading Function
7782
// Extracts the contents of Postgres packet from the read socket buffer
7883
static bool ReadPacket(Buffer &rbuf, InputPacket &rpkt);
7984

85+
// Packet Reading Function
86+
// Extracts the header of a Postgres packet from the read socket buffer
87+
static bool ReadPacketHeader(Buffer &rbuf, InputPacket &rpkt,
88+
bool startup_format);
89+
8090
/* Routine to deal with the first packet from the client */
81-
bool ProcessInitialPackets(InputPacket *pkt, Client client, bool ssl_able,
82-
bool &ssl_sent, bool &finish_startup_packet);
91+
bool ProcessInitialPackets(InputPacket *pkt, bool ssl_able);
8392

8493
/* Routine to deal with SSL request message */
8594
void ProcessSSLRequestPacket(bool ssl_able, bool &ssl_handshake);
8695

87-
/* Routine to deal with general Startup message */
88-
bool ProcessStartupPacket(InputPacket *pkt, int32_t proto_version,
89-
Client client, bool &finish_startup_packet);
90-
91-
bool GetFinishedStartupPacket();
92-
93-
private:
94-
// Packet Reading Function
95-
// Extracts the header of a Postgres packet from the read socket buffer
96-
static bool ReadPacketHeader(Buffer &rbuf, InputPacket &rpkt);
97-
9896
//===--------------------------------------------------------------------===//
9997
// PROTOCOL HANDLING FUNCTIONS
10098
//===--------------------------------------------------------------------===//
@@ -155,6 +153,8 @@ class PostgresProtocolHandler : public ProtocolHandler {
155153
//===--------------------------------------------------------------------===//
156154
// MEMBERS
157155
//===--------------------------------------------------------------------===//
156+
// True if this protocol is handling startup/SSL packets
157+
bool init_stage_;
158158

159159
NetworkProtocolType protocol_type_;
160160

@@ -166,7 +166,7 @@ class PostgresProtocolHandler : public ProtocolHandler {
166166
// global txn state
167167
NetworkTransactionStateType txn_state_;
168168

169-
// state to mang skipped queries
169+
// state to manage skipped queries
170170
bool skipped_stmt_ = false;
171171
std::string skipped_query_string_;
172172
QueryType skipped_query_type_;

src/include/network/protocol_handler.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ class ProtocolHandler {
6060

6161
// The traffic cop used for this connection
6262
tcop::TrafficCop *traffic_cop_;
63+
64+
std::unordered_map<std::string, std::string> cmdline_options;
6365
};
6466

6567
} // namespace network

src/network/connection_handle.cpp

Lines changed: 11 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,7 @@ namespace {
8383
;
8484
}
8585

86-
// For readability sake, do not run clang-format on this macro block and follow
87-
// the style presented when editing.
86+
// clang-format off
8887
DEF_TRANSITION_GRAPH
8988
DEFINE_STATE(READ)
9089
ON(WAKEUP) SET_STATE_TO(READ) AND_INVOKE(FillReadBuffer)
@@ -94,6 +93,7 @@ DEF_TRANSITION_GRAPH
9493
END_DEF
9594

9695
DEFINE_STATE(SSL_HANDSHAKE)
96+
ON(WAKEUP) SET_STATE_TO(SSL_HANDSHAKE) AND_INVOKE(SSL_handshake)
9797
ON(NEED_DATA) SET_STATE_TO(SSL_HANDSHAKE) AND_WAIT
9898
ON(FINISH) SET_STATE_TO(CLOSING) AND_INVOKE(CloseSocket)
9999
ON(PROCEED) SET_STATE_TO(PROCESS) AND_INVOKE(Process)
@@ -124,6 +124,7 @@ DEF_TRANSITION_GRAPH
124124
END_DEF
125125

126126
END_DEF
127+
// clang-format on
127128

128129
void ConnectionHandle::StateMachine::Accept(Transition action,
129130
ConnectionHandle &connection) {
@@ -440,75 +441,6 @@ std::string ConnectionHandle::WriteBufferToString() {
440441
return std::string(wbuf_->buf.begin(), wbuf_->buf.end());
441442
}
442443

443-
ProcessResult ConnectionHandle::ProcessInitial() {
444-
// TODO(Tianyi): this is a direct copy from protocol handler
445-
// and we could get rid of it later when we have the second
446-
// protocol handler;
447-
448-
if (initial_packet_.header_parsed == false) {
449-
// parse out the header first
450-
if (ReadStartupPacketHeader(*rbuf_, initial_packet_) == false) {
451-
// need more data
452-
return ProcessResult::MORE_DATA_REQUIRED;
453-
}
454-
}
455-
PL_ASSERT(initial_packet_.header_parsed == true);
456-
457-
if (initial_packet_.is_initialized == false) {
458-
// packet needs to be initialized with rest of the contents
459-
// TODO(Tianyi): If other protocols are added, this need to be changed
460-
if (PostgresProtocolHandler::ReadPacket(*rbuf_, initial_packet_) == false) {
461-
// need more data
462-
return ProcessResult::MORE_DATA_REQUIRED;
463-
}
464-
}
465-
466-
if (protocol_handler_ == nullptr) {
467-
protocol_handler_ = ProtocolHandlerFactory::CreateProtocolHandler(
468-
ProtocolHandlerType::Postgres, &traffic_cop_);
469-
}
470-
// We need to handle startup packet first
471-
// TODO(Tianyi): If other protocols are added, this need to be changed
472-
bool result = protocol_handler_->ProcessInitialPackets(
473-
&initial_packet_, client_, ssl_able_, ssl_handshake_,
474-
finish_startup_packet_);
475-
// Clean up the initial_packet after finishing processing.
476-
initial_packet_.Reset();
477-
if (result) {
478-
return ProcessResult::COMPLETE;
479-
} else {
480-
return ProcessResult::TERMINATE;
481-
}
482-
}
483-
484-
// TODO(Tianyi): This function is now dedicated for postgres packet
485-
bool ConnectionHandle::ReadStartupPacketHeader(Buffer &rbuf,
486-
InputPacket &rpkt) {
487-
size_t initial_read_size = sizeof(int32_t);
488-
489-
if (!rbuf.IsReadDataAvailable(initial_read_size)) {
490-
return false;
491-
}
492-
493-
// extract packet contents size
494-
// content lengths should exclude the length
495-
rpkt.len = rbuf.GetUInt32BigEndian() - sizeof(uint32_t);
496-
497-
// do we need to use the extended buffer for this packet?
498-
rpkt.is_extended = (rpkt.len > rbuf.GetMaxSize());
499-
500-
if (rpkt.is_extended) {
501-
LOG_DEBUG("Using extended buffer for pkt size:%ld", rpkt.len);
502-
// reserve space for the extended buffer
503-
rpkt.ReserveExtendedBuffer();
504-
}
505-
506-
// we have processed the data, move buffer pointer
507-
rbuf.buf_ptr += initial_read_size;
508-
rpkt.header_parsed = true;
509-
return true;
510-
}
511-
512444
// Writes a packet's header (type, size) into the write buffer.
513445
// Return false when the socket is not ready for write
514446
WriteState ConnectionHandle::BufferWriteBytesHeader(OutputPacket *pkt) {
@@ -645,6 +577,12 @@ Transition ConnectionHandle::CloseSocket() {
645577
}
646578

647579
Transition ConnectionHandle::SSL_handshake() {
580+
if (HaveResponse()) {
581+
switch(ProcessWrite()) {
582+
583+
}
584+
}
585+
648586
if (conn_SSL_context == nullptr) {
649587
conn_SSL_context = SSL_new(PelotonServer::ssl_context);
650588
if (conn_SSL_context == nullptr) {
@@ -735,6 +673,8 @@ Transition ConnectionHandle::Process() {
735673
return Transition::GET_RESULT;
736674
case ProcessResult::TERMINATE:
737675
return Transition::FINISH;
676+
case ProcessResult::NEED_SSL_HANDSHAKE:
677+
return Transition::NEED_SSL_HANDSHAKE;
738678
}
739679
}
740680

0 commit comments

Comments
 (0)