Skip to content

Commit 189c9fe

Browse files
committed
Added manager-fallback
This is a functional version of the new heracles architecture.
1 parent fa8ca8b commit 189c9fe

File tree

23 files changed

+1000
-30
lines changed

23 files changed

+1000
-30
lines changed

Gopkg.lock

Lines changed: 7 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,5 +51,6 @@ go-proto:
5151
cd proto && make
5252

5353
go-build: go-proto
54+
go build -o ./target/debug/manager-fallback manager-fallback/manager.go
5455
go build -o ./target/debug/worker worker/worker.go
55-
go build -o ./target/debug/hrctl hrctl/hrctl.go
56+
go build -o ./target/debug/hrctl hrctl/hrctl.go

hrctl/app/parser.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func schedule(c *cli.Context) error {
101101
return errors.Wrap(err, "unable to get job from file")
102102
}
103103

104-
conn, err := connect(c.String("manager"))
104+
conn, err := connect(c.GlobalString("manager"))
105105
if err != nil {
106106
return errors.Wrap(err, "unable to connect to manager")
107107
}
@@ -110,7 +110,7 @@ func schedule(c *cli.Context) error {
110110
}
111111

112112
func cancel(c *cli.Context) error {
113-
conn, err := connect(c.String("manager"))
113+
conn, err := connect(c.GlobalString("manager"))
114114
if err != nil {
115115
return errors.Wrap(err, "unable to connect to manager")
116116
}
@@ -142,7 +142,7 @@ func describeQueue(c *cli.Context) error {
142142
}
143143

144144
func describeJobs(c *cli.Context) error {
145-
conn, err := connect(c.String("manager"))
145+
conn, err := connect(c.GlobalString("manager"))
146146
if err != nil {
147147
return errors.Wrap(err, "unable to connect to manager")
148148
}
@@ -151,7 +151,7 @@ func describeJobs(c *cli.Context) error {
151151
}
152152

153153
func describeTasks(c *cli.Context) error {
154-
conn, err := connect(c.String("manager"))
154+
conn, err := connect(c.GlobalString("manager"))
155155
if err != nil {
156156
return errors.Wrap(err, "unable to connect to manager")
157157
}

manager-fallback/broker/amqp.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package broker
2+
3+
import (
4+
"time"
5+
6+
log "github.com/golang/glog"
7+
"github.com/golang/protobuf/proto"
8+
"github.com/pkg/errors"
9+
"github.com/streadway/amqp"
10+
11+
"github.com/cpssd/heracles/manager-fallback/settings"
12+
"github.com/cpssd/heracles/proto/datatypes"
13+
)
14+
15+
// AMQPConnection broker
16+
type AMQPConnection struct {
17+
conn *amqp.Connection
18+
ch *amqp.Channel
19+
queueName string
20+
}
21+
22+
type channel interface {
23+
Publish()
24+
}
25+
26+
// NewAMQP returns a new connection to AMQP
27+
func NewAMQP(addr string) (*AMQPConnection, error) {
28+
log.Infof("connecting to broker at %s", addr)
29+
conn, err := amqp.Dial(addr)
30+
if err != nil {
31+
return nil, errors.Wrap(err, "unable to open connection to AMQP")
32+
}
33+
// TODO: Handle closing of connection
34+
35+
log.Info("connected to broker")
36+
37+
ch, err := conn.Channel()
38+
if err != nil {
39+
return nil, errors.Wrap(err, "unable to create a channel")
40+
}
41+
42+
queueName := settings.String("broker.queue_name")
43+
44+
if _, err = ch.QueueDeclare(
45+
queueName,
46+
true,
47+
false,
48+
false,
49+
false,
50+
nil,
51+
); err != nil {
52+
return nil, errors.Wrap(err, "unable to declare a queue")
53+
}
54+
55+
return &AMQPConnection{
56+
conn: conn,
57+
ch: ch,
58+
queueName: queueName,
59+
}, nil
60+
}
61+
62+
// Send implementation. Call this in go routine. This function will only return
63+
// When positively acknowledged. It can also return an error if something happens
64+
// beforehand
65+
func (c *AMQPConnection) Send(task *datatypes.Task) error {
66+
serialized, err := proto.Marshal(task)
67+
if err != nil {
68+
return errors.Wrap(err, "can't serialize")
69+
}
70+
71+
if err := c.ch.Publish(
72+
"",
73+
settings.String("broker.queue_name"),
74+
false,
75+
false,
76+
amqp.Publishing{
77+
DeliveryMode: amqp.Persistent,
78+
Timestamp: time.Now(),
79+
Body: serialized,
80+
},
81+
); err != nil {
82+
return err
83+
}
84+
85+
log.V(1).Infof("Sent task %s", task.GetId())
86+
87+
return nil
88+
}

manager-fallback/broker/broker.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package broker
2+
3+
import (
4+
"github.com/cpssd/heracles/manager-fallback/settings"
5+
"github.com/cpssd/heracles/proto/datatypes"
6+
)
7+
8+
// Broker interface
9+
type Broker interface {
10+
Send(*datatypes.Task) error
11+
}
12+
13+
// New broker connection
14+
func New() (Broker, error) {
15+
addr := settings.GetString("broker.address")
16+
return NewAMQP(addr)
17+
}

manager-fallback/manager.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"os"
6+
7+
log "github.com/golang/glog"
8+
"github.com/pkg/errors"
9+
10+
"github.com/cpssd/heracles/manager-fallback/broker"
11+
"github.com/cpssd/heracles/manager-fallback/scheduler"
12+
"github.com/cpssd/heracles/manager-fallback/server"
13+
"github.com/cpssd/heracles/manager-fallback/settings"
14+
"github.com/cpssd/heracles/manager-fallback/state"
15+
)
16+
17+
func main() {
18+
if err := run(); err != nil {
19+
fmt.Fprintf(os.Stderr, "%v\n", err)
20+
}
21+
}
22+
23+
func run() error {
24+
if err := settings.Init(); err != nil {
25+
return errors.Wrap(err, "unable to initialize settings")
26+
}
27+
28+
log.Info("starting heracles manager-fallback")
29+
30+
st, err := state.New()
31+
if err != nil {
32+
return errors.Wrap(err, "unable to connect to state")
33+
}
34+
br, err := broker.New()
35+
if err != nil {
36+
return errors.Wrap(err, "unable to create a broker connection")
37+
}
38+
39+
sch := scheduler.New(br, st)
40+
41+
waitUntilProcessingReady := make(chan struct{})
42+
go func() {
43+
close(waitUntilProcessingReady)
44+
sch.ProcessJobs()
45+
}()
46+
<-waitUntilProcessingReady
47+
48+
if err := server.New(sch).Run(); err != nil {
49+
return errors.Wrap(err, "error running server")
50+
}
51+
52+
return nil
53+
}
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
package scheduler
2+
3+
import (
4+
"sync"
5+
"time"
6+
7+
log "github.com/golang/glog"
8+
"github.com/google/uuid"
9+
"github.com/pkg/errors"
10+
11+
"github.com/cpssd/heracles/manager-fallback/broker"
12+
"github.com/cpssd/heracles/manager-fallback/splitting"
13+
"github.com/cpssd/heracles/manager-fallback/state"
14+
"github.com/cpssd/heracles/proto/datatypes"
15+
)
16+
17+
// Scheduler type
18+
type Scheduler struct {
19+
st state.State
20+
br broker.Broker
21+
jobs chan *datatypes.Job
22+
}
23+
24+
// New Scheduler
25+
func New(br broker.Broker, st state.State) *Scheduler {
26+
return &Scheduler{
27+
br: br,
28+
st: st,
29+
jobs: make(chan *datatypes.Job),
30+
}
31+
}
32+
33+
// Schedule a job
34+
func (s Scheduler) Schedule(job *datatypes.Job) (string, error) {
35+
id := uuid.New().String()
36+
job.Id = id
37+
if err := s.st.SaveJob(job); err != nil {
38+
return "", errors.Wrap(err, "can't schedule")
39+
}
40+
41+
s.jobs <- job
42+
43+
return id, nil
44+
}
45+
46+
// Cancel a job
47+
func (s Scheduler) Cancel(jobID string) error {
48+
return nil
49+
}
50+
51+
// ProcessJobs listens
52+
func (s Scheduler) ProcessJobs() {
53+
log.Info("begining to listen for any jobs")
54+
var wg sync.WaitGroup
55+
for job := range s.jobs {
56+
wg.Add(1)
57+
go func(job *datatypes.Job) {
58+
defer wg.Done()
59+
if err := s.processJob(job); err != nil {
60+
log.Errorf("error processing job: %v", err)
61+
}
62+
}(job)
63+
}
64+
wg.Wait()
65+
66+
}
67+
68+
func (s *Scheduler) processJob(job *datatypes.Job) error {
69+
var wg sync.WaitGroup
70+
71+
intermediateFiles := splitting.IntermediateFiles(job)
72+
73+
// create all tasks
74+
mapTasks, err := splitting.Map(job, intermediateFiles)
75+
if err != nil {
76+
job.FailureDetails = err.Error()
77+
return err
78+
}
79+
reduceTasks := splitting.Reduce(job, intermediateFiles)
80+
81+
if err := s.st.CreateTasks(append(mapTasks, reduceTasks...)); err != nil {
82+
job.FailureDetails = err.Error()
83+
return err
84+
}
85+
86+
// Start tunning the reduce tasks
87+
wg.Add(len(mapTasks))
88+
log.V(1).Info("starting map tasks")
89+
for _, task := range mapTasks {
90+
go func(task *datatypes.Task) {
91+
defer wg.Done()
92+
if err := s.processTask(task); err != nil {
93+
job.FailureDetails = err.Error()
94+
job.Status = datatypes.JobStatus_JOB_FAILED
95+
log.Errorf("%v", err)
96+
return
97+
}
98+
}(task)
99+
}
100+
wg.Wait()
101+
s.st.WaitUntilTasksComplete(job.GetId(), datatypes.TaskKind_MAP)
102+
103+
wg.Add(len(reduceTasks))
104+
log.V(1).Info("starting reduce tasks")
105+
for _, task := range reduceTasks {
106+
go func(task *datatypes.Task) {
107+
defer wg.Done()
108+
if err := s.processTask(task); err != nil {
109+
job.FailureDetails = err.Error()
110+
job.Status = datatypes.JobStatus_JOB_FAILED
111+
log.Errorf("%v", err)
112+
return
113+
}
114+
}(task)
115+
}
116+
wg.Wait()
117+
s.st.WaitUntilTasksComplete(job.GetId(), datatypes.TaskKind_REDUCE)
118+
119+
job.Status = datatypes.JobStatus_JOB_DONE
120+
job.TimeDone = uint64(time.Now().Unix())
121+
return s.st.SaveJob(job)
122+
}
123+
124+
func (s *Scheduler) processTask(task *datatypes.Task) error {
125+
log.Infof("processing task %s: Sending to broker", task.GetId())
126+
if err := s.br.Send(task); err != nil {
127+
return errors.Wrap(err, "can't send task to broker")
128+
}
129+
130+
return nil
131+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package server
2+
3+
import (
4+
"context"
5+
6+
pb "github.com/cpssd/heracles/proto/mapreduce"
7+
"google.golang.org/grpc"
8+
"google.golang.org/grpc/codes"
9+
)
10+
11+
// Schedule implementation
12+
func (s *Server) Schedule(ctx context.Context, req *pb.ScheduleRequest) (*pb.ScheduleResponse, error) {
13+
res := &pb.ScheduleResponse{}
14+
15+
jobID, err := s.sch.Schedule(req.GetJob())
16+
if err != nil {
17+
return res, grpc.Errorf(codes.Internal, err.Error())
18+
}
19+
20+
res.JobId = jobID
21+
return res, nil
22+
}
23+
24+
// Cancel implementation
25+
func (s *Server) Cancel(ctx context.Context, req *pb.CancelRequest) (*pb.EmptyMessage, error) {
26+
if err := s.sch.Cancel(req.GetJobId()); err != nil {
27+
return &pb.EmptyMessage{}, grpc.Errorf(codes.Internal, err.Error())
28+
}
29+
return &pb.EmptyMessage{}, nil
30+
}
31+
32+
// Describe implementation
33+
func (s *Server) Describe(ctx context.Context, req *pb.DescribeRequest) (*pb.Description, error) {
34+
return nil, nil
35+
}

0 commit comments

Comments
 (0)