Skip to content

Commit 4fe8bbd

Browse files
authored
Retrieve version for writer schema when getting migrations (#2358)
* Retrieve version for writer schema when getting migrations Fixes #2356 * Minor cleanup
1 parent 0923761 commit 4fe8bbd

File tree

6 files changed

+20
-9
lines changed

6 files changed

+20
-9
lines changed

src/Confluent.SchemaRegistry.Serdes.Avro/GenericDeserializerImpl.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public async Task<GenericRecord> Deserialize(string topic, Headers headers, byte
7272
}
7373

7474
string subject = GetSubjectName(topic, isKey, null);
75-
Schema latestSchema = null;
75+
RegisteredSchema latestSchema = null;
7676
if (subject != null)
7777
{
7878
latestSchema = await GetReaderSchema(subject)

src/Confluent.SchemaRegistry.Serdes.Avro/SpecificDeserializerImpl.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ public async Task<T> Deserialize(string topic, Headers headers, byte[] array, bo
126126
}
127127

128128
string subject = GetSubjectName(topic, isKey, null);
129-
Schema latestSchema = null;
129+
RegisteredSchema latestSchema = null;
130130
if (subject != null)
131131
{
132132
latestSchema = await GetReaderSchema(subject)

src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ public override async Task<T> DeserializeAsync(ReadOnlyMemory<byte> data, bool i
175175
bool isKey = context.Component == MessageComponentType.Key;
176176
string topic = context.Topic;
177177
string subject = GetSubjectName(topic, isKey, null);
178-
Schema latestSchema = null;
178+
RegisteredSchema latestSchema = null;
179179
if (subject != null)
180180
{
181181
latestSchema = await GetReaderSchema(subject)

src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufDeserializer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ public override async Task<T> DeserializeAsync(ReadOnlyMemory<byte> data, bool i
128128
// Currently Protobuf does not support migration rules because of lack of support for DynamicMessage
129129
// See https://github.com/protocolbuffers/protobuf/issues/658
130130
/*
131-
Schema latestSchema = await SerdeUtils.GetReaderSchema(schemaRegistryClient, subject, useLatestWithMetadata, useLatestVersion)
131+
RegisteredSchema latestSchema = await SerdeUtils.GetReaderSchema(schemaRegistryClient, subject, useLatestWithMetadata, useLatestVersion)
132132
.ConfigureAwait(continueOnCapturedContext: false);
133133
*/
134134

src/Confluent.SchemaRegistry/AsyncSerde.cs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -166,11 +166,14 @@ await ResolveReferences(s, schemas, visited)
166166
return schemas;
167167
}
168168

169-
protected async Task<IList<Migration>> GetMigrations(string subject, Schema writerSchema, Schema readerSchema)
169+
protected async Task<IList<Migration>> GetMigrations(string subject, Schema writer, RegisteredSchema readerSchema)
170170
{
171+
var writerSchema = await schemaRegistryClient.LookupSchemaAsync(subject, writer, false, false)
172+
.ConfigureAwait(continueOnCapturedContext: false);
173+
171174
RuleMode migrationMode;
172-
Schema first;
173-
Schema last;
175+
RegisteredSchema first;
176+
RegisteredSchema last;
174177
IList<Migration> migrations = new List<Migration>();
175178
if (writerSchema.Version < readerSchema.Version)
176179
{
@@ -217,7 +220,7 @@ protected async Task<IList<Migration>> GetMigrations(string subject, Schema writ
217220
return migrations;
218221
}
219222

220-
private async Task<IList<Schema>> GetSchemasBetween(string subject, Schema first, Schema last)
223+
private async Task<IList<Schema>> GetSchemasBetween(string subject, RegisteredSchema first, RegisteredSchema last)
221224
{
222225
if (last.Version - first.Version <= 1)
223226
{

test/Confluent.SchemaRegistry.Serdes.UnitTests/BaseSerializeDeserialize.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,15 @@ public BaseSerializeDeserializeTests()
4545
var schemaRegistryMock = new Mock<ISchemaRegistryClient>();
4646
schemaRegistryMock.Setup(x => x.ConstructValueSubjectName(testTopic, It.IsAny<string>())).Returns($"{testTopic}-value");
4747
schemaRegistryMock.Setup(x => x.RegisterSchemaAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<bool>())).ReturnsAsync(
48-
(string topic, string schema, bool normalize) => store.TryGetValue(schema, out int id) ? id : store[schema] = store.Count + 1
48+
(string subject, string schema, bool normalize) => store.TryGetValue(schema, out int id) ? id : store[schema] = store.Count + 1
49+
);
50+
schemaRegistryMock.Setup(x => x.LookupSchemaAsync(It.IsAny<string>(), It.IsAny<Schema>(), It.IsAny<bool>(), It.IsAny<bool>())).ReturnsAsync(
51+
(string subject, Schema schema, bool ignoreDeleted, bool normalize) =>
52+
{
53+
return subjectStore[subject].First(x =>
54+
x.SchemaString == schema.SchemaString
55+
);
56+
}
4957
);
5058
schemaRegistryMock.Setup(x => x.GetSchemaBySubjectAndIdAsync(It.IsAny<string>(), It.IsAny<int>(), It.IsAny<string>())).ReturnsAsync(
5159
(string subject, int id, string format) =>

0 commit comments

Comments
 (0)