Skip to content
This repository was archived by the owner on Sep 27, 2019. It is now read-only.

Commit dd2cdfd

Browse files
authored
Merge branch 'master' into master
2 parents 0528cd3 + 7cbedfb commit dd2cdfd

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+2647
-2544
lines changed

src/include/common/exception.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ enum class ExceptionType {
5656
CONNECTION = 21, // connection related
5757
SYNTAX = 22, // syntax related
5858
SETTINGS = 23, // settings related
59-
BINDER = 24 // settings related
59+
BINDER = 24, // settings related
60+
NETWORK = 25
6061
};
6162

6263
class Exception : public std::runtime_error {
@@ -427,6 +428,13 @@ class ConnectionException : public Exception {
427428
: Exception(ExceptionType::CONNECTION, msg) {}
428429
};
429430

431+
class NetworkProcessException : public Exception {
432+
NetworkProcessException() = delete;
433+
434+
public:
435+
NetworkProcessException(std::string msg) : Exception(ExceptionType::NETWORK, msg) {}
436+
};
437+
430438
class SettingsException : public Exception {
431439
SettingsException() = delete;
432440

src/include/common/thread_pool.h

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,30 +10,29 @@
1010
//
1111
//===----------------------------------------------------------------------===//
1212

13-
1413
#pragma once
1514

16-
#include <vector>
1715
#include <thread>
16+
#include <vector>
1817

1918
#include <boost/asio/io_service.hpp>
20-
#include <boost/thread/thread.hpp>
2119
#include <boost/bind.hpp>
2220
#include <boost/function.hpp>
21+
#include <boost/thread/thread.hpp>
2322

2423
#include "common/macros.h"
2524

2625
namespace peloton {
2726
// a wrapper for boost worker thread pool.
2827
class ThreadPool {
2928
public:
30-
ThreadPool() : pool_size_(0),
31-
dedicated_thread_count_(0),
32-
work_(io_service_) { }
29+
ThreadPool()
30+
: pool_size_(0), dedicated_thread_count_(0), work_(io_service_) {}
3331

34-
~ThreadPool() { }
32+
~ThreadPool() {}
3533

36-
void Initialize(const size_t &pool_size, const size_t &dedicated_thread_count) {
34+
void Initialize(const size_t &pool_size,
35+
const size_t &dedicated_thread_count) {
3736
current_thread_count_ = ATOMIC_VAR_INIT(0);
3837
pool_size_ = pool_size;
3938
// PL_ASSERT(pool_size_ != 0);
@@ -73,7 +72,8 @@ class ThreadPool {
7372
size_t thread_id =
7473
current_thread_count_.fetch_add(1, std::memory_order_relaxed);
7574
// assign task to dedicated thread.
76-
dedicated_threads_[thread_id].reset(new std::thread(std::thread(func, params...)));
75+
dedicated_threads_[thread_id].reset(
76+
new std::thread(std::thread(func, params...)));
7777
}
7878

7979
private:
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// Peloton
4+
//
5+
// connection_dispatcher_task.h
6+
//
7+
// Identification: src/include/network/connection_dispatcher_task.h
8+
//
9+
// Copyright (c) 2015-2018, Carnegie Mellon University Database Group
10+
//
11+
//===----------------------------------------------------------------------===//
12+
13+
#pragma once
14+
15+
#include "notifiable_task.h"
16+
#include "network_state.h"
17+
#include "concurrency/epoch_manager_factory.h"
18+
#include "connection_handler_task.h"
19+
20+
namespace peloton {
21+
namespace network {
22+
23+
/**
24+
* @brief A ConnectionDispatcherTask on the main server thread and dispatches
25+
* incoming connections to handler threads.
26+
*
27+
* On construction, the dispatcher also spawns a number of handlers running on
28+
* their own threads. The dispatcher is
29+
* then responsible for maintain, and when shutting down, shutting down the
30+
* spawned handlers also.
31+
*/
32+
class ConnectionDispatcherTask : public NotifiableTask {
33+
public:
34+
/**
35+
* Creates a new ConnectionDispatcherTask, spawning the specified number of
36+
* handlers, each running on their own threads.
37+
*
38+
* @param num_handlers The number of handler tasks to spawn.
39+
* @param listen_fd The server socket fd to listen on.
40+
*/
41+
ConnectionDispatcherTask(int num_handlers, int listen_fd);
42+
43+
/**
44+
* @brief Dispatches the client connection at fd to a handler.
45+
* Currently, the dispatch uses round-robin, and thread communication is
46+
* achieved
47+
* through channels. The dispatch writes a symbol to the fd that the handler
48+
* is configured
49+
* to receive updates on.
50+
*
51+
* @param fd the socket fd of the client connection being dispatched
52+
* @param flags Unused. This is here to conform to libevent callback function
53+
* signature.
54+
*/
55+
void DispatchConnection(int fd, short flags);
56+
57+
/**
58+
* Breaks the dispatcher and managed handlers from their event loops.
59+
*/
60+
void ExitLoop() override;
61+
62+
private:
63+
std::vector<std::shared_ptr<ConnectionHandlerTask>> handlers_;
64+
// TODO: have a smarter dispatch scheduler, we currently use round-robin
65+
std::atomic<int> next_handler_;
66+
};
67+
68+
} // namespace network
69+
} // namespace peloton
Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// Peloton
4+
//
5+
// connection_handle.h
6+
//
7+
// Identification: src/include/network/connection_handle.h
8+
//
9+
// Copyright (c) 2015-2018, Carnegie Mellon University Database Group
10+
//
11+
//===----------------------------------------------------------------------===//
12+
13+
#pragma once
14+
#include <event2/buffer.h>
15+
#include <event2/bufferevent.h>
16+
#include <event2/event.h>
17+
#include <event2/listener.h>
18+
#include <unordered_map>
19+
20+
#include <csignal>
21+
#include <cstdio>
22+
#include <cstdlib>
23+
#include <cstring>
24+
#include <vector>
25+
26+
#include <arpa/inet.h>
27+
#include <netinet/tcp.h>
28+
#include <sys/file.h>
29+
30+
#include "common/exception.h"
31+
#include "common/logger.h"
32+
33+
#include "marshal.h"
34+
#include "network/connection_handler_task.h"
35+
#include "network_state.h"
36+
#include "protocol_handler.h"
37+
38+
#include <openssl/err.h>
39+
#include <openssl/ssl.h>
40+
41+
namespace peloton {
42+
namespace network {
43+
44+
// TODO(tianyu) This class is not refactored in full as rewriting the logic is
45+
// not cost-effective. However, readability
46+
// improvement and other changes may become desirable in the future. Other than
47+
// code clutter, responsibility assignment
48+
// is not well thought-out in this class. Abstracting out some type of socket
49+
// wrapper would be nice.
50+
/**
51+
* @brief A ConnectionHandle encapsulates all information about a client
52+
* connection for its entire duration.
53+
* One should not use the constructor to construct a new ConnectionHandle
54+
* instance every time as it is expensive
55+
* to allocate buffers. Instead, use the ConnectionHandleFactory.
56+
*
57+
* @see ConnectionHandleFactory
58+
*/
59+
class ConnectionHandle {
60+
public:
61+
/**
62+
* Update the existing event to listen to the passed flags
63+
*/
64+
void UpdateEventFlags(short flags);
65+
66+
WriteState WritePackets();
67+
68+
std::string WriteBufferToString();
69+
70+
inline void HandleEvent(int, short) {
71+
state_machine_.Accept(Transition::WAKEUP, *this);
72+
}
73+
74+
// Exposed for testing
75+
const std::unique_ptr<ProtocolHandler> &GetProtocolHandler() const {
76+
return protocol_handler_;
77+
}
78+
79+
// State Machine actions
80+
/**
81+
* refill_read_buffer - Used to repopulate read buffer with a fresh
82+
* batch of data from the socket
83+
*/
84+
Transition FillReadBuffer();
85+
Transition Wait();
86+
Transition Process();
87+
Transition ProcessWrite();
88+
Transition GetResult();
89+
Transition CloseSocket();
90+
91+
private:
92+
/**
93+
* A state machine is defined to be a set of states, a set of symbols it
94+
* supports, and a function mapping each
95+
* state and symbol pair to the state it should transition to. i.e.
96+
* transition_graph = state * symbol -> state
97+
*
98+
* In addition to the transition system, our network state machine also needs
99+
* to perform actions. Actions are
100+
* defined as functions (lambdas, or closures, in various other languages) and
101+
* is promised to be invoked by the
102+
* state machine after each transition if registered in the transition graph.
103+
*
104+
* So the transition graph overall has type transition_graph = state * symbol
105+
* -> state * action
106+
*/
107+
class StateMachine {
108+
public:
109+
using action = Transition (*)(ConnectionHandle &);
110+
using transition_result = std::pair<ConnState, action>;
111+
/**
112+
* Runs the internal state machine, starting from the symbol given, until no
113+
* more
114+
* symbols are available.
115+
*
116+
* Each state of the state machine defines a map from a transition symbol to
117+
* an action
118+
* and the next state it should go to. The actions can either generate the
119+
* next symbol,
120+
* which means the state machine will continue to run on the generated
121+
* symbol, or signal
122+
* that there is no more symbols that can be generated, at which point the
123+
* state machine
124+
* will stop running and return, waiting for an external event (user
125+
* interaction, or system event)
126+
* to generate the next symbol.
127+
*
128+
* @param action starting symbol
129+
* @param connection the network connection object to apply actions to
130+
*/
131+
void Accept(Transition action, ConnectionHandle &connection);
132+
133+
private:
134+
/**
135+
* delta is the transition function that defines, for each state, its
136+
* behavior and the
137+
* next state it should go to.
138+
*/
139+
static transition_result Delta_(ConnState state, Transition transition);
140+
ConnState current_state_ = ConnState::READ;
141+
};
142+
143+
friend class StateMachine;
144+
friend class ConnectionHandleFactory;
145+
146+
ConnectionHandle(int sock_fd, ConnectionHandlerTask *handler,
147+
std::shared_ptr<Buffer> rbuf, std::shared_ptr<Buffer> wbuf,
148+
bool ssl_able);
149+
150+
ProcessResult ProcessInitial();
151+
152+
/**
153+
* Extracts the header of a Postgres start up packet from the read socket
154+
* buffer
155+
*/
156+
static bool ReadStartupPacketHeader(Buffer &rbuf, InputPacket &rpkt);
157+
158+
/**
159+
* Writes a packet's header (type, size) into the write buffer
160+
*/
161+
WriteState BufferWriteBytesHeader(OutputPacket *pkt);
162+
163+
/**
164+
* Writes a packet's content into the write buffer
165+
*/
166+
WriteState BufferWriteBytesContent(OutputPacket *pkt);
167+
168+
/**
169+
* Used to invoke a write into the Socket, returns false if the socket is not
170+
* ready for write
171+
*/
172+
WriteState FlushWriteBuffer();
173+
174+
/**
175+
* Set the socket to non-blocking mode
176+
*/
177+
inline void SetNonBlocking(evutil_socket_t fd) {
178+
auto flags = fcntl(fd, F_GETFL);
179+
flags |= O_NONBLOCK;
180+
if (fcntl(fd, F_SETFL, flags) < 0) {
181+
LOG_ERROR("Failed to set non-blocking socket");
182+
}
183+
}
184+
185+
/**
186+
* Set TCP No Delay for lower latency
187+
*/
188+
inline void SetTCPNoDelay(evutil_socket_t fd) {
189+
int one = 1;
190+
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof one);
191+
}
192+
193+
int sock_fd_; // socket file descriptor
194+
struct event *network_event = nullptr; // something to read from network
195+
struct event *workpool_event = nullptr; // worker thread done the job
196+
197+
SSL *conn_SSL_context = nullptr; // SSL context for the connection
198+
199+
ConnectionHandlerTask *handler_; // reference to the network thread
200+
std::unique_ptr<ProtocolHandler>
201+
protocol_handler_; // Stores state for this socket
202+
tcop::TrafficCop traffic_cop_;
203+
204+
std::shared_ptr<Buffer> rbuf_; // Socket's read buffer
205+
std::shared_ptr<Buffer> wbuf_; // Socket's write buffer
206+
unsigned int next_response_ = 0; // The next response in the response buffer
207+
Client client_;
208+
StateMachine state_machine_;
209+
210+
// TODO(Tianyi) Can we encapsulate these flags?
211+
bool ssl_handshake_ = false;
212+
bool finish_startup_packet_ = false;
213+
bool ssl_able_;
214+
215+
// TODO(Tianyi) hide this in protocol handler
216+
InputPacket initial_packet_;
217+
218+
short curr_event_flag_; // current libevent event flag
219+
};
220+
} // namespace network
221+
} // namespace peloton

0 commit comments

Comments
 (0)