Skip to content

Commit 15e5ddd

Browse files
authored
Include deleted schemas when getting schemas by subject/version (#16) (#2288)
* Include deleted schemas when getting schemas by subect/version * Minor fix
1 parent 2b17c81 commit 15e5ddd

File tree

7 files changed

+13
-10
lines changed

7 files changed

+13
-10
lines changed

src/Confluent.SchemaRegistry.Serdes.Json/JsonSchemaResolver.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ private async Task CreateSchemaDictUtil(Schema root)
8989
{
9090
foreach (var reference in root.References)
9191
{
92-
Schema refSchemaRes = await schemaRegistryClient.GetRegisteredSchemaAsync(reference.Subject, reference.Version);
92+
Schema refSchemaRes = await schemaRegistryClient.GetRegisteredSchemaAsync(reference.Subject, reference.Version, false);
9393
await CreateSchemaDictUtil(refSchemaRes);
9494
}
9595
}

src/Confluent.SchemaRegistry/AsyncSerde.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ private async Task<IDictionary<string, string>> ResolveReferences(
124124
visited.Add(reference.Name);
125125
if (!schemas.ContainsKey(reference.Name))
126126
{
127-
Schema s = await schemaRegistryClient.GetRegisteredSchemaAsync(reference.Subject, reference.Version)
127+
Schema s = await schemaRegistryClient.GetRegisteredSchemaAsync(reference.Subject, reference.Version, false)
128128
.ConfigureAwait(continueOnCapturedContext: false);
129129
if (s == null)
130130
{
@@ -201,7 +201,7 @@ private async Task<IList<Schema>> GetSchemasBetween(string subject, Schema first
201201
int version1 = first.Version;
202202
int version2 = last.Version;
203203
for (int i = version1 + 1; i < version2; i++) {
204-
tasks.Add(schemaRegistryClient.GetRegisteredSchemaAsync(subject, i));
204+
tasks.Add(schemaRegistryClient.GetRegisteredSchemaAsync(subject, i, false));
205205
}
206206
RegisteredSchema[] schemas = await Task.WhenAll(tasks).ConfigureAwait(continueOnCapturedContext: false);
207207

src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -595,7 +595,7 @@ public async Task<Schema> GetSchemaBySubjectAndIdAsync(string subject, int id, s
595595

596596

597597
/// <inheritdoc/>
598-
public async Task<RegisteredSchema> GetRegisteredSchemaAsync(string subject, int version)
598+
public async Task<RegisteredSchema> GetRegisteredSchemaAsync(string subject, int version, bool ignoreDeletedSchemas = true)
599599
{
600600
await cacheMutex.WaitAsync().ConfigureAwait(continueOnCapturedContext: false);
601601
try

src/Confluent.SchemaRegistry/ISchemaRegistryClient.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,10 +194,13 @@ public interface ISchemaRegistryClient : IDisposable
194194
/// <param name="version">
195195
/// The version number of schema to get.
196196
/// </param>
197+
/// <param name="ignoreDeletedSchemas">
198+
/// Whether or not to ignore deleted schemas.
199+
/// </param>
197200
/// <returns>
198201
/// The schema identified by the specified <paramref name="subject" /> and <paramref name="version" />.
199202
/// </returns>
200-
Task<RegisteredSchema> GetRegisteredSchemaAsync(string subject, int version);
203+
Task<RegisteredSchema> GetRegisteredSchemaAsync(string subject, int version, bool ignoreDeletedSchemas = true);
201204

202205

203206
/// <summary>

src/Confluent.SchemaRegistry/Rest/IRestService.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ internal interface IRestService : IDisposable
3434
Task<RegisteredSchema> GetLatestWithMetadataAsync(string subject, IDictionary<string, string> metadata, bool ignoreDeletedSchemas);
3535
Task<Schema> GetSchemaAsync(int id, string format = null);
3636
Task<Schema> GetSchemaBySubjectAndIdAsync(string subject, int id, string format = null);
37-
Task<RegisteredSchema> GetSchemaAsync(string subject, int version);
37+
Task<RegisteredSchema> GetSchemaAsync(string subject, int version, bool ignoreDeletedSchemas = true);
3838
Task<List<string>> GetSubjectsAsync();
3939
Task<List<int>> GetSubjectVersionsAsync(string subject);
4040
Task<int> RegisterSchemaAsync(string subject, Schema schema, bool normalize);

src/Confluent.SchemaRegistry/Rest/RestService.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -339,9 +339,9 @@ public async Task<List<int>> GetSubjectVersionsAsync(string subject)
339339
=> await RequestListOfAsync<int>($"subjects/{Uri.EscapeDataString(subject)}/versions", HttpMethod.Get)
340340
.ConfigureAwait(continueOnCapturedContext: false);
341341

342-
public async Task<RegisteredSchema> GetSchemaAsync(string subject, int version)
342+
public async Task<RegisteredSchema> GetSchemaAsync(string subject, int version, bool ignoreDeletedSchemas = true)
343343
=> SanitizeRegisteredSchema(
344-
await RequestAsync<RegisteredSchema>($"subjects/{Uri.EscapeDataString(subject)}/versions/{version}",
344+
await RequestAsync<RegisteredSchema>($"subjects/{Uri.EscapeDataString(subject)}/versions/{version}?deleted={!ignoreDeletedSchemas}",
345345
HttpMethod.Get)
346346
.ConfigureAwait(continueOnCapturedContext: false));
347347

test/Confluent.SchemaRegistry.Serdes.UnitTests/BaseSerializeDeserialize.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,8 @@ public BaseSerializeDeserializeTests()
6161
return new Schema(store.Where(x => x.Value == id).First().Key, null, SchemaType.Avro);
6262
}
6363
});
64-
schemaRegistryMock.Setup(x => x.GetRegisteredSchemaAsync(It.IsAny<string>(), It.IsAny<int>())).ReturnsAsync(
65-
(string subject, int version) => subjectStore[subject].First(x => x.Version == version)
64+
schemaRegistryMock.Setup(x => x.GetRegisteredSchemaAsync(It.IsAny<string>(), It.IsAny<int>(), It.IsAny<bool>())).ReturnsAsync(
65+
(string subject, int version, bool ignoreDeletedSchemas) => subjectStore[subject].First(x => x.Version == version)
6666
);
6767
schemaRegistryMock.Setup(x => x.GetLatestSchemaAsync(It.IsAny<string>())).ReturnsAsync(
6868
(string subject) => subjectStore[subject].Last()

0 commit comments

Comments
 (0)