88import threading
99import time
1010import uuid
11- from typing import Callable , Dict , List , Optional , Union
11+ from typing import Any , Callable , Dict , List , Optional , Union
1212
1313import pika
1414from pika .exceptions import NackError , UnroutableError
15+ from prometheus_client import start_http_server , Counter , Enum
1516from sseclient import SSEClient
1617
1718from pycti .api .opencti_api_client import OpenCTIApiClient
@@ -27,16 +28,16 @@ def get_config_variable(
2728 yaml_path : List ,
2829 config : Dict = {},
2930 isNumber : Optional [bool ] = False ,
30- default = None ,
31+ default : Optional [ Any ] = None ,
3132) -> Union [bool , int , None , str ]:
3233 """[summary]
3334
3435 :param env_var: environnement variable name
3536 :param yaml_path: path to yaml config
3637 :param config: client config dict, defaults to {}
3738 :param isNumber: specify if the variable is a number, defaults to False
39+ :param default: default value
3840 """
39-
4041 if os .getenv (env_var ) is not None :
4142 result = os .getenv (env_var )
4243 elif yaml_path is not None :
@@ -62,7 +63,6 @@ def create_ssl_context() -> ssl.SSLContext:
6263
6364 `ssl` uses bitwise operations to specify context `<enum 'Options'>`
6465 """
65-
6666 ssl_context_options : List [int ] = [
6767 ssl .OP_NO_COMPRESSION ,
6868 ssl .OP_NO_TICKET , # pylint: disable=no-member
@@ -122,7 +122,6 @@ def _process_message(self, channel, method, properties, body) -> None:
122122 :param body: message body (data)
123123 :type body: str or bytes or bytearray
124124 """
125-
126125 json_data = json .loads (body )
127126 self .thread = threading .Thread (target = self ._data_handler , args = [json_data ])
128127 self .thread .start ()
@@ -430,12 +429,54 @@ def __init__(self, config: Dict) -> None:
430429 False ,
431430 False ,
432431 )
432+ # Start up the server to expose the metrics.
433+ self .expose_metrics = get_config_variable (
434+ "CONNECTOR_EXPOSE_METRICS" ,
435+ ["connector" , "expose_metrics" ],
436+ config ,
437+ False ,
438+ False ,
439+ )
440+ metrics_port = get_config_variable (
441+ "CONNECTOR_METRICS_PORT" , ["connector" , "metrics_port" ], config , True , 9095
442+ )
443+ self .metrics = None
444+ if self .expose_metrics :
445+ self .log_info (f"Exposing metrics on port { metrics_port } " )
446+ start_http_server (metrics_port )
447+
448+ self .metrics = {
449+ "bundle_send" : Counter (
450+ "bundle_send" ,
451+ "Number of bundle send" ,
452+ ),
453+ "record_send" : Counter (
454+ "record_send" ,
455+ "Number of record (objects per bundle) send" ,
456+ ),
457+ "run_count" : Counter (
458+ "run_count" ,
459+ "Number of run" ,
460+ ),
461+ "error_count" : Counter (
462+ "error_count" ,
463+ "Number of error" ,
464+ ),
465+ "client_error_count" : Counter (
466+ "client_error_count" ,
467+ "Number of client error" ,
468+ ),
469+ "state" : Enum (
470+ "state" , "State of connector" , states = ["idle" , "running" , "stopped" ]
471+ ),
472+ }
433473
434474 # Configure logger
435475 numeric_level = getattr (
436476 logging , self .log_level .upper () if self .log_level else "INFO" , None
437477 )
438478 if not isinstance (numeric_level , int ):
479+ self .metric_inc ("error_count" )
439480 raise ValueError (f"Invalid log level: { self .log_level } " )
440481 logging .basicConfig (level = numeric_level )
441482
@@ -473,6 +514,26 @@ def __init__(self, config: Dict) -> None:
473514 # self.listen_stream = None
474515 self .listen_queue = None
475516
517+ def metric_inc (self , metric : str , n : int = 1 ):
518+ """Increase the given metric counter by `n`.
519+
520+ :param metric: metric name
521+ :type metric: str
522+ :param n: increase the counter by `n`
523+ :type n: int
524+ """
525+ if self .metrics is not None :
526+ self .metrics [metric ].inc (n )
527+
528+ def metric_state (self , state : str ):
529+ """Set the state of the `state` metric.
530+
531+ :param state: new `state` to set
532+ :type state: str
533+ """
534+ if self .metrics is not None :
535+ self .metrics ["state" ].state (state )
536+
476537 def stop (self ) -> None :
477538 if self .listen_queue :
478539 self .listen_queue .stop ()
@@ -493,7 +554,6 @@ def set_state(self, state) -> None:
493554 :param state: state object
494555 :type state: Dict
495556 """
496-
497557 self .connector_state = json .dumps (state )
498558
499559 def get_state (self ) -> Optional [Dict ]:
@@ -502,7 +562,6 @@ def get_state(self) -> Optional[Dict]:
502562 :return: returns the current state of the connector if there is any
503563 :rtype:
504564 """
505-
506565 try :
507566 if self .connector_state :
508567 state = json .loads (self .connector_state )
@@ -518,7 +577,6 @@ def listen(self, message_callback: Callable[[Dict], str]) -> None:
518577 :param message_callback: callback function to process messages
519578 :type message_callback: Callable[[Dict], str]
520579 """
521-
522580 self .listen_queue = ListenQueue (self , self .config , message_callback )
523581 self .listen_queue .start ()
524582
@@ -535,7 +593,6 @@ def listen_stream(
535593
536594 :param message_callback: callback function to process messages
537595 """
538-
539596 self .listen_stream = ListenStream (
540597 self ,
541598 message_callback ,
@@ -687,8 +744,10 @@ def _send_bundle(self, channel, bundle, **kwargs) -> None:
687744 ),
688745 )
689746 logging .info ("Bundle has been sent" )
747+ self .metric_inc ("bundle_send" )
690748 except (UnroutableError , NackError ) as e :
691749 logging .error ("Unable to send bundle, retry...%s" , e )
750+ self .metric_inc ("error_count" )
692751 self ._send_bundle (channel , bundle , ** kwargs )
693752
694753 def split_stix2_bundle (self , bundle ) -> list :
@@ -700,12 +759,12 @@ def split_stix2_bundle(self, bundle) -> list:
700759 :return: returns a list of bundles
701760 :rtype: list
702761 """
703-
704762 self .cache_index = {}
705763 self .cache_added = []
706764 try :
707765 bundle_data = json .loads (bundle )
708766 except Exception as e :
767+ self .metric_inc ("error_count" )
709768 raise Exception ("File data is not a valid JSON" ) from e
710769
711770 # validation = validate_parsed_json(bundle_data)
@@ -781,7 +840,6 @@ def stix2_get_entity_objects(self, entity) -> list:
781840 :return: entity objects as list
782841 :rtype: list
783842 """
784-
785843 items = [entity ]
786844 # Get embedded objects
787845 embedded_objects = self .stix2_get_embedded_objects (entity )
@@ -802,7 +860,6 @@ def stix2_get_relationship_objects(self, relationship) -> list:
802860 :return: list of relations objects
803861 :rtype: list
804862 """
805-
806863 items = [relationship ]
807864 # Get source ref
808865 if relationship ["source_ref" ] in self .cache_index :
@@ -831,7 +888,6 @@ def stix2_get_report_objects(self, report) -> list:
831888 :return: list of items for a stix2 report object
832889 :rtype: list
833890 """
834-
835891 items = [report ]
836892 # Add all object refs
837893 for object_ref in report ["object_refs" ]:
@@ -852,7 +908,6 @@ def stix2_deduplicate_objects(items) -> list:
852908 :return: de-duplicated list of items
853909 :rtype: list
854910 """
855-
856911 ids = []
857912 final_items = []
858913 for item in items :
@@ -870,7 +925,6 @@ def stix2_create_bundle(items) -> Optional[str]:
870925 :return: JSON of the stix2 bundle
871926 :rtype:
872927 """
873-
874928 bundle = {
875929 "type" : "bundle" ,
876930 "id" : f"bundle--{ uuid .uuid4 ()} " ,
@@ -890,7 +944,6 @@ def check_max_tlp(tlp: str, max_tlp: str) -> bool:
890944 :return: TLP level in allowed TLPs
891945 :rtype: bool
892946 """
893-
894947 allowed_tlps : Dict [str , List [str ]] = {
895948 "TLP:RED" : ["TLP:WHITE" , "TLP:GREEN" , "TLP:AMBER" , "TLP:RED" ],
896949 "TLP:AMBER" : ["TLP:WHITE" , "TLP:GREEN" , "TLP:AMBER" ],
0 commit comments