@@ -14,6 +14,7 @@ import (
1414 "testing"
1515 "time"
1616
17+ "github.com/databricks/cli/libs/flags"
1718 "github.com/fatih/color"
1819 "github.com/gorilla/websocket"
1920 "github.com/stretchr/testify/assert"
@@ -44,7 +45,7 @@ func TestLogStreamerTailBufferFlushes(t *testing.T) {
4445 follow : false ,
4546 prefetch : 25 * time .Millisecond ,
4647 writer : buf ,
47- formatter : newLogFormatter (false ),
48+ formatter : newLogFormatter (false , flags . OutputText ),
4849 }
4950
5051 require .NoError (t , streamer .Run (context .Background ()))
@@ -77,7 +78,7 @@ func TestLogStreamerTailFlushErrorPropagates(t *testing.T) {
7778 follow : false ,
7879 prefetch : 0 ,
7980 writer : & failWriter {err : writerErr },
80- formatter : newLogFormatter (false ),
81+ formatter : newLogFormatter (false , flags . OutputText ),
8182 }
8283
8384 err := streamer .Run (context .Background ())
@@ -102,7 +103,7 @@ func TestLogStreamerTrimsCRLFInStructuredEntries(t *testing.T) {
102103 url : toWebSocketURL (server .URL ),
103104 token : "token" ,
104105 writer : buf ,
105- formatter : newLogFormatter (false ),
106+ formatter : newLogFormatter (false , flags . OutputText ),
106107 }
107108
108109 require .NoError (t , streamer .Run (context .Background ()))
@@ -125,7 +126,7 @@ func TestLogStreamerDialErrorIncludesResponseBody(t *testing.T) {
125126 url : toWebSocketURL (server .URL ),
126127 token : "test" ,
127128 writer : & bytes.Buffer {},
128- formatter : newLogFormatter (false ),
129+ formatter : newLogFormatter (false , flags . OutputText ),
129130 }
130131
131132 err := streamer .Run (context .Background ())
@@ -152,7 +153,7 @@ func TestLogStreamerRetriesOnDialFailure(t *testing.T) {
152153 follow : true ,
153154 prefetch : 0 ,
154155 writer : buf ,
155- formatter : newLogFormatter (false ),
156+ formatter : newLogFormatter (false , flags . OutputText ),
156157 }
157158
158159 ctx , cancel := context .WithTimeout (context .Background (), 300 * time .Millisecond )
@@ -182,7 +183,7 @@ func TestLogStreamerSendsSearchTerm(t *testing.T) {
182183 token : "test" ,
183184 search : "ERROR" ,
184185 writer : buf ,
185- formatter : newLogFormatter (false ),
186+ formatter : newLogFormatter (false , flags . OutputText ),
186187 }
187188
188189 require .NoError (t , streamer .Run (context .Background ()))
@@ -196,7 +197,7 @@ func TestLogStreamerFiltersSources(t *testing.T) {
196197 defer conn .Close ()
197198 _ , _ , _ = conn .ReadMessage ()
198199 require .NoError (t , sendEntry (conn , 1 , "app" ))
199- require .NoError (t , conn .WriteMessage (websocket .TextMessage , mustJSON (wsEntry {Source : "SYSTEM" , Timestamp : 2 , Message : "sys" })))
200+ require .NoError (t , conn .WriteMessage (websocket .TextMessage , mustJSON (t , wsEntry {Source : "SYSTEM" , Timestamp : 2 , Message : "sys" })))
200201 _ = conn .WriteControl (websocket .CloseMessage , websocket .FormatCloseMessage (websocket .CloseNormalClosure , "" ), time .Now ().Add (time .Second ))
201202 })
202203 defer server .Close ()
@@ -210,7 +211,7 @@ func TestLogStreamerFiltersSources(t *testing.T) {
210211 token : "test" ,
211212 sources : sources ,
212213 writer : buf ,
213- formatter : newLogFormatter (false ),
214+ formatter : newLogFormatter (false , flags . OutputText ),
214215 }
215216
216217 require .NoError (t , streamer .Run (context .Background ()))
@@ -226,22 +227,58 @@ func TestFormatLogEntryColorizesWhenEnabled(t *testing.T) {
226227
227228 entry := & wsEntry {Source : "app" , Timestamp : 1 , Message : "hello\n " }
228229
229- colorFormatter := newLogFormatter (true )
230+ colorFormatter := newLogFormatter (true , flags . OutputText )
230231 colored := colorFormatter .FormatEntry (entry )
231232 assert .Contains (t , colored , "\x1b [" )
232233 assert .Contains (t , colored , fmt .Sprintf ("[%s]" , color .HiBlueString ("APP" )))
233234
234- plainFormatter := newLogFormatter (false )
235+ plainFormatter := newLogFormatter (false , flags . OutputText )
235236 plain := plainFormatter .FormatEntry (entry )
236237 assert .NotContains (t , plain , "\x1b [" )
237238 assert .Contains (t , plain , "[APP]" )
238239}
239240
240- func mustJSON (entry wsEntry ) []byte {
241- raw , err := json .Marshal (entry )
242- if err != nil {
243- panic (err )
241+ func TestLogStreamerOutputsNDJSON (t * testing.T ) {
242+ t .Parallel ()
243+
244+ server := newTestLogServer (t , func (id int , conn * websocket.Conn ) {
245+ defer conn .Close ()
246+ _ , _ , _ = conn .ReadMessage ()
247+ require .NoError (t , sendEntry (conn , 1.0 , "first message" ))
248+ require .NoError (t , sendEntry (conn , 2.0 , "second message" ))
249+ _ = conn .WriteControl (websocket .CloseMessage , websocket .FormatCloseMessage (websocket .CloseNormalClosure , "" ), time .Now ().Add (time .Second ))
250+ })
251+ defer server .Close ()
252+
253+ buf := & bytes.Buffer {}
254+ streamer := & logStreamer {
255+ dialer : & websocket.Dialer {},
256+ url : toWebSocketURL (server .URL ),
257+ token : "token" ,
258+ writer : buf ,
259+ formatter : newLogFormatter (false , flags .OutputJSON ),
244260 }
261+
262+ require .NoError (t , streamer .Run (context .Background ()))
263+
264+ lines := strings .Split (strings .TrimSpace (buf .String ()), "\n " )
265+ require .Len (t , lines , 2 , "expected two NDJSON lines" )
266+
267+ var entry1 wsEntry
268+ require .NoError (t , json .Unmarshal ([]byte (lines [0 ]), & entry1 ))
269+ assert .Equal (t , "APP" , entry1 .Source )
270+ assert .Equal (t , "first message" , entry1 .Message )
271+
272+ var entry2 wsEntry
273+ require .NoError (t , json .Unmarshal ([]byte (lines [1 ]), & entry2 ))
274+ assert .Equal (t , "APP" , entry2 .Source )
275+ assert .Equal (t , "second message" , entry2 .Message )
276+ }
277+
278+ func mustJSON (t * testing.T , entry wsEntry ) []byte {
279+ raw , err := json .Marshal (entry )
280+ require .NoError (t , err )
281+
245282 return raw
246283}
247284
@@ -267,7 +304,7 @@ func TestTailWithoutPrefetchRespectsTailSize(t *testing.T) {
267304 tail : 2 ,
268305 prefetch : 0 ,
269306 writer : buf ,
270- formatter : newLogFormatter (false ),
307+ formatter : newLogFormatter (false , flags . OutputText ),
271308 }
272309
273310 require .NoError (t , streamer .Run (context .Background ()))
@@ -291,7 +328,7 @@ func TestCloseErrorPropagatesWhenAbnormal(t *testing.T) {
291328 dialer : & websocket.Dialer {},
292329 url : toWebSocketURL (server .URL ),
293330 token : "token" ,
294- formatter : newLogFormatter (false ),
331+ formatter : newLogFormatter (false , flags . OutputText ),
295332 }
296333
297334 err := streamer .Run (context .Background ())
@@ -381,7 +418,7 @@ func TestLogStreamerTailFlushesWithoutFollow(t *testing.T) {
381418 follow : false ,
382419 prefetch : 50 * time .Millisecond ,
383420 writer : writer ,
384- formatter : newLogFormatter (false ),
421+ formatter : newLogFormatter (false , flags . OutputText ),
385422 }
386423
387424 ctx , cancel := context .WithTimeout (context .Background (), time .Second )
@@ -422,7 +459,7 @@ func TestLogStreamerFollowTailWithoutPrefetchEmitsRequestedLines(t *testing.T) {
422459 follow : true ,
423460 prefetch : 0 ,
424461 writer : writer ,
425- formatter : newLogFormatter (false ),
462+ formatter : newLogFormatter (false , flags . OutputText ),
426463 }
427464
428465 ctx , cancel := context .WithCancel (context .Background ())
@@ -480,7 +517,7 @@ func TestLogStreamerFollowTailDoesNotReplayAfterReconnect(t *testing.T) {
480517 follow : true ,
481518 prefetch : 0 ,
482519 writer : writer ,
483- formatter : newLogFormatter (false ),
520+ formatter : newLogFormatter (false , flags . OutputText ),
484521 }
485522
486523 ctx , cancel := context .WithCancel (context .Background ())
@@ -561,7 +598,7 @@ func TestLogStreamerRefreshesTokenAfterAuthClose(t *testing.T) {
561598 tokenProvider : tokenProvider ,
562599 follow : true ,
563600 writer : buf ,
564- formatter : newLogFormatter (false ),
601+ formatter : newLogFormatter (false , flags . OutputText ),
565602 }
566603
567604 ctx , cancel := context .WithCancel (context .Background ())
@@ -600,7 +637,7 @@ func TestLogStreamerEmitsPlainTextFrames(t *testing.T) {
600637 url : toWebSocketURL (server .URL ),
601638 token : "token" ,
602639 writer : buf ,
603- formatter : newLogFormatter (false ),
640+ formatter : newLogFormatter (false , flags . OutputText ),
604641 }
605642
606643 require .NoError (t , streamer .Run (context .Background ()))
@@ -625,7 +662,7 @@ func TestLogStreamerTimeoutStopsQuietFollowStream(t *testing.T) {
625662 url : toWebSocketURL (server .URL ),
626663 token : "token" ,
627664 follow : true ,
628- formatter : newLogFormatter (false ),
665+ formatter : newLogFormatter (false , flags . OutputText ),
629666 }
630667
631668 ctx , cancel := context .WithTimeout (context .Background (), 100 * time .Millisecond )
@@ -718,7 +755,7 @@ func TestAppStatusCheckerStopsFollowing(t *testing.T) {
718755 follow : true ,
719756 writer : buf ,
720757 appStatusChecker : appStatusChecker ,
721- formatter : newLogFormatter (false ),
758+ formatter : newLogFormatter (false , flags . OutputText ),
722759 }
723760
724761 err := streamer .Run (context .Background ())
0 commit comments