Skip to content

Commit 27ba2ab

Browse files
ericfeunekesEric Feunekesclaudepkosiec
authored
Introduce app logs command (#3908)
## Changes - Add the `databricks apps logs NAME` command, including tail/follow/search/source/output-file flags wired via `cmdgroup`, file mirroring with 0600 perms, and validation against apps that lack a public URL. (`cmd/workspace/apps`) - Introduce the reusable `libs/logstream` helper with token refresh hooks, buffering, search/source filtering, structured error handling, context-driven deadlines, and a comprehensive unit suite so other commands can stream logs without bespoke WebSocket loops. ## Why This is a quality of life addition that allows the CLI to tail app logs without a need to navigate to the Apps UI. ## Tests - go test ./libs/logstream - go test ./cmd/workspace/apps - go test ./cmd/auth <!-- If your PR needs to be included in the release notes for next release, add a separate entry in NEXT_CHANGELOG.md as part of your PR. --> --------- Co-authored-by: Eric Feunekes <[email protected]> Co-authored-by: Claude <[email protected]> Co-authored-by: Pawel Kosiec <[email protected]>
1 parent 6ab1806 commit 27ba2ab

File tree

5 files changed

+1661
-0
lines changed

5 files changed

+1661
-0
lines changed

cmd/workspace/apps/logs.go

Lines changed: 281 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,281 @@
1+
package apps
2+
3+
import (
4+
"context"
5+
"crypto/tls"
6+
"errors"
7+
"fmt"
8+
"io"
9+
"net/http"
10+
"net/url"
11+
"os"
12+
"path"
13+
"slices"
14+
"strings"
15+
"time"
16+
17+
"github.com/databricks/cli/cmd/root"
18+
"github.com/databricks/cli/libs/apps/logstream"
19+
"github.com/databricks/cli/libs/cmdctx"
20+
"github.com/databricks/cli/libs/cmdgroup"
21+
"github.com/databricks/cli/libs/cmdio"
22+
"github.com/databricks/cli/libs/log"
23+
"github.com/databricks/databricks-sdk-go/config"
24+
"github.com/databricks/databricks-sdk-go/service/apps"
25+
"github.com/gorilla/websocket"
26+
"github.com/spf13/cobra"
27+
)
28+
29+
const (
30+
defaultTailLines = 200
31+
defaultPrefetchWindow = 2 * time.Second
32+
defaultHandshakeTimeout = 30 * time.Second
33+
)
34+
35+
var allowedSources = []string{"APP", "SYSTEM"}
36+
37+
func newLogsCommand() *cobra.Command {
38+
var (
39+
tailLines int
40+
follow bool
41+
outputPath string
42+
streamTimeout time.Duration
43+
searchTerm string
44+
sourceFilters []string
45+
)
46+
47+
cmd := &cobra.Command{
48+
Use: "logs NAME",
49+
Short: "Show Databricks app logs",
50+
Long: `Stream stdout/stderr logs for a Databricks app via its log stream.
51+
52+
By default the command fetches the most recent logs (up to --tail-lines, default 200) and exits.
53+
Use --follow to continue streaming logs until cancelled, optionally bounding the duration with --timeout.
54+
Server-side filtering is available through --search (same semantics as the Databricks UI) and client-side filtering
55+
via --source APP|SYSTEM. Use --output-file to mirror the stream to a local file (created with 0600 permissions).`,
56+
Example: ` # Fetch the last 50 log lines
57+
databricks apps logs my-app --tail-lines 50
58+
59+
# Follow logs until interrupted, searching for "ERROR" messages from app sources only
60+
databricks apps logs my-app --follow --search ERROR --source APP
61+
62+
# Mirror streamed logs to a local file while following for up to 5 minutes
63+
databricks apps logs my-app --follow --timeout 5m --output-file /tmp/my-app.log`,
64+
Args: root.ExactArgs(1),
65+
PreRunE: root.MustWorkspaceClient,
66+
RunE: func(cmd *cobra.Command, args []string) error {
67+
ctx := cmd.Context()
68+
69+
if tailLines < 0 {
70+
return errors.New("--tail-lines cannot be negative")
71+
}
72+
73+
if follow && streamTimeout > 0 {
74+
var cancel context.CancelFunc
75+
ctx, cancel = context.WithTimeout(ctx, streamTimeout)
76+
defer cancel()
77+
}
78+
79+
name := args[0]
80+
w := cmdctx.WorkspaceClient(ctx)
81+
app, err := w.Apps.Get(ctx, apps.GetAppRequest{Name: name})
82+
if err != nil {
83+
return err
84+
}
85+
if app.Url == "" {
86+
return fmt.Errorf("app %s does not have a public URL; deploy and start it before streaming logs", name)
87+
}
88+
89+
wsURL, err := buildLogsURL(app.Url)
90+
if err != nil {
91+
return err
92+
}
93+
94+
cfg := cmdctx.ConfigUsed(ctx)
95+
if cfg == nil {
96+
return errors.New("missing workspace configuration")
97+
}
98+
99+
tokenSource := cfg.GetTokenSource()
100+
if tokenSource == nil {
101+
return errors.New("configuration does not support OAuth tokens")
102+
}
103+
104+
initialToken, err := tokenSource.Token(ctx)
105+
if err != nil {
106+
return err
107+
}
108+
109+
tokenProvider := func(ctx context.Context) (string, error) {
110+
tok, err := tokenSource.Token(ctx)
111+
if err != nil {
112+
return "", err
113+
}
114+
return tok.AccessToken, nil
115+
}
116+
117+
appStatusChecker := func(ctx context.Context) error {
118+
app, err := w.Apps.Get(ctx, apps.GetAppRequest{Name: name})
119+
if err != nil {
120+
return err
121+
}
122+
if app.ComputeStatus == nil {
123+
return errors.New("app status unavailable")
124+
}
125+
// Check if app is in a terminal/stopped state
126+
switch app.ComputeStatus.State {
127+
case apps.ComputeStateStopped, apps.ComputeStateDeleting, apps.ComputeStateError:
128+
return fmt.Errorf("app is %s", app.ComputeStatus.State)
129+
default:
130+
// App is running or transitioning - continue streaming
131+
return nil
132+
}
133+
}
134+
135+
writer := cmd.OutOrStdout()
136+
var file *os.File
137+
if outputPath != "" {
138+
file, err = os.OpenFile(outputPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o600)
139+
if err != nil {
140+
return err
141+
}
142+
defer file.Close()
143+
writer = io.MultiWriter(writer, file)
144+
}
145+
colorizeLogs := outputPath == "" && cmdio.IsTTY(cmd.OutOrStdout())
146+
147+
sourceMap, err := buildSourceFilter(sourceFilters)
148+
if err != nil {
149+
return err
150+
}
151+
152+
log.Infof(ctx, "Streaming logs for %s (%s)", name, wsURL)
153+
return logstream.Run(ctx, logstream.Config{
154+
Dialer: newLogStreamDialer(cfg),
155+
URL: wsURL,
156+
Origin: normalizeOrigin(app.Url),
157+
Token: initialToken.AccessToken,
158+
TokenProvider: tokenProvider,
159+
AppStatusChecker: appStatusChecker,
160+
Search: searchTerm,
161+
Sources: sourceMap,
162+
Tail: tailLines,
163+
Follow: follow,
164+
Prefetch: defaultPrefetchWindow,
165+
Writer: writer,
166+
UserAgent: "databricks-cli apps logs",
167+
Colorize: colorizeLogs,
168+
})
169+
},
170+
}
171+
172+
streamGroup := cmdgroup.NewFlagGroup("Streaming")
173+
streamGroup.FlagSet().IntVar(&tailLines, "tail-lines", defaultTailLines, "Number of recent log lines to show before streaming. Set to 0 to show everything.")
174+
streamGroup.FlagSet().BoolVarP(&follow, "follow", "f", false, "Continue streaming logs until interrupted.")
175+
streamGroup.FlagSet().DurationVar(&streamTimeout, "timeout", 0, "Maximum time to stream when --follow is set. 0 disables the timeout.")
176+
177+
filterGroup := cmdgroup.NewFlagGroup("Filtering")
178+
filterGroup.FlagSet().StringVar(&searchTerm, "search", "", "Send a search term to the log service before streaming.")
179+
filterGroup.FlagSet().StringSliceVar(&sourceFilters, "source", nil, "Restrict logs to APP and/or SYSTEM sources.")
180+
181+
wrappedCmd := cmdgroup.NewCommandWithGroupFlag(cmd)
182+
wrappedCmd.AddFlagGroup(streamGroup)
183+
wrappedCmd.AddFlagGroup(filterGroup)
184+
185+
cmd.Flags().StringVar(&outputPath, "output-file", "", "Optional file path to write logs in addition to stdout.")
186+
187+
return cmd
188+
}
189+
190+
func buildLogsURL(appURL string) (string, error) {
191+
parsed, err := url.Parse(appURL)
192+
if err != nil {
193+
return "", err
194+
}
195+
196+
switch strings.ToLower(parsed.Scheme) {
197+
case "https":
198+
parsed.Scheme = "wss"
199+
case "http":
200+
parsed.Scheme = "ws"
201+
case "wss", "ws":
202+
default:
203+
return "", fmt.Errorf("unsupported app URL scheme: %s", parsed.Scheme)
204+
}
205+
206+
parsed.Path = path.Join(parsed.Path, "logz/stream")
207+
if !strings.HasPrefix(parsed.Path, "/") {
208+
parsed.Path = "/" + parsed.Path
209+
}
210+
211+
return parsed.String(), nil
212+
}
213+
214+
func normalizeOrigin(appURL string) string {
215+
parsed, err := url.Parse(appURL)
216+
if err != nil {
217+
return ""
218+
}
219+
switch strings.ToLower(parsed.Scheme) {
220+
case "http", "https":
221+
return parsed.Scheme + "://" + parsed.Host
222+
case "ws":
223+
parsed.Scheme = "http"
224+
case "wss":
225+
parsed.Scheme = "https"
226+
default:
227+
return ""
228+
}
229+
parsed.Path = ""
230+
parsed.RawQuery = ""
231+
parsed.Fragment = ""
232+
return parsed.String()
233+
}
234+
235+
func buildSourceFilter(values []string) (map[string]struct{}, error) {
236+
if len(values) == 0 {
237+
return nil, nil
238+
}
239+
filter := make(map[string]struct{})
240+
for _, v := range values {
241+
trimmed := strings.ToUpper(strings.TrimSpace(v))
242+
if trimmed == "" {
243+
continue
244+
}
245+
if !slices.Contains(allowedSources, trimmed) {
246+
return nil, fmt.Errorf("invalid --source value %q (valid: %s)", v, strings.Join(allowedSources, ", "))
247+
}
248+
filter[trimmed] = struct{}{}
249+
}
250+
if len(filter) == 0 {
251+
return nil, nil
252+
}
253+
return filter, nil
254+
}
255+
256+
func newLogStreamDialer(cfg *config.Config) *websocket.Dialer {
257+
dialer := &websocket.Dialer{
258+
Proxy: http.ProxyFromEnvironment,
259+
HandshakeTimeout: defaultHandshakeTimeout,
260+
}
261+
262+
if cfg == nil {
263+
return dialer
264+
}
265+
266+
if transport, ok := cfg.HTTPTransport.(*http.Transport); ok && transport != nil {
267+
clone := transport.Clone()
268+
dialer.Proxy = clone.Proxy
269+
dialer.NetDialContext = clone.DialContext
270+
if clone.TLSClientConfig != nil {
271+
dialer.TLSClientConfig = clone.TLSClientConfig.Clone()
272+
}
273+
return dialer
274+
}
275+
276+
if cfg.InsecureSkipVerify {
277+
dialer.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
278+
}
279+
280+
return dialer
281+
}

cmd/workspace/apps/logs_test.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package apps
2+
3+
import (
4+
"crypto/tls"
5+
"net/http"
6+
"net/url"
7+
"testing"
8+
9+
"github.com/databricks/databricks-sdk-go/config"
10+
"github.com/stretchr/testify/assert"
11+
"github.com/stretchr/testify/require"
12+
)
13+
14+
func TestNewLogStreamDialerConfiguresProxyAndTLS(t *testing.T) {
15+
t.Run("clones HTTP transport when provided", func(t *testing.T) {
16+
proxyURL, err := url.Parse("http://localhost:8080")
17+
require.NoError(t, err)
18+
19+
transport := &http.Transport{
20+
Proxy: http.ProxyURL(proxyURL),
21+
TLSClientConfig: &tls.Config{MinVersion: tls.VersionTLS12},
22+
}
23+
24+
cfg := &config.Config{
25+
HTTPTransport: transport,
26+
}
27+
28+
dialer := newLogStreamDialer(cfg)
29+
require.NotNil(t, dialer)
30+
31+
req := &http.Request{URL: &url.URL{Scheme: "https", Host: "example.com"}}
32+
actualProxy, err := dialer.Proxy(req)
33+
require.NoError(t, err)
34+
assert.Equal(t, proxyURL.String(), actualProxy.String())
35+
36+
require.NotNil(t, dialer.TLSClientConfig)
37+
assert.NotSame(t, transport.TLSClientConfig, dialer.TLSClientConfig, "TLS config should be cloned")
38+
assert.Equal(t, transport.TLSClientConfig.MinVersion, dialer.TLSClientConfig.MinVersion)
39+
})
40+
41+
t.Run("honors insecure skip verify when no transport is supplied", func(t *testing.T) {
42+
cfg := &config.Config{
43+
InsecureSkipVerify: true,
44+
}
45+
dialer := newLogStreamDialer(cfg)
46+
require.NotNil(t, dialer)
47+
require.NotNil(t, dialer.TLSClientConfig, "expected TLS config when insecure skip verify is set")
48+
assert.True(t, dialer.TLSClientConfig.InsecureSkipVerify)
49+
})
50+
}
51+
52+
func TestBuildLogsURLConvertsSchemes(t *testing.T) {
53+
url, err := buildLogsURL("https://example.com/foo")
54+
require.NoError(t, err)
55+
assert.Equal(t, "wss://example.com/foo/logz/stream", url)
56+
57+
url, err = buildLogsURL("http://example.com/foo")
58+
require.NoError(t, err)
59+
assert.Equal(t, "ws://example.com/foo/logz/stream", url)
60+
}
61+
62+
func TestBuildLogsURLRejectsUnknownScheme(t *testing.T) {
63+
_, err := buildLogsURL("ftp://example.com/foo")
64+
require.Error(t, err)
65+
}
66+
67+
func TestNormalizeOrigin(t *testing.T) {
68+
assert.Equal(t, "https://example.com", normalizeOrigin("https://example.com/foo"))
69+
assert.Equal(t, "http://example.com", normalizeOrigin("ws://example.com/foo"))
70+
assert.Equal(t, "https://example.com", normalizeOrigin("wss://example.com/foo"))
71+
assert.Equal(t, "", normalizeOrigin("://invalid"))
72+
}
73+
74+
func TestBuildSourceFilter(t *testing.T) {
75+
filters, err := buildSourceFilter([]string{"app", "system", ""})
76+
require.NoError(t, err)
77+
assert.Equal(t, map[string]struct{}{"APP": {}, "SYSTEM": {}}, filters)
78+
79+
filters, err = buildSourceFilter(nil)
80+
require.NoError(t, err)
81+
assert.Nil(t, filters)
82+
83+
_, err = buildSourceFilter([]string{"foo"})
84+
require.Error(t, err)
85+
}

cmd/workspace/apps/overrides.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ func listDeploymentsOverride(listDeploymentsCmd *cobra.Command, listDeploymentsR
2323
}
2424

2525
func init() {
26+
cmdOverrides = append(cmdOverrides, func(cmd *cobra.Command) {
27+
cmd.AddCommand(newLogsCommand())
28+
})
2629
listOverrides = append(listOverrides, listOverride)
2730
listDeploymentsOverrides = append(listDeploymentsOverrides, listDeploymentsOverride)
2831
}

0 commit comments

Comments
 (0)