|
1 | 1 | package collect |
2 | 2 |
|
3 | 3 | import ( |
| 4 | + "bytes" |
| 5 | + "context" |
| 6 | + "encoding/json" |
| 7 | + "fmt" |
| 8 | + "os" |
| 9 | + "path/filepath" |
| 10 | + "time" |
| 11 | + |
| 12 | + "github.com/pkg/errors" |
4 | 13 | troubleshootv1beta2 "github.com/replicatedhq/troubleshoot/pkg/apis/troubleshoot/v1beta2" |
| 14 | + "github.com/replicatedhq/troubleshoot/pkg/constants" |
| 15 | + "golang.org/x/sync/errgroup" |
| 16 | + corev1 "k8s.io/api/core/v1" |
| 17 | + "k8s.io/apimachinery/pkg/runtime" |
| 18 | + "k8s.io/apiserver/pkg/storage/names" |
| 19 | + "k8s.io/client-go/kubernetes" |
| 20 | + "k8s.io/client-go/rest" |
5 | 21 | ) |
6 | 22 |
|
7 | 23 | type HostCollector interface { |
8 | 24 | Title() string |
9 | 25 | IsExcluded() (bool, error) |
10 | 26 | Collect(progressChan chan<- interface{}) (map[string][]byte, error) |
11 | | - RemoteCollect(progressChan chan<- interface{}) (map[string][]byte, error) // RemoteCollect is used to priviledge pods to collect data from different nodes |
| 27 | +} |
| 28 | + |
| 29 | +type RemoteCollectParams struct { |
| 30 | + ProgressChan chan<- interface{} |
| 31 | + HostCollector *troubleshootv1beta2.HostCollect |
| 32 | + BundlePath string |
| 33 | + ClientConfig *rest.Config // specify actual type |
| 34 | + Image string |
| 35 | + PullPolicy string // specify actual type if needed |
| 36 | + Timeout time.Duration // specify duration type if needed |
| 37 | + LabelSelector string |
| 38 | + NamePrefix string |
| 39 | + Namespace string |
| 40 | + Title string |
12 | 41 | } |
13 | 42 |
|
14 | 43 | func GetHostCollector(collector *troubleshootv1beta2.HostCollect, bundlePath string) (HostCollector, bool) { |
@@ -81,3 +110,131 @@ func hostCollectorTitleOrDefault(meta troubleshootv1beta2.HostCollectorMeta, def |
81 | 110 | } |
82 | 111 | return defaultTitle |
83 | 112 | } |
| 113 | + |
| 114 | +func RemoteHostCollect(ctx context.Context, params RemoteCollectParams) (map[string][]byte, error) { |
| 115 | + scheme := runtime.NewScheme() |
| 116 | + if err := corev1.AddToScheme(scheme); err != nil { |
| 117 | + return nil, errors.Wrap(err, "failed to add runtime scheme") |
| 118 | + } |
| 119 | + |
| 120 | + client, err := kubernetes.NewForConfig(params.ClientConfig) |
| 121 | + if err != nil { |
| 122 | + return nil, err |
| 123 | + } |
| 124 | + |
| 125 | + runner := &podRunner{ |
| 126 | + client: client, |
| 127 | + scheme: scheme, |
| 128 | + image: params.Image, |
| 129 | + pullPolicy: params.PullPolicy, |
| 130 | + waitInterval: remoteCollectorDefaultInterval, |
| 131 | + } |
| 132 | + |
| 133 | + // Get all the nodes where we should run. |
| 134 | + nodes, err := listNodesNamesInSelector(ctx, client, params.LabelSelector) |
| 135 | + if err != nil { |
| 136 | + return nil, errors.Wrap(err, "failed to get the list of nodes matching a nodeSelector") |
| 137 | + } |
| 138 | + |
| 139 | + if params.NamePrefix == "" { |
| 140 | + params.NamePrefix = remoteCollectorNamePrefix |
| 141 | + } |
| 142 | + |
| 143 | + result, err := runRemote(ctx, runner, nodes, params.HostCollector, names.SimpleNameGenerator, params.NamePrefix, params.Namespace) |
| 144 | + if err != nil { |
| 145 | + return nil, errors.Wrap(err, "failed to run collector remotely") |
| 146 | + } |
| 147 | + |
| 148 | + allCollectedData := mapCollectorResultToOutput(result, params) |
| 149 | + output := NewResult() |
| 150 | + |
| 151 | + // save the first result we find in the node and save it |
| 152 | + for node, result := range allCollectedData { |
| 153 | + var nodeResult map[string]string |
| 154 | + if err := json.Unmarshal(result, &nodeResult); err != nil { |
| 155 | + return nil, errors.Wrap(err, "failed to marshal node results") |
| 156 | + } |
| 157 | + |
| 158 | + for file, collectorResult := range nodeResult { |
| 159 | + directory := filepath.Dir(file) |
| 160 | + fileName := filepath.Base(file) |
| 161 | + // expected file name for remote collectors will be the normal path separated by / and the node name |
| 162 | + output.SaveResult(params.BundlePath, fmt.Sprintf("%s/%s/%s", directory, node, fileName), bytes.NewBufferString(collectorResult)) |
| 163 | + } |
| 164 | + } |
| 165 | + |
| 166 | + // check if NODE_LIST_FILE exists |
| 167 | + _, err = os.Stat(constants.NODE_LIST_FILE) |
| 168 | + // if it not exists, save the nodes list |
| 169 | + if err != nil { |
| 170 | + nodesBytes, err := json.MarshalIndent(HostOSInfoNodes{Nodes: nodes}, "", " ") |
| 171 | + if err != nil { |
| 172 | + return nil, errors.Wrap(err, "failed to marshal host os info nodes") |
| 173 | + } |
| 174 | + output.SaveResult(params.BundlePath, constants.NODE_LIST_FILE, bytes.NewBuffer(nodesBytes)) |
| 175 | + } |
| 176 | + return output, nil |
| 177 | +} |
| 178 | + |
| 179 | +func runRemote(ctx context.Context, runner runner, nodes []string, collector *troubleshootv1beta2.HostCollect, nameGenerator names.NameGenerator, namePrefix string, namespace string) (map[string][]byte, error) { |
| 180 | + g, ctx := errgroup.WithContext(ctx) |
| 181 | + results := make(chan map[string][]byte, len(nodes)) |
| 182 | + |
| 183 | + for _, node := range nodes { |
| 184 | + node := node |
| 185 | + g.Go(func() error { |
| 186 | + // May need to evaluate error and log warning. Otherwise any error |
| 187 | + // here will cancel the context of other goroutines and no results |
| 188 | + // will be returned. |
| 189 | + return runner.run(ctx, collector, namespace, nameGenerator.GenerateName(namePrefix+"-"), node, results) |
| 190 | + }) |
| 191 | + } |
| 192 | + |
| 193 | + // Wait for all collectors to complete or return the first error. |
| 194 | + if err := g.Wait(); err != nil { |
| 195 | + return nil, errors.Wrap(err, "failed remote collection") |
| 196 | + } |
| 197 | + close(results) |
| 198 | + |
| 199 | + output := make(map[string][]byte) |
| 200 | + for result := range results { |
| 201 | + r := result |
| 202 | + for k, v := range r { |
| 203 | + output[k] = v |
| 204 | + } |
| 205 | + } |
| 206 | + |
| 207 | + return output, nil |
| 208 | +} |
| 209 | + |
| 210 | +func mapCollectorResultToOutput(result map[string][]byte, params RemoteCollectParams) map[string][]byte { |
| 211 | + allCollectedData := make(map[string][]byte) |
| 212 | + |
| 213 | + for k, v := range result { |
| 214 | + if curBytes, ok := allCollectedData[k]; ok { |
| 215 | + var curResults map[string]string |
| 216 | + if err := json.Unmarshal(curBytes, &curResults); err != nil { |
| 217 | + params.ProgressChan <- errors.Errorf("failed to read existing results for collector %s: %v\n", params.Title, err) |
| 218 | + continue |
| 219 | + } |
| 220 | + var newResults map[string]string |
| 221 | + if err := json.Unmarshal(v, &newResults); err != nil { |
| 222 | + params.ProgressChan <- errors.Errorf("failed to read new results for collector %s: %v\n", params.Title, err) |
| 223 | + continue |
| 224 | + } |
| 225 | + for file, data := range newResults { |
| 226 | + curResults[file] = data |
| 227 | + } |
| 228 | + combinedResults, err := json.Marshal(curResults) |
| 229 | + if err != nil { |
| 230 | + params.ProgressChan <- errors.Errorf("failed to combine results for collector %s: %v\n", params.Title, err) |
| 231 | + continue |
| 232 | + } |
| 233 | + allCollectedData[k] = combinedResults |
| 234 | + } else { |
| 235 | + allCollectedData[k] = v |
| 236 | + } |
| 237 | + |
| 238 | + } |
| 239 | + return allCollectedData |
| 240 | +} |
0 commit comments