diff --git a/Gopkg.lock b/Gopkg.lock index bad91f1..02efeff 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -25,6 +25,12 @@ revision = "925541529c1fa6821df4e44ce2723319eb2be768" version = "v1.0.0" +[[projects]] + name = "github.com/google/uuid" + packages = ["."] + revision = "064e2069ce9c359c118179501254f67d7d37ba24" + version = "0.2" + [[projects]] branch = "master" name = "github.com/hashicorp/hcl" @@ -209,6 +215,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "d1f93a29f71bb017367bb5730c97f2088f7831ed222ca4c9710c3d22151acf21" + inputs-digest = "92f018c6cf286c8b3fc8ec756599027584e1336f47df1b9ed378ff709f338a22" solver-name = "gps-cdcl" solver-version = 1 diff --git a/Makefile b/Makefile index 3eca0f5..6fa1537 100644 --- a/Makefile +++ b/Makefile @@ -51,5 +51,6 @@ go-proto: cd proto && make go-build: go-proto + go build -o ./target/debug/manager-fallback manager-fallback/manager.go go build -o ./target/debug/worker worker/worker.go - go build -o ./target/debug/hrctl hrctl/hrctl.go \ No newline at end of file + go build -o ./target/debug/hrctl hrctl/hrctl.go diff --git a/hrctl/app/parser.go b/hrctl/app/parser.go index 5736975..5f3b31f 100644 --- a/hrctl/app/parser.go +++ b/hrctl/app/parser.go @@ -101,7 +101,7 @@ func schedule(c *cli.Context) error { return errors.Wrap(err, "unable to get job from file") } - conn, err := connect(c.String("manager")) + conn, err := connect(c.GlobalString("manager")) if err != nil { return errors.Wrap(err, "unable to connect to manager") } @@ -110,7 +110,7 @@ func schedule(c *cli.Context) error { } func cancel(c *cli.Context) error { - conn, err := connect(c.String("manager")) + conn, err := connect(c.GlobalString("manager")) if err != nil { return errors.Wrap(err, "unable to connect to manager") } @@ -142,7 +142,7 @@ func describeQueue(c *cli.Context) error { } func describeJobs(c *cli.Context) error { - conn, err := connect(c.String("manager")) + conn, err := connect(c.GlobalString("manager")) if err != nil { return errors.Wrap(err, "unable to connect to manager") } @@ -151,7 +151,7 @@ func describeJobs(c *cli.Context) error { } func describeTasks(c *cli.Context) error { - conn, err := connect(c.String("manager")) + conn, err := connect(c.GlobalString("manager")) if err != nil { return errors.Wrap(err, "unable to connect to manager") } diff --git a/manager-fallback/broker/amqp.go b/manager-fallback/broker/amqp.go new file mode 100644 index 0000000..8245dfe --- /dev/null +++ b/manager-fallback/broker/amqp.go @@ -0,0 +1,88 @@ +package broker + +import ( + "time" + + log "github.com/golang/glog" + "github.com/golang/protobuf/proto" + "github.com/pkg/errors" + "github.com/streadway/amqp" + + "github.com/cpssd/heracles/manager-fallback/settings" + "github.com/cpssd/heracles/proto/datatypes" +) + +// AMQPConnection broker +type AMQPConnection struct { + conn *amqp.Connection + ch *amqp.Channel + queueName string +} + +type channel interface { + Publish() +} + +// NewAMQP returns a new connection to AMQP +func NewAMQP(addr string) (*AMQPConnection, error) { + log.Infof("connecting to broker at %s", addr) + conn, err := amqp.Dial(addr) + if err != nil { + return nil, errors.Wrap(err, "unable to open connection to AMQP") + } + // TODO: Handle closing of connection + + log.Info("connected to broker") + + ch, err := conn.Channel() + if err != nil { + return nil, errors.Wrap(err, "unable to create a channel") + } + + queueName := settings.String("broker.queue_name") + + if _, err = ch.QueueDeclare( + queueName, + true, + false, + false, + false, + nil, + ); err != nil { + return nil, errors.Wrap(err, "unable to declare a queue") + } + + return &AMQPConnection{ + conn: conn, + ch: ch, + queueName: queueName, + }, nil +} + +// Send implementation. Call this in go routine. This function will only return +// When positively acknowledged. It can also return an error if something happens +// beforehand +func (c *AMQPConnection) Send(task *datatypes.Task) error { + serialized, err := proto.Marshal(task) + if err != nil { + return errors.Wrap(err, "can't serialize") + } + + if err := c.ch.Publish( + "", + settings.String("broker.queue_name"), + false, + false, + amqp.Publishing{ + DeliveryMode: amqp.Persistent, + Timestamp: time.Now(), + Body: serialized, + }, + ); err != nil { + return err + } + + log.V(1).Infof("Sent task %s", task.GetId()) + + return nil +} diff --git a/manager-fallback/broker/broker.go b/manager-fallback/broker/broker.go new file mode 100644 index 0000000..f2cefcf --- /dev/null +++ b/manager-fallback/broker/broker.go @@ -0,0 +1,17 @@ +package broker + +import ( + "github.com/cpssd/heracles/manager-fallback/settings" + "github.com/cpssd/heracles/proto/datatypes" +) + +// Broker interface +type Broker interface { + Send(*datatypes.Task) error +} + +// New broker connection +func New() (Broker, error) { + addr := settings.GetString("broker.address") + return NewAMQP(addr) +} diff --git a/manager-fallback/manager.go b/manager-fallback/manager.go new file mode 100644 index 0000000..166bcea --- /dev/null +++ b/manager-fallback/manager.go @@ -0,0 +1,53 @@ +package main + +import ( + "fmt" + "os" + + log "github.com/golang/glog" + "github.com/pkg/errors" + + "github.com/cpssd/heracles/manager-fallback/broker" + "github.com/cpssd/heracles/manager-fallback/scheduler" + "github.com/cpssd/heracles/manager-fallback/server" + "github.com/cpssd/heracles/manager-fallback/settings" + "github.com/cpssd/heracles/manager-fallback/state" +) + +func main() { + if err := run(); err != nil { + fmt.Fprintf(os.Stderr, "%v\n", err) + } +} + +func run() error { + if err := settings.Init(); err != nil { + return errors.Wrap(err, "unable to initialize settings") + } + + log.Info("starting heracles manager-fallback") + + st, err := state.New() + if err != nil { + return errors.Wrap(err, "unable to connect to state") + } + br, err := broker.New() + if err != nil { + return errors.Wrap(err, "unable to create a broker connection") + } + + sch := scheduler.New(br, st) + + waitUntilProcessingReady := make(chan struct{}) + go func() { + close(waitUntilProcessingReady) + sch.ProcessJobs() + }() + <-waitUntilProcessingReady + + if err := server.New(sch).Run(); err != nil { + return errors.Wrap(err, "error running server") + } + + return nil +} diff --git a/manager-fallback/scheduler/scheduler.go b/manager-fallback/scheduler/scheduler.go new file mode 100644 index 0000000..a5ec2cc --- /dev/null +++ b/manager-fallback/scheduler/scheduler.go @@ -0,0 +1,131 @@ +package scheduler + +import ( + "sync" + "time" + + log "github.com/golang/glog" + "github.com/google/uuid" + "github.com/pkg/errors" + + "github.com/cpssd/heracles/manager-fallback/broker" + "github.com/cpssd/heracles/manager-fallback/splitting" + "github.com/cpssd/heracles/manager-fallback/state" + "github.com/cpssd/heracles/proto/datatypes" +) + +// Scheduler type +type Scheduler struct { + st state.State + br broker.Broker + jobs chan *datatypes.Job +} + +// New Scheduler +func New(br broker.Broker, st state.State) *Scheduler { + return &Scheduler{ + br: br, + st: st, + jobs: make(chan *datatypes.Job), + } +} + +// Schedule a job +func (s Scheduler) Schedule(job *datatypes.Job) (string, error) { + id := uuid.New().String() + job.Id = id + if err := s.st.SaveJob(job); err != nil { + return "", errors.Wrap(err, "can't schedule") + } + + s.jobs <- job + + return id, nil +} + +// Cancel a job +func (s Scheduler) Cancel(jobID string) error { + return nil +} + +// ProcessJobs listens +func (s Scheduler) ProcessJobs() { + log.Info("begining to listen for any jobs") + var wg sync.WaitGroup + for job := range s.jobs { + wg.Add(1) + go func(job *datatypes.Job) { + defer wg.Done() + if err := s.processJob(job); err != nil { + log.Errorf("error processing job: %v", err) + } + }(job) + } + wg.Wait() + +} + +func (s *Scheduler) processJob(job *datatypes.Job) error { + var wg sync.WaitGroup + + intermediateFiles := splitting.IntermediateFiles(job) + + // create all tasks + mapTasks, err := splitting.Map(job, intermediateFiles) + if err != nil { + job.FailureDetails = err.Error() + return err + } + reduceTasks := splitting.Reduce(job, intermediateFiles) + + if err := s.st.CreateTasks(append(mapTasks, reduceTasks...)); err != nil { + job.FailureDetails = err.Error() + return err + } + + // Start tunning the reduce tasks + wg.Add(len(mapTasks)) + log.V(1).Info("starting map tasks") + for _, task := range mapTasks { + go func(task *datatypes.Task) { + defer wg.Done() + if err := s.processTask(task); err != nil { + job.FailureDetails = err.Error() + job.Status = datatypes.JobStatus_JOB_FAILED + log.Errorf("%v", err) + return + } + }(task) + } + wg.Wait() + s.st.WaitUntilTasksComplete(job.GetId(), datatypes.TaskKind_MAP) + + wg.Add(len(reduceTasks)) + log.V(1).Info("starting reduce tasks") + for _, task := range reduceTasks { + go func(task *datatypes.Task) { + defer wg.Done() + if err := s.processTask(task); err != nil { + job.FailureDetails = err.Error() + job.Status = datatypes.JobStatus_JOB_FAILED + log.Errorf("%v", err) + return + } + }(task) + } + wg.Wait() + s.st.WaitUntilTasksComplete(job.GetId(), datatypes.TaskKind_REDUCE) + + job.Status = datatypes.JobStatus_JOB_DONE + job.TimeDone = uint64(time.Now().Unix()) + return s.st.SaveJob(job) +} + +func (s *Scheduler) processTask(task *datatypes.Task) error { + log.Infof("processing task %s: Sending to broker", task.GetId()) + if err := s.br.Send(task); err != nil { + return errors.Wrap(err, "can't send task to broker") + } + + return nil +} diff --git a/manager-fallback/server/jobschedule.go b/manager-fallback/server/jobschedule.go new file mode 100644 index 0000000..7642d34 --- /dev/null +++ b/manager-fallback/server/jobschedule.go @@ -0,0 +1,35 @@ +package server + +import ( + "context" + + pb "github.com/cpssd/heracles/proto/mapreduce" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" +) + +// Schedule implementation +func (s *Server) Schedule(ctx context.Context, req *pb.ScheduleRequest) (*pb.ScheduleResponse, error) { + res := &pb.ScheduleResponse{} + + jobID, err := s.sch.Schedule(req.GetJob()) + if err != nil { + return res, grpc.Errorf(codes.Internal, err.Error()) + } + + res.JobId = jobID + return res, nil +} + +// Cancel implementation +func (s *Server) Cancel(ctx context.Context, req *pb.CancelRequest) (*pb.EmptyMessage, error) { + if err := s.sch.Cancel(req.GetJobId()); err != nil { + return &pb.EmptyMessage{}, grpc.Errorf(codes.Internal, err.Error()) + } + return &pb.EmptyMessage{}, nil +} + +// Describe implementation +func (s *Server) Describe(ctx context.Context, req *pb.DescribeRequest) (*pb.Description, error) { + return nil, nil +} diff --git a/manager-fallback/server/server.go b/manager-fallback/server/server.go new file mode 100644 index 0000000..c75e5a9 --- /dev/null +++ b/manager-fallback/server/server.go @@ -0,0 +1,42 @@ +package server + +import ( + "fmt" + "net" + + "github.com/pkg/errors" + "google.golang.org/grpc" + + "github.com/cpssd/heracles/manager-fallback/scheduler" + "github.com/cpssd/heracles/manager-fallback/settings" + + pb "github.com/cpssd/heracles/proto/mapreduce" +) + +// Server type +type Server struct { + *grpc.Server + sch *scheduler.Scheduler +} + +// New Server +func New(sch *scheduler.Scheduler) *Server { + s := &Server{ + Server: grpc.NewServer(), + sch: sch, + } + + pb.RegisterJobScheduleServiceServer(s.Server, s) + + return s +} + +// Run the server +func (s *Server) Run() error { + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", settings.Int("server.port"))) + if err != nil { + return errors.Wrap(err, "can't start server") + } + + return s.Serve(lis) +} diff --git a/manager-fallback/settings/settings.go b/manager-fallback/settings/settings.go new file mode 100644 index 0000000..5ba5471 --- /dev/null +++ b/manager-fallback/settings/settings.go @@ -0,0 +1,83 @@ +package settings + +import ( + "flag" + + log "github.com/golang/glog" + "github.com/spf13/pflag" + "github.com/spf13/viper" +) + +var settings *viper.Viper + +// Init initializes the settings +func Init() error { + settings = viper.New() + setDefaults() + settings.SetConfigName("worker") + settings.AddConfigPath("/etc/heracles/") + setOptions() + + if err := settings.ReadInConfig(); err != nil { + log.Errorf("%v", err) + } + + log.V(1).Info("Running with the following settings") + for k, v := range settings.AllSettings() { + log.V(1).Infof("\t%s: %+v\n", k, v) + } + + return nil +} + +func setDefaults() { + settings.SetDefault("broker.queue_name", "heracles_tasks") + settings.SetDefault("broker.address", "") + settings.SetDefault("state.backend", "file") + settings.SetDefault("state.location", "") + settings.SetDefault("server.port", 8081) + settings.SetDefault("scheduler.input_chunk_size", 64*1024*1024*1024) + settings.SetDefault("scheduler.intermediate_data_location", "/tmp/heracles_intermediate") +} + +func setOptions() { + flag.String("broker.queue_name", "", "queue name") + flag.String("broker.address", "", "address of the broker") + flag.String("state.backend", "", "backend of the state store") + flag.String("state.location", "", "path to the file store") + flag.Int("scheduler.input_chunk_size", 64*1024*1024*1024, "chunk size") + flag.String("scheduler.intermediate_data_location", "", "location of intermediate files") + flag.Int("server.port", 8081, "server port") + + flag.Parse() + + pflag.CommandLine.AddGoFlagSet(flag.CommandLine) + pflag.Parse() + settings.BindPFlags(pflag.CommandLine) +} + +// Get returns the value of a key. +func Get(key string) interface{} { + return settings.Get(key) +} + +// GetString setting +// DEPRECATED: Use `String` +func GetString(key string) string { + return String(key) +} + +// String settings +func String(key string) string { + return settings.GetString(key) +} + +// Int setting +func Int(key string) int { + return settings.GetInt(key) +} + +// Set a value in the settings +func Set(key string, value interface{}) { + settings.Set(key, value) +} diff --git a/manager-fallback/splitting/map.go b/manager-fallback/splitting/map.go new file mode 100644 index 0000000..6009de2 --- /dev/null +++ b/manager-fallback/splitting/map.go @@ -0,0 +1,103 @@ +package splitting + +import ( + "io/ioutil" + "os" + "path" + "time" + + log "github.com/golang/glog" + "github.com/google/uuid" + "github.com/pkg/errors" + + "github.com/cpssd/heracles/manager-fallback/settings" + "github.com/cpssd/heracles/proto/datatypes" +) + +// Map Splitting +func Map(job *datatypes.Job, interm map[int][]string) ([]*datatypes.Task, error) { + switch job.GetInputKind() { + case datatypes.InputDataKind_UNDEFINED: + return nil, errors.New("type cannot be undefined") + case datatypes.InputDataKind_DATA_TEXT_NEWLINES: + return textMap(job, interm) + } + return nil, nil +} + +func textMap(job *datatypes.Job, interm map[int][]string) ([]*datatypes.Task, error) { + entries, err := ioutil.ReadDir(job.GetInputDirectory()) + if err != nil { + return nil, errors.Wrap(err, "can't read dir") + } + + var chunks []*datatypes.InputChunk + for _, entry := range entries { + chunks = append(chunks, + splitTextFile(path.Join(job.GetInputDirectory(), entry.Name()))..., + ) + } + + tasks := []*datatypes.Task{} + for i, chunk := range chunks { + id := uuid.New().String() + + outputFiles := []string{} + for p := range interm { + outputFiles = append(outputFiles, interm[p][i]) + } + + task := &datatypes.Task{ + Id: id, + JobId: job.GetId(), + Status: datatypes.TaskStatus_TASK_PENDING, + Kind: datatypes.TaskKind_MAP, + TimeCreated: uint64(time.Now().Unix()), + InputChunk: chunk, + PayloadPath: job.GetPayloadPath(), + PartitionCount: uint64(len(job.GetOutputFiles())), + OutputFiles: outputFiles, + } + + tasks = append(tasks, task) + } + + return tasks, nil +} + +// The functions returns a bunch (or one) chunk from input. It does so by doing +// the following: +// if the file is below the threshold, it returns the whole file +// if the file is above the threshold, it does the following: +// goes to threshold +// traces back until a new line character is found, creates chunk from +// that +// goes from that point where the new line char has been found and +// tries from the beginning +func splitTextFile(f string) []*datatypes.InputChunk { + fi, err := os.Stat(f) + if err != nil { + // Nothing to do about it . just log + log.Error(err) + } + + log.V(2).Infof("Splitting file %s", fi.Name()) + + max := int64(settings.Int("scheduler.input_chunk_size")) + + if fi.Size() < max { + return []*datatypes.InputChunk{ + { + Path: f, + StartByte: 0, + EndByte: uint64(fi.Size()), + }, + } + } + + chunks := []*datatypes.InputChunk{} + + // TODO: Chunks bigger than 64MB + + return chunks +} diff --git a/manager-fallback/splitting/reduce.go b/manager-fallback/splitting/reduce.go new file mode 100644 index 0000000..234ecb1 --- /dev/null +++ b/manager-fallback/splitting/reduce.go @@ -0,0 +1,33 @@ +package splitting + +import ( + "strings" + "time" + + "github.com/google/uuid" + + "github.com/cpssd/heracles/proto/datatypes" +) + +// Reduce splitting +func Reduce(job *datatypes.Job, interm map[int][]string) []*datatypes.Task { + tasks := []*datatypes.Task{} + for i, output := range job.GetOutputFiles() { + id := uuid.New().String() + + task := &datatypes.Task{ + Id: id, + JobId: job.GetId(), + Status: datatypes.TaskStatus_TASK_PENDING, + Kind: datatypes.TaskKind_REDUCE, + TimeCreated: uint64(time.Now().Unix()), + OutputFiles: []string{output}, + PayloadPath: job.GetPayloadPath(), + InputChunk: &datatypes.InputChunk{ + Path: strings.Join(interm[i], ","), + }, + } + tasks = append(tasks, task) + } + return tasks +} diff --git a/manager-fallback/splitting/splitting.go b/manager-fallback/splitting/splitting.go new file mode 100644 index 0000000..d668273 --- /dev/null +++ b/manager-fallback/splitting/splitting.go @@ -0,0 +1,60 @@ +package splitting + +import ( + "io/ioutil" + "os" + "path" + "strconv" + + log "github.com/golang/glog" + "github.com/google/uuid" + + "github.com/cpssd/heracles/manager-fallback/settings" + "github.com/cpssd/heracles/proto/datatypes" +) + +func fileRange(id string, num int) []string { + dir := settings.String("scheduler.intermediate_data_location") + + // create the dir + os.MkdirAll(path.Join(dir, id), os.ModePerm) + + ret := []string{} + for i := 0; i < num; i++ { + ret = append(ret, path.Join(dir, id, strconv.Itoa(i))) + } + return ret +} + +// IntermediateFiles returns the names of intermediate files. +func IntermediateFiles(job *datatypes.Job) map[int][]string { + out := len(job.GetOutputFiles()) + + // create the intermediate directory + jobDir := path.Join(settings.String("scheduler.intermediate_data_location"), job.GetId()) + if err := os.MkdirAll(jobDir, os.ModePerm); err != nil { + log.Errorf("can't create intermediata job directory: %v", err) + return nil + } + + inFiles, err := ioutil.ReadDir(job.GetInputDirectory()) + if err != nil { + log.Error(err) + return nil + } + + ret := make(map[int][]string) + + for i := 0; i < out; i++ { + if err := os.MkdirAll(path.Join(jobDir, strconv.Itoa(i)), os.ModePerm); err != nil { + log.Errorf("can't create intermediate data dir partition %d", i) + return nil + } + + for range inFiles { + ret[i] = append(ret[i], path.Join(jobDir, strconv.Itoa(i), uuid.New().String())) + } + } + + return ret +} diff --git a/manager-fallback/state/file.go b/manager-fallback/state/file.go new file mode 100644 index 0000000..5ee8b17 --- /dev/null +++ b/manager-fallback/state/file.go @@ -0,0 +1,237 @@ +package state + +import ( + "io/ioutil" + "os" + "path" + "time" + + "github.com/fsnotify/fsnotify" + + "github.com/cpssd/heracles/manager-fallback/settings" + "github.com/cpssd/heracles/proto/datatypes" + log "github.com/golang/glog" + "github.com/golang/protobuf/proto" + "github.com/pkg/errors" +) + +const ( + jobsDir = "jobs" + jobSaveFile = "request" + tasksDir = "tasks" + pendingMapDir = "pending_map_tasks" + pendingReduceDir = "pending_reduce_tasks" +) + +// FileStore implementation +type FileStore struct { + location string +} + +// NewFileStore returns a new file backed state store +func NewFileStore(location string) (*FileStore, error) { + // TODO: remove this from here. It is not really state. + if err := os.MkdirAll(settings.String("scheduler.intermediate_data_location"), os.ModePerm); err != nil { + return nil, err + } + + // TODO: Should not be 0777 + if err := os.MkdirAll(path.Join(location, jobsDir), os.ModePerm); err != nil { + return nil, err + } + + log.Info("Using file backed storage") + return &FileStore{location}, nil +} + +// SaveJob implementation +func (f FileStore) SaveJob(job *datatypes.Job) error { + log.V(2).Infof("Saving job %s", job.GetId()) + id := job.GetId() + + jobDirPath := path.Join(f.location, jobsDir, id) + if err := f.prepareJobDirectory(jobDirPath); err != nil { + log.Error(err) + f.removeJob(id) + return err + } + + serialized, err := proto.Marshal(job) + if err != nil { + f.removeJob(id) + return err + } + + if err := ioutil.WriteFile(path.Join(jobDirPath, jobSaveFile), serialized, 0600); err != nil { + f.removeJob(id) + return err + } + + log.V(1).Infof("Successfully saved job %s", job.GetId()) + + return nil +} + +// SaveTask implementation +func (f FileStore) SaveTask(task *datatypes.Task) error { + jobDirPath := path.Join(f.location, jobsDir, task.GetJobId()) + id := task.GetId() + + taskFilePath := path.Join(jobDirPath, tasksDir, id) + if _, err := os.Stat(taskFilePath); os.IsNotExist(err) { + return err + } + + var pendingFilePath string + + switch task.GetKind() { + case datatypes.TaskKind_MAP: + pendingFilePath = path.Join(jobDirPath, pendingMapDir, id) + case datatypes.TaskKind_REDUCE: + pendingFilePath = path.Join(jobDirPath, pendingReduceDir, id) + } + + if _, err := os.Stat(pendingFilePath); os.IsNotExist(err) { + return err + } + + serializedTask, err := proto.Marshal(task) + if err != nil { + return errors.Wrap(err, "unable to serialize task") + } + + if err := ioutil.WriteFile(taskFilePath, serializedTask, 0644); err != nil { + return errors.Wrapf(err, "unable to write task %s", task.GetId()) + } + + if task.GetStatus() == datatypes.TaskStatus_TASK_DONE { + log.V(1).Infof("removing task %s because its done", task.GetId()) + if err := os.Remove(pendingFilePath); err != nil { + return errors.Wrapf(err, "unable to remove pending task %s", task.GetId()) + } + } + + log.V(1).Infof("successfully saved task %s", task.GetId()) + + return nil +} + +// CreateTasks implementation +func (f FileStore) CreateTasks(tasks []*datatypes.Task) error { + for _, task := range tasks { + if err := f.createTask(task); err != nil { + return err + } + } + return nil +} + +// WaitUntilTasksComplete implementation +func (f FileStore) WaitUntilTasksComplete(id string, kind datatypes.TaskKind) error { + watcher, err := fsnotify.NewWatcher() + if err != nil { + return err + } + defer watcher.Close() + + pendingDirPath := path.Join(f.location, jobsDir, id) + switch kind { + case datatypes.TaskKind_MAP: + pendingDirPath = path.Join(pendingDirPath, pendingMapDir) + case datatypes.TaskKind_REDUCE: + pendingDirPath = path.Join(pendingDirPath, pendingReduceDir) + } + + log.V(1).Infof("watching %s for changes", pendingDirPath) + + done := make(chan struct{}) + go func() { + // TODO: make this configurarable through settings + timer := time.NewTimer(2 * time.Second) + + for { + select { + case <-watcher.Events: + files, err := ioutil.ReadDir(pendingDirPath) + if err != nil { + log.Error(err) + } + if len(files) == 0 { + close(done) + return + } + case <-timer.C: + files, err := ioutil.ReadDir(pendingDirPath) + if err != nil { + log.Error(err) + } + if len(files) == 0 { + close(done) + return + } + case err := <-watcher.Errors: + log.Error(err) + } + } + }() + + if err := watcher.Add(pendingDirPath); err != nil { + log.Error("unable to add watcher") + return err + } + + <-done + return nil +} + +func (f FileStore) createTask(task *datatypes.Task) error { + jobDirPath := path.Join(f.location, jobsDir, task.GetJobId()) + id := task.GetId() + + taskFilePath := path.Join(jobDirPath, tasksDir, id) + if _, err := os.Create(taskFilePath); err != nil { + return err + } + + var pendingFilePath string + switch task.GetKind() { + case datatypes.TaskKind_MAP: + pendingFilePath = path.Join(jobDirPath, pendingMapDir, id) + case datatypes.TaskKind_REDUCE: + pendingFilePath = path.Join(jobDirPath, pendingReduceDir, id) + } + if _, err := os.Create(pendingFilePath); err != nil { + return err + } + return nil +} + +// PendingMapTasks implementation +func (f FileStore) PendingMapTasks(*datatypes.Job) ([]*datatypes.Task, error) { + return nil, nil +} + +// PendingReduceTasks implementation +func (f FileStore) PendingReduceTasks(*datatypes.Job) ([]*datatypes.Task, error) { + return nil, nil +} + +func (f FileStore) removeJob(id string) { + os.RemoveAll(path.Join(f.location, jobsDir, id)) +} + +func (f FileStore) prepareJobDirectory(loc string) error { + if err := os.MkdirAll(path.Join(loc, tasksDir), os.ModePerm); err != nil { + return err + } + log.V(2).Info("created tasks dir") + if err := os.MkdirAll(path.Join(loc, pendingMapDir), os.ModePerm); err != nil { + return err + } + log.V(2).Info("created pending map dir") + if err := os.MkdirAll(path.Join(loc, pendingReduceDir), os.ModePerm); err != nil { + return err + } + log.V(2).Info("created pending reduce dir") + return nil +} diff --git a/manager-fallback/state/state.go b/manager-fallback/state/state.go new file mode 100644 index 0000000..3c12168 --- /dev/null +++ b/manager-fallback/state/state.go @@ -0,0 +1,28 @@ +package state + +import ( + "github.com/pkg/errors" + + "github.com/cpssd/heracles/manager-fallback/settings" + "github.com/cpssd/heracles/proto/datatypes" +) + +// State defintion +type State interface { + SaveJob(*datatypes.Job) error + SaveTask(*datatypes.Task) error + CreateTasks([]*datatypes.Task) error + WaitUntilTasksComplete(string, datatypes.TaskKind) error + // PendingMapTasks(*datatypes.Job) ([]*datatypes.Task, error) + // PendingReduceTasks(*datatypes.Job) ([]*datatypes.Task, error) +} + +// New returns state +func New() (State, error) { + switch settings.String("state.backend") { + case "file": + return NewFileStore(settings.String("state.location")) + } + + return nil, errors.New("unknown state kind") +} diff --git a/tests/badtest.sh b/tests/badtest.sh index c3726d0..54feb87 100755 --- a/tests/badtest.sh +++ b/tests/badtest.sh @@ -2,7 +2,7 @@ docker kill rabbit docker rm rabbit -docker run -d --hostname=rabbit --name=rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management +docker run --rm -d --hostname=rabbit --name=rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management sleep 10 go run ../tools/tester/tester.go mkdir -p /tmp/heracles_test_jobs/jobs/test_job/tasks diff --git a/tools/demo.sh b/tools/demo.sh new file mode 100755 index 0000000..949046e --- /dev/null +++ b/tools/demo.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env sh + +LOCATION=/tmp/heracles/demo + +docker run --rm -d --hostname=rabbit --name=rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management +rm -rf $LOCATION/ +mkdir -p $LOCATION/word-counter/input +mkdir -p $LOCATION/word-counter/out +mkdir -p $LOCATION/intermediate +mkdir -p $LOCATION/state +echo "sample1 sample2" > $LOCATION/word-counter/input/file1 +echo "sample2 sample3" > $LOCATION/word-counter/input/file2 +cp ./target/debug/examples/word-counter $LOCATION/word-counter/ + +tee $LOCATION/count.pb <