Skip to content

Commit 9fa8bcd

Browse files
committed
refactor: added config options for schema registry to reuse caches
1 parent 50d923a commit 9fa8bcd

File tree

5 files changed

+57
-80
lines changed

5 files changed

+57
-80
lines changed

pkg/diff/diff.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,11 @@ type SyncerOpts struct {
190190

191191
// Prevents the Syncer from performing any Delete operations. Default is false (will delete).
192192
NoDeletes bool
193+
194+
// SchemaRegistry is an optional shared schema registry. When provided,
195+
// it is reused for schema fetching and caching. When nil, a new
196+
// registry is created internally.
197+
SchemaRegistry *schema.Registry
193198
}
194199

195200
// NewSyncer constructs a Syncer.
@@ -236,7 +241,11 @@ func NewSyncer(opts SyncerOpts) (*Syncer, error) {
236241
}
237242
s.resultChan = make(chan EntityAction, eventBuffer)
238243

239-
s.schemaRegistry = schema.NewRegistry(context.Background(), opts.KongClient, opts.IsKonnect)
244+
if opts.SchemaRegistry != nil {
245+
s.schemaRegistry = opts.SchemaRegistry
246+
} else {
247+
s.schemaRegistry = schema.NewRegistry(context.Background(), opts.KongClient, opts.IsKonnect)
248+
}
240249

241250
return s, nil
242251
}

pkg/dump/dump.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"sync"
99

1010
"github.com/blang/semver/v4"
11+
"github.com/kong/go-database-reconciler/pkg/schema"
1112
"github.com/kong/go-database-reconciler/pkg/utils"
1213
"github.com/kong/go-kong/kong"
1314
"github.com/kong/go-kong/kong/custom"
@@ -82,6 +83,11 @@ type Config struct {
8283

8384
// This flag is set to remove default values while dumping entities.
8485
SkipDefaults bool
86+
87+
// SchemaRegistry is an optional shared schema registry. When provided,
88+
// it is reused for schema fetching and caching (e.g. during SkipDefaults
89+
// processing). When nil, a new registry is created internally.
90+
SchemaRegistry *schema.Registry
8591
}
8692

8793
func deduplicate(stringSlice []string) []string {
@@ -730,14 +736,13 @@ func Get(ctx context.Context, client *kong.Client, config Config) (*utils.KongRa
730736

731737
if config.SkipDefaults {
732738
isKonnect := config.KonnectControlPlane != ""
733-
schemaFetcher := NewSchemaFetcher(ctx, client, isKonnect)
734-
735-
if schemaFetcher == nil {
736-
return nil, fmt.Errorf("schemaFetcher is nil")
739+
registry := config.SchemaRegistry
740+
if registry == nil {
741+
registry = schema.NewRegistry(ctx, client, isKonnect)
737742
}
738743

739744
group, newCtx := errgroup.WithContext(ctx)
740-
removeDefaultsFromState(newCtx, group, &state, schemaFetcher)
745+
removeDefaultsFromState(newCtx, group, &state, registry)
741746
err := group.Wait()
742747
if err != nil {
743748
return nil, err

pkg/dump/schemas.go

Lines changed: 0 additions & 37 deletions
This file was deleted.

pkg/dump/skip_defaults.go

Lines changed: 32 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
)
1313

1414
func removeDefaultsFromState(ctx context.Context, group *errgroup.Group,
15-
state *utils.KongRawState, schemaFetcher *SchemaFetcher,
15+
state *utils.KongRawState, registry *schema_pkg.Registry,
1616
) {
1717
// Consumer Groups
1818
group.Go(func() error {
@@ -25,17 +25,17 @@ func removeDefaultsFromState(ctx context.Context, group *errgroup.Group,
2525
consumers := cg.Consumers
2626
plugins := cg.Plugins
2727

28-
err := processStateEntities([]interface{}{consumerGroup}, schemaFetcher, "consumer_groups")
28+
err := processStateEntities([]interface{}{consumerGroup}, registry, "consumer_groups")
2929
if err != nil {
3030
return fmt.Errorf("error removing defaults from consumer_groups: %w", err)
3131
}
3232

33-
err = processStateEntities(consumers, schemaFetcher, "consumers")
33+
err = processStateEntities(consumers, registry, "consumers")
3434
if err != nil {
3535
return fmt.Errorf("error removing defaults from consumers: %w", err)
3636
}
3737

38-
err = processStateEntities(plugins, schemaFetcher, "plugins")
38+
err = processStateEntities(plugins, registry, "plugins")
3939
if err != nil {
4040
return fmt.Errorf("error removing defaults from plugins: %w", err)
4141
}
@@ -50,7 +50,7 @@ func removeDefaultsFromState(ctx context.Context, group *errgroup.Group,
5050
if err := ctx.Err(); err != nil {
5151
return err
5252
}
53-
err := processStateEntities(state.Consumers, schemaFetcher, "consumers")
53+
err := processStateEntities(state.Consumers, registry, "consumers")
5454
if err != nil {
5555
return fmt.Errorf("error removing defaults from consumers: %w", err)
5656
}
@@ -59,7 +59,7 @@ func removeDefaultsFromState(ctx context.Context, group *errgroup.Group,
5959

6060
// Key Auth credentials
6161
group.Go(func() error {
62-
err := processStateEntities(state.KeyAuths, schemaFetcher, "keyauth_credentials")
62+
err := processStateEntities(state.KeyAuths, registry, "keyauth_credentials")
6363
if err != nil {
6464
return fmt.Errorf("error removing defaults from key auths: %w", err)
6565
}
@@ -68,7 +68,7 @@ func removeDefaultsFromState(ctx context.Context, group *errgroup.Group,
6868

6969
// HMAC Auth credentials
7070
group.Go(func() error {
71-
err := processStateEntities(state.HMACAuths, schemaFetcher, "hmacauth_credentials")
71+
err := processStateEntities(state.HMACAuths, registry, "hmacauth_credentials")
7272
if err != nil {
7373
return fmt.Errorf("error removing defaults from hmac auths: %w", err)
7474
}
@@ -77,7 +77,7 @@ func removeDefaultsFromState(ctx context.Context, group *errgroup.Group,
7777

7878
// JWT Auth credentials
7979
group.Go(func() error {
80-
err := processStateEntities(state.JWTAuths, schemaFetcher, "jwt_secrets")
80+
err := processStateEntities(state.JWTAuths, registry, "jwt_secrets")
8181
if err != nil {
8282
return fmt.Errorf("error removing defaults from jwt auths: %w", err)
8383
}
@@ -86,7 +86,7 @@ func removeDefaultsFromState(ctx context.Context, group *errgroup.Group,
8686

8787
// Basic Auth credentials
8888
group.Go(func() error {
89-
err := processStateEntities(state.BasicAuths, schemaFetcher, "basicauth_credentials")
89+
err := processStateEntities(state.BasicAuths, registry, "basicauth_credentials")
9090
if err != nil {
9191
return fmt.Errorf("error removing defaults from basic auths: %w", err)
9292
}
@@ -95,7 +95,7 @@ func removeDefaultsFromState(ctx context.Context, group *errgroup.Group,
9595

9696
// OAuth2 credentials
9797
group.Go(func() error {
98-
err := processStateEntities(state.Oauth2Creds, schemaFetcher, "oauth2_credentials")
98+
err := processStateEntities(state.Oauth2Creds, registry, "oauth2_credentials")
9999
if err != nil {
100100
return fmt.Errorf("error removing defaults from oauth2 creds: %w", err)
101101
}
@@ -104,7 +104,7 @@ func removeDefaultsFromState(ctx context.Context, group *errgroup.Group,
104104

105105
// ACL Groups
106106
group.Go(func() error {
107-
err := processStateEntities(state.ACLGroups, schemaFetcher, "acls")
107+
err := processStateEntities(state.ACLGroups, registry, "acls")
108108
if err != nil {
109109
return fmt.Errorf("error removing defaults from acl groups: %w", err)
110110
}
@@ -113,7 +113,7 @@ func removeDefaultsFromState(ctx context.Context, group *errgroup.Group,
113113

114114
// mTLS Auth credentials
115115
group.Go(func() error {
116-
err := processStateEntities(state.MTLSAuths, schemaFetcher, "mtls_auth_credentials")
116+
err := processStateEntities(state.MTLSAuths, registry, "mtls_auth_credentials")
117117
if err != nil {
118118
return fmt.Errorf("error removing defaults from mtls auths: %w", err)
119119
}
@@ -125,7 +125,7 @@ func removeDefaultsFromState(ctx context.Context, group *errgroup.Group,
125125
if err := ctx.Err(); err != nil {
126126
return err
127127
}
128-
err := processStateEntities(state.Services, schemaFetcher, "services")
128+
err := processStateEntities(state.Services, registry, "services")
129129
if err != nil {
130130
return fmt.Errorf("error removing defaults from services: %w", err)
131131
}
@@ -134,7 +134,7 @@ func removeDefaultsFromState(ctx context.Context, group *errgroup.Group,
134134

135135
// Routes
136136
group.Go(func() error {
137-
err := processStateEntities(state.Routes, schemaFetcher, "routes")
137+
err := processStateEntities(state.Routes, registry, "routes")
138138
if err != nil {
139139
return fmt.Errorf("error removing defaults from routes: %w", err)
140140
}
@@ -143,7 +143,7 @@ func removeDefaultsFromState(ctx context.Context, group *errgroup.Group,
143143

144144
// Plugins
145145
group.Go(func() error {
146-
err := processStateEntities(state.Plugins, schemaFetcher, "plugins")
146+
err := processStateEntities(state.Plugins, registry, "plugins")
147147
if err != nil {
148148
return fmt.Errorf("error removing defaults from plugins: %w", err)
149149
}
@@ -152,7 +152,7 @@ func removeDefaultsFromState(ctx context.Context, group *errgroup.Group,
152152

153153
// Filter Chains
154154
group.Go(func() error {
155-
err := processStateEntities(state.FilterChains, schemaFetcher, "filter_chains")
155+
err := processStateEntities(state.FilterChains, registry, "filter_chains")
156156
if err != nil {
157157
return fmt.Errorf("error removing defaults from filter chains: %w", err)
158158
}
@@ -161,7 +161,7 @@ func removeDefaultsFromState(ctx context.Context, group *errgroup.Group,
161161

162162
// Certificates
163163
group.Go(func() error {
164-
err := processStateEntities(state.Certificates, schemaFetcher, "certificates")
164+
err := processStateEntities(state.Certificates, registry, "certificates")
165165
if err != nil {
166166
return fmt.Errorf("error removing defaults from certificates: %w", err)
167167
}
@@ -170,7 +170,7 @@ func removeDefaultsFromState(ctx context.Context, group *errgroup.Group,
170170

171171
// CA Certificates
172172
group.Go(func() error {
173-
err := processStateEntities(state.CACertificates, schemaFetcher, "ca_certificates")
173+
err := processStateEntities(state.CACertificates, registry, "ca_certificates")
174174
if err != nil {
175175
return fmt.Errorf("error removing defaults from ca certificates: %w", err)
176176
}
@@ -179,7 +179,7 @@ func removeDefaultsFromState(ctx context.Context, group *errgroup.Group,
179179

180180
// SNIs
181181
group.Go(func() error {
182-
err := processStateEntities(state.SNIs, schemaFetcher, "snis")
182+
err := processStateEntities(state.SNIs, registry, "snis")
183183
if err != nil {
184184
return fmt.Errorf("error removing defaults from snis: %w", err)
185185
}
@@ -188,7 +188,7 @@ func removeDefaultsFromState(ctx context.Context, group *errgroup.Group,
188188

189189
// Upstreams
190190
group.Go(func() error {
191-
err := processStateEntities(state.Upstreams, schemaFetcher, "upstreams")
191+
err := processStateEntities(state.Upstreams, registry, "upstreams")
192192
if err != nil {
193193
return fmt.Errorf("error removing defaults from upstreams: %w", err)
194194
}
@@ -197,7 +197,7 @@ func removeDefaultsFromState(ctx context.Context, group *errgroup.Group,
197197

198198
// Targets
199199
group.Go(func() error {
200-
err := processStateEntities(state.Targets, schemaFetcher, "targets")
200+
err := processStateEntities(state.Targets, registry, "targets")
201201
if err != nil {
202202
return fmt.Errorf("error removing defaults from targets: %w", err)
203203
}
@@ -206,7 +206,7 @@ func removeDefaultsFromState(ctx context.Context, group *errgroup.Group,
206206

207207
// Vaults
208208
group.Go(func() error {
209-
err := processStateEntities(state.Vaults, schemaFetcher, "vaults")
209+
err := processStateEntities(state.Vaults, registry, "vaults")
210210
if err != nil {
211211
return fmt.Errorf("error removing defaults from vaults: %w", err)
212212
}
@@ -215,7 +215,7 @@ func removeDefaultsFromState(ctx context.Context, group *errgroup.Group,
215215

216216
// Partials
217217
group.Go(func() error {
218-
err := processStateEntities(state.Partials, schemaFetcher, "partials")
218+
err := processStateEntities(state.Partials, registry, "partials")
219219
if err != nil {
220220
return fmt.Errorf("error removing defaults from partials: %w", err)
221221
}
@@ -224,7 +224,7 @@ func removeDefaultsFromState(ctx context.Context, group *errgroup.Group,
224224

225225
// Keys
226226
group.Go(func() error {
227-
err := processStateEntities(state.Keys, schemaFetcher, "keys")
227+
err := processStateEntities(state.Keys, registry, "keys")
228228
if err != nil {
229229
return fmt.Errorf("error removing defaults from keys: %w", err)
230230
}
@@ -233,7 +233,7 @@ func removeDefaultsFromState(ctx context.Context, group *errgroup.Group,
233233

234234
// Key Sets
235235
group.Go(func() error {
236-
err := processStateEntities(state.KeySets, schemaFetcher, "key_sets")
236+
err := processStateEntities(state.KeySets, registry, "key_sets")
237237
if err != nil {
238238
return fmt.Errorf("error removing defaults from key sets: %w", err)
239239
}
@@ -242,7 +242,7 @@ func removeDefaultsFromState(ctx context.Context, group *errgroup.Group,
242242

243243
// Licenses
244244
group.Go(func() error {
245-
err := processStateEntities(state.Licenses, schemaFetcher, "licenses")
245+
err := processStateEntities(state.Licenses, registry, "licenses")
246246
if err != nil {
247247
return fmt.Errorf("error removing defaults from licenses: %w", err)
248248
}
@@ -251,7 +251,7 @@ func removeDefaultsFromState(ctx context.Context, group *errgroup.Group,
251251

252252
// RBAC Roles
253253
group.Go(func() error {
254-
err := processStateEntities(state.RBACRoles, schemaFetcher, "rbac_roles")
254+
err := processStateEntities(state.RBACRoles, registry, "rbac_roles")
255255
if err != nil {
256256
return fmt.Errorf("error removing defaults from rbac roles: %w", err)
257257
}
@@ -260,21 +260,21 @@ func removeDefaultsFromState(ctx context.Context, group *errgroup.Group,
260260

261261
// RBAC Endpoint Permissions
262262
group.Go(func() error {
263-
err := processStateEntities(state.RBACEndpointPermissions, schemaFetcher, "rbac_endpoint_permissions")
263+
err := processStateEntities(state.RBACEndpointPermissions, registry, "rbac_endpoint_permissions")
264264
if err != nil {
265265
return fmt.Errorf("error removing defaults from rbac endpoint permissions: %w", err)
266266
}
267267
return nil
268268
})
269269
}
270270

271-
func processStateEntities[T any](entities []T, schemaFetcher *SchemaFetcher, entityType string) error {
271+
func processStateEntities[T any](entities []T, registry *schema_pkg.Registry, entityType string) error {
272272
if len(entities) == 0 {
273273
return nil
274274
}
275275

276276
for _, e := range entities {
277-
err := removeDefaultsFromEntity(e, entityType, schemaFetcher)
277+
err := removeDefaultsFromEntity(e, entityType, registry)
278278
if err != nil {
279279
return err
280280
}
@@ -283,7 +283,7 @@ func processStateEntities[T any](entities []T, schemaFetcher *SchemaFetcher, ent
283283
return nil
284284
}
285285

286-
func removeDefaultsFromEntity(entity interface{}, entityType string, schemaFetcher *SchemaFetcher) error {
286+
func removeDefaultsFromEntity(entity interface{}, entityType string, registry *schema_pkg.Registry) error {
287287
ptr := reflect.ValueOf(entity)
288288
if ptr.Kind() != reflect.Ptr {
289289
return fmt.Errorf("entity is not a pointer")
@@ -296,7 +296,7 @@ func removeDefaultsFromEntity(entity interface{}, entityType string, schemaFetch
296296
return fmt.Errorf("error getting entity identifier for schema fetching: %w", err)
297297
}
298298

299-
schema, err := schemaFetcher.getSchema(entityType, entityIdentifier)
299+
schema, err := registry.GetSchema(entityType, entityIdentifier)
300300
if err != nil {
301301
return fmt.Errorf("error fetching schema for entity %s of type %s: %w", entityIdentifier, entityType, err)
302302
}

0 commit comments

Comments
 (0)