@@ -17,38 +17,32 @@ limitations under the License.
17
17
package problemclient
18
18
19
19
import (
20
+ "encoding/json"
20
21
"fmt"
21
22
"os"
22
- "time"
23
23
24
24
"k8s.io/kubernetes/pkg/api"
25
- "k8s.io/kubernetes/pkg/api/errors"
26
25
"k8s.io/kubernetes/pkg/api/unversioned"
27
- clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
28
- unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
29
26
"k8s.io/kubernetes/pkg/client/record"
30
27
"k8s.io/kubernetes/pkg/client/restclient"
28
+ client "k8s.io/kubernetes/pkg/client/unversioned"
31
29
"k8s.io/kubernetes/pkg/types"
32
30
"k8s.io/kubernetes/pkg/util"
33
-
34
- "github.com/golang/glog"
35
31
)
36
32
37
33
// Client is the interface of problem client
38
34
type Client interface {
39
35
// GetConditions get all specifiec conditions of current node.
40
36
GetConditions (conditionTypes []api.NodeConditionType ) ([]* api.NodeCondition , error )
41
37
// SetConditions set or update conditions of current node.
42
- // Notice that conditions with status api.ConditionFalse will be removed from the condition list, so that
43
- // we'll only have useful conditions in the condition list.
44
- SetConditions (conditions []api.NodeCondition , timeout time.Duration ) error
38
+ SetConditions (conditions []api.NodeCondition ) error
45
39
// Eventf reports the event.
46
40
Eventf (eventType string , source , reason , messageFmt string , args ... interface {})
47
41
}
48
42
49
43
type nodeProblemClient struct {
50
44
nodeName string
51
- client clientset. Interface
45
+ client * client. Client
52
46
clock util.Clock
53
47
recorders map [string ]record.EventRecorder
54
48
nodeRef * api.ObjectReference
@@ -62,10 +56,7 @@ func NewClientOrDie() Client {
62
56
panic (err )
63
57
}
64
58
// TODO(random-liu): Set QPS Limit
65
- c .client , err = clientset .NewForConfig (cfg )
66
- if err != nil {
67
- panic (err )
68
- }
59
+ c .client = client .NewOrDie (cfg )
69
60
// TODO(random-liu): Get node name from cloud provider
70
61
c .nodeName , err = os .Hostname ()
71
62
if err != nil {
@@ -77,7 +68,7 @@ func NewClientOrDie() Client {
77
68
}
78
69
79
70
func (c * nodeProblemClient ) GetConditions (conditionTypes []api.NodeConditionType ) ([]* api.NodeCondition , error ) {
80
- node , err := c .client .Core (). Nodes ().Get (c .nodeName )
71
+ node , err := c .client .Nodes ().Get (c .nodeName )
81
72
if err != nil {
82
73
return nil , err
83
74
}
@@ -92,21 +83,16 @@ func (c *nodeProblemClient) GetConditions(conditionTypes []api.NodeConditionType
92
83
return conditions , nil
93
84
}
94
85
95
- func (c * nodeProblemClient ) SetConditions (newConditions []api.NodeCondition , timeout time. Duration ) error {
86
+ func (c * nodeProblemClient ) SetConditions (newConditions []api.NodeCondition ) error {
96
87
for i := range newConditions {
97
88
// Each time we update the conditions, we update the heart beat time
98
89
newConditions [i ].LastHeartbeatTime = unversioned .NewTime (c .clock .Now ())
99
90
}
100
- return c .updateNodeCondition (func (conditions []api.NodeCondition ) []api.NodeCondition {
101
- for _ , condition := range newConditions {
102
- if condition .Status == api .ConditionFalse {
103
- conditions = unsetCondition (condition .Type , conditions )
104
- } else {
105
- conditions = setCondition (condition , conditions )
106
- }
107
- }
108
- return conditions
109
- }, timeout )
91
+ patch , err := generatePatch (newConditions )
92
+ if err != nil {
93
+ return nil
94
+ }
95
+ return c .client .Patch (api .StrategicMergePatchType ).Resource ("nodes" ).Name (c .nodeName ).SubResource ("status" ).Body (patch ).Do ().Error ()
110
96
}
111
97
112
98
func (c * nodeProblemClient ) Eventf (eventType , source , reason , messageFmt string , args ... interface {}) {
@@ -119,60 +105,20 @@ func (c *nodeProblemClient) Eventf(eventType, source, reason, messageFmt string,
119
105
recorder .Eventf (c .nodeRef , eventType , reason , messageFmt , args ... )
120
106
}
121
107
122
- func unsetCondition (conditionType api.NodeConditionType , conditions []api.NodeCondition ) []api.NodeCondition {
123
- result := []api.NodeCondition {}
124
- for _ , condition := range conditions {
125
- if condition .Type != conditionType {
126
- result = append (result , condition )
127
- }
128
- }
129
- return result
130
- }
131
-
132
- func setCondition (condition api.NodeCondition , conditions []api.NodeCondition ) []api.NodeCondition {
133
- found := false
134
- for i := range conditions {
135
- if conditions [i ].Type == condition .Type {
136
- target := & conditions [i ]
137
- * target = condition
138
- found = true
139
- break
140
- }
141
- }
142
- if ! found {
143
- conditions = append (conditions , condition )
144
- }
145
- return conditions
146
- }
147
-
148
- func (c * nodeProblemClient ) updateNodeCondition (updateFunc func ([]api.NodeCondition ) []api.NodeCondition , timeout time.Duration ) error {
149
- updateTime := c .clock .Now ()
150
- for {
151
- node , err := c .client .Core ().Nodes ().Get (c .nodeName )
152
- if err != nil {
153
- return err
154
- }
155
- node .Status .Conditions = updateFunc (node .Status .Conditions )
156
- _ , err = c .client .Core ().Nodes ().UpdateStatus (node )
157
- if err != nil {
158
- if errors .IsConflict (err ) {
159
- glog .Warningf ("Conflicting update node status for node %q, will retry soon: %v" , c .nodeName , err )
160
- if c .clock .Now ().Sub (updateTime ) >= timeout {
161
- return timeoutError {node : c .nodeName , timeout : timeout }
162
- }
163
- continue
164
- }
165
- return err
166
- }
167
- return nil
108
+ // generatePatch generates condition patch
109
+ func generatePatch (conditions []api.NodeCondition ) ([]byte , error ) {
110
+ raw , err := json .Marshal (& conditions )
111
+ if err != nil {
112
+ return nil , err
168
113
}
114
+ return []byte (fmt .Sprintf (`{"status":{"conditions":%s}}` , raw )), nil
169
115
}
170
116
171
117
// getEventRecorder generates a recorder for specific node name and source.
172
- func getEventRecorder (c clientset. Interface , nodeName , source string ) record.EventRecorder {
118
+ func getEventRecorder (c * client. Client , nodeName , source string ) record.EventRecorder {
173
119
eventBroadcaster := record .NewBroadcaster ()
174
120
recorder := eventBroadcaster .NewRecorder (api.EventSource {Component : source , Host : nodeName })
175
- eventBroadcaster .StartRecordingToSink (& unversionedcore. EventSinkImpl { Interface : c . Core (). Events ("" )} )
121
+ eventBroadcaster .StartRecordingToSink (c . Events ("" ))
176
122
return recorder
177
123
}
178
124
@@ -184,19 +130,3 @@ func getNodeRef(nodeName string) *api.ObjectReference {
184
130
Namespace : "" ,
185
131
}
186
132
}
187
-
188
- // timeoutError is the error returned by problem client when condition update timeout.
189
- type timeoutError struct {
190
- node string
191
- timeout time.Duration
192
- }
193
-
194
- func (e timeoutError ) Error () string {
195
- return fmt .Sprintf ("update condition for node %q timeout %s" , e .node , e .timeout )
196
- }
197
-
198
- // IsErrTimeout checks whether a given error is timeout error.
199
- func IsErrTimeout (err error ) bool {
200
- _ , ok := err .(timeoutError )
201
- return ok
202
- }
0 commit comments