Skip to content
Merged
11 changes: 11 additions & 0 deletions docs/modules/components/pages/outputs/redpanda_migrator.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ output:
translate_ids: false
normalize: false
strict: false
max_parallel_http_requests: 10
consumer_groups:
enabled: true
interval: 1m
Expand Down Expand Up @@ -139,6 +140,7 @@ output:
translate_ids: false
normalize: false
strict: false
max_parallel_http_requests: 10
consumer_groups:
enabled: true
interval: 1m
Expand Down Expand Up @@ -1425,6 +1427,15 @@ Error on unknown schema IDs. Only relevant when translate_ids is true. When fals

*Default*: `false`

=== `schema_registry.max_parallel_http_requests`

Maximum number of parallel HTTP requests to the schema registry. Controls concurrency when syncing multiple schemas.


*Type*: `int`

*Default*: `10`

=== `consumer_groups`

Sorry! This field is missing documentation.
Expand Down
13 changes: 12 additions & 1 deletion internal/impl/redpanda/migrator/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func NewSchemaRegistryMigratorForTesting(t *testing.T, conf SchemaRegistryMigrat
t.Cleanup(func() {
t.Log(buf.String())
})
conf.MaxParallelHTTPRequests = 2
return &schemaRegistryMigrator{
conf: conf,
src: src,
Expand All @@ -80,11 +81,21 @@ func NewSchemaRegistryMigratorForTesting(t *testing.T, conf SchemaRegistryMigrat
log: service.NewLoggerFromSlog(slog.New(slog.NewTextHandler(&buf, &slog.HandlerOptions{
Level: slog.LevelDebug,
}))),
knownSubjects: make(map[schemaInfo]struct{}),
knownSubjects: make(map[schemaSubjectVersion]struct{}),
knownSchemas: make(map[int]schemaInfo),
}
}

func (m *schemaRegistryMigrator) DfsSubjectSchemasFunc(
ctx context.Context,
client *sr.Client,
root sr.SubjectSchema,
filter func(subject string, version int) bool,
cb func(sr.SubjectSchema) error,
) error {
return m.dfsSubjectSchemasFunc(ctx, client, root, filter, cb)
}

func NewGroupsMigratorForTesting(
t *testing.T,
conf GroupsMigratorConfig,
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/redpanda/migrator/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ func NewMigrator(mgr *service.Resources) *Migrator {
sr: schemaRegistryMigrator{
metrics: newSchemaRegistryMetrics(mgr.Metrics()),
log: log,
knownSubjects: make(map[schemaInfo]struct{}),
knownSubjects: make(map[schemaSubjectVersion]struct{}),
knownSchemas: make(map[int]schemaInfo),
},
groups: groupsMigrator{
Expand Down
Loading