Skip to content
This repository was archived by the owner on Sep 27, 2019. It is now read-only.

Commit a8246b9

Browse files
2 parents a11e123 + f372234 commit a8246b9

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

69 files changed

+2805
-2592
lines changed

src/brain/query_logger.cpp

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,22 +11,40 @@
1111
//===----------------------------------------------------------------------===//
1212

1313
#include "brain/query_logger.h"
14+
1415
#include "catalog/query_history_catalog.h"
1516
#include "concurrency/transaction_context.h"
1617
#include "concurrency/transaction_manager_factory.h"
17-
#include "parser/pg_query.h"
1818

1919
namespace peloton {
2020
namespace brain {
2121

22+
QueryLogger::Fingerprint::Fingerprint(const std::string &query)
23+
: query_(query),
24+
fingerprint_(""),
25+
fingerprint_result_(pg_query_fingerprint(query.c_str())) {
26+
if (fingerprint_result_.hexdigest != nullptr) {
27+
fingerprint_ = fingerprint_result_.hexdigest;
28+
}
29+
}
30+
31+
QueryLogger::Fingerprint::~Fingerprint() {
32+
pg_query_free_fingerprint_result(fingerprint_result_);
33+
}
34+
2235
void QueryLogger::LogQuery(std::string query_string, uint64_t timestamp) {
2336
auto &txn_manager = concurrency::TransactionManagerFactory::GetInstance();
24-
auto txn = txn_manager.BeginTransaction();
25-
std::string fingerprint = pg_query_fingerprint(query_string.c_str()).hexdigest;
37+
auto *txn = txn_manager.BeginTransaction();
38+
39+
// Perform fingerprint
40+
Fingerprint fingerprint{query_string};
2641

27-
catalog::QueryHistoryCatalog::GetInstance()->InsertQueryHistory(
28-
query_string, fingerprint, timestamp, nullptr, txn);
42+
// Log query + fingerprint
43+
auto &query_history_catalog = catalog::QueryHistoryCatalog::GetInstance();
44+
query_history_catalog.InsertQueryHistory(
45+
query_string, fingerprint.GetFingerprint(), timestamp, nullptr, txn);
2946

47+
// We're done
3048
txn_manager.CommitTransaction(txn);
3149
}
3250

src/catalog/query_history_catalog.cpp

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,25 +6,23 @@
66
//
77
// Identification: src/catalog/query_history_catalog.cpp
88
//
9-
// Copyright (c) 2015-18, Carnegie Mellon University Database Group
9+
// Copyright (c) 2015-2018, Carnegie Mellon University Database Group
1010
//
1111
//===----------------------------------------------------------------------===//
1212

1313
#include "catalog/query_history_catalog.h"
1414

1515
#include "catalog/catalog.h"
16-
#include "executor/logical_tile.h"
17-
#include "parser/pg_query.h"
1816
#include "storage/data_table.h"
1917
#include "type/value_factory.h"
2018

2119
namespace peloton {
2220
namespace catalog {
2321

24-
QueryHistoryCatalog *QueryHistoryCatalog::GetInstance(
22+
QueryHistoryCatalog &QueryHistoryCatalog::GetInstance(
2523
concurrency::TransactionContext *txn) {
2624
static QueryHistoryCatalog query_history_catalog{txn};
27-
return &query_history_catalog;
25+
return query_history_catalog;
2826
}
2927

3028
QueryHistoryCatalog::QueryHistoryCatalog(concurrency::TransactionContext *txn)
@@ -36,22 +34,23 @@ QueryHistoryCatalog::QueryHistoryCatalog(concurrency::TransactionContext *txn)
3634
"timestamp TIMESTAMP NOT NULL);",
3735
txn) {}
3836

39-
QueryHistoryCatalog::~QueryHistoryCatalog() {}
37+
QueryHistoryCatalog::~QueryHistoryCatalog() = default;
4038

41-
bool QueryHistoryCatalog::InsertQueryHistory(const std::string &query_string,
42-
std::string &fingerprint, uint64_t timestamp,
43-
type::AbstractPool *pool,
44-
concurrency::TransactionContext *txn) {
39+
bool QueryHistoryCatalog::InsertQueryHistory(
40+
const std::string &query_string, const std::string &fingerprint,
41+
uint64_t timestamp, type::AbstractPool *pool,
42+
concurrency::TransactionContext *txn) {
4543
std::unique_ptr<storage::Tuple> tuple(
4644
new storage::Tuple(catalog_table_->GetSchema(), true));
4745

48-
auto val0 = type::ValueFactory::GetVarcharValue(query_string, pool);
49-
auto val1 = type::ValueFactory::GetVarcharValue(fingerprint, pool);
46+
auto val0 = type::ValueFactory::GetVarcharValue(query_string);
47+
auto val1 = type::ValueFactory::GetVarcharValue(fingerprint);
5048
auto val2 = type::ValueFactory::GetTimestampValue(timestamp);
5149

52-
tuple->SetValue(ColumnId::QUERY_STRING, val0, pool);
53-
tuple->SetValue(ColumnId::FINGERPRINT, val1, pool);
54-
tuple->SetValue(ColumnId::TIMESTAMP, val2, pool);
50+
tuple->SetValue(ColumnId::QUERY_STRING, val0,
51+
pool != nullptr ? pool : &pool_);
52+
tuple->SetValue(ColumnId::FINGERPRINT, val1, pool != nullptr ? pool : &pool_);
53+
tuple->SetValue(ColumnId::TIMESTAMP, val2, pool != nullptr ? pool : &pool_);
5554

5655
// Insert the tuple
5756
return InsertTuple(std::move(tuple), txn);

src/include/brain/query_logger.h

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,11 @@
1212

1313
#pragma once
1414

15+
#include <cstdint>
1516
#include <string>
1617

18+
#include "parser/pg_query.h"
19+
1720
namespace peloton {
1821
namespace brain {
1922

@@ -25,8 +28,28 @@ class QueryLogger {
2528
public:
2629
QueryLogger() = default;
2730

28-
/*
31+
class Fingerprint {
32+
public:
33+
/// Constructor
34+
explicit Fingerprint(const std::string &query);
35+
36+
/// Destructor
37+
~Fingerprint();
38+
39+
/// Get original string
40+
const std::string &GetQueryString() { return query_; }
41+
const std::string &GetFingerprint() { return fingerprint_; }
42+
43+
private:
44+
// Accessors
45+
std::string query_;
46+
std::string fingerprint_;
47+
PgQueryFingerprintResult fingerprint_result_;
48+
};
49+
50+
/**
2951
* @brief This function logs the query into query_history_catalog
52+
*
3053
* @param the sql string corresponding to the query
3154
* @param timestamp of the transaction that executed the query
3255
*/

src/include/catalog/query_history_catalog.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
//
77
// Identification: src/include/catalog/query_history_catalog.h
88
//
9-
// Copyright (c) 2015-18, Carnegie Mellon University Database Group
9+
// Copyright (c) 2015-2018, Carnegie Mellon University Database Group
1010
//
1111
//===----------------------------------------------------------------------===//
1212

@@ -23,6 +23,7 @@
2323
#pragma once
2424

2525
#include "catalog/abstract_catalog.h"
26+
#include "type/ephemeral_pool.h"
2627

2728
#define QUERY_HISTORY_CATALOG_NAME "pg_query_history"
2829

@@ -34,14 +35,14 @@ class QueryHistoryCatalog : public AbstractCatalog {
3435
~QueryHistoryCatalog();
3536

3637
// Global Singleton
37-
static QueryHistoryCatalog *GetInstance(
38+
static QueryHistoryCatalog &GetInstance(
3839
concurrency::TransactionContext *txn = nullptr);
3940

4041
//===--------------------------------------------------------------------===//
4142
// write Related API
4243
//===--------------------------------------------------------------------===//
4344
bool InsertQueryHistory(const std::string &query_string,
44-
std::string &fingerprint, uint64_t timestamp,
45+
const std::string &fingerprint, uint64_t timestamp,
4546
type::AbstractPool *pool,
4647
concurrency::TransactionContext *txn);
4748

@@ -54,6 +55,8 @@ class QueryHistoryCatalog : public AbstractCatalog {
5455
private:
5556
QueryHistoryCatalog(concurrency::TransactionContext *txn);
5657

58+
// Pool to use for variable length strings
59+
type::EphemeralPool pool_;
5760
};
5861

5962
} // namespace catalog

src/include/common/exception.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ enum class ExceptionType {
5656
CONNECTION = 21, // connection related
5757
SYNTAX = 22, // syntax related
5858
SETTINGS = 23, // settings related
59-
BINDER = 24 // settings related
59+
BINDER = 24, // settings related
60+
NETWORK = 25
6061
};
6162

6263
class Exception : public std::runtime_error {
@@ -427,6 +428,13 @@ class ConnectionException : public Exception {
427428
: Exception(ExceptionType::CONNECTION, msg) {}
428429
};
429430

431+
class NetworkProcessException : public Exception {
432+
NetworkProcessException() = delete;
433+
434+
public:
435+
NetworkProcessException(std::string msg) : Exception(ExceptionType::NETWORK, msg) {}
436+
};
437+
430438
class SettingsException : public Exception {
431439
SettingsException() = delete;
432440

src/include/common/thread_pool.h

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,30 +10,29 @@
1010
//
1111
//===----------------------------------------------------------------------===//
1212

13-
1413
#pragma once
1514

16-
#include <vector>
1715
#include <thread>
16+
#include <vector>
1817

1918
#include <boost/asio/io_service.hpp>
20-
#include <boost/thread/thread.hpp>
2119
#include <boost/bind.hpp>
2220
#include <boost/function.hpp>
21+
#include <boost/thread/thread.hpp>
2322

2423
#include "common/macros.h"
2524

2625
namespace peloton {
2726
// a wrapper for boost worker thread pool.
2827
class ThreadPool {
2928
public:
30-
ThreadPool() : pool_size_(0),
31-
dedicated_thread_count_(0),
32-
work_(io_service_) { }
29+
ThreadPool()
30+
: pool_size_(0), dedicated_thread_count_(0), work_(io_service_) {}
3331

34-
~ThreadPool() { }
32+
~ThreadPool() {}
3533

36-
void Initialize(const size_t &pool_size, const size_t &dedicated_thread_count) {
34+
void Initialize(const size_t &pool_size,
35+
const size_t &dedicated_thread_count) {
3736
current_thread_count_ = ATOMIC_VAR_INIT(0);
3837
pool_size_ = pool_size;
3938
// PL_ASSERT(pool_size_ != 0);
@@ -73,7 +72,8 @@ class ThreadPool {
7372
size_t thread_id =
7473
current_thread_count_.fetch_add(1, std::memory_order_relaxed);
7574
// assign task to dedicated thread.
76-
dedicated_threads_[thread_id].reset(new std::thread(std::thread(func, params...)));
75+
dedicated_threads_[thread_id].reset(
76+
new std::thread(std::thread(func, params...)));
7777
}
7878

7979
private:
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// Peloton
4+
//
5+
// connection_dispatcher_task.h
6+
//
7+
// Identification: src/include/network/connection_dispatcher_task.h
8+
//
9+
// Copyright (c) 2015-2018, Carnegie Mellon University Database Group
10+
//
11+
//===----------------------------------------------------------------------===//
12+
13+
#pragma once
14+
15+
#include "notifiable_task.h"
16+
#include "network_state.h"
17+
#include "concurrency/epoch_manager_factory.h"
18+
#include "connection_handler_task.h"
19+
20+
namespace peloton {
21+
namespace network {
22+
23+
/**
24+
* @brief A ConnectionDispatcherTask on the main server thread and dispatches
25+
* incoming connections to handler threads.
26+
*
27+
* On construction, the dispatcher also spawns a number of handlers running on
28+
* their own threads. The dispatcher is
29+
* then responsible for maintain, and when shutting down, shutting down the
30+
* spawned handlers also.
31+
*/
32+
class ConnectionDispatcherTask : public NotifiableTask {
33+
public:
34+
/**
35+
* Creates a new ConnectionDispatcherTask, spawning the specified number of
36+
* handlers, each running on their own threads.
37+
*
38+
* @param num_handlers The number of handler tasks to spawn.
39+
* @param listen_fd The server socket fd to listen on.
40+
*/
41+
ConnectionDispatcherTask(int num_handlers, int listen_fd);
42+
43+
/**
44+
* @brief Dispatches the client connection at fd to a handler.
45+
* Currently, the dispatch uses round-robin, and thread communication is
46+
* achieved
47+
* through channels. The dispatch writes a symbol to the fd that the handler
48+
* is configured
49+
* to receive updates on.
50+
*
51+
* @param fd the socket fd of the client connection being dispatched
52+
* @param flags Unused. This is here to conform to libevent callback function
53+
* signature.
54+
*/
55+
void DispatchConnection(int fd, short flags);
56+
57+
/**
58+
* Breaks the dispatcher and managed handlers from their event loops.
59+
*/
60+
void ExitLoop() override;
61+
62+
private:
63+
std::vector<std::shared_ptr<ConnectionHandlerTask>> handlers_;
64+
// TODO: have a smarter dispatch scheduler, we currently use round-robin
65+
std::atomic<int> next_handler_;
66+
};
67+
68+
} // namespace network
69+
} // namespace peloton

0 commit comments

Comments
 (0)