Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,6 @@ go-proto:
cd proto && make

go-build: go-proto
go build -o ./target/debug/manager-fallback manager-fallback/manager.go
go build -o ./target/debug/worker worker/worker.go
go build -o ./target/debug/hrctl hrctl/hrctl.go
go build -o ./target/debug/hrctl hrctl/hrctl.go
8 changes: 4 additions & 4 deletions hrctl/app/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func schedule(c *cli.Context) error {
return errors.Wrap(err, "unable to get job from file")
}

conn, err := connect(c.String("manager"))
conn, err := connect(c.GlobalString("manager"))
if err != nil {
return errors.Wrap(err, "unable to connect to manager")
}
Expand All @@ -110,7 +110,7 @@ func schedule(c *cli.Context) error {
}

func cancel(c *cli.Context) error {
conn, err := connect(c.String("manager"))
conn, err := connect(c.GlobalString("manager"))
if err != nil {
return errors.Wrap(err, "unable to connect to manager")
}
Expand Down Expand Up @@ -142,7 +142,7 @@ func describeQueue(c *cli.Context) error {
}

func describeJobs(c *cli.Context) error {
conn, err := connect(c.String("manager"))
conn, err := connect(c.GlobalString("manager"))
if err != nil {
return errors.Wrap(err, "unable to connect to manager")
}
Expand All @@ -151,7 +151,7 @@ func describeJobs(c *cli.Context) error {
}

func describeTasks(c *cli.Context) error {
conn, err := connect(c.String("manager"))
conn, err := connect(c.GlobalString("manager"))
if err != nil {
return errors.Wrap(err, "unable to connect to manager")
}
Expand Down
88 changes: 88 additions & 0 deletions manager-fallback/broker/amqp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package broker

import (
"time"

log "github.com/golang/glog"
"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
"github.com/streadway/amqp"

"github.com/cpssd/heracles/manager-fallback/settings"
"github.com/cpssd/heracles/proto/datatypes"
)

// AMQPConnection broker
type AMQPConnection struct {
conn *amqp.Connection
ch *amqp.Channel
queueName string
}

type channel interface {
Publish()
}

// NewAMQP returns a new connection to AMQP
func NewAMQP(addr string) (*AMQPConnection, error) {
log.Infof("connecting to broker at %s", addr)
conn, err := amqp.Dial(addr)
if err != nil {
return nil, errors.Wrap(err, "unable to open connection to AMQP")
}
// TODO: Handle closing of connection

log.Info("connected to broker")

ch, err := conn.Channel()
if err != nil {
return nil, errors.Wrap(err, "unable to create a channel")
}

queueName := settings.String("broker.queue_name")

if _, err = ch.QueueDeclare(
queueName,
true,
false,
false,
false,
nil,
); err != nil {
return nil, errors.Wrap(err, "unable to declare a queue")
}

return &AMQPConnection{
conn: conn,
ch: ch,
queueName: queueName,
}, nil
}

// Send implementation. Call this in go routine. This function will only return
// When positively acknowledged. It can also return an error if something happens
// beforehand
func (c *AMQPConnection) Send(task *datatypes.Task) error {
serialized, err := proto.Marshal(task)
if err != nil {
return errors.Wrap(err, "can't serialize")
}

if err := c.ch.Publish(
"",
settings.String("broker.queue_name"),
false,
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
Timestamp: time.Now(),
Body: serialized,
},
); err != nil {
return err
}

log.V(1).Infof("Sent task %s", task.GetId())

return nil
}
17 changes: 17 additions & 0 deletions manager-fallback/broker/broker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package broker

import (
"github.com/cpssd/heracles/manager-fallback/settings"
"github.com/cpssd/heracles/proto/datatypes"
)

// Broker interface
type Broker interface {
Send(*datatypes.Task) error
}

// New broker connection
func New() (Broker, error) {
addr := settings.GetString("broker.address")
return NewAMQP(addr)
}
53 changes: 53 additions & 0 deletions manager-fallback/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package main

import (
"fmt"
"os"

log "github.com/golang/glog"
"github.com/pkg/errors"

"github.com/cpssd/heracles/manager-fallback/broker"
"github.com/cpssd/heracles/manager-fallback/scheduler"
"github.com/cpssd/heracles/manager-fallback/server"
"github.com/cpssd/heracles/manager-fallback/settings"
"github.com/cpssd/heracles/manager-fallback/state"
)

func main() {
if err := run(); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
}
}

func run() error {
if err := settings.Init(); err != nil {
return errors.Wrap(err, "unable to initialize settings")
}

log.Info("starting heracles manager-fallback")

st, err := state.New()
if err != nil {
return errors.Wrap(err, "unable to connect to state")
}
br, err := broker.New()
if err != nil {
return errors.Wrap(err, "unable to create a broker connection")
}

sch := scheduler.New(br, st)

waitUntilProcessingReady := make(chan struct{})
go func() {
close(waitUntilProcessingReady)
sch.ProcessJobs()
}()
<-waitUntilProcessingReady

if err := server.New(sch).Run(); err != nil {
return errors.Wrap(err, "error running server")
}

return nil
}
131 changes: 131 additions & 0 deletions manager-fallback/scheduler/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package scheduler

import (
"sync"
"time"

log "github.com/golang/glog"
"github.com/google/uuid"
"github.com/pkg/errors"

"github.com/cpssd/heracles/manager-fallback/broker"
"github.com/cpssd/heracles/manager-fallback/splitting"
"github.com/cpssd/heracles/manager-fallback/state"
"github.com/cpssd/heracles/proto/datatypes"
)

// Scheduler type
type Scheduler struct {
st state.State
br broker.Broker
jobs chan *datatypes.Job
}

// New Scheduler
func New(br broker.Broker, st state.State) *Scheduler {
return &Scheduler{
br: br,
st: st,
jobs: make(chan *datatypes.Job),
}
}

// Schedule a job
func (s Scheduler) Schedule(job *datatypes.Job) (string, error) {
id := uuid.New().String()
job.Id = id
if err := s.st.SaveJob(job); err != nil {
return "", errors.Wrap(err, "can't schedule")
}

s.jobs <- job

return id, nil
}

// Cancel a job
func (s Scheduler) Cancel(jobID string) error {
return nil
}

// ProcessJobs listens
func (s Scheduler) ProcessJobs() {
log.Info("begining to listen for any jobs")
var wg sync.WaitGroup
for job := range s.jobs {
wg.Add(1)
go func(job *datatypes.Job) {
defer wg.Done()
if err := s.processJob(job); err != nil {
log.Errorf("error processing job: %v", err)
}
}(job)
}
wg.Wait()

}

func (s *Scheduler) processJob(job *datatypes.Job) error {
var wg sync.WaitGroup

intermediateFiles := splitting.IntermediateFiles(job)

// create all tasks
mapTasks, err := splitting.Map(job, intermediateFiles)
if err != nil {
job.FailureDetails = err.Error()
return err
}
reduceTasks := splitting.Reduce(job, intermediateFiles)

if err := s.st.CreateTasks(append(mapTasks, reduceTasks...)); err != nil {
job.FailureDetails = err.Error()
return err
}

// Start tunning the reduce tasks
wg.Add(len(mapTasks))
log.V(1).Info("starting map tasks")
for _, task := range mapTasks {
go func(task *datatypes.Task) {
defer wg.Done()
if err := s.processTask(task); err != nil {
job.FailureDetails = err.Error()
job.Status = datatypes.JobStatus_JOB_FAILED
log.Errorf("%v", err)
return
}
}(task)
}
wg.Wait()
s.st.WaitUntilTasksComplete(job.GetId(), datatypes.TaskKind_MAP)

wg.Add(len(reduceTasks))
log.V(1).Info("starting reduce tasks")
for _, task := range reduceTasks {
go func(task *datatypes.Task) {
defer wg.Done()
if err := s.processTask(task); err != nil {
job.FailureDetails = err.Error()
job.Status = datatypes.JobStatus_JOB_FAILED
log.Errorf("%v", err)
return
}
}(task)
}
wg.Wait()
s.st.WaitUntilTasksComplete(job.GetId(), datatypes.TaskKind_REDUCE)

job.Status = datatypes.JobStatus_JOB_DONE
job.TimeDone = uint64(time.Now().Unix())
return s.st.SaveJob(job)
}

func (s *Scheduler) processTask(task *datatypes.Task) error {
log.Infof("processing task %s: Sending to broker", task.GetId())
if err := s.br.Send(task); err != nil {
return errors.Wrap(err, "can't send task to broker")
}

return nil
}
35 changes: 35 additions & 0 deletions manager-fallback/server/jobschedule.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package server

import (
"context"

pb "github.com/cpssd/heracles/proto/mapreduce"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)

// Schedule implementation
func (s *Server) Schedule(ctx context.Context, req *pb.ScheduleRequest) (*pb.ScheduleResponse, error) {
res := &pb.ScheduleResponse{}

jobID, err := s.sch.Schedule(req.GetJob())
if err != nil {
return res, grpc.Errorf(codes.Internal, err.Error())
}

res.JobId = jobID
return res, nil
}

// Cancel implementation
func (s *Server) Cancel(ctx context.Context, req *pb.CancelRequest) (*pb.EmptyMessage, error) {
if err := s.sch.Cancel(req.GetJobId()); err != nil {
return &pb.EmptyMessage{}, grpc.Errorf(codes.Internal, err.Error())
}
return &pb.EmptyMessage{}, nil
}

// Describe implementation
func (s *Server) Describe(ctx context.Context, req *pb.DescribeRequest) (*pb.Description, error) {
return nil, nil
}
Loading