Skip to content
This repository was archived by the owner on Sep 27, 2019. It is now read-only.

Commit 4416b8e

Browse files
authored
Merge pull request #1221 from tli2/master
Clean up Brain framework code
2 parents 6fcbe83 + 644df60 commit 4416b8e

File tree

5 files changed

+78
-29
lines changed

5 files changed

+78
-29
lines changed

src/include/brain/brain.h

Lines changed: 60 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,52 +15,88 @@
1515
#include <string>
1616
#include <cstdio>
1717
#include <utility>
18+
#include <functional>
1819
#include "capnp/ez-rpc.h"
1920
#include "peloton/capnp/peloton_service.capnp.h"
2021
#include "common/notifiable_task.h"
2122

2223
namespace peloton {
2324
namespace brain {
2425

26+
/**
27+
* Provides an access point to the various resources available to the jobs in
28+
* the brain, such as RPC and Catalog.
29+
*/
2530
class BrainEnvironment {
26-
// TODO(tianyu): provide interface for accessing various resources of the brain,
27-
// such as network connection to a peloton engine.
31+
// TODO(tianyu): fill in as needed
2832
};
2933

34+
/**
35+
* Interface that represents a piece of task to be run on Brain. To use this
36+
* abstract class, extend it with a concrete class and fill in the method
37+
* OnJobInvocation.
38+
*/
3039
class BrainJob {
3140
public:
32-
// TODO(tianyu): Extend this interface for richer interaction
33-
virtual void RunJob(BrainEnvironment *) = 0;
41+
explicit BrainJob(BrainEnvironment *env) : env_(env) {}
42+
43+
virtual ~BrainJob() = default;
44+
45+
// This is separate from the user-defined OnJobInvocation to allow for better
46+
// interfacing with the libevent API.
47+
/**
48+
* Invokes this job to be run. The brain framework will call this method.
49+
*
50+
*/
51+
inline void Invoke() { OnJobInvocation(env_); }
52+
// TODO(tianyu): Extend this interface for richer behavior
53+
/**
54+
* Executed as the main body of the job, filled in by the user. Use the
55+
* provided BrainEnvironment for interaction with Brain's resources.
56+
*/
57+
virtual void OnJobInvocation(BrainEnvironment *) = 0;
58+
private:
59+
BrainEnvironment *env_;
3460
};
3561

36-
class ExampleBrainJob: public BrainJob {
62+
/**
63+
* Simple implementation of a BrainJob.
64+
*/
65+
class SimpleBrainJob : public BrainJob {
3766
public:
38-
void RunJob(BrainEnvironment *) override {
39-
// TODO(tianyu): Replace with real address
40-
capnp::EzRpcClient client("localhost:15445");
41-
PelotonService::Client peloton_service = client.getMain<PelotonService>();
42-
auto request = peloton_service.createIndexRequest();
43-
request.getRequest().setIndexKeys(42);
44-
auto response = request.send().wait(client.getWaitScope());
45-
}
67+
explicit SimpleBrainJob(BrainEnvironment *env,
68+
std::function<void(BrainEnvironment *)> task)
69+
: BrainJob(env), task_(std::move(task)) {}
70+
inline void OnJobInvocation(BrainEnvironment *env) override { task_(env); }
71+
private:
72+
std::function<void(BrainEnvironment *)> task_;
4673
};
4774

75+
/**
76+
* Main running component of the brain. Events can be registered on this event
77+
* loop and once Run is called, it will invoke handlers every specified time
78+
* interval
79+
*/
4880
class Brain {
4981
public:
5082
// TODO(tianyu): Add necessary parameters to initialize the brain's resources
5183
Brain() : scheduler_(0) {}
5284

85+
~Brain() {
86+
for (auto entry : jobs_)
87+
delete entry.second;
88+
}
89+
90+
template <typename BrainJob, typename... Args>
5391
inline void RegisterJob(const struct timeval *period,
54-
std::string name,
55-
BrainJob &job) {
56-
auto callback = [](int, short, void *pair) {
57-
auto *job_env_pair = reinterpret_cast<std::pair<BrainJob *, BrainEnvironment *> *>(pair);
58-
job_env_pair->first->RunJob(job_env_pair->second);
92+
std::string name, Args... args) {
93+
auto *job = new BrainJob(&env_, args...);
94+
jobs_[name] = job;
95+
auto callback = [](int, short, void *arg) {
96+
reinterpret_cast<BrainJob *>(arg)->Invoke();
5997
};
60-
// TODO(tianyu) Deal with this memory
61-
auto *pair = new std::pair<BrainJob *, BrainEnvironment *>(&job, &env_);
6298
job_handles_[name] =
63-
scheduler_.RegisterPeriodicEvent(period, callback, pair);
99+
scheduler_.RegisterPeriodicEvent(period, callback, job);
64100
}
65101

66102
inline void Run() {
@@ -73,8 +109,9 @@ class Brain {
73109

74110
private:
75111
NotifiableTask scheduler_;
112+
std::unordered_map<std::string, BrainJob *> jobs_;
76113
std::unordered_map<std::string, struct event *> job_handles_;
77114
BrainEnvironment env_;
78115
};
79-
}
80-
}
116+
} // namespace brain
117+
} // namespace peloton

src/include/common/event_util.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include "common/logger.h"
2222

2323
namespace peloton {
24+
2425
/**
2526
* Static utility class with wrappers for libevent functions.
2627
*

src/include/common/notifiable_task.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ class NotifiableTask {
6262
*/
6363
inline int Id() const { return task_id_; }
6464

65+
6566
/**
6667
* @brief Register an event with the event base associated with this
6768
* notifiable task.

src/main/peloton/peloton.cpp

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
// For GFlag's built-in help message flag
2323
DECLARE_bool(help);
2424

25-
void RunPelotonEngine() {
25+
void RunPelotonServer() {
2626
try {
2727
// Setup
2828
peloton::PelotonInit::Initialize();
@@ -49,8 +49,18 @@ void RunPelotonBrain() {
4949
// TODO(tianyu): register jobs here
5050
struct timeval one_second;
5151
one_second.tv_sec = 1;
52-
peloton::brain::ExampleBrainJob job;
53-
brain.RegisterJob(&one_second, "test", job);
52+
one_second.tv_usec = 0;
53+
54+
auto example_task = [](peloton::brain::BrainEnvironment *) {
55+
// TODO(tianyu): Replace with real address
56+
capnp::EzRpcClient client("localhost:15445");
57+
PelotonService::Client peloton_service = client.getMain<PelotonService>();
58+
auto request = peloton_service.createIndexRequest();
59+
request.getRequest().setIndexKeys(42);
60+
auto response = request.send().wait(client.getWaitScope());
61+
};
62+
63+
brain.RegisterJob<peloton::brain::SimpleBrainJob>(&one_second, "test", example_task);
5464
brain.Run();
5565
}
5666

@@ -76,6 +86,6 @@ int main(int argc, char *argv[]) {
7686
peloton::settings::SettingId::brain))
7787
RunPelotonBrain();
7888
else
79-
RunPelotonEngine();
89+
RunPelotonServer();
8090
return 0;
8191
}

src/network/peloton_server.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -270,8 +270,8 @@ void PelotonServer::ServerLoop() {
270270
if (settings::SettingsManager::GetBool(settings::SettingId::rpc_enabled)) {
271271
int rpc_port =
272272
settings::SettingsManager::GetInt(settings::SettingId::rpc_port);
273-
auto rpc_task = std::make_shared<PelotonRpcHandlerTask>(("127.0.0.1:"
274-
+ std::to_string(rpc_port)).c_str());
273+
std::string address = "127.0.0.1:" + std::to_string(rpc_port);
274+
auto rpc_task = std::make_shared<PelotonRpcHandlerTask>(address.c_str());
275275
DedicatedThreadRegistry::GetInstance()
276276
.RegisterDedicatedThread<PelotonRpcHandlerTask>(this, rpc_task);
277277
}

0 commit comments

Comments
 (0)