Skip to content

Commit 9b54bf3

Browse files
work in progress, fetching executions via html now
1 parent 79d12a9 commit 9b54bf3

File tree

6 files changed

+163
-19
lines changed

6 files changed

+163
-19
lines changed

azkaban/client.go

Lines changed: 119 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,17 @@ package azkaban
22

33
import (
44
"encoding/json"
5+
"errors"
56
"fmt"
7+
htmlx "golang.org/x/net/html"
68
"html"
79
"io"
810
"io/ioutil"
911
"log"
1012
"net/http"
1113
"net/http/httputil"
14+
"strconv"
15+
"time"
1216
)
1317

1418
type Client struct {
@@ -129,7 +133,7 @@ func (c *Client) RestartFlowNow(project, flow string) error {
129133
return nil
130134
}
131135

132-
func (c *Client) FlowEcecutionStatus(executionID int64) (FlowExecutionStatus, error) {
136+
func (c *Client) FlowExecutionStatus(executionID int64) (FlowExecutionStatus, error) {
133137
status := FlowExecutionStatus{}
134138

135139
params := make(map[string]string)
@@ -182,6 +186,120 @@ func (c *Client) FlowSchedule(projectID int64, flowID string) (FlowSchedule, err
182186
}
183187
}
184188

189+
func (c *Client) Running() ([]FlowExecution, error) {
190+
resp, err := c.request("GET", "executor", nil)
191+
if err != nil {
192+
return nil, err
193+
}
194+
if resp.StatusCode != 200 {
195+
return nil, errors.New(fmt.Sprintf("got %d %s when retrieving running executions", resp.StatusCode, resp.Status))
196+
}
197+
198+
defer resp.Body.Close()
199+
doc, err := htmlx.Parse(resp.Body)
200+
if err != nil {
201+
return nil, err
202+
}
203+
204+
// Azkaban serves the login page simply with a HTTP 200 so the only way to check if we're looking at the login page
205+
// is by looking for the login element.
206+
if findElementWithID(doc,"username") != nil && findElementWithID(doc, "password") != nil {
207+
return nil, errors.New("credentials expired, reauthenticate")
208+
}
209+
210+
table := findElementWithID(doc, "executingJobs")
211+
212+
return findExecutions(table)
213+
}
214+
215+
type FlowExecution struct {
216+
FlowID string
217+
Project string
218+
Execution
219+
}
220+
221+
func findExecutions(n *htmlx.Node) ([]FlowExecution, error) {
222+
tbody := findElementsOfType(n, "tbody")
223+
rows := findElementsOfType(tbody[0], "tr")
224+
225+
var executions []FlowExecution
226+
for _, row := range rows {
227+
cells := findElementsOfType(row, "td")
228+
229+
execID, err := strconv.ParseInt(findElementsOfType(cells[1], "a")[0].FirstChild.Data, 10, 64)
230+
if err != nil {
231+
return nil, err
232+
}
233+
flowID := findElementsOfType(cells[3], "a")[0].FirstChild.Data
234+
project := findElementsOfType(cells[4], "a")[0].FirstChild.Data
235+
// TODO project ID is not part of the actual link so we can't parse it. We'll have to fetch projects before we do this
236+
// parsing time "2019-08-21 01:53:42" as "Jan 2, 2006 at 3:04pm": cannot parse "2019-08-21 01:53:42" as "Jan"
237+
startTime, err := time.Parse("2006-01-02 15:04:05", cells[7].FirstChild.Data)
238+
if err != nil {
239+
return nil, err
240+
}
241+
242+
execution := FlowExecution{
243+
FlowID: flowID,
244+
Project: project,
245+
Execution: Execution{
246+
SubmitTime: AzkabanTimestamp{},
247+
StartTime: AzkabanTimestamp(startTime),
248+
Status: "RUNNING",
249+
ID: execID,
250+
EndTime: AzkabanTimestamp(time.Now()),
251+
},
252+
}
253+
254+
executions = append(executions, execution)
255+
}
256+
257+
return executions, nil
258+
}
259+
260+
func getAttribute(n *htmlx.Node, name string) string {
261+
for _, a := range n.Attr {
262+
if a.Key == name {
263+
return a.Val
264+
}
265+
}
266+
return ""
267+
}
268+
269+
func findElementsOfType(n *htmlx.Node, t string) []*htmlx.Node {
270+
var result []*htmlx.Node
271+
if n.Type == htmlx.ElementNode && n.Data == t {
272+
result = append(result, n)
273+
}
274+
275+
for c := n.FirstChild; c != nil; c = c.NextSibling {
276+
result = append(result, findElementsOfType(c, t)...)
277+
}
278+
279+
return result
280+
}
281+
282+
func findElementWithID(n *htmlx.Node, id string) *htmlx.Node {
283+
if hasAttribute(n, "id", id) {
284+
return n;
285+
}
286+
for c := n.FirstChild; c != nil; c = c.NextSibling {
287+
if result := findElementWithID(c, id); result != nil {
288+
return result
289+
}
290+
}
291+
return nil
292+
}
293+
294+
func hasAttribute(n *htmlx.Node, name string, value string) bool {
295+
for _, a := range n.Attr {
296+
if a.Key == name && a.Val == value {
297+
return true
298+
}
299+
}
300+
return false
301+
}
302+
185303
func (c *Client) requestAndDecode(method string, path string, params map[string]string, dst interface{}) error {
186304
resp, err := c.request(method, path, params)
187305
if err != nil {

azkaban/json.go

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -258,22 +258,6 @@ func (t AzkabanStringTime) Time() time.Time {
258258
return time.Time(t)
259259
}
260260

261-
func formatDuration(d time.Duration) string {
262-
hours := 0
263-
if d.Hours() >= 1.0 {
264-
hours = int(d.Hours())
265-
}
266-
minutes := 0
267-
if d.Minutes() > 0 {
268-
minutes = int(d.Minutes()) % 60
269-
}
270-
seconds := 0
271-
if d.Seconds() > 0 {
272-
seconds = int(d.Seconds()) % 60
273-
}
274-
return fmt.Sprintf("%d:%02d:%02d", hours, minutes, seconds)
275-
}
276-
277261
type FlowExecutionStatus struct {
278262
Attempt int `json:"attempt"`
279263
Status Status `json:"status"`

check_flow_action.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ func (h FlowStatusChecker) printFlowStatus() (status FlowStatus, err error) {
207207
status.LastExecution = executions.MostRecentExecution()
208208
executionID := executions.MostRecentExecution().ID
209209

210-
flowExecStatus, err := client.FlowEcecutionStatus(executionID)
210+
flowExecStatus, err := client.FlowExecutionStatus(executionID)
211211
if err != nil {
212212
return status, err
213213
}

get_action.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,12 @@ func SetupGetActions() cli.Command {
4040
Action: handlers.GetExecutionsAction,
4141
ArgsUsage: "flow",
4242
},
43+
{
44+
Name: "running",
45+
Aliases: []string{"r"},
46+
Usage: "lists running executions",
47+
Action: handlers.GetRunningAction,
48+
},
4349
},
4450
}
4551
}
@@ -48,6 +54,37 @@ type GetActionsHandler struct {
4854
ActionWithContext
4955
}
5056

57+
func (a *GetActionsHandler) GetRunningAction(c *cli.Context) error {
58+
executions, err := a.Client().Running()
59+
60+
w := new(tabwriter.Writer)
61+
w.Init(os.Stdout, 4, 4, 2, ' ', 0)
62+
fmt.Fprintf(
63+
w,
64+
"%s\t%s\t%s\t%s\t%s\n",
65+
"Project",
66+
"FlowID",
67+
color.WhiteString("Status"),
68+
"Start time",
69+
"Runtime",
70+
)
71+
72+
for _, execution := range executions {
73+
fmt.Fprintf(
74+
w,
75+
"%s\t%s\t%s\t%s\t%s\n",
76+
execution.Project,
77+
execution.FlowID,
78+
execution.Status.Colored(),
79+
humanize.Time(execution.StartTime.Time()),
80+
format.DurationHumanReadable(time.Since(execution.StartTime.Time())),
81+
)
82+
}
83+
84+
w.Flush()
85+
return err
86+
}
87+
5188
func (a *GetActionsHandler) GetExecutionsAction(c *cli.Context) error {
5289
if !c.Args().Present() {
5390
return errors.New("no flowID given")

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ require (
88
github.com/mattn/go-colorable v0.0.9 // indirect
99
github.com/mattn/go-isatty v0.0.3 // indirect
1010
github.com/urfave/cli v1.21.0
11-
golang.org/x/sys v0.0.0-20171110072704-1e2299c37cc9 // indirect
11+
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7
1212
)

go.sum

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,13 @@ github.com/mattn/go-isatty v0.0.3 h1:ns/ykhmWi7G9O+8a448SecJU3nSMBXJfqQkl0upE1jI
99
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
1010
github.com/urfave/cli v1.21.0 h1:wYSSj06510qPIzGSua9ZqsncMmWE3Zr55KBERygyrxE=
1111
github.com/urfave/cli v1.21.0/go.mod h1:lxDj6qX9Q6lWQxIrbrT0nwecwUtRnhVZAJjJZrVUZZQ=
12+
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
13+
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7 h1:fHDIZ2oxGnUZRN6WgWFCbYBjH9uqVPRCUVUDhs0wnbA=
14+
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
1215
golang.org/x/sys v0.0.0-20171110072704-1e2299c37cc9 h1:LRRcaqNAXmlmNmT+Q2u9z+iEf5+Y+S1Ea9MNwGxOc98=
1316
golang.org/x/sys v0.0.0-20171110072704-1e2299c37cc9/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
17+
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
18+
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
1419
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
1520
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
1621
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=

0 commit comments

Comments
 (0)