Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions query/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ type ExecutionOptions struct {

// AbortCh is a channel that signals when results are no longer desired by the caller.
AbortCh <-chan struct{}

// UserID is the ID of the user executing the query.
UserID string
}

type (
Expand Down Expand Up @@ -470,6 +473,7 @@ func (e *Executor) recover(query *influxql.Query, results chan *Result) {
type Task struct {
query string
database string
userID string
status TaskStatus
startTime time.Time
closing chan struct{}
Expand Down
8 changes: 7 additions & 1 deletion query/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,8 @@ func TestQueryExecutor_Abort(t *testing.T) {
}

func TestQueryExecutor_ShowQueries(t *testing.T) {
const testUser = "Fred"
const userColumn = 5
e := NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
Expand All @@ -336,12 +338,16 @@ func TestQueryExecutor_ShowQueries(t *testing.T) {
t.Fatal(err)
}

results := e.ExecuteQuery(q, query.ExecutionOptions{}, nil)
results := e.ExecuteQuery(q, query.ExecutionOptions{UserID: testUser}, nil)
result := <-results
if len(result.Series) != 1 {
t.Errorf("expected %d series, got %d", 1, len(result.Series))
} else if len(result.Series[0].Values) != 1 {
t.Errorf("expected %d row, got %d", 1, len(result.Series[0].Values))
} else if result.Series[0].Values[0][userColumn] != testUser {
t.Errorf("unexpected user: %s", result.Series[0].Values[0][0])
} else if result.Series[0].Columns[userColumn] != "user" {
t.Errorf("unexpected column: %s", result.Series[0].Columns[5])
}
if result.Err != nil {
t.Errorf("unexpected error: %s", result.Err)
Expand Down
15 changes: 10 additions & 5 deletions query/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const (
)

var (
queryFieldNames []string = []string{"qid", "query", "database", "duration", "status"}
queryFieldNames []string = []string{"qid", "query", "database", "duration", "status", "user"}
)

func (t TaskStatus) String() string {
Expand Down Expand Up @@ -145,7 +145,7 @@ func (t *TaskManager) executeShowQueriesStatement(q *influxql.ShowQueriesStateme

d = prettyTime(d)

values = append(values, []interface{}{id, qi.query, qi.database, d.String(), qi.status.String()})
values = append(values, []interface{}{id, qi.query, qi.database, d.String(), qi.status.String(), qi.userID})
}

return []*models.Row{{
Expand All @@ -172,7 +172,8 @@ func (t *TaskManager) LogCurrentQueries(logFunc func(string, ...zap.Field)) {
zap.String(queryFieldNames[1], queryInfo.Query),
zap.String(queryFieldNames[2], queryInfo.Database),
zap.String(queryFieldNames[3], prettyTime(queryInfo.Duration).String()),
zap.String(queryFieldNames[4], queryInfo.Status.String()))
zap.String(queryFieldNames[4], queryInfo.Status.String()),
zap.String(queryFieldNames[5], queryInfo.User))
}
}

Expand Down Expand Up @@ -208,6 +209,7 @@ func (t *TaskManager) AttachQuery(q *influxql.Query, opt ExecutionOptions, inter
query := &Task{
query: q.String(),
database: opt.Database,
userID: opt.UserID,
status: RunningTask,
startTime: time.Now(),
closing: make(chan struct{}),
Expand All @@ -223,8 +225,8 @@ func (t *TaskManager) AttachQuery(q *influxql.Query, opt ExecutionOptions, inter

select {
case <-timer.C:
t.Logger.Warn(fmt.Sprintf("Detected slow query: %s (qid: %d, database: %s, threshold: %s)",
query.query, qid, query.database, t.LogQueriesAfter))
t.Logger.Warn(fmt.Sprintf("Detected slow query: %s (qid: %d, database: %s, user: %s, threshold: %s)",
query.query, qid, query.database, query.userID, t.LogQueriesAfter))
case <-closing:
}
return nil
Expand Down Expand Up @@ -279,6 +281,7 @@ type QueryInfo struct {
Database string `json:"database"`
Duration time.Duration `json:"duration"`
Status TaskStatus `json:"status"`
User string `json:"user"`
}

// Queries returns a list of all running queries with information about them.
Expand All @@ -295,6 +298,7 @@ func (t *TaskManager) Queries() []QueryInfo {
Database: qi.database,
Duration: now.Sub(qi.startTime),
Status: qi.status,
User: qi.userID,
})
}
return queries
Expand Down Expand Up @@ -323,6 +327,7 @@ func (t *TaskManager) waitForQuery(qid uint64, interrupt <-chan struct{}, closin
"query killed for exceeding timeout limit",
zap.String("query", t.queries[qid].query),
zap.String("database", t.queries[qid].database),
zap.String("user", t.queries[qid].userID),
zap.String("timeout", prettyTime(t.QueryTimeout).String()),
)
}
Expand Down
6 changes: 6 additions & 0 deletions services/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,13 +685,19 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.U
// Parse whether this is an async command.
async := r.FormValue("async") == "true"

var userName string

if user != nil {
userName = user.ID()
}
opts := query.ExecutionOptions{
Database: db,
RetentionPolicy: r.FormValue("rp"),
ChunkSize: chunkSize,
ReadOnly: r.Method == "GET",
NodeID: nodeID,
Authorizer: fineAuthorizer,
UserID: userName,
}

if h.Config.AuthEnabled {
Expand Down