Skip to content

Commit 4783dc1

Browse files
yurii-hunteryurii.myslyvets
andauthored
fixes ProtobufSerializer race condition (#1524)
Co-authored-by: yurii.myslyvets <[email protected]>
1 parent 979e0d8 commit 4783dc1

File tree

1 file changed

+5
-8
lines changed

1 file changed

+5
-8
lines changed

src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializer.cs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -168,27 +168,24 @@ private static byte[] createIndexArray(MessageDescriptor md)
168168
/// </remarks>
169169
private async Task<List<SchemaReference>> RegisterOrGetReferences(FileDescriptor fd, SerializationContext context, bool autoRegisterSchema)
170170
{
171-
var result = new List<SchemaReference>();
172-
173-
var tasks = new Task[fd.Dependencies.Count];
171+
var tasks = new Task<SchemaReference>[fd.Dependencies.Count];
174172
for (int i=0; i<fd.Dependencies.Count; ++i)
175173
{
176-
var dependency = fd.Dependencies[i];
177-
Func<Task> t = async () => {
174+
Func<FileDescriptor, Task<SchemaReference>> t = async (FileDescriptor dependency) => {
178175
var dependencyReferences = await RegisterOrGetReferences(dependency, context, autoRegisterSchema).ConfigureAwait(continueOnCapturedContext: false);
179176
var subject = referenceSubjectNameStrategy(context, dependency.Name);
180177
var schema = new Schema(dependency.SerializedData.ToBase64(), dependencyReferences, SchemaType.Protobuf);
181178
var schemaId = autoRegisterSchema
182179
? await schemaRegistryClient.RegisterSchemaAsync(subject, schema).ConfigureAwait(continueOnCapturedContext: false)
183180
: await schemaRegistryClient.GetSchemaIdAsync(subject, schema).ConfigureAwait(continueOnCapturedContext: false);
184181
var registeredDependentSchema = await schemaRegistryClient.LookupSchemaAsync(subject, schema, true).ConfigureAwait(continueOnCapturedContext: false);
185-
result.Add(new SchemaReference(dependency.Name, subject, registeredDependentSchema.Version));
182+
return new SchemaReference(dependency.Name, subject, registeredDependentSchema.Version);
186183
};
187-
tasks[i] = t();
184+
tasks[i] = t(fd.Dependencies[i]);
188185
}
189186
await Task.WhenAll(tasks.ToArray()).ConfigureAwait(continueOnCapturedContext: false);
190187

191-
return result;
188+
return tasks.Select(t => t.Result).ToList();
192189
}
193190

194191

0 commit comments

Comments
 (0)