Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/curio/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/filecoin-project/curio/deps/config"
"github.com/filecoin-project/curio/harmony/harmonydb"
"github.com/filecoin-project/curio/harmony/harmonytask"
"github.com/filecoin-project/curio/harmony/taskhelp/usertaskmgt"
"github.com/filecoin-project/curio/lib/chainsched"
"github.com/filecoin-project/curio/lib/curiochain"
"github.com/filecoin-project/curio/lib/fastparamfetch"
Expand Down Expand Up @@ -196,6 +197,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
// (we could have just appended to this list in the reverse order, but defining
// tasks in pipeline order is more intuitive)

usertaskmgt.WrapTasks(activeTasks, dependencies.Cfg.Subsystems.UserScheduleURL, dependencies.DB, dependencies.ListenAddr)
ht, err := harmonytask.New(db, activeTasks, dependencies.ListenAddr)
if err != nil {
return nil, err
Expand Down
17 changes: 17 additions & 0 deletions deps/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,23 @@ type CurioSubsystemsConfig struct {

// The address that should listen for Web GUI requests.
GuiAddress string

// UserScheduleURL are the URLs for the user schedule optimization service. Please preceed each with
// the name of the task that it should be used for, followed by a comma and the URL. For example:
// "SealSDR,http://localhost:8080/schedule"
// This http endpoint gets a POST request with the following JSON body:
// {
// "task_id": "task_id",
// "task_type": "task_type",
// "workers": ["worker1", "worker2"]
// }
// And looks for a 200 response with the following JSON body:
// {
// "worker": "worker1"
// "timeout": 60
// }
// Timeout in seconds until it will be rescheduled.
UserScheduleURL []string
}
type CurioFees struct {
DefaultMaxFee types.FIL
Expand Down
5 changes: 5 additions & 0 deletions harmony/harmonydb/sql/20240724-user_sched.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE harmony_task_user (
task_id INTEGER PRIMARY KEY,
owner TEXT NOT NULL,
expiration INTEGER NOT NULL
);
27 changes: 16 additions & 11 deletions harmony/harmonytask/task_type_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,27 +194,23 @@ canAcceptAgain:
}
return owner == h.TaskEngine.ownerID
})
if doErr != nil {
if doErr != nil && doErr != ErrReturnToPoolPlease {
log.Errorw("Do() returned error", "type", h.Name, "id", strconv.Itoa(int(*tID)), "error", doErr)
}
}()
return true
}

var ErrReturnToPoolPlease = errors.New("return to pool")

func (h *taskTypeHandler) recordCompletion(tID TaskID, workStart time.Time, done bool, doErr error) {
workEnd := time.Now()
retryWait := time.Millisecond * 100
retryRecordCompletion:
cm, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) (bool, error) {
var postedTime time.Time
err := tx.QueryRow(`SELECT posted_time FROM harmony_task WHERE id=$1`, tID).Scan(&postedTime)

if err != nil {
return false, fmt.Errorf("could not log completion: %w ", err)
}
result := "unspecified error"
if done {
_, err = tx.Exec("DELETE FROM harmony_task WHERE id=$1", tID)
_, err := tx.Exec("DELETE FROM harmony_task WHERE id=$1", tID)
if err != nil {

return false, fmt.Errorf("could not log completion: %w", err)
Expand All @@ -228,9 +224,9 @@ retryRecordCompletion:
result = "error: " + doErr.Error()
}
var deleteTask bool
if h.MaxFailures > 0 {
if doErr != ErrReturnToPoolPlease && h.MaxFailures > 0 {
ct := uint(0)
err = tx.QueryRow(`SELECT count(*) FROM harmony_task_history
err := tx.QueryRow(`SELECT count(*) FROM harmony_task_history
WHERE task_id=$1 AND result=FALSE`, tID).Scan(&ct)
if err != nil {
return false, fmt.Errorf("could not read task history: %w", err)
Expand All @@ -240,7 +236,7 @@ retryRecordCompletion:
}
}
if deleteTask {
_, err = tx.Exec("DELETE FROM harmony_task WHERE id=$1", tID)
_, err := tx.Exec("DELETE FROM harmony_task WHERE id=$1", tID)
if err != nil {
return false, fmt.Errorf("could not delete failed job: %w", err)
}
Expand All @@ -252,6 +248,15 @@ retryRecordCompletion:
}
}
}
if doErr == ErrReturnToPoolPlease {
return true, nil
}
var postedTime time.Time
err := tx.QueryRow(`SELECT posted_time FROM harmony_task WHERE id=$1`, tID).Scan(&postedTime)

if err != nil {
return false, fmt.Errorf("could not log completion: %w ", err)
}
_, err = tx.Exec(`INSERT INTO harmony_task_history
(task_id, name, posted, work_start, work_end, result, completed_by_host_and_port, err)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`, tID, h.Name, postedTime.UTC(), workStart.UTC(), workEnd.UTC(), done, h.TaskEngine.hostAndPort, result)
Expand Down
152 changes: 152 additions & 0 deletions harmony/taskhelp/usertaskmgt/usertaskmgt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
Package usertaskmgt provides a way to wrap tasks with a URL that can be called to assign the task to a worker.
Timeline
- UrlTask accepts everything
- once accepted, UrlTask.Do() finds who should own the task and updates the DB:
- harmony_task_user.owner_id & expiration_time
- harmony_task releases the task (without err)
- The poller will see the task & call CanAccept()
- CanAccept() will see the owner_id and call the deeper canaccept() if it's us.
- If it's not us, check the expiration time and release the task by deleting the row.
- The task will be done by the worker who was told to do it, or eventually reassigned.
Pitfalls:
- If the user's URL is down, the task will be stuck in the DB.
- Turnaround time is slowed by the additional trip through the poller.
- Full task resources are claimed by the URL runner, so the task needs a full capacity.
*/
package usertaskmgt

import (
"bytes"
"context"
"encoding/json"
"net/http"
"net/url"
"strconv"
"strings"
"time"

"github.com/filecoin-project/curio/harmony/harmonydb"
"github.com/filecoin-project/curio/harmony/harmonytask"
logging "github.com/ipfs/go-log/v2"
"github.com/samber/lo"
"golang.org/x/xerrors"
)

var log = logging.Logger("userTaskMgt")

func WrapTasks(tasks []harmonytask.TaskInterface, UserScheduleUrl []string, db *harmonydb.DB, hostAndPort string) {
urlMap := lo.SliceToMap(UserScheduleUrl, func(s string) (string, *url.URL) {
spl := strings.SplitN(s, ",", 2)
if len(spl) != 2 {
log.Errorf("Invalid UserScheduleUrl: %s. Expected: taskName,url", s)
return "", &url.URL{}
}
u, err := url.Parse(spl[1])
if err != nil {
log.Errorf("Invalid UserScheduleUrl: %s. Expected: taskName,url", s)
return "", &url.URL{}
}
return spl[0], u
})
for i, task := range tasks {
if url, ok := urlMap[task.TypeDetails().Name]; ok {
tasks[i] = &UrlTask{
TaskInterface: task,
UserScheduleUrl: url,
name: task.TypeDetails().Name,
db: db,
hostAndPort: hostAndPort,
}
}
}
}

type UrlTask struct {
harmonytask.TaskInterface
db *harmonydb.DB
UserScheduleUrl *url.URL
name string
hostAndPort string
}

// CanAccept should accept all IF no harmony_task_user row exists, ELSE
// if us, try CanAccept() until expiration hits.
func (t *UrlTask) CanAccept(tids []harmonytask.TaskID, te *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
id := tids[0]
var owner string
var expiration int64
err := t.db.QueryRow(context.Background(), `SELECT COALESCE(owner,''), COALESCE(expiration, 0) from harmony_task_user WHERE task_id=$1`, id).Scan(&owner, &expiration)
if err != nil {
return nil, xerrors.Errorf("could not get owner: %w", err)
}
if owner != "" {
if owner == t.hostAndPort {
return t.TaskInterface.CanAccept(tids, te)
}
if expiration < time.Now().Unix() {
_, err = t.db.Exec(context.Background(), `DELETE FROM harmony_task_user WHERE task_id=$1`, id)
if err != nil {
return nil, xerrors.Errorf("could not delete from harmony_task_user: %w", err)
}
}
}
return &id, nil
}

var client = &http.Client{Timeout: time.Second * 10}

func (t *UrlTask) Do(id harmonytask.TaskID, stillMe func() bool) (bool, error) {
var owner string
err := t.db.QueryRow(context.Background(), `SELECT COALESCE(owner,'') FROM harmony_task_user WHERE task_id=$1`, id).Scan(&owner)
if err != nil {
return false, xerrors.Errorf("could not get owner: %w", err)
}
if owner == t.hostAndPort {
return t.TaskInterface.Do(id, stillMe)
}
var workerList []string
err = t.db.Select(context.Background(), &workerList, `SELECT host_and_port
FROM harmony_machines m JOIN harmony_machine_details d ON d.machine_id=m.id
WHERE tasks LIKE $1`, "%,"+t.name+",%")
if err != nil {
return false, xerrors.Errorf("could not get worker list: %w", err)
}

resp, err := client.Post(t.UserScheduleUrl.String(), "application/json", bytes.NewReader([]byte(`
{
"task_type": "`+t.name+`",
"task_id": `+strconv.Itoa(int(id))+`,
"workers": [`+strings.Join(workerList, ",")+`],
}
`)))
if err != nil {
return false, xerrors.Errorf("could not call user defined URL: %w", err)
}
if resp.StatusCode != http.StatusOK {
return false, xerrors.Errorf("User defined URL returned non-200 status code: %d", resp.StatusCode)
}
var respData struct {
Worker string
Timeout int
}
defer resp.Body.Close()
err = json.NewDecoder(resp.Body).Decode(&respData)
if err != nil {
return false, xerrors.Errorf("could not decode user defined URL response: %w", err)
}

// If it's us, we cannot shortcut because we don't have CanAccept's 2nd arg.

expires := time.Now().Add(time.Second * time.Duration(respData.Timeout))
_, err = t.db.Exec(context.Background(), `INSERT INTO harmony_task_user (task_id, owner, expiration) VALUES ($1,$2)`, id, respData.Worker, expires)
if err != nil {
return false, xerrors.Errorf("could not insert into harmony_task_user: %w", err)
}

return false, harmonytask.ErrReturnToPoolPlease
}