Skip to content
141 changes: 96 additions & 45 deletions yoda/executor/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,72 +3,123 @@ package executor
import (
"bytes"
"context"
"io"
"io/ioutil"
"os"
"encoding/base64"
"fmt"
"net/url"
"os/exec"
"path/filepath"
"strconv"
"time"

"github.com/google/shlex"

"github.com/bandprotocol/chain/v2/x/oracle/types"
"github.com/levigross/grequests"
)

// Only use in testnet. No intensive testing, use at your own risk
type DockerExec struct {
image string
timeout time.Duration
image string
name string
timeout time.Duration
portLists chan string
maxTry int
}

func NewDockerExec(image string, timeout time.Duration) *DockerExec {
return &DockerExec{image: image, timeout: timeout}
func NewDockerExec(image string, timeout time.Duration, maxTry int, startPort int, endPort int) *DockerExec {
ctx := context.Background()
portLists := make(chan string, endPort-startPort+1)
name := "docker-runtime-executor-"
for i := startPort; i <= endPort; i++ {
port := strconv.Itoa(i)
StartContainer(name, ctx, port, image)
portLists <- port
}

return &DockerExec{image: image, name: name, timeout: timeout, portLists: portLists, maxTry: maxTry}
}

func (e *DockerExec) Exec(code []byte, arg string, env interface{}) (ExecResult, error) {
// TODO: Handle env if we are to revive Docker
dir, err := ioutil.TempDir("/tmp", "executor")
if err != nil {
return ExecResult{}, err
}
defer os.RemoveAll(dir)
err = ioutil.WriteFile(filepath.Join(dir, "exec"), code, 0777)
if err != nil {
return ExecResult{}, err
}
name := filepath.Base(dir)
args, err := shlex.Split(arg)
if err != nil {
return ExecResult{}, err
}
func StartContainer(name string, ctx context.Context, port string, image string) error {
exec.Command("docker", "kill", name+port).Run()
dockerArgs := append([]string{
"run", "--rm",
"-v", dir + ":/scratch:ro",
"--name", name,
e.image,
"/scratch/exec",
}, args...)
ctx, cancel := context.WithTimeout(context.Background(), e.timeout)
defer cancel()
"--name", name + port,
"-p", port + ":5000",
"--memory=512m",
image,
})

cmd := exec.CommandContext(ctx, "docker", dockerArgs...)
var buf bytes.Buffer
cmd.Stdout = &buf
cmd.Stderr = &buf
err = cmd.Run()
if ctx.Err() == context.DeadlineExceeded {
exec.Command("docker", "kill", name).Start()
return ExecResult{}, ErrExecutionimeout
}
exitCode := uint32(0)
err := cmd.Start()
return err
}

func (e *DockerExec) PostRequest(
code []byte,
arg string,
env interface{},
name string,
ctx context.Context,
port string,
) (ExecResult, error) {
executable := base64.StdEncoding.EncodeToString(code)
resp, err := grequests.Post(
"http://localhost:"+port,
&grequests.RequestOptions{
Headers: map[string]string{
"Content-Type": "application/json",
},
JSON: map[string]interface{}{
"executable": executable,
"calldata": arg,
"timeout": e.timeout.Milliseconds(),
"env": env,
},
RequestTimeout: e.timeout,
},
)

if err != nil {
if exitError, ok := err.(*exec.ExitError); ok {
exitCode = uint32(exitError.ExitCode())
} else {
urlErr, ok := err.(*url.Error)
if !ok || !urlErr.Timeout() {
return ExecResult{}, err
}
// Return timeout code
return ExecResult{Output: []byte{}, Code: 111}, nil
}
output, err := ioutil.ReadAll(io.LimitReader(&buf, int64(types.DefaultMaxReportDataSize)))

if !resp.Ok {
return ExecResult{}, ErrRestNotOk
}

r := externalExecutionResponse{}
err = resp.JSON(&r)

if err != nil {
return ExecResult{}, err
}
return ExecResult{Output: output, Code: exitCode}, nil

go func() {
StartContainer(name, ctx, port, e.image)
e.portLists <- port
}()
if r.Returncode == 0 {
return ExecResult{Output: []byte(r.Stdout), Code: 0, Version: r.Version}, nil
} else {
return ExecResult{Output: []byte(r.Stderr), Code: r.Returncode, Version: r.Version}, nil
}
}

func (e *DockerExec) Exec(code []byte, arg string, env interface{}) (ExecResult, error) {
ctx := context.Background()
port := <-e.portLists
errs := []error{}
for i := 0; i < e.maxTry; i++ {
execResult, err := e.PostRequest(code, arg, env, e.name, ctx, port)
if err == nil {
return execResult, err
}
errs = append(errs, err)
time.Sleep(500 * time.Millisecond)
}
return ExecResult{}, fmt.Errorf(ErrReachMaxTry.Error()+", tried errors: %#q", errs)
}
61 changes: 22 additions & 39 deletions yoda/executor/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,48 +2,31 @@ package executor

import (
"testing"
)

func TestDockerSuccess(t *testing.T) {
// TODO: Enable test when CI has docker installed.
// e := NewDockerExec("bandprotocol/runtime:1.0.2", 10*time.Second)
// res, err := e.Exec([]byte(`#!/usr/bin/env python3
// import json
// import urllib.request
// import sys
"time"

// BINANCE_URL = "https://api.binance.com/api/v1/depth?symbol={}USDT&limit=5"

// def make_json_request(url):
// req = urllib.request.Request(url)
// req.add_header(
// "User-Agent",
// "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36",
// )
// return json.loads(urllib.request.urlopen(req).read())

// def main(symbol):
// res = make_json_request(BINANCE_URL.format(symbol))
// bid = float(res["bids"][0][0])
// ask = float(res["asks"][0][0])
// return (bid + ask) / 2
"github.com/stretchr/testify/require"
)

// if __name__ == "__main__":
// try:
// print(main(*sys.argv[1:]))
// except Exception as e:
// print(str(e), file=sys.stderr)
// sys.exit(1)
// `), "BTC")
// fmt.Println(string(res.Output), res.Code, err)
// require.True(t, false)
func SetupDockerTest(t *testing.T) {
}

func TestDockerLongStdout(t *testing.T) {
func TestDockerSuccess(t *testing.T) {
// TODO: Enable test when CI has docker installed.
// e := NewDockerExec("bandprotocol/runtime:1.0.2", 10*time.Second)
// res, err := e.Exec([]byte(`#!/usr/bin/env python3
// print("A"*1000)`), "BTC")
// fmt.Println(string(res.Output), res.Code, err)
// require.True(t, false)
// Prerequisite: please build docker image before running test
e := NewDockerExec("ongartbandprotocol/band-testing:python-runtime", 120*time.Second, 100, 5000, 5009)
for i := 0; i < 20; i++ {
res, err := e.Exec([]byte(
"#!/usr/bin/env python3\nimport os\nimport sys\nprint(sys.argv[1], os.getenv('BAND_CHAIN_ID'))",
), "TEST_ARG", map[string]interface{}{
"BAND_CHAIN_ID": "test-chain-id",
"BAND_VALIDATOR": "test-validator",
"BAND_REQUEST_ID": "test-request-id",
"BAND_EXTERNAL_ID": "test-external-id",
"BAND_REPORTER": "test-reporter",
"BAND_SIGNATURE": "test-signature",
})
require.Equal(t, []byte("TEST_ARG test-chain-id\n"), res.Output)
require.Equal(t, uint32(0), res.Code)
require.NoError(t, err)
}
}
71 changes: 57 additions & 14 deletions yoda/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,21 @@ import (
"errors"
"fmt"
"net/url"
"strconv"
"strings"
"time"
)

const (
flagQueryTimeout = "timeout"
flagQueryTimeout = "timeout"
flagQueryMaxTry = "maxTry"
flagQueryPortRange = "portRange"
)

var (
ErrExecutionimeout = errors.New("execution timeout")
ErrRestNotOk = errors.New("rest return non 2XX response")
ErrReachMaxTry = errors.New("execution reach max try")
)

type ExecResult struct {
Expand All @@ -33,15 +37,22 @@ var testProgram []byte = []byte(

// NewExecutor returns executor by name and executor URL
func NewExecutor(executor string) (exec Executor, err error) {
name, base, timeout, err := parseExecutor(executor)
name, base, timeout, maxTry, startPort, endPort, err := parseExecutor(executor)
if err != nil {
return nil, err
}
switch name {
case "rest":
exec = NewRestExec(base, timeout)
case "docker":
return nil, fmt.Errorf("Docker executor is currently not supported")
// Only use in testnet. No intensive testing, use at your own risk
if endPort <= startPort {
return nil, fmt.Errorf("portRange invalid: startPort: %d, endPort: %d", startPort, endPort)
}
if maxTry < 1 {
return nil, fmt.Errorf("maxTry invalid: %d", maxTry)
}
exec = NewDockerExec(base, timeout, maxTry, startPort, endPort)
default:
return nil, fmt.Errorf("Invalid executor name: %s, base: %s", name, base)
}
Expand All @@ -68,29 +79,61 @@ func NewExecutor(executor string) (exec Executor, err error) {
return exec, nil
}

// parseExecutor splits the executor string in the form of "name:base?timeout=" into parts.
func parseExecutor(executorStr string) (name string, base string, timeout time.Duration, err error) {
// parseExecutor splits the executor string in the form of "name:base?timeout=&maxTry=&portRange=" into parts.
func parseExecutor(
executorStr string,
) (name string, base string, timeout time.Duration, maxTry int, startPort int, endPort int, err error) {
executor := strings.SplitN(executorStr, ":", 2)
if len(executor) != 2 {
return "", "", 0, fmt.Errorf("Invalid executor, cannot parse executor: %s", executorStr)
return "", "", 0, 0, 0, 0, fmt.Errorf("Invalid executor, cannot parse executor: %s", executorStr)
}
u, err := url.Parse(executor[1])
if err != nil {
return "", "", 0, fmt.Errorf("Invalid url, cannot parse %s to url with error: %s", executor[1], err.Error())
return "", "", 0, 0, 0, 0, fmt.Errorf(
"Invalid url, cannot parse %s to url with error: %s",
executor[1],
err.Error(),
)
}

query := u.Query()
timeoutStr := query.Get(flagQueryTimeout)
if timeoutStr == "" {
return "", "", 0, fmt.Errorf("Invalid timeout, executor requires query timeout")
return "", "", 0, 0, 0, 0, fmt.Errorf("Invalid timeout, executor requires query timeout")
}
// Remove timeout from query because we need to return `base`
query.Del(flagQueryTimeout)
u.RawQuery = query.Encode()

timeout, err = time.ParseDuration(timeoutStr)
if err != nil {
return "", "", 0, fmt.Errorf("Invalid timeout, cannot parse duration with error: %s", err.Error())
return "", "", 0, 0, 0, 0, fmt.Errorf("Invalid timeout, cannot parse duration with error: %s", err.Error())
}

maxTryStr := query.Get(flagQueryMaxTry)
if maxTryStr == "" {
maxTryStr = "1"
}
maxTry, err = strconv.Atoi(maxTryStr)
if err != nil {
return "", "", 0, 0, 0, 0, fmt.Errorf("Invalid maxTry, cannot parse integer with error: %s", err.Error())
}

portRangeStr := query.Get(flagQueryPortRange)
ports := strings.SplitN(portRangeStr, "-", 2)
if len(ports) != 2 {
ports = []string{"0", "0"}
}
return executor[0], u.String(), timeout, nil
startPort, err = strconv.Atoi(ports[0])
if err != nil {
return "", "", 0, 0, 0, 0, fmt.Errorf("Invalid portRange, cannot parse integer with error: %s", err.Error())
}
endPort, err = strconv.Atoi(ports[1])
if err != nil {
return "", "", 0, 0, 0, 0, fmt.Errorf("Invalid portRange, cannot parse integer with error: %s", err.Error())
}

// Remove timeout from query because we need to return `base`
query.Del(flagQueryTimeout)
query.Del(flagQueryMaxTry)
query.Del(flagQueryPortRange)

u.RawQuery = query.Encode()
return executor[0], u.String(), timeout, maxTry, startPort, endPort, nil
}
Loading