Skip to content
This repository was archived by the owner on Sep 27, 2019. It is now read-only.

Commit c21f80a

Browse files
authored
Merge pull request #1199 from tli2/tianyuli-brain
scaffolding for brain
2 parents 0db1a2a + a8d697a commit c21f80a

File tree

10 files changed

+207
-91
lines changed

10 files changed

+207
-91
lines changed

src/common/notifiable_task.cpp

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// Peloton
4+
//
5+
// notifiable_task.cpp
6+
//
7+
// Identification: src/common/notifiable_task.cpp
8+
//
9+
// Copyright (c) 2015-2018, Carnegie Mellon University Database Group
10+
//
11+
//===----------------------------------------------------------------------===//
12+
13+
#include "common/notifiable_task.h"
14+
#include "common/logger.h"
15+
#include "common/event_util.h"
16+
#include <cstring>
17+
18+
namespace peloton {
19+
20+
NotifiableTask::NotifiableTask(int task_id) : task_id_(task_id) {
21+
base_ = EventUtil::EventBaseNew();
22+
// For exiting a loop
23+
terminate_ = RegisterManualEvent([](int, short, void *arg) {
24+
EventUtil::EventBaseLoopExit((struct event_base *) arg, nullptr);
25+
}, base_);
26+
};
27+
28+
NotifiableTask::~NotifiableTask() {
29+
for (struct event *event : events_) {
30+
EventUtil::EventDel(event);
31+
event_free(event);
32+
}
33+
event_base_free(base_);
34+
}
35+
36+
struct event *NotifiableTask::RegisterEvent(int fd,
37+
short flags,
38+
event_callback_fn callback,
39+
void *arg,
40+
const struct timeval *timeout) {
41+
struct event *event = event_new(base_, fd, flags, callback, arg);
42+
events_.insert(event);
43+
EventUtil::EventAdd(event, timeout);
44+
return event;
45+
}
46+
47+
void NotifiableTask::UnregisterEvent(struct event *event) {
48+
auto it = events_.find(event);
49+
if (it == events_.end()) return;
50+
if (event_del(event) == -1) {
51+
LOG_ERROR("Failed to delete event");
52+
return;
53+
}
54+
event_free(event);
55+
events_.erase(event);
56+
}
57+
58+
} // namespace peloton
59+

src/include/brain/brain.h

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// Peloton
4+
//
5+
// brain.h
6+
//
7+
// Identification: src/include/brain/brain.h
8+
//
9+
// Copyright (c) 2015-2018, Carnegie Mellon University Database Group
10+
//
11+
//===----------------------------------------------------------------------===//
12+
13+
#pragma once
14+
#include <unordered_map>
15+
#include <string>
16+
#include <cstdio>
17+
#include <utility>
18+
#include "capnp/ez-rpc.h"
19+
#include "peloton/capnp/peloton_service.capnp.h"
20+
#include "common/notifiable_task.h"
21+
22+
namespace peloton {
23+
namespace brain {
24+
25+
class BrainEnvironment {
26+
// TODO(tianyu): provide interface for accessing various resources of the brain,
27+
// such as network connection to a peloton engine.
28+
};
29+
30+
class BrainJob {
31+
public:
32+
// TODO(tianyu): Extend this interface for richer interaction
33+
virtual void RunJob(BrainEnvironment *) = 0;
34+
};
35+
36+
class ExampleBrainJob: public BrainJob {
37+
public:
38+
void RunJob(BrainEnvironment *) override {
39+
// TODO(tianyu): Replace with real address
40+
capnp::EzRpcClient client("localhost:15445");
41+
PelotonService::Client peloton_service = client.getMain<PelotonService>();
42+
auto request = peloton_service.createIndexRequest();
43+
request.getRequest().setIndexKeys(42);
44+
auto response = request.send().wait(client.getWaitScope());
45+
}
46+
};
47+
48+
class Brain {
49+
public:
50+
// TODO(tianyu): Add necessary parameters to initialize the brain's resources
51+
Brain() : scheduler_(0) {}
52+
53+
inline void RegisterJob(const struct timeval *period,
54+
std::string name,
55+
BrainJob &job) {
56+
auto callback = [](int, short, void *pair) {
57+
auto *job_env_pair = reinterpret_cast<std::pair<BrainJob *, BrainEnvironment *> *>(pair);
58+
job_env_pair->first->RunJob(job_env_pair->second);
59+
};
60+
// TODO(tianyu) Deal with this memory
61+
auto *pair = new std::pair<BrainJob *, BrainEnvironment *>(&job, &env_);
62+
job_handles_[name] =
63+
scheduler_.RegisterPeriodicEvent(period, callback, pair);
64+
}
65+
66+
inline void Run() {
67+
scheduler_.EventLoop();
68+
}
69+
70+
inline void Terminate() {
71+
scheduler_.ExitLoop();
72+
}
73+
74+
private:
75+
NotifiableTask scheduler_;
76+
std::unordered_map<std::string, struct event *> job_handles_;
77+
BrainEnvironment env_;
78+
};
79+
}
80+
}

src/include/capnp/peloton_service.capnp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ struct CreateIndexRequest {
1212
}
1313

1414
struct CreateIndexResponse {
15-
15+
message @0 :Text;
1616
}
1717

1818
interface PelotonService {

src/include/network/error_util.h renamed to src/include/common/event_util.h

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,16 @@
22
//
33
// Peloton
44
//
5-
// error_util.h
5+
// event_util.h
66
//
7-
// Identification: src/include/network/error_util.h
7+
// Identification: src/include/common/event_util.h
88
//
99
// Copyright (c) 2015-2018, Carnegie Mellon University Database Group
1010
//
1111
//===----------------------------------------------------------------------===//
1212

1313
#pragma once
1414

15-
#include <string>
16-
#include <unordered_map>
17-
#include <utility>
18-
1915
#include <event2/buffer.h>
2016
#include <event2/bufferevent.h>
2117
#include <event2/event.h>
@@ -25,7 +21,6 @@
2521
#include "common/logger.h"
2622

2723
namespace peloton {
28-
namespace network {
2924
/**
3025
* Static utility class with wrappers for libevent functions.
3126
*
@@ -86,5 +81,4 @@ class EventUtil {
8681
"Error in event base dispatch");
8782
}
8883
};
89-
} // namespace network
9084
} // namespace peloton

src/include/network/notifiable_task.h renamed to src/include/common/notifiable_task.h

Lines changed: 19 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -4,31 +4,19 @@
44
//
55
// notifiable_task.h
66
//
7-
// Identification: src/include/network/notifiable_task.h
7+
// Identification: src/include/common/notifiable_task.h
88
//
99
// Copyright (c) 2015-2018, Carnegie Mellon University Database Group
1010
//
1111
//===----------------------------------------------------------------------===//
1212

1313
#pragma once
1414

15-
#include <csignal>
16-
#include <cstdio>
17-
#include <cstdlib>
18-
#include <cstring>
19-
#include <vector>
2015
#include <unordered_set>
21-
22-
#include <sys/file.h>
2316
#include <event2/thread.h>
24-
25-
#include "common/exception.h"
26-
#include "common/logger.h"
27-
#include "network_state.h"
28-
#include "error_util.h"
17+
#include "common/event_util.h"
2918

3019
namespace peloton {
31-
namespace network {
3220

3321
/**
3422
* Convenient MACRO to use a method as a libevent callback function. Example
@@ -61,29 +49,13 @@ class NotifiableTask {
6149
* Constructs a new NotifiableTask instance.
6250
* @param task_id a unique id assigned to this task
6351
*/
64-
explicit NotifiableTask(int task_id) : task_id_(task_id) {
65-
base_ = EventUtil::EventBaseNew();
66-
// TODO(tianyu) Determine whether we actually need this line. Tianyi says we
67-
// need it, libevent documentation says no
68-
// evthread_make_base_notifiable(base_);
69-
70-
// For exiting a loop
71-
terminate_ = RegisterManualEvent([](int, short, void *arg) {
72-
EventUtil::EventBaseLoopExit((struct event_base *)arg, nullptr);
73-
}, base_);
74-
};
52+
explicit NotifiableTask(int task_id);
7553

7654
/**
77-
* Destructs this NotifiableTask. All events currently registered to its base
78-
* are also deleted and freed.
79-
*/
80-
virtual ~NotifiableTask() {
81-
for (struct event *event : events_) {
82-
EventUtil::EventDel(event);
83-
event_free(event);
84-
}
85-
event_base_free(base_);
86-
}
55+
* Destructs this NotifiableTask. All events currently registered to its base
56+
* are also deleted and freed.
57+
*/
58+
virtual ~NotifiableTask();
8759

8860
/**
8961
* @return unique id assigned to this task
@@ -114,13 +86,7 @@ class NotifiableTask {
11486
*/
11587
struct event *RegisterEvent(int fd, short flags, event_callback_fn callback,
11688
void *arg,
117-
const struct timeval *timeout = nullptr) {
118-
struct event *event = event_new(base_, fd, flags, callback, arg);
119-
events_.insert(event);
120-
EventUtil::EventAdd(event, timeout);
121-
return event;
122-
}
123-
89+
const struct timeval *timeout = nullptr);
12490
/**
12591
* @brief Register a signal event. This is a wrapper around RegisterEvent()
12692
*
@@ -133,8 +99,9 @@ class NotifiableTask {
13399
* null which will wait forever
134100
* @return pointer to the allocated event.
135101
*/
136-
struct event *RegisterSignalEvent(int signal, event_callback_fn callback,
137-
void *arg) {
102+
inline struct event *RegisterSignalEvent(int signal,
103+
event_callback_fn callback,
104+
void *arg) {
138105
return RegisterEvent(signal, EV_SIGNAL | EV_PERSIST, callback, arg);
139106
}
140107

@@ -150,8 +117,9 @@ class NotifiableTask {
150117
* @param arg an argument to be passed to the callback function
151118
* @return pointer to the allocated event.
152119
*/
153-
struct event *RegisterPeriodicEvent(const struct timeval *timeout,
154-
event_callback_fn callback, void *arg) {
120+
inline struct event *RegisterPeriodicEvent(const struct timeval *timeout,
121+
event_callback_fn callback,
122+
void *arg) {
155123
return RegisterEvent(-1, EV_TIMEOUT | EV_PERSIST, callback, arg, timeout);
156124
}
157125

@@ -166,14 +134,14 @@ class NotifiableTask {
166134
* @param arg an argument to be passed to the callback function
167135
* @return pointer to the allocated event.
168136
*/
169-
struct event *RegisterManualEvent(event_callback_fn callback, void *arg) {
137+
inline struct event *RegisterManualEvent(event_callback_fn callback,
138+
void *arg) {
170139
return RegisterEvent(-1, EV_PERSIST, callback, arg);
171140
}
172141

173142
// TODO(tianyu): The original network code seems to do this as an
174143
// optimization. Specifically it avoids new memory allocation by reusing
175144
// an existing event. I am leaving this out until we get numbers.
176-
177145
// void UpdateEvent(struct event *event, int fd, short flags,
178146
// event_callback_fn callback, void *arg,
179147
// const struct timeval *timeout = nullptr) {
@@ -198,25 +166,15 @@ class NotifiableTask {
198166
*
199167
* @param event the event to be freed
200168
*/
201-
void UnregisterEvent(struct event *event) {
202-
auto it = events_.find(event);
203-
if (it == events_.end()) return;
204-
if (event_del(event) == -1) {
205-
LOG_ERROR("Failed to delete event");
206-
return;
207-
}
208-
event_free(event);
209-
events_.erase(event);
210-
}
169+
void UnregisterEvent(struct event *event);
211170

212171
/**
213172
* In a loop, make this notifiable task wait and respond to incoming events
214173
*/
215-
void EventLoop() {
174+
inline void EventLoop() {
216175
EventUtil::EventBaseDispatch(base_);
217176
LOG_TRACE("stop");
218177
}
219-
220178
/**
221179
* Exits the event loop
222180
*/
@@ -225,7 +183,7 @@ class NotifiableTask {
225183
/**
226184
* Wrapper around ExitLoop() to conform to libevent callback signature
227185
*/
228-
void ExitLoop(int, short) { ExitLoop(); }
186+
inline void ExitLoop(int, short) { ExitLoop(); }
229187

230188
private:
231189
const int task_id_;
@@ -236,5 +194,4 @@ class NotifiableTask {
236194
std::unordered_set<struct event *> events_;
237195
};
238196

239-
} // namespace network
240197
} // namespace peloton

src/include/network/connection_dispatcher_task.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
#pragma once
1414

15-
#include "notifiable_task.h"
15+
#include "common/notifiable_task.h"
1616
#include "network_state.h"
1717
#include "concurrency/epoch_manager_factory.h"
1818
#include "connection_handler_task.h"

src/include/network/connection_handler_task.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
#include "common/exception.h"
2323
#include "common/logger.h"
2424
#include "common/container/lock_free_queue.h"
25-
#include "network/notifiable_task.h"
25+
#include "common/notifiable_task.h"
2626

2727
namespace peloton {
2828
namespace network {

src/include/network/peloton_server.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
#include "common/dedicated_thread_owner.h"
3535
#include "connection_dispatcher_task.h"
3636
#include "network_state.h"
37-
#include "notifiable_task.h"
37+
#include "common/notifiable_task.h"
3838
#include "protocol_handler.h"
3939

4040
#include <openssl/crypto.h>

src/include/settings/settings.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,11 @@ SETTING_bool(brain,
156156
false,
157157
true, true)
158158

159+
SETTING_string(peloton_address,
160+
"ip and port of the peloton rpc service, address:port",
161+
"127.0.0.1:15445",
162+
false, false)
163+
159164
// Size of the brain task queue
160165
SETTING_int(brain_task_queue_size,
161166
"Brain Task Queue Size (default: 32)",

0 commit comments

Comments
 (0)