77
88import confluent_kafka
99import confluent_kafka .schema_registry .avro
10+ import requests
11+
1012from jinja2 import Template
1113from slack import WebClient
1214from slack .errors import SlackApiError
1315
1416from . import config , message_schemas , common
1517
1618
17- logger = logging .getLogger ('plan_monitor.notify_slack ' )
19+ logger = logging .getLogger ('plan_monitor.notify ' )
1820
1921
20- def notify (slack_client : WebClient , msg : confluent_kafka .Message ) -> None :
22+ def notify_slack (slack_client : WebClient , msg : confluent_kafka .Message ) -> None :
2123 prior_plans_count , prior_times_sum , prior_reads_sum , prior_execs_sum , prior_last_execution = 0 , 0 , 0 , 0 , 0
2224 msg_val = dict (msg .value ())
2325 for pp in msg_val ['prior_plans' ]:
@@ -54,15 +56,30 @@ def notify(slack_client: WebClient, msg: confluent_kafka.Message) -> None:
5456 logger .warning (f"Error sending message to Slack: { e .response .get ('error' , '<none>' )} " )
5557
5658
57- def notify_slack () -> None :
59+ def notify_http (msg : confluent_kafka .Message ) -> None :
60+ if config .HTTP_NOTIFY_TEMPLATE :
61+ template = Template (config .HTTP_NOTIFY_TEMPLATE )
62+ body = template .render (msg = msg )
63+ else :
64+ body = json .dumps (msg )
65+
66+ headers = json .loads (config .HTTP_NOTIFY_HEADERS ) if config .HTTP_NOTIFY_HEADERS else {}
67+ resp = requests .post (config .HTTP_NOTIFY_URL , data = body , headers = headers , timeout = 5.0 )
68+ resp .raise_for_status ()
69+
70+ logger .debug ('Posted eviction notification to %s with code %s and response: %s' , config .HTTP_NOTIFY_URL ,
71+ resp .status_code , resp .text )
72+
73+
74+ def notify () -> None :
5875 schema_registry = confluent_kafka .schema_registry .SchemaRegistryClient ({'url' : config .SCHEMA_REGISTRY_URL })
5976 key_deserializer = confluent_kafka .schema_registry .avro .AvroDeserializer (
6077 message_schemas .EVICTED_PLANS_MESSAGE_KEY_AVRO_SCHEMA , schema_registry )
6178 value_deserializer = confluent_kafka .schema_registry .avro .AvroDeserializer (
6279 message_schemas .EVICTED_PLANS_MESSAGE_VALUE_AVRO_SCHEMA , schema_registry )
6380
6481 consumer_config = {'bootstrap.servers' : config .KAFKA_BOOTSTRAP_SERVERS ,
65- 'group.id' : f'sqlserver_plan_regression_monitor_notify_slack_ { socket .getfqdn ()} ' ,
82+ 'group.id' : f'sqlserver_plan_regression_monitor_notify_ { socket .getfqdn ()} ' ,
6683 'key.deserializer' : key_deserializer ,
6784 'value.deserializer' : value_deserializer ,
6885 'enable.auto.commit' : True ,
@@ -72,10 +89,12 @@ def notify_slack() -> None:
7289
7390 kafka_consumer = confluent_kafka .DeserializingConsumer (consumer_config )
7491 kafka_consumer .subscribe ([config .EVICTED_PLANS_TOPIC ])
75- slack_client = WebClient (token = config .SLACK_API_TOKEN )
7692 last_log_heartbeat = datetime .utcnow ()
7793 log_heartbeat_interval_seconds = 60
7894
95+ if config .SLACK_NOTIFY_CHANNEL :
96+ slack_client = WebClient (token = config .SLACK_API_TOKEN )
97+
7998 try :
8099 while True :
81100 if (datetime .utcnow () - last_log_heartbeat ).total_seconds () > log_heartbeat_interval_seconds :
@@ -88,10 +107,17 @@ def notify_slack() -> None:
88107 if msg is None :
89108 continue
90109
91- logger .info (f'Notifying Slack channel { config .SLACK_NOTIFY_CHANNEL } for message '
92- f'@ { common .msg_coordinates (msg )} ' )
110+ if config .SLACK_NOTIFY_CHANNEL :
111+ logger .info (f'Notifying Slack channel { config .SLACK_NOTIFY_CHANNEL } for message '
112+ f'@ { common .msg_coordinates (msg )} ' )
113+ notify_slack (slack_client , msg )
114+
115+ if config .HTTP_NOTIFY_URL :
116+ logger .info (f'Notifying via HTTP POST for message '
117+ f'@ { common .msg_coordinates (msg )} ' )
118+ notify_http (msg )
119+
93120 last_log_heartbeat = datetime .utcnow ()
94- notify (slack_client , msg )
95121 kafka_consumer .store_offsets (msg )
96122 except KeyboardInterrupt :
97123 logger .info ('Received interrupt request; shutting down...' )
@@ -102,4 +128,4 @@ def notify_slack() -> None:
102128
103129if __name__ == "__main__" :
104130 logger .info ('Starting...' )
105- notify_slack ()
131+ notify ()
0 commit comments