Skip to content

Commit 1ba0b06

Browse files
authored
internal: testserver: reorg internals, fix error code (#3591)
## Changes - Move jobs-related code to jobs.go. - In jobs.go, update parsing error to return 400 instead of 500. - Move secrets ACLs related code to secret_acls.go. ## Why Match structure of other resources in testserver.
1 parent 8abb179 commit 1ba0b06

File tree

4 files changed

+256
-268
lines changed

4 files changed

+256
-268
lines changed

libs/testserver/fake_workspace.go

Lines changed: 0 additions & 181 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@ import (
66
"fmt"
77
"path"
88
"path/filepath"
9-
"sort"
10-
"strconv"
119
"strings"
1210
"sync"
1311

@@ -297,185 +295,6 @@ func (s *FakeWorkspace) WorkspaceFilesExportFile(path string) []byte {
297295
return s.files[path].Data
298296
}
299297

300-
func (s *FakeWorkspace) JobsCreate(request jobs.CreateJob) Response {
301-
defer s.LockUnlock()()
302-
303-
jobId := s.nextJobId
304-
s.nextJobId++
305-
306-
jobSettings := jobs.JobSettings{}
307-
err := jsonConvert(request, &jobSettings)
308-
if err != nil {
309-
return Response{
310-
StatusCode: 400,
311-
Body: fmt.Sprintf("Cannot convert request to jobSettings: %s", err),
312-
}
313-
}
314-
315-
s.Jobs[jobId] = jobs.Job{
316-
JobId: jobId,
317-
Settings: &jobSettings,
318-
}
319-
320-
return Response{
321-
Body: jobs.CreateResponse{JobId: jobId},
322-
}
323-
}
324-
325-
func (s *FakeWorkspace) JobsReset(request jobs.ResetJob) Response {
326-
defer s.LockUnlock()()
327-
328-
jobId := request.JobId
329-
330-
_, ok := s.Jobs[request.JobId]
331-
if !ok {
332-
return Response{
333-
StatusCode: 403,
334-
Body: "{}",
335-
}
336-
}
337-
338-
s.Jobs[jobId] = jobs.Job{
339-
JobId: jobId,
340-
Settings: &request.NewSettings,
341-
}
342-
343-
return Response{
344-
Body: "",
345-
}
346-
}
347-
348-
func (s *FakeWorkspace) JobsGet(jobId string) Response {
349-
id := jobId
350-
351-
jobIdInt, err := strconv.ParseInt(id, 10, 64)
352-
if err != nil {
353-
return Response{
354-
StatusCode: 400,
355-
Body: fmt.Sprintf("Failed to parse job id: %s: %v", err, id),
356-
}
357-
}
358-
359-
defer s.LockUnlock()()
360-
361-
job, ok := s.Jobs[jobIdInt]
362-
if !ok {
363-
return Response{
364-
StatusCode: 404,
365-
}
366-
}
367-
368-
job = setSourceIfNotSet(job)
369-
return Response{
370-
Body: job,
371-
}
372-
}
373-
374-
func (s *FakeWorkspace) JobsRunNow(jobId int64) Response {
375-
defer s.LockUnlock()()
376-
377-
_, ok := s.Jobs[jobId]
378-
if !ok {
379-
return Response{
380-
StatusCode: 404,
381-
}
382-
}
383-
384-
runId := s.nextJobRunId
385-
s.nextJobRunId++
386-
s.JobRuns[runId] = jobs.Run{
387-
RunId: runId,
388-
State: &jobs.RunState{
389-
LifeCycleState: jobs.RunLifeCycleStateRunning,
390-
},
391-
RunPageUrl: fmt.Sprintf("%s/job/run/%d", s.url, runId),
392-
RunType: jobs.RunTypeJobRun,
393-
RunName: "run-name",
394-
}
395-
396-
return Response{
397-
Body: jobs.RunNowResponse{
398-
RunId: runId,
399-
},
400-
}
401-
}
402-
403-
func (s *FakeWorkspace) JobsGetRun(runId int64) Response {
404-
defer s.LockUnlock()()
405-
406-
run, ok := s.JobRuns[runId]
407-
if !ok {
408-
return Response{
409-
StatusCode: 404,
410-
}
411-
}
412-
413-
// Mark the run as terminated.
414-
run.State.LifeCycleState = jobs.RunLifeCycleStateTerminated
415-
return Response{
416-
Body: run,
417-
}
418-
}
419-
420-
func (s *FakeWorkspace) JobsList() Response {
421-
defer s.LockUnlock()()
422-
423-
list := make([]jobs.BaseJob, 0, len(s.Jobs))
424-
for _, job := range s.Jobs {
425-
job = setSourceIfNotSet(job)
426-
baseJob := jobs.BaseJob{}
427-
err := jsonConvert(job, &baseJob)
428-
if err != nil {
429-
return Response{
430-
StatusCode: 400,
431-
Body: fmt.Sprintf("failed to convert job to base job: %s", err),
432-
}
433-
}
434-
435-
list = append(list, baseJob)
436-
}
437-
438-
// sort to have less non-determinism in tests
439-
sort.Slice(list, func(i, j int) bool {
440-
return list[i].JobId < list[j].JobId
441-
})
442-
443-
return Response{
444-
Body: jobs.ListJobsResponse{
445-
Jobs: list,
446-
},
447-
}
448-
}
449-
450-
func setSourceIfNotSet(job jobs.Job) jobs.Job {
451-
// Setting the source field in the output of the Jobs List API same way as backend does
452-
if job.Settings != nil {
453-
source := "WORKSPACE"
454-
if job.Settings.GitSource != nil {
455-
source = "GIT"
456-
}
457-
for _, task := range job.Settings.Tasks {
458-
if task.NotebookTask != nil {
459-
if task.NotebookTask.Source == "" {
460-
task.NotebookTask.Source = jobs.Source(source)
461-
}
462-
if task.DbtTask != nil {
463-
if task.DbtTask.Source == "" {
464-
task.DbtTask.Source = jobs.Source(source)
465-
}
466-
}
467-
if task.SparkPythonTask != nil {
468-
if task.SparkPythonTask.Source == "" {
469-
task.SparkPythonTask.Source = jobs.Source(source)
470-
}
471-
}
472-
}
473-
}
474-
}
475-
476-
return job
477-
}
478-
479298
// jsonConvert saves input to a value pointed by output
480299
func jsonConvert(input, output any) error {
481300
writer := new(bytes.Buffer)

libs/testserver/handlers.go

Lines changed: 22 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"encoding/json"
66
"fmt"
77
"net/http"
8-
"strconv"
98

109
"github.com/databricks/databricks-sdk-go/service/catalog"
1110
"github.com/databricks/databricks-sdk-go/service/compute"
@@ -180,77 +179,42 @@ func AddDefaultHandlers(server *Server) {
180179
})
181180

182181
server.Handle("POST", "/api/2.2/jobs/create", func(req Request) any {
183-
var request jobs.CreateJob
184-
if err := json.Unmarshal(req.Body, &request); err != nil {
185-
return Response{
186-
Body: fmt.Sprintf("internal error: %s", err),
187-
StatusCode: 500,
188-
}
189-
}
190-
191-
return req.Workspace.JobsCreate(request)
182+
return req.Workspace.JobsCreate(req)
192183
})
193184

194185
server.Handle("POST", "/api/2.2/jobs/delete", func(req Request) any {
195186
var request jobs.DeleteJob
196187
if err := json.Unmarshal(req.Body, &request); err != nil {
197188
return Response{
198-
Body: fmt.Sprintf("internal error: %s", err),
199-
StatusCode: 500,
189+
StatusCode: 400,
190+
Body: fmt.Sprintf("request parsing error: %s", err),
200191
}
201192
}
202193
return MapDelete(req.Workspace, req.Workspace.Jobs, request.JobId)
203194
})
204195

205196
server.Handle("POST", "/api/2.2/jobs/reset", func(req Request) any {
206-
var request jobs.ResetJob
207-
if err := json.Unmarshal(req.Body, &request); err != nil {
208-
return Response{
209-
Body: fmt.Sprintf("internal error: %s", err),
210-
StatusCode: 500,
211-
}
212-
}
213-
214-
return req.Workspace.JobsReset(request)
197+
return req.Workspace.JobsReset(req)
215198
})
216199

217200
server.Handle("GET", "/api/2.0/jobs/get", func(req Request) any {
218-
jobId := req.URL.Query().Get("job_id")
219-
return req.Workspace.JobsGet(jobId)
201+
return req.Workspace.JobsGet(req)
220202
})
221203

222204
server.Handle("GET", "/api/2.2/jobs/get", func(req Request) any {
223-
jobId := req.URL.Query().Get("job_id")
224-
return req.Workspace.JobsGet(jobId)
205+
return req.Workspace.JobsGet(req)
225206
})
226207

227208
server.Handle("GET", "/api/2.2/jobs/list", func(req Request) any {
228209
return req.Workspace.JobsList()
229210
})
230211

231212
server.Handle("POST", "/api/2.2/jobs/run-now", func(req Request) any {
232-
var request jobs.RunNow
233-
if err := json.Unmarshal(req.Body, &request); err != nil {
234-
return Response{
235-
Body: fmt.Sprintf("internal error: %s", err),
236-
StatusCode: 500,
237-
}
238-
}
239-
240-
return req.Workspace.JobsRunNow(request.JobId)
213+
return req.Workspace.JobsRunNow(req)
241214
})
242215

243216
server.Handle("GET", "/api/2.2/jobs/runs/get", func(req Request) any {
244-
runId := req.URL.Query().Get("run_id")
245-
runIdInt, err := strconv.ParseInt(runId, 10, 64)
246-
if err != nil {
247-
return Response{
248-
Body: fmt.Sprintf("internal error: %s", err),
249-
StatusCode: 500,
250-
}
251-
}
252-
253-
return req.Workspace.JobsGetRun(runIdInt)
217+
return req.Workspace.JobsGetRun(req)
254218
})
255219

256220
server.Handle("GET", "/api/2.2/jobs/runs/list", func(req Request) any {
@@ -395,6 +359,8 @@ func AddDefaultHandlers(server *Server) {
395359
return req.Workspace.VolumesCreate(req)
396360
})
397361

362+
// Repos:
363+
398364
server.Handle("POST", "/api/2.0/repos", func(req Request) any {
399365
return req.Workspace.ReposCreate(req)
400366
})
@@ -420,6 +386,7 @@ func AddDefaultHandlers(server *Server) {
420386
})
421387

422388
// SQL Warehouses:
389+
423390
server.Handle("GET", "/api/2.0/sql/warehouses/{warehouse_id}", func(req Request) any {
424391
return MapGet(req.Workspace, req.Workspace.SqlWarehouses, req.Vars["warehouse_id"])
425392
})
@@ -444,60 +411,24 @@ func AddDefaultHandlers(server *Server) {
444411
return req.Workspace.SqlDataSourcesList(req)
445412
})
446413

414+
// Secrets ACLs:
415+
447416
server.Handle("GET", "/api/2.0/secrets/acls/get", func(req Request) any {
448-
defer req.Workspace.LockUnlock()()
449-
450-
scope := req.URL.Query().Get("scope")
451-
principal := req.URL.Query().Get("principal")
452-
scopeAcls := req.Workspace.Acls[scope]
453-
for _, acl := range scopeAcls {
454-
if acl.Principal == principal {
455-
return acl
456-
}
457-
}
458-
return Response{StatusCode: 404}
417+
return req.Workspace.SecretsAclsGet(req)
459418
})
460419

461420
server.Handle("GET", "/api/2.0/secrets/acls/list", func(req Request) any {
462-
return MapGet(req.Workspace, req.Workspace.Acls, req.Vars["scope"])
421+
return MapGet(req.Workspace, req.Workspace.Acls, req.URL.Query().Get("scope"))
463422
})
464423

465424
server.Handle("POST", "/api/2.0/secrets/acls/put", func(req Request) any {
466-
defer req.Workspace.LockUnlock()()
467-
468-
var request workspace.PutAcl
469-
if err := json.Unmarshal(req.Body, &request); err != nil {
470-
return Response{
471-
Body: fmt.Sprintf("internal error: %s", err),
472-
StatusCode: 500,
473-
}
474-
}
475-
req.Workspace.Acls[request.Scope] = append(req.Workspace.Acls[request.Scope], workspace.AclItem{
476-
Principal: request.Principal,
477-
Permission: request.Permission,
478-
})
479-
return ""
425+
return req.Workspace.SecretsAclsPut(req)
480426
})
481427

482428
server.Handle("POST", "/api/2.0/secrets/acls/delete", func(req Request) any {
483-
defer req.Workspace.LockUnlock()()
484-
485-
var request workspace.DeleteAcl
486-
if err := json.Unmarshal(req.Body, &request); err != nil {
487-
return Response{
488-
Body: fmt.Sprintf("internal error: %s", err),
489-
StatusCode: 500,
490-
}
491-
}
492-
scopeAcls := req.Workspace.Acls[request.Scope]
493-
for i, acl := range scopeAcls {
494-
if acl.Principal == request.Principal {
495-
req.Workspace.Acls[request.Scope] = append(scopeAcls[:i], scopeAcls[i+1:]...)
496-
return ""
497-
}
498-
}
499-
return Response{StatusCode: 404}
429+
return req.Workspace.SecretsAclsDelete(req)
500430
})
431+
// Database Instances:
501432

502433
server.Handle("POST", "/api/2.0/database/instances", func(req Request) any {
503434
return req.Workspace.DatabaseInstanceCreate(req)
@@ -519,6 +450,8 @@ func AddDefaultHandlers(server *Server) {
519450
return DatabaseInstanceMapDelete(req)
520451
})
521452

453+
// Database Catalogs:
454+
522455
server.Handle("POST", "/api/2.0/database/catalogs", func(req Request) any {
523456
return req.Workspace.DatabaseCatalogCreate(req)
524457
})
@@ -535,6 +468,8 @@ func AddDefaultHandlers(server *Server) {
535468
return MapDelete(req.Workspace, req.Workspace.DatabaseCatalogs, req.Vars["name"])
536469
})
537470

471+
// Synced Database Tables:
472+
538473
server.Handle("POST", "/api/2.0/database/synced_tables", func(req Request) any {
539474
return req.Workspace.SyncedDatabaseTableCreate(req)
540475
})

0 commit comments

Comments
 (0)