Skip to content

Commit 7d7153a

Browse files
committed
chore: refactor provider
1 parent 379f3a6 commit 7d7153a

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+1585
-840
lines changed

api/adc/types.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -726,3 +726,24 @@ func (s *StringOrSlice) UnmarshalJSON(p []byte) error {
726726
}
727727
return json.Unmarshal(p, &s.StrVal)
728728
}
729+
730+
type Config struct {
731+
Name string
732+
ServerAddrs []string
733+
Token string
734+
TlsVerify bool
735+
}
736+
737+
// MarshalJSON implements custom JSON marshaling for adcConfig
738+
// It excludes the Token field for security reasons
739+
func (c Config) MarshalJSON() ([]byte, error) {
740+
return json.Marshal(struct {
741+
Name string `json:"name"`
742+
ServerAddrs []string `json:"serverAddrs"`
743+
TlsVerify bool `json:"tlsVerify"`
744+
}{
745+
Name: c.Name,
746+
ServerAddrs: c.ServerAddrs,
747+
TlsVerify: c.TlsVerify,
748+
})
749+
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.

internal/provider/adc/store.go renamed to internal/adc/cache/store.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
package adc
18+
package cache
1919

2020
import (
2121
"fmt"
@@ -27,37 +27,36 @@ import (
2727

2828
adctypes "github.com/apache/apisix-ingress-controller/api/adc"
2929
"github.com/apache/apisix-ingress-controller/internal/controller/label"
30-
"github.com/apache/apisix-ingress-controller/internal/provider/adc/cache"
3130
)
3231

3332
type Store struct {
34-
cacheMap map[string]cache.Cache
33+
cacheMap map[string]Cache
3534
pluginMetadataMap map[string]adctypes.PluginMetadata
3635

3736
sync.Mutex
3837
}
3938

4039
func NewStore() *Store {
4140
return &Store{
42-
cacheMap: make(map[string]cache.Cache),
41+
cacheMap: make(map[string]Cache),
4342
pluginMetadataMap: make(map[string]adctypes.PluginMetadata),
4443
}
4544
}
4645

47-
func (s *Store) Insert(name string, resourceTypes []string, resources adctypes.Resources, Labels map[string]string) error {
46+
func (s *Store) Insert(name string, resourceTypes []string, resources *adctypes.Resources, Labels map[string]string) error {
4847
s.Lock()
4948
defer s.Unlock()
5049
targetCache, ok := s.cacheMap[name]
5150
if !ok {
52-
db, err := cache.NewMemDBCache()
51+
db, err := NewMemDBCache()
5352
if err != nil {
5453
return err
5554
}
5655
s.cacheMap[name] = db
5756
targetCache = s.cacheMap[name]
5857
}
5958
log.Debugw("Inserting resources into cache for", zap.String("name", name))
60-
selector := &cache.KindLabelSelector{
59+
selector := &KindLabelSelector{
6160
Kind: Labels[label.LabelKind],
6261
Name: Labels[label.LabelName],
6362
Namespace: Labels[label.LabelNamespace],
@@ -153,7 +152,7 @@ func (s *Store) Delete(name string, resourceTypes []string, Labels map[string]st
153152
if !ok {
154153
return nil
155154
}
156-
selector := &cache.KindLabelSelector{
155+
selector := &KindLabelSelector{
157156
Kind: Labels[label.LabelKind],
158157
Name: Labels[label.LabelName],
159158
Namespace: Labels[label.LabelNamespace],

internal/adc/client/client.go

Lines changed: 283 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,283 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package client
19+
20+
import (
21+
"context"
22+
"encoding/json"
23+
"fmt"
24+
"os"
25+
"slices"
26+
"strings"
27+
"time"
28+
29+
"github.com/api7/gopkg/pkg/log"
30+
"github.com/pkg/errors"
31+
"go.uber.org/zap"
32+
33+
adctypes "github.com/apache/apisix-ingress-controller/api/adc"
34+
"github.com/apache/apisix-ingress-controller/internal/adc/cache"
35+
"github.com/apache/apisix-ingress-controller/internal/types"
36+
pkgmetrics "github.com/apache/apisix-ingress-controller/pkg/metrics"
37+
)
38+
39+
type Client struct {
40+
*cache.Store
41+
42+
executor ADCExecutor
43+
BackendMode string
44+
}
45+
46+
func New(mode string) (*Client, error) {
47+
return &Client{
48+
Store: cache.NewStore(),
49+
executor: &DefaultADCExecutor{},
50+
BackendMode: mode,
51+
}, nil
52+
}
53+
54+
type Task struct {
55+
Name string
56+
Labels map[string]string
57+
Configs []adctypes.Config
58+
ResourceTypes []string
59+
Resources *adctypes.Resources
60+
}
61+
62+
func (d *Client) Insert(ctx context.Context, args Task) error {
63+
for _, config := range args.Configs {
64+
if err := d.Store.Insert(config.Name, args.ResourceTypes, args.Resources, args.Labels); err != nil {
65+
log.Errorw("failed to insert resources into store",
66+
zap.String("name", config.Name),
67+
zap.Error(err),
68+
)
69+
return err
70+
}
71+
}
72+
return nil
73+
}
74+
75+
func (d *Client) Remove(ctx context.Context, args Task) error {
76+
for _, config := range args.Configs {
77+
if err := d.Store.Delete(config.Name, args.ResourceTypes, args.Labels); err != nil {
78+
log.Errorw("failed to delete resources from store",
79+
zap.String("name", config.Name),
80+
zap.Error(err),
81+
)
82+
return err
83+
}
84+
}
85+
return nil
86+
}
87+
88+
func (d *Client) Update(ctx context.Context, args Task) error {
89+
return d.sync(ctx, args)
90+
}
91+
92+
func (c *Client) Sync(ctx context.Context, cfg map[string]adctypes.Config) (map[string]types.ADCExecutionErrors, error) {
93+
log.Debug("syncing all resources")
94+
95+
if len(cfg) == 0 {
96+
log.Warn("no adc configs provided")
97+
return nil, nil
98+
}
99+
100+
log.Debugw("syncing resources with multiple configs", zap.Any("configs", cfg))
101+
102+
failedMap := map[string]types.ADCExecutionErrors{}
103+
var failedConfigs []string
104+
for name, config := range cfg {
105+
resources, err := c.Store.GetResources(name)
106+
if err != nil {
107+
log.Errorw("failed to get resources from store", zap.String("name", name), zap.Error(err))
108+
failedConfigs = append(failedConfigs, name)
109+
continue
110+
}
111+
if resources == nil {
112+
continue
113+
}
114+
115+
if err := c.sync(ctx, Task{
116+
Name: name + "-sync",
117+
Configs: []adctypes.Config{config},
118+
Resources: resources,
119+
}); err != nil {
120+
log.Errorw("failed to sync resources", zap.String("name", name), zap.Error(err))
121+
failedConfigs = append(failedConfigs, name)
122+
var execErrs types.ADCExecutionErrors
123+
if errors.As(err, &execErrs) {
124+
failedMap[name] = execErrs
125+
}
126+
}
127+
}
128+
129+
if len(failedConfigs) > 0 {
130+
return failedMap, fmt.Errorf("failed to sync %d configs: %s",
131+
len(failedConfigs),
132+
strings.Join(failedConfigs, ", "))
133+
}
134+
return failedMap, nil
135+
}
136+
137+
func (c *Client) sync(ctx context.Context, task Task) error {
138+
log.Debugw("syncing resources", zap.Any("task", task))
139+
140+
if len(task.Configs) == 0 {
141+
log.Warnw("no adc configs provided", zap.Any("task", task))
142+
return nil
143+
}
144+
145+
var errs types.ADCExecutionErrors
146+
147+
// for global rules, we need to list all global rules and set it to the task resources
148+
if slices.Contains(task.ResourceTypes, "global_rule") {
149+
for _, config := range task.Configs {
150+
globalRules, err := c.Store.ListGlobalRules(config.Name)
151+
if err != nil {
152+
return err
153+
}
154+
var globalrule adctypes.GlobalRule
155+
if len(globalRules) > 0 {
156+
merged := make(adctypes.Plugins)
157+
for _, item := range globalRules {
158+
for k, v := range item.Plugins {
159+
merged[k] = v
160+
}
161+
}
162+
globalrule = adctypes.GlobalRule(merged)
163+
}
164+
165+
task.Resources.GlobalRules = globalrule
166+
log.Debugw("syncing resources global rules", zap.Any("globalRules", task.Resources.GlobalRules))
167+
168+
fileIOStart := time.Now()
169+
syncFilePath, cleanup, err := prepareSyncFile(task.Resources)
170+
if err != nil {
171+
pkgmetrics.RecordFileIODuration("prepare_sync_file", "failure", time.Since(fileIOStart).Seconds())
172+
return err
173+
}
174+
pkgmetrics.RecordFileIODuration("prepare_sync_file", "success", time.Since(fileIOStart).Seconds())
175+
defer cleanup()
176+
177+
args := BuildADCExecuteArgs(syncFilePath, task.Labels, task.ResourceTypes)
178+
179+
// Record sync duration for each config
180+
startTime := time.Now()
181+
resourceType := strings.Join(task.ResourceTypes, ",")
182+
if resourceType == "" {
183+
resourceType = "all"
184+
}
185+
186+
err = c.executor.Execute(ctx, "", config, args)
187+
duration := time.Since(startTime).Seconds()
188+
189+
status := "success"
190+
if err != nil {
191+
status = "failure"
192+
log.Errorw("failed to execute adc command", zap.Error(err), zap.Any("config", config))
193+
194+
var execErr types.ADCExecutionError
195+
if errors.As(err, &execErr) {
196+
errs.Errors = append(errs.Errors, execErr)
197+
pkgmetrics.RecordExecutionError(config.Name, execErr.Name)
198+
} else {
199+
pkgmetrics.RecordExecutionError(config.Name, "unknown")
200+
}
201+
}
202+
203+
// Record metrics
204+
pkgmetrics.RecordSyncDuration(config.Name, resourceType, status, duration)
205+
}
206+
207+
if len(errs.Errors) > 0 {
208+
return errs
209+
}
210+
return nil
211+
}
212+
213+
// Record file I/O duration
214+
fileIOStart := time.Now()
215+
// every task resources is the same, so we can use the first config to prepare the sync file
216+
syncFilePath, cleanup, err := prepareSyncFile(task.Resources)
217+
if err != nil {
218+
pkgmetrics.RecordFileIODuration("prepare_sync_file", "failure", time.Since(fileIOStart).Seconds())
219+
return err
220+
}
221+
pkgmetrics.RecordFileIODuration("prepare_sync_file", "success", time.Since(fileIOStart).Seconds())
222+
defer cleanup()
223+
224+
args := BuildADCExecuteArgs(syncFilePath, task.Labels, task.ResourceTypes)
225+
226+
for _, config := range task.Configs {
227+
// Record sync duration for each config
228+
startTime := time.Now()
229+
resourceType := strings.Join(task.ResourceTypes, ",")
230+
if resourceType == "" {
231+
resourceType = "all"
232+
}
233+
234+
err := c.executor.Execute(ctx, "", config, args)
235+
duration := time.Since(startTime).Seconds()
236+
237+
status := "success"
238+
if err != nil {
239+
status = "failure"
240+
log.Errorw("failed to execute adc command", zap.Error(err), zap.Any("config", config))
241+
242+
var execErr types.ADCExecutionError
243+
if errors.As(err, &execErr) {
244+
errs.Errors = append(errs.Errors, execErr)
245+
pkgmetrics.RecordExecutionError(config.Name, execErr.Name)
246+
} else {
247+
pkgmetrics.RecordExecutionError(config.Name, "unknown")
248+
}
249+
}
250+
251+
// Record metrics
252+
pkgmetrics.RecordSyncDuration(config.Name, resourceType, status, duration)
253+
}
254+
255+
if len(errs.Errors) > 0 {
256+
return errs
257+
}
258+
return nil
259+
}
260+
261+
func prepareSyncFile(resources any) (string, func(), error) {
262+
data, err := json.Marshal(resources)
263+
if err != nil {
264+
return "", nil, err
265+
}
266+
267+
tmpFile, err := os.CreateTemp("", "adc-task-*.json")
268+
if err != nil {
269+
return "", nil, err
270+
}
271+
cleanup := func() {
272+
_ = tmpFile.Close()
273+
_ = os.Remove(tmpFile.Name())
274+
}
275+
if _, err := tmpFile.Write(data); err != nil {
276+
cleanup()
277+
return "", nil, err
278+
}
279+
280+
log.Debugw("generated adc file", zap.String("filename", tmpFile.Name()), zap.String("json", string(data)))
281+
282+
return tmpFile.Name(), cleanup, nil
283+
}

0 commit comments

Comments
 (0)