55 "encoding/json"
66 "fmt"
77 "io"
8+ "sync"
89 "time"
910
1011 imageHelper "github.com/dyrector-io/dyrectorio/golang/internal/helper/image"
@@ -40,6 +41,7 @@ type DeploymentLogger struct {
4041 deploymentID string
4142 requestID string
4243 logs []string
44+ streamMu sync.Mutex
4345}
4446
4547func NewDeploymentLogger (ctx context.Context , deploymentID * string ,
@@ -65,23 +67,34 @@ func (dog *DeploymentLogger) SetRequestID(requestID string) {
6567 dog .requestID = requestID
6668}
6769
70+ // send is the single point for all stream.Send calls.
71+ // It is thread-safe and self-disabling: after the first Send error the stream
72+ // is nilled so subsequent calls are silent no-ops.
73+ func (dog * DeploymentLogger ) send (msg * common.DeploymentStatusMessage ) {
74+ dog .streamMu .Lock ()
75+ defer dog .streamMu .Unlock ()
76+ if dog .stream == nil {
77+ log .Trace ().Msgf ("dog stream was already nil: %s" , msg )
78+ return
79+ }
80+ if err := dog .stream .Send (msg ); err != nil {
81+ log .Error ().Err (err ).Str ("deployment" , dog .deploymentID ).Msg ("Deployment stream send error" )
82+ dog .stream = nil
83+ }
84+ }
85+
6886// Writes to all available streams: std.out and gRPC streams
6987func (dog * DeploymentLogger ) Write (level Level , messages ... string ) {
7088 for i := range messages {
7189 log .Info ().Str ("deployment" , dog .deploymentID ).Msg (messages [i ])
7290 dog .logs = append (dog .logs , messages [i ])
7391 }
7492
75- if dog .stream != nil {
76- logLevel := common .DeploymentMessageLevel (level )
77- err := dog .stream .Send (& common.DeploymentStatusMessage {
78- Log : messages ,
79- LogLevel : & logLevel ,
80- })
81- if err != nil {
82- log .Error ().Err (err ).Stack ().Str ("deployment" , dog .deploymentID ).Msg ("Write error" )
83- }
84- }
93+ logLevel := common .DeploymentMessageLevel (level )
94+ dog .send (& common.DeploymentStatusMessage {
95+ Log : messages ,
96+ LogLevel : & logLevel ,
97+ })
8598}
8699
87100func (dog * DeploymentLogger ) WriteDeploymentStatus (status common.DeploymentStatus , messages ... string ) {
@@ -90,22 +103,17 @@ func (dog *DeploymentLogger) WriteDeploymentStatus(status common.DeploymentStatu
90103 dog .logs = append (dog .logs , messages [i ])
91104 }
92105
93- if dog .stream != nil {
94- logLevel := common .DeploymentMessageLevel_INFO
95- if status == common .DeploymentStatus_FAILED {
96- logLevel = common .DeploymentMessageLevel_ERROR
97- }
98- err := dog .stream .Send (& common.DeploymentStatusMessage {
99- Log : messages ,
100- LogLevel : & logLevel ,
101- Data : & common.DeploymentStatusMessage_DeploymentStatus {
102- DeploymentStatus : status ,
103- },
104- })
105- if err != nil {
106- log .Error ().Err (err ).Stack ().Str ("deployment" , dog .deploymentID ).Msg ("Write deployment status error" )
107- }
106+ logLevel := common .DeploymentMessageLevel_INFO
107+ if status == common .DeploymentStatus_FAILED {
108+ logLevel = common .DeploymentMessageLevel_ERROR
108109 }
110+ dog .send (& common.DeploymentStatusMessage {
111+ Log : messages ,
112+ LogLevel : & logLevel ,
113+ Data : & common.DeploymentStatusMessage_DeploymentStatus {
114+ DeploymentStatus : status ,
115+ },
116+ })
109117}
110118
111119func (dog * DeploymentLogger ) WriteContainerState (
@@ -121,55 +129,30 @@ func (dog *DeploymentLogger) WriteContainerState(
121129 dog .logs = append (dog .logs , messages [i ])
122130 }
123131
124- if dog .stream != nil {
125- instance := & common.DeploymentStatusMessage_Instance {
132+ logLevel := common .DeploymentMessageLevel (level )
133+ dog .send (& common.DeploymentStatusMessage {
134+ Log : messages ,
135+ LogLevel : & logLevel ,
136+ Data : & common.DeploymentStatusMessage_Instance {
126137 Instance : & common.InstanceDeploymentItem {
127138 InstanceId : dog .requestID ,
128139 State : containerState ,
129140 Reason : reason ,
130141 },
131- }
132-
133- logLevel := common .DeploymentMessageLevel (level )
134-
135- err := dog .stream .Send (& common.DeploymentStatusMessage {
136- Log : messages ,
137- LogLevel : & logLevel ,
138- Data : instance ,
139- })
140- if err != nil {
141- log .Error ().
142- Err (err ).
143- Stack ().
144- Str ("deployment" , dog .deploymentID ).
145- Str ("prefix" , prefix ).
146- Msg ("Write container state error" )
147- }
148- }
142+ },
143+ })
149144}
150145
151146func (dog * DeploymentLogger ) WriteContainerProgress (status string , progress float32 ) {
152- if dog .stream != nil {
153- progress := & common.DeploymentStatusMessage_ContainerProgress {
147+ dog .send ( & common. DeploymentStatusMessage {
148+ Data : & common.DeploymentStatusMessage_ContainerProgress {
154149 ContainerProgress : & common.DeployContainerProgress {
155150 InstanceId : dog .requestID ,
156151 Status : status ,
157152 Progress : progress ,
158153 },
159- }
160-
161- err := dog .stream .Send (& common.DeploymentStatusMessage {
162- Data : progress ,
163- })
164- if err != nil {
165- log .Error ().
166- Err (err ).
167- Stack ().
168- Str ("deployment" , dog .deploymentID ).
169- Str ("prefix" , dog .requestID ).
170- Msg ("Write container progress error" )
171- }
172- }
154+ },
155+ })
173156}
174157
175158func (dog * DeploymentLogger ) GetLogs () []string {
0 commit comments