55 "encoding/json"
66 "fmt"
77 "io"
8+ "time"
89
910 imageHelper "github.com/dyrector-io/dyrectorio/golang/internal/helper/image"
1011
@@ -16,6 +17,16 @@ import (
1617 "github.com/dyrector-io/dyrectorio/protobuf/go/common"
1718)
1819
20+ type Level int32
21+
22+ const (
23+ Info Level = 1
24+ Warning Level = 2
25+ Error Level = 3
26+ )
27+
28+ const ProgressReportThrottleMillis = 500
29+
1930type status struct {
2031 Current int64
2132 Total int64
@@ -29,7 +40,7 @@ type DeploymentLogger struct {
2940 ctx context.Context
3041 appConfig * config.CommonConfiguration
3142
32- io. StringWriter
43+ LogWriter
3344}
3445
3546func NewDeploymentLogger (ctx context.Context , deploymentID * string ,
@@ -56,15 +67,17 @@ func (dog *DeploymentLogger) SetRequestID(requestID string) {
5667}
5768
5869// Writes to all available streams: std.out and gRPC streams
59- func (dog * DeploymentLogger ) Write (messages ... string ) {
70+ func (dog * DeploymentLogger ) Write (level Level , messages ... string ) {
6071 for i := range messages {
6172 log .Info ().Str ("deployment" , dog .deploymentID ).Msg (messages [i ])
6273 dog .logs = append (dog .logs , messages ... )
6374 }
6475
6576 if dog .stream != nil {
77+ logLevel := common .DeploymentMessageLevel (level )
6678 err := dog .stream .Send (& common.DeploymentStatusMessage {
67- Log : messages ,
79+ Log : messages ,
80+ LogLevel : & logLevel ,
6881 })
6982 if err != nil {
7083 log .Error ().Err (err ).Stack ().Str ("deployment" , dog .deploymentID ).Msg ("Write error" )
@@ -79,8 +92,13 @@ func (dog *DeploymentLogger) WriteDeploymentStatus(status common.DeploymentStatu
7992 }
8093
8194 if dog .stream != nil {
95+ logLevel := common .DeploymentMessageLevel_INFO
96+ if status == common .DeploymentStatus_FAILED {
97+ logLevel = common .DeploymentMessageLevel_ERROR
98+ }
8299 err := dog .stream .Send (& common.DeploymentStatusMessage {
83- Log : messages ,
100+ Log : messages ,
101+ LogLevel : & logLevel ,
84102 Data : & common.DeploymentStatusMessage_DeploymentStatus {
85103 DeploymentStatus : status ,
86104 },
@@ -91,7 +109,12 @@ func (dog *DeploymentLogger) WriteDeploymentStatus(status common.DeploymentStatu
91109 }
92110}
93111
94- func (dog * DeploymentLogger ) WriteContainerState (containerState common.ContainerState , reason string , messages ... string ) {
112+ func (dog * DeploymentLogger ) WriteContainerState (
113+ containerState common.ContainerState ,
114+ reason string ,
115+ level Level ,
116+ messages ... string ,
117+ ) {
95118 prefix := fmt .Sprintf ("%s - %s" , dog .requestID , containerState )
96119
97120 for i := range messages {
@@ -108,9 +131,12 @@ func (dog *DeploymentLogger) WriteContainerState(containerState common.Container
108131 },
109132 }
110133
134+ logLevel := common .DeploymentMessageLevel (level )
135+
111136 err := dog .stream .Send (& common.DeploymentStatusMessage {
112- Log : messages ,
113- Data : instance ,
137+ Log : messages ,
138+ LogLevel : & logLevel ,
139+ Data : instance ,
114140 })
115141 if err != nil {
116142 log .Error ().
@@ -123,31 +149,74 @@ func (dog *DeploymentLogger) WriteContainerState(containerState common.Container
123149 }
124150}
125151
152+ func (dog * DeploymentLogger ) WriteContainerProgress (status string , progress float32 ) {
153+ if dog .stream != nil {
154+ progress := & common.DeploymentStatusMessage_ContainerProgress {
155+ ContainerProgress : & common.DeployContainerProgress {
156+ InstanceId : dog .requestID ,
157+ Status : status ,
158+ Progress : progress ,
159+ },
160+ }
161+
162+ err := dog .stream .Send (& common.DeploymentStatusMessage {
163+ Data : progress ,
164+ })
165+ if err != nil {
166+ log .Error ().
167+ Err (err ).
168+ Stack ().
169+ Str ("deployment" , dog .deploymentID ).
170+ Str ("prefix" , dog .requestID ).
171+ Msg ("Write container progress error" )
172+ }
173+ }
174+ }
175+
126176func (dog * DeploymentLogger ) GetLogs () []string {
127177 return dog .logs
128178}
129179
130- func (dog * DeploymentLogger ) WriteString (s string ) (int , error ) {
131- dog .Write (s )
180+ func (dog * DeploymentLogger ) WriteInfo (messages ... string ) {
181+ dog .Write (Info , messages ... )
182+ }
132183
133- return len (s ), nil
184+ func (dog * DeploymentLogger ) WriteError (messages ... string ) {
185+ dog .Write (Error , messages ... )
186+ }
187+
188+ func reportDockerPullProgress (dog * DeploymentLogger , stat map [string ]* status ) {
189+ var sum float32
190+ for _ , status := range stat {
191+ if status .Total == 0 {
192+ continue
193+ }
194+ sum += (float32 (status .Current ) / float32 (status .Total ))
195+ }
196+
197+ dog .WriteContainerProgress ("Pulling" , sum / float32 (len (stat )))
134198}
135199
136200func (dog * DeploymentLogger ) WriteDockerPull (header string , respIn io.ReadCloser ) error {
137201 if respIn == nil {
138- dog .Write (fmt .Sprintf ("%s ✓ up-to-date" , header ))
202+ dog .WriteInfo (fmt .Sprintf ("%s ✓ up-to-date" , header ))
139203 return nil
140204 }
141205
142206 dec := json .NewDecoder (respIn )
143207 stat := map [string ]* status {}
144208
209+ dog .WriteContainerProgress ("Pulling" , 0 )
210+
211+ lastReportTime := time .Now ()
212+
145213 var pulled , pulling , waiting int
146214 for i := 0 ; ; i ++ {
147215 var jm jsonmessage.JSONMessage
148216 if err := dec .Decode (& jm ); err != nil {
149217 if err == io .EOF {
150- dog .Write (fmt .Sprintf ("%s ✓ pull complete " , header ))
218+ dog .WriteInfo (fmt .Sprintf ("%s ✓ pull complete " , header ))
219+ dog .WriteContainerProgress ("Pull complete" , 1 )
151220 return nil
152221 }
153222 }
@@ -158,19 +227,28 @@ func (dog *DeploymentLogger) WriteDockerPull(header string, respIn io.ReadCloser
158227 }
159228 switch {
160229 case phase == imageHelper .LayerProgressStatusMatching :
161- dog .Write (fmt .Sprintf ("%s ✓ up-to-date" , header ))
230+ dog .WriteInfo (fmt .Sprintf ("%s ✓ up-to-date" , header ))
162231 return nil
163232 case phase == imageHelper .LayerProgressStatusStarting ||
164233 phase == imageHelper .LayerProgressStatusWaiting :
165234 stat [jm .ID ].Total = jm .Progress .Total
166235 waiting ++
167- dog .Write (fmt .Sprintf ("%v layers: %d/%d, %s %s" , header , pulled , len (stat ), jm .ID , jm .Status ))
236+ dog .WriteInfo (fmt .Sprintf ("%v layers: %d/%d, %s %s" , header , pulled , len (stat ), jm .ID , jm .Status ))
168237 case phase == imageHelper .LayerProgressStatusDownloading :
169238 stat [jm .ID ].Current = jm .Progress .Current
239+ if stat [jm .ID ].Total == 0 {
240+ stat [jm .ID ].Total = jm .Progress .Total
241+ }
170242 pulling ++
171243 case phase == imageHelper .LayerProgressStatusComplete || phase == imageHelper .LayerProgressStatusExists :
172244 pulled ++
173- dog .Write (fmt .Sprintf ("%v layers: %d/%d, %s %s" , header , pulled , len (stat ), jm .ID , jm .Status ))
245+ dog .WriteInfo (fmt .Sprintf ("%v layers: %d/%d, %s %s" , header , pulled , len (stat ), jm .ID , jm .Status ))
246+ }
247+
248+ if time .Since (lastReportTime ).Milliseconds () >= ProgressReportThrottleMillis {
249+ lastReportTime = time .Now ()
250+
251+ reportDockerPullProgress (dog , stat )
174252 }
175253 }
176254}
0 commit comments