Skip to content

Commit 8502af0

Browse files
authored
Add support for Avro logical types in rules (#35) (#2310)
1 parent 96e8d54 commit 8502af0

File tree

4 files changed

+49
-3
lines changed

4 files changed

+49
-3
lines changed

src/Confluent.SchemaRegistry.Rules/CelExecutor.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,8 @@ private static Google.Api.Expr.V1Alpha1.Type FindTypeForAvroType(Avro.Schema sch
246246
}
247247

248248
throw new ArgumentException("Unsupported union type");
249+
case Avro.Schema.Type.Logical:
250+
return FindTypeForAvroType((schema as LogicalSchema).BaseSchema);
249251
default:
250252
throw new ArgumentException("Unsupported type " + type);
251253
}

src/Confluent.SchemaRegistry.Rules/Confluent.SchemaRegistry.Rules.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
</ItemGroup>
3535

3636
<ItemGroup Condition="'$(TargetFramework)' == 'net6.0'">
37-
<PackageReference Include="Cel.NET" Version="0.3.1" />
37+
<PackageReference Include="Cel.NET" Version="0.3.2" />
3838
</ItemGroup>
3939

4040
<ItemGroup>

src/Confluent.SchemaRegistry.Serdes.Avro/AvroUtils.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,9 @@ private static RuleContext.Type GetType(Avro.Schema schema)
165165
case Avro.Schema.Type.Boolean:
166166
return RuleContext.Type.Boolean;
167167
case Avro.Schema.Type.Null:
168+
return RuleContext.Type.Null;
169+
case Avro.Schema.Type.Logical:
170+
return GetType((schema as LogicalSchema).BaseSchema);
168171
default:
169172
return RuleContext.Type.Null;
170173
}

test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -696,12 +696,12 @@ public void GenericRecordCELCondition()
696696
schema.RuleSet = new RuleSet(new List<Rule>(),
697697
new List<Rule>
698698
{
699-
new Rule("testCEL", RuleKind.Condition, RuleMode.Write, "CEL", null, null,
699+
new Rule("testCEL", RuleKind.Condition, RuleMode.Write, "CEL", null, null,
700700
"message.name == 'awesome'", null, null, false)
701701
}
702702
);
703703
store[schemaStr] = 1;
704-
subjectStore["topic-value"] = new List<RegisteredSchema> { schema };
704+
subjectStore["topic-value"] = new List<RegisteredSchema> { schema };
705705
var config = new AvroSerializerConfig
706706
{
707707
AutoRegisterSchemas = false,
@@ -724,6 +724,47 @@ public void GenericRecordCELCondition()
724724
Assert.Equal(user["favorite_number"], result["favorite_number"]);
725725
}
726726

727+
[Fact]
728+
public void GenericRecordCELConditionLogicalType()
729+
{
730+
var uuid = "550e8400-e29b-41d4-a716-446655440000";
731+
var schemaStr = "{\"type\":\"record\",\"name\":\"UserWithPic\",\"namespace\":\"Confluent.Kafka.Examples.AvroSpecific" +
732+
"\",\"fields\":[{\"name\":\"name\",\"type\":{\"type\":\"string\",\"logicalType\":\"uuid\"}},{\"name\":\"favorite_number\"," +
733+
"\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}," +
734+
"{\"name\":\"picture\",\"type\":[\"null\",\"bytes\"],\"default\":null}]}";
735+
736+
var schema = new RegisteredSchema("topic-value", 1, 1, schemaStr, SchemaType.Avro, null);
737+
schema.RuleSet = new RuleSet(new List<Rule>(),
738+
new List<Rule>
739+
{
740+
new Rule("testCEL", RuleKind.Condition, RuleMode.Write, "CEL", null, null,
741+
"message.name == '" + uuid + "'", null, null, false)
742+
}
743+
);
744+
store[schemaStr] = 1;
745+
subjectStore["topic-value"] = new List<RegisteredSchema> { schema };
746+
var config = new AvroSerializerConfig
747+
{
748+
AutoRegisterSchemas = false,
749+
UseLatestVersion = true
750+
};
751+
var serializer = new AvroSerializer<GenericRecord>(schemaRegistryClient, config);
752+
var deserializer = new AvroDeserializer<GenericRecord>(schemaRegistryClient, null);
753+
754+
var user = new GenericRecord((RecordSchema) Avro.Schema.Parse(schemaStr));
755+
user.Add("name", uuid);
756+
user.Add("favorite_number", 100);
757+
user.Add("favorite_color", "blue");
758+
759+
Headers headers = new Headers();
760+
var bytes = serializer.SerializeAsync(user, new SerializationContext(MessageComponentType.Value, testTopic, headers)).Result;
761+
var result = deserializer.DeserializeAsync(bytes, false, new SerializationContext(MessageComponentType.Value, testTopic, headers)).Result;
762+
763+
Assert.Equal(new Guid(uuid), result["name"]);
764+
Assert.Equal(user["favorite_color"], result["favorite_color"]);
765+
Assert.Equal(user["favorite_number"], result["favorite_number"]);
766+
}
767+
727768
[Fact]
728769
public void GenericRecordCELConditionFail()
729770
{

0 commit comments

Comments
 (0)