2020from typing import List , Optional , Set , Dict , Any
2121
2222from confluent_kafka .schema_registry import RegisteredSchema
23+ from confluent_kafka .schema_registry .common .schema_registry_client import \
24+ RulePhase
2325from confluent_kafka .schema_registry .common .serde import ErrorAction , \
2426 FieldTransformer , Migration , NoneAction , RuleAction , \
2527 RuleConditionError , RuleContext , RuleError , SchemaId
@@ -59,6 +61,17 @@ def _execute_rules(
5961 source : Optional [Schema ], target : Optional [Schema ],
6062 message : Any , inline_tags : Optional [Dict [str , Set [str ]]],
6163 field_transformer : Optional [FieldTransformer ]
64+ ) -> Any :
65+ return self ._execute_rules_with_phase (
66+ ser_ctx , subject , RulePhase .DOMAIN , rule_mode ,
67+ source , target , message , inline_tags , field_transformer )
68+
69+ def _execute_rules_with_phase (
70+ self , ser_ctx : SerializationContext , subject : str ,
71+ rule_phase : RulePhase , rule_mode : RuleMode ,
72+ source : Optional [Schema ], target : Optional [Schema ],
73+ message : Any , inline_tags : Optional [Dict [str , Set [str ]]],
74+ field_transformer : Optional [FieldTransformer ]
6275 ) -> Any :
6376 if message is None or target is None :
6477 return message
@@ -73,7 +86,10 @@ def _execute_rules(
7386 rules .reverse ()
7487 else :
7588 if target is not None and target .rule_set is not None :
76- rules = target .rule_set .domain_rules
89+ if rule_phase == RulePhase .ENCODING :
90+ rules = target .rule_set .encoding_rules
91+ else :
92+ rules = target .rule_set .domain_rules
7793 if rule_mode == RuleMode .READ :
7894 # Execute read rules in reverse order for symmetry
7995 rules = rules [:] if rules else []
@@ -197,19 +213,25 @@ async def _get_writer_schema(
197213 else :
198214 raise SerializationError ("Schema ID or GUID is not set" )
199215
200- def _has_rules (self , rule_set : RuleSet , mode : RuleMode ) -> bool :
216+ def _has_rules (self , rule_set : RuleSet , phase : RulePhase , mode : RuleMode ) -> bool :
201217 if rule_set is None :
202218 return False
219+ if phase == RulePhase .MIGRATION :
220+ rules = rule_set .migration_rules
221+ elif phase == RulePhase .DOMAIN :
222+ rules = rule_set .domain_rules
223+ elif phase == RulePhase .ENCODING :
224+ rules = rule_set .encoding_rules
203225 if mode in (RuleMode .UPGRADE , RuleMode .DOWNGRADE ):
204226 return any (rule .mode == mode or rule .mode == RuleMode .UPDOWN
205- for rule in rule_set . migration_rules or [])
227+ for rule in rules or [])
206228 elif mode == RuleMode .UPDOWN :
207- return any (rule .mode == mode for rule in rule_set . migration_rules or [])
229+ return any (rule .mode == mode for rule in rules or [])
208230 elif mode in (RuleMode .WRITE , RuleMode .READ ):
209231 return any (rule .mode == mode or rule .mode == RuleMode .WRITEREAD
210- for rule in rule_set . domain_rules or [])
232+ for rule in rules or [])
211233 elif mode == RuleMode .WRITEREAD :
212- return any (rule .mode == mode for rule in rule_set . migration_rules or [])
234+ return any (rule .mode == mode for rule in rules or [])
213235 return False
214236
215237 async def _get_migrations (
@@ -235,7 +257,8 @@ async def _get_migrations(
235257 if i == 0 :
236258 previous = version
237259 continue
238- if version .schema .rule_set is not None and self ._has_rules (version .schema .rule_set , migration_mode ):
260+ if version .schema .rule_set is not None and self ._has_rules (
261+ version .schema .rule_set , RulePhase .MIGRATION , migration_mode ):
239262 if migration_mode == RuleMode .UPGRADE :
240263 migration = Migration (migration_mode , previous , version )
241264 else :
@@ -265,7 +288,8 @@ def _execute_migrations(
265288 migrations : List [Migration ], message : Any
266289 ) -> Any :
267290 for migration in migrations :
268- message = self ._execute_rules (ser_ctx , subject , migration .rule_mode ,
269- migration .source .schema , migration .target .schema ,
270- message , None , None )
291+ message = self ._execute_rules_with_phase (
292+ ser_ctx , subject , RulePhase .MIGRATION , migration .rule_mode ,
293+ migration .source .schema , migration .target .schema ,
294+ message , None , None )
271295 return message
0 commit comments