15
15
#include < string>
16
16
#include < cstdio>
17
17
#include < utility>
18
+ #include < functional>
18
19
#include " capnp/ez-rpc.h"
19
20
#include " peloton/capnp/peloton_service.capnp.h"
20
21
#include " common/notifiable_task.h"
21
22
22
23
namespace peloton {
23
24
namespace brain {
24
25
26
+ /* *
27
+ * Provides an access point to the various resources available to the jobs in
28
+ * the brain, such as RPC and Catalog.
29
+ */
25
30
class BrainEnvironment {
26
- // TODO(tianyu): provide interface for accessing various resources of the brain,
27
- // such as network connection to a peloton engine.
31
+ // TODO(tianyu): fill in as needed
28
32
};
29
33
34
+ /* *
35
+ * Interface that represents a piece of task to be run on Brain. To use this
36
+ * abstract class, extend it with a concrete class and fill in the method
37
+ * OnJobInvocation.
38
+ */
30
39
class BrainJob {
31
40
public:
32
- // TODO(tianyu): Extend this interface for richer interaction
33
- virtual void RunJob (BrainEnvironment *) = 0;
41
+ explicit BrainJob (BrainEnvironment *env) : env_(env) {}
42
+
43
+ virtual ~BrainJob () = default ;
44
+
45
+ // This is separate from the user-defined OnJobInvocation to allow for better
46
+ // interfacing with the libevent API.
47
+ /* *
48
+ * Invokes this job to be run. The brain framework will call this method.
49
+ *
50
+ */
51
+ inline void Invoke () { OnJobInvocation (env_); }
52
+ // TODO(tianyu): Extend this interface for richer behavior
53
+ /* *
54
+ * Executed as the main body of the job, filled in by the user. Use the
55
+ * provided BrainEnvironment for interaction with Brain's resources.
56
+ */
57
+ virtual void OnJobInvocation (BrainEnvironment *) = 0;
58
+ private:
59
+ BrainEnvironment *env_;
34
60
};
35
61
36
- class ExampleBrainJob : public BrainJob {
62
+ /* *
63
+ * Simple implementation of a BrainJob.
64
+ */
65
+ class SimpleBrainJob : public BrainJob {
37
66
public:
38
- void RunJob (BrainEnvironment *) override {
39
- // TODO(tianyu): Replace with real address
40
- capnp::EzRpcClient client (" localhost:15445" );
41
- PelotonService::Client peloton_service = client.getMain <PelotonService>();
42
- auto request = peloton_service.createIndexRequest ();
43
- request.getRequest ().setIndexKeys (42 );
44
- auto response = request.send ().wait (client.getWaitScope ());
45
- }
67
+ explicit SimpleBrainJob (BrainEnvironment *env,
68
+ std::function<void (BrainEnvironment *)> task)
69
+ : BrainJob(env), task_(std::move(task)) {}
70
+ inline void OnJobInvocation (BrainEnvironment *env) override { task_ (env); }
71
+ private:
72
+ std::function<void (BrainEnvironment *)> task_;
46
73
};
47
74
75
+ /* *
76
+ * Main running component of the brain. Events can be registered on this event
77
+ * loop and once Run is called, it will invoke handlers every specified time
78
+ * interval
79
+ */
48
80
class Brain {
49
81
public:
50
82
// TODO(tianyu): Add necessary parameters to initialize the brain's resources
51
83
Brain () : scheduler_(0 ) {}
52
84
85
+ ~Brain () {
86
+ for (auto entry : jobs_)
87
+ delete entry.second ;
88
+ }
89
+
90
+ template <typename BrainJob, typename ... Args>
53
91
inline void RegisterJob (const struct timeval *period,
54
- std::string name,
55
- BrainJob &job) {
56
- auto callback = []( int , short , void *pair) {
57
- auto *job_env_pair = reinterpret_cast <std::pair<BrainJob *, BrainEnvironment *> *>(pair);
58
- job_env_pair-> first -> RunJob (job_env_pair-> second );
92
+ std::string name, Args... args) {
93
+ auto *job = new BrainJob (&env_, args...);
94
+ jobs_[name] = job;
95
+ auto callback = []( int , short , void *arg) {
96
+ reinterpret_cast <BrainJob *>(arg)-> Invoke ( );
59
97
};
60
- // TODO(tianyu) Deal with this memory
61
- auto *pair = new std::pair<BrainJob *, BrainEnvironment *>(&job, &env_);
62
98
job_handles_[name] =
63
- scheduler_.RegisterPeriodicEvent (period, callback, pair );
99
+ scheduler_.RegisterPeriodicEvent (period, callback, job );
64
100
}
65
101
66
102
inline void Run () {
@@ -73,8 +109,9 @@ class Brain {
73
109
74
110
private:
75
111
NotifiableTask scheduler_;
112
+ std::unordered_map<std::string, BrainJob *> jobs_;
76
113
std::unordered_map<std::string, struct event *> job_handles_;
77
114
BrainEnvironment env_;
78
115
};
79
- }
80
- }
116
+ } // namespace brain
117
+ } // namespace peloton
0 commit comments