|
| 1 | +from lib.logging_utils import init_logger |
| 2 | +from lib.vcon_redis import VconRedis |
| 3 | +from redis_mgr import redis |
| 4 | + |
| 5 | +logger = init_logger(__name__) |
| 6 | + |
| 7 | +default_options = { |
| 8 | + # Dictionary mapping tags to target Redis lists |
| 9 | + # e.g., {"important": "important_vcons", "urgent": "urgent_vcons"} |
| 10 | + "tag_routes": {}, |
| 11 | + # Whether to continue normal processing after routing |
| 12 | + "forward_original": True, |
| 13 | +} |
| 14 | + |
| 15 | +def run(vcon_uuid, link_name, opts=default_options): |
| 16 | + """Tag Router link that routes vCons to different Redis lists based on tags in attachments. |
| 17 | + |
| 18 | + Args: |
| 19 | + vcon_uuid: UUID of the vCon to process |
| 20 | + link_name: Name of this link instance |
| 21 | + opts: Link options containing: |
| 22 | + tag_routes: Dictionary mapping tags to target Redis lists |
| 23 | + forward_original: Whether to continue normal processing after routing |
| 24 | + |
| 25 | + Returns: |
| 26 | + vcon_uuid if the vCon should continue normal processing, None otherwise |
| 27 | + """ |
| 28 | + logger.debug(f"Starting {__name__}::run") |
| 29 | + |
| 30 | + # Merge options |
| 31 | + merged_opts = default_options.copy() |
| 32 | + merged_opts.update(opts) |
| 33 | + opts = merged_opts |
| 34 | + |
| 35 | + # Get the vCon |
| 36 | + vcon_redis = VconRedis() |
| 37 | + vcon = vcon_redis.get_vcon(vcon_uuid) |
| 38 | + if not vcon: |
| 39 | + logger.error(f"Could not find vCon {vcon_uuid}") |
| 40 | + return None |
| 41 | + |
| 42 | + # Check if there are any tag routes configured |
| 43 | + if not opts.get("tag_routes"): |
| 44 | + logger.warning(f"No tag routes configured for {link_name}, skipping") |
| 45 | + return vcon_uuid |
| 46 | + |
| 47 | + # Extract all tags from attachments |
| 48 | + tags = [] |
| 49 | + for attachment in vcon.attachments: |
| 50 | + # Handle only plural "tags" format |
| 51 | + if attachment['type'] == "tags" and 'body' in attachment: |
| 52 | + if isinstance(attachment['body'], list): |
| 53 | + # Process each tag string in the list |
| 54 | + for tag_str in attachment['body']: |
| 55 | + if isinstance(tag_str, str) and ":" in tag_str: |
| 56 | + # Split on first colon to get tag name |
| 57 | + tag_name = tag_str.split(":", 1)[0] |
| 58 | + if tag_name: |
| 59 | + tags.append(tag_name) |
| 60 | + elif isinstance(attachment['body'], dict): |
| 61 | + # If body is a dict, use the keys as tags |
| 62 | + for tag_name in attachment['body'].keys(): |
| 63 | + if tag_name: |
| 64 | + tags.append(tag_name) |
| 65 | + |
| 66 | + if not tags: |
| 67 | + logger.debug(f"No tags found in vCon {vcon_uuid}") |
| 68 | + return vcon_uuid if opts.get("forward_original") else None |
| 69 | + |
| 70 | + # Route the vCon to the appropriate Redis lists based on tags |
| 71 | + routed = False |
| 72 | + for tag in tags: |
| 73 | + if tag in opts["tag_routes"]: |
| 74 | + target_list = opts["tag_routes"][tag] |
| 75 | + logger.info(f"Routing vCon {vcon_uuid} to list '{target_list}' based on tag '{tag}'") |
| 76 | + # Push the vCon UUID to the target Redis list |
| 77 | + redis.rpush(target_list, str(vcon_uuid)) |
| 78 | + routed = True |
| 79 | + else: |
| 80 | + logger.debug(f"No route configured for tag '{tag}'") |
| 81 | + |
| 82 | + if routed: |
| 83 | + logger.info(f"Successfully routed vCon {vcon_uuid} based on tags") |
| 84 | + else: |
| 85 | + logger.info(f"No applicable routes found for vCon {vcon_uuid}") |
| 86 | + |
| 87 | + # Return based on forward_original setting |
| 88 | + return vcon_uuid if opts.get("forward_original") else None |
0 commit comments