Skip to content
This repository was archived by the owner on Sep 27, 2019. It is now read-only.

Commit 5522737

Browse files
authored
Merge branch 'master' into unique
2 parents 99afde2 + 7cbedfb commit 5522737

File tree

79 files changed

+3127
-2660
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

79 files changed

+3127
-2660
lines changed

script/installation/packages.sh

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -48,22 +48,13 @@ if [ "$DISTRO" = "UBUNTU" ]; then
4848
fi
4949
sudo apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 15CF4D18AF4F7421
5050
sudo apt-get update -qq
51+
CMAKE_NAME="cmake3"
52+
FORCE_Y="--force-yes"
53+
else
54+
CMAKE_NAME="cmake"
55+
FORCE_Y=""
5156
fi
5257

53-
# Fix for cmake3 name change in Ubuntu 15.x and 16.x plus --force-yes deprecation
54-
CMAKE_NAME="cmake3"
55-
FORCE_Y="--force-yes"
56-
MAJOR_VER=$(echo "$DISTRO_VER" | cut -d '.' -f 1)
57-
for version in "15" "16"
58-
do
59-
if [ "$MAJOR_VER" = "$version" ]
60-
then
61-
FORCE_Y=""
62-
CMAKE_NAME="cmake"
63-
break
64-
fi
65-
done
66-
6758
sudo apt-get -qq $FORCE_Y --ignore-missing -y install \
6859
git \
6960
g++ \

src/executor/create_executor.cpp

Lines changed: 36 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -99,25 +99,26 @@ bool CreateExecutor::CreateDatabase(const planner::CreatePlan &node) {
9999
}
100100

101101
bool CreateExecutor::CreateTable(const planner::CreatePlan &node) {
102-
auto txn = context_->GetTransaction();
102+
auto current_txn = context_->GetTransaction();
103103
std::string table_name = node.GetTableName();
104104
auto database_name = node.GetDatabaseName();
105105
std::unique_ptr<catalog::Schema> schema(node.GetSchema());
106106

107107
ResultType result = catalog::Catalog::GetInstance()->CreateTable(
108-
database_name, table_name, std::move(schema), txn);
109-
txn->SetResult(result);
108+
database_name, table_name, std::move(schema), current_txn);
109+
current_txn->SetResult(result);
110110

111-
if (txn->GetResult() == ResultType::SUCCESS) {
111+
if (current_txn->GetResult() == ResultType::SUCCESS) {
112112
LOG_TRACE("Creating table succeeded!");
113113

114114
// Add the foreign key constraint (or other multi-column constraints)
115115
if (node.GetForeignKeys().empty() == false) {
116116
auto catalog = catalog::Catalog::GetInstance();
117-
auto db = catalog->GetDatabaseWithName(database_name, txn);
117+
auto db = catalog->GetDatabaseWithName(database_name, current_txn);
118118

119119
auto source_table = db->GetTableWithName(table_name);
120120
int count = 1;
121+
121122
for (auto fk : node.GetForeignKeys()) {
122123
auto sink_table = db->GetTableWithName(fk.sink_table_name);
123124

@@ -139,7 +140,7 @@ bool CreateExecutor::CreateTable(const planner::CreatePlan &node) {
139140
// Sink Column Offsets
140141
std::vector<oid_t> sink_col_ids;
141142
for (auto col_name : fk.foreign_key_sinks) {
142-
oid_t col_id = source_table->GetSchema()->GetColumnID(col_name);
143+
oid_t col_id = sink_table->GetSchema()->GetColumnID(col_name);
143144
if (col_id == INVALID_OID) {
144145
std::string error = StringUtil::Format(
145146
"Invalid sink column name '%s.%s' for foreign key '%s'",
@@ -152,43 +153,51 @@ bool CreateExecutor::CreateTable(const planner::CreatePlan &node) {
152153
PL_ASSERT(sink_col_ids.size() == fk.foreign_key_sinks.size());
153154

154155
// Create the catalog object and shove it into the table
155-
auto catalog_fk = new catalog::ForeignKey(
156+
auto catalog_fk = new catalog::ForeignKey(INVALID_OID,
156157
sink_table->GetOid(), sink_col_ids, source_col_ids,
157158
fk.upd_action, fk.del_action, fk.constraint_name);
158159
source_table->AddForeignKey(catalog_fk);
159160

160161
// Register FK with the sink table for delete/update actions
161-
sink_table->RegisterForeignKeySource(table_name);
162+
catalog_fk = new catalog::ForeignKey(source_table->GetOid(),
163+
INVALID_OID,
164+
sink_col_ids,
165+
source_col_ids,
166+
fk.upd_action,
167+
fk.del_action,
168+
fk.constraint_name);
169+
sink_table->RegisterForeignKeySource(catalog_fk);
162170

163171
// Add a non-unique index on the source table if needed
164-
if (catalog_fk->GetUpdateAction() != FKConstrActionType::NOACTION ||
165-
catalog_fk->GetDeleteAction() != FKConstrActionType::NOACTION) {
166-
std::vector<std::string> source_col_names =
167-
fk.foreign_key_sources;
168-
std::string index_name =
169-
source_table->GetName() + "_FK_" + std::to_string(count);
170-
catalog->CreateIndex(database_name, source_table->GetName(),
171-
source_col_ids, index_name, false,
172-
IndexType::BWTREE, txn);
173-
count++;
172+
std::vector<std::string> source_col_names =
173+
fk.foreign_key_sources;
174+
std::string index_name =
175+
source_table->GetName() + "_FK_" + sink_table->GetName() + "_"
176+
+ std::to_string(count);
177+
catalog->CreateIndex(database_name, source_table->GetName(),
178+
source_col_ids, index_name, false,
179+
IndexType::BWTREE, current_txn);
180+
count++;
174181

175182
#ifdef LOG_DEBUG_ENABLED
176-
LOG_DEBUG("Added a FOREIGN index on in %s.", table_name.c_str());
177-
LOG_DEBUG("Foreign key column names: ");
178-
for (auto c : source_col_names) {
179-
LOG_DEBUG("FK col name: %s", c.c_str());
180-
}
181-
#endif
183+
LOG_DEBUG("Added a FOREIGN index on in %s.\n", table_name.c_str());
184+
LOG_DEBUG("Foreign key column names: \n");
185+
for (auto c : source_col_names) {
186+
LOG_DEBUG("FK col name: %s\n", c.c_str());
187+
}
188+
for (auto c : fk.foreign_key_sinks) {
189+
LOG_DEBUG("FK sink col name: %s\n", c.c_str());
182190
}
191+
#endif
183192
}
184193
}
185-
} else if (txn->GetResult() == ResultType::FAILURE) {
194+
} else if (current_txn->GetResult() == ResultType::FAILURE) {
186195
LOG_TRACE("Creating table failed!");
187-
return (false);
188196
} else {
189197
LOG_TRACE("Result is: %s",
190-
ResultTypeToString(txn->GetResult()).c_str());
198+
ResultTypeToString(current_txn->GetResult()).c_str());
191199
}
200+
192201
return (true);
193202
}
194203

src/executor/delete_executor.cpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,27 @@ bool DeleteExecutor::DExecute() {
133133
physical_tuple_id = old_location.offset;
134134
}
135135

136+
ContainerTuple<storage::TileGroup> old_tuple(tile_group, physical_tuple_id);
137+
storage::Tuple prev_tuple(target_table_->GetSchema(), true);
138+
139+
// Get a copy of the old tuple
140+
for (oid_t column_itr = 0; column_itr < target_table_schema->GetColumnCount(); column_itr++) {
141+
type::Value val = (old_tuple.GetValue(column_itr));
142+
prev_tuple.SetValue(column_itr, val, executor_context_->GetPool());
143+
}
144+
145+
// Check the foreign key source table
146+
if (target_table_->CheckForeignKeySrcAndCascade(&prev_tuple,
147+
nullptr,
148+
current_txn,
149+
executor_context_,
150+
false) == false)
151+
{
152+
transaction_manager.SetTransactionResult(current_txn,
153+
peloton::ResultType::FAILURE);
154+
return false;
155+
}
156+
136157
bool is_owner = transaction_manager.IsOwner(current_txn, tile_group_header,
137158
physical_tuple_id);
138159

src/executor/update_executor.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
#include "storage/data_table.h"
2323
#include "storage/tile_group_header.h"
2424
#include "storage/tile.h"
25+
#include "storage/storage_manager.h"
26+
#include "catalog/foreign_key.h"
2527

2628
namespace peloton {
2729
namespace executor {
@@ -112,6 +114,27 @@ bool UpdateExecutor::PerformUpdatePrimaryKey(
112114
return false;
113115
}
114116

117+
// Check the source table of any foreign key constraint
118+
if (target_table_->GetForeignKeySrcCount() > 0) {
119+
storage::Tuple prev_tuple(target_table_schema, true);
120+
// Get a copy of the old tuple
121+
for (oid_t column_itr = 0; column_itr < target_table_schema->GetColumnCount(); column_itr++) {
122+
type::Value val = (old_tuple.GetValue(column_itr));
123+
prev_tuple.SetValue(column_itr, val, executor_context_->GetPool());
124+
}
125+
126+
if (target_table_->CheckForeignKeySrcAndCascade(&prev_tuple,
127+
&new_tuple,
128+
current_txn,
129+
executor_context_,
130+
true) == false)
131+
{
132+
transaction_manager.SetTransactionResult(current_txn,
133+
peloton::ResultType::FAILURE);
134+
return false;
135+
}
136+
}
137+
115138
transaction_manager.PerformInsert(current_txn, location, index_entry_ptr);
116139

117140
return true;

src/include/catalog/foreign_key.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,20 +28,23 @@ namespace catalog {
2828
// Stores info about foreign key constraints, like the sink table id etc.
2929
class ForeignKey {
3030
public:
31-
ForeignKey(oid_t sink_table_id,
31+
ForeignKey(oid_t source_table_id,
32+
oid_t sink_table_id,
3233
std::vector<oid_t> sink_col_ids,
3334
std::vector<oid_t> source_col_ids,
3435
FKConstrActionType update_action,
3536
FKConstrActionType delete_action,
3637
std::string constraint_name)
3738

38-
: sink_table_id(sink_table_id),
39+
: source_table_id(source_table_id),
40+
sink_table_id(sink_table_id),
3941
sink_col_ids(sink_col_ids),
4042
source_col_ids(source_col_ids),
4143
update_action(update_action),
4244
delete_action(delete_action),
4345
fk_name(constraint_name) {}
4446

47+
oid_t GetSourceTableOid() const { return source_table_id; }
4548
oid_t GetSinkTableOid() const { return sink_table_id; }
4649

4750
std::vector<oid_t> GetSinkColumnIds() const { return sink_col_ids; }
@@ -52,6 +55,7 @@ class ForeignKey {
5255
std::string &GetConstraintName() { return fk_name; }
5356

5457
private:
58+
oid_t source_table_id = INVALID_OID;
5559
oid_t sink_table_id = INVALID_OID;
5660

5761
// Columns in the reference table (sink)

src/include/common/exception.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ enum class ExceptionType {
5656
CONNECTION = 21, // connection related
5757
SYNTAX = 22, // syntax related
5858
SETTINGS = 23, // settings related
59-
BINDER = 24 // settings related
59+
BINDER = 24, // settings related
60+
NETWORK = 25
6061
};
6162

6263
class Exception : public std::runtime_error {
@@ -427,6 +428,13 @@ class ConnectionException : public Exception {
427428
: Exception(ExceptionType::CONNECTION, msg) {}
428429
};
429430

431+
class NetworkProcessException : public Exception {
432+
NetworkProcessException() = delete;
433+
434+
public:
435+
NetworkProcessException(std::string msg) : Exception(ExceptionType::NETWORK, msg) {}
436+
};
437+
430438
class SettingsException : public Exception {
431439
SettingsException() = delete;
432440

src/include/common/thread_pool.h

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,30 +10,29 @@
1010
//
1111
//===----------------------------------------------------------------------===//
1212

13-
1413
#pragma once
1514

16-
#include <vector>
1715
#include <thread>
16+
#include <vector>
1817

1918
#include <boost/asio/io_service.hpp>
20-
#include <boost/thread/thread.hpp>
2119
#include <boost/bind.hpp>
2220
#include <boost/function.hpp>
21+
#include <boost/thread/thread.hpp>
2322

2423
#include "common/macros.h"
2524

2625
namespace peloton {
2726
// a wrapper for boost worker thread pool.
2827
class ThreadPool {
2928
public:
30-
ThreadPool() : pool_size_(0),
31-
dedicated_thread_count_(0),
32-
work_(io_service_) { }
29+
ThreadPool()
30+
: pool_size_(0), dedicated_thread_count_(0), work_(io_service_) {}
3331

34-
~ThreadPool() { }
32+
~ThreadPool() {}
3533

36-
void Initialize(const size_t &pool_size, const size_t &dedicated_thread_count) {
34+
void Initialize(const size_t &pool_size,
35+
const size_t &dedicated_thread_count) {
3736
current_thread_count_ = ATOMIC_VAR_INIT(0);
3837
pool_size_ = pool_size;
3938
// PL_ASSERT(pool_size_ != 0);
@@ -73,7 +72,8 @@ class ThreadPool {
7372
size_t thread_id =
7473
current_thread_count_.fetch_add(1, std::memory_order_relaxed);
7574
// assign task to dedicated thread.
76-
dedicated_threads_[thread_id].reset(new std::thread(std::thread(func, params...)));
75+
dedicated_threads_[thread_id].reset(
76+
new std::thread(std::thread(func, params...)));
7777
}
7878

7979
private:
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// Peloton
4+
//
5+
// connection_dispatcher_task.h
6+
//
7+
// Identification: src/include/network/connection_dispatcher_task.h
8+
//
9+
// Copyright (c) 2015-2018, Carnegie Mellon University Database Group
10+
//
11+
//===----------------------------------------------------------------------===//
12+
13+
#pragma once
14+
15+
#include "notifiable_task.h"
16+
#include "network_state.h"
17+
#include "concurrency/epoch_manager_factory.h"
18+
#include "connection_handler_task.h"
19+
20+
namespace peloton {
21+
namespace network {
22+
23+
/**
24+
* @brief A ConnectionDispatcherTask on the main server thread and dispatches
25+
* incoming connections to handler threads.
26+
*
27+
* On construction, the dispatcher also spawns a number of handlers running on
28+
* their own threads. The dispatcher is
29+
* then responsible for maintain, and when shutting down, shutting down the
30+
* spawned handlers also.
31+
*/
32+
class ConnectionDispatcherTask : public NotifiableTask {
33+
public:
34+
/**
35+
* Creates a new ConnectionDispatcherTask, spawning the specified number of
36+
* handlers, each running on their own threads.
37+
*
38+
* @param num_handlers The number of handler tasks to spawn.
39+
* @param listen_fd The server socket fd to listen on.
40+
*/
41+
ConnectionDispatcherTask(int num_handlers, int listen_fd);
42+
43+
/**
44+
* @brief Dispatches the client connection at fd to a handler.
45+
* Currently, the dispatch uses round-robin, and thread communication is
46+
* achieved
47+
* through channels. The dispatch writes a symbol to the fd that the handler
48+
* is configured
49+
* to receive updates on.
50+
*
51+
* @param fd the socket fd of the client connection being dispatched
52+
* @param flags Unused. This is here to conform to libevent callback function
53+
* signature.
54+
*/
55+
void DispatchConnection(int fd, short flags);
56+
57+
/**
58+
* Breaks the dispatcher and managed handlers from their event loops.
59+
*/
60+
void ExitLoop() override;
61+
62+
private:
63+
std::vector<std::shared_ptr<ConnectionHandlerTask>> handlers_;
64+
// TODO: have a smarter dispatch scheduler, we currently use round-robin
65+
std::atomic<int> next_handler_;
66+
};
67+
68+
} // namespace network
69+
} // namespace peloton

0 commit comments

Comments
 (0)