@@ -21,6 +21,7 @@ import (
21
21
"errors"
22
22
"fmt"
23
23
"io"
24
+ "regexp"
24
25
"sync"
25
26
"time"
26
27
@@ -107,6 +108,7 @@ type LogsOptions struct {
107
108
ContainerNameSpecified bool
108
109
Selector string
109
110
MaxFollowConcurrency int
111
+ Prefix bool
110
112
111
113
Object runtime.Object
112
114
GetPodTimeout time.Duration
@@ -116,6 +118,8 @@ type LogsOptions struct {
116
118
genericclioptions.IOStreams
117
119
118
120
TailSpecified bool
121
+
122
+ containerNameFromRefSpecRegexp * regexp.Regexp
119
123
}
120
124
121
125
func NewLogsOptions (streams genericclioptions.IOStreams , allContainers bool ) * LogsOptions {
@@ -124,6 +128,8 @@ func NewLogsOptions(streams genericclioptions.IOStreams, allContainers bool) *Lo
124
128
AllContainers : allContainers ,
125
129
Tail : - 1 ,
126
130
MaxFollowConcurrency : 5 ,
131
+
132
+ containerNameFromRefSpecRegexp : regexp .MustCompile (`spec\.(?:initContainers|containers|ephemeralContainers){(.+)}` ),
127
133
}
128
134
}
129
135
@@ -156,6 +162,7 @@ func NewCmdLogs(f cmdutil.Factory, streams genericclioptions.IOStreams) *cobra.C
156
162
cmdutil .AddPodRunningTimeoutFlag (cmd , defaultPodLogsTimeout )
157
163
cmd .Flags ().StringVarP (& o .Selector , "selector" , "l" , o .Selector , "Selector (label query) to filter on." )
158
164
cmd .Flags ().IntVar (& o .MaxFollowConcurrency , "max-log-requests" , o .MaxFollowConcurrency , "Specify maximum number of concurrent logs to follow when using by a selector. Defaults to 5." )
165
+ cmd .Flags ().BoolVar (& o .Prefix , "prefix" , o .Prefix , "Prefix each log line with the log source (pod name and container name)" )
159
166
return cmd
160
167
}
161
168
@@ -314,14 +321,15 @@ func (o LogsOptions) RunLogs() error {
314
321
return o .sequentialConsumeRequest (requests )
315
322
}
316
323
317
- func (o LogsOptions ) parallelConsumeRequest (requests [ ]rest.ResponseWrapper ) error {
324
+ func (o LogsOptions ) parallelConsumeRequest (requests map [corev1. ObjectReference ]rest.ResponseWrapper ) error {
318
325
reader , writer := io .Pipe ()
319
326
wg := & sync.WaitGroup {}
320
327
wg .Add (len (requests ))
321
- for _ , request := range requests {
322
- go func (request rest.ResponseWrapper ) {
328
+ for objRef , request := range requests {
329
+ go func (objRef corev1. ObjectReference , request rest.ResponseWrapper ) {
323
330
defer wg .Done ()
324
- if err := o .ConsumeRequestFn (request , writer ); err != nil {
331
+ out := o .addPrefixIfNeeded (objRef , writer )
332
+ if err := o .ConsumeRequestFn (request , out ); err != nil {
325
333
if ! o .IgnoreLogErrors {
326
334
writer .CloseWithError (err )
327
335
@@ -332,7 +340,7 @@ func (o LogsOptions) parallelConsumeRequest(requests []rest.ResponseWrapper) err
332
340
fmt .Fprintf (writer , "error: %v\n " , err )
333
341
}
334
342
335
- }(request )
343
+ }(objRef , request )
336
344
}
337
345
338
346
go func () {
@@ -344,16 +352,38 @@ func (o LogsOptions) parallelConsumeRequest(requests []rest.ResponseWrapper) err
344
352
return err
345
353
}
346
354
347
- func (o LogsOptions ) sequentialConsumeRequest (requests []rest.ResponseWrapper ) error {
348
- for _ , request := range requests {
349
- if err := o .ConsumeRequestFn (request , o .Out ); err != nil {
355
+ func (o LogsOptions ) sequentialConsumeRequest (requests map [corev1.ObjectReference ]rest.ResponseWrapper ) error {
356
+ for objRef , request := range requests {
357
+ out := o .addPrefixIfNeeded (objRef , o .Out )
358
+ if err := o .ConsumeRequestFn (request , out ); err != nil {
350
359
return err
351
360
}
352
361
}
353
362
354
363
return nil
355
364
}
356
365
366
+ func (o LogsOptions ) addPrefixIfNeeded (ref corev1.ObjectReference , writer io.Writer ) io.Writer {
367
+ if ! o .Prefix || ref .FieldPath == "" || ref .Name == "" {
368
+ return writer
369
+ }
370
+
371
+ // We rely on ref.FieldPath to contain a reference to a container
372
+ // including a container name (not an index) so we can get a container name
373
+ // without making an extra API request.
374
+ var containerName string
375
+ containerNameMatches := o .containerNameFromRefSpecRegexp .FindStringSubmatch (ref .FieldPath )
376
+ if len (containerNameMatches ) == 2 {
377
+ containerName = containerNameMatches [1 ]
378
+ }
379
+
380
+ prefix := fmt .Sprintf ("[pod/%s/%s] " , ref .Name , containerName )
381
+ return & prefixingWriter {
382
+ prefix : []byte (prefix ),
383
+ writer : writer ,
384
+ }
385
+ }
386
+
357
387
// DefaultConsumeRequest reads the data from request and writes into
358
388
// the out writer. It buffers data from requests until the newline or io.EOF
359
389
// occurs in the data, so it doesn't interleave logs sub-line
@@ -384,3 +414,25 @@ func DefaultConsumeRequest(request rest.ResponseWrapper, out io.Writer) error {
384
414
}
385
415
}
386
416
}
417
+
418
+ type prefixingWriter struct {
419
+ prefix []byte
420
+ writer io.Writer
421
+ }
422
+
423
+ func (pw * prefixingWriter ) Write (p []byte ) (int , error ) {
424
+ if len (p ) == 0 {
425
+ return 0 , nil
426
+ }
427
+
428
+ // Perform an "atomic" write of a prefix and p to make sure that it doesn't interleave
429
+ // sub-line when used concurrently with io.PipeWrite.
430
+ n , err := pw .writer .Write (append (pw .prefix , p ... ))
431
+ if n > len (p ) {
432
+ // To comply with the io.Writer interface requirements we must
433
+ // return a number of bytes written from p (0 <= n <= len(p)),
434
+ // so we are ignoring the length of the prefix here.
435
+ return len (p ), err
436
+ }
437
+ return n , err
438
+ }
0 commit comments