88import threading
99import time
1010import uuid
11- from typing import Any , Callable , Dict , List , Optional , Union
11+ from typing import Callable , Dict , List , Optional , Union
1212
1313import pika
1414from pika .exceptions import NackError , UnroutableError
15- from prometheus_client import start_http_server , Counter , Enum
1615from sseclient import SSEClient
1716
1817from pycti .api .opencti_api_client import OpenCTIApiClient
@@ -28,16 +27,16 @@ def get_config_variable(
2827 yaml_path : List ,
2928 config : Dict = {},
3029 isNumber : Optional [bool ] = False ,
31- default : Optional [ Any ] = None ,
30+ default = None ,
3231) -> Union [bool , int , None , str ]:
3332 """[summary]
3433
3534 :param env_var: environnement variable name
3635 :param yaml_path: path to yaml config
3736 :param config: client config dict, defaults to {}
3837 :param isNumber: specify if the variable is a number, defaults to False
39- :param default: default value
4038 """
39+
4140 if os .getenv (env_var ) is not None :
4241 result = os .getenv (env_var )
4342 elif yaml_path is not None :
@@ -63,6 +62,7 @@ def create_ssl_context() -> ssl.SSLContext:
6362
6463 `ssl` uses bitwise operations to specify context `<enum 'Options'>`
6564 """
65+
6666 ssl_context_options : List [int ] = [
6767 ssl .OP_NO_COMPRESSION ,
6868 ssl .OP_NO_TICKET , # pylint: disable=no-member
@@ -122,6 +122,7 @@ def _process_message(self, channel, method, properties, body) -> None:
122122 :param body: message body (data)
123123 :type body: str or bytes or bytearray
124124 """
125+
125126 json_data = json .loads (body )
126127 self .thread = threading .Thread (target = self ._data_handler , args = [json_data ])
127128 self .thread .start ()
@@ -429,54 +430,12 @@ def __init__(self, config: Dict) -> None:
429430 False ,
430431 False ,
431432 )
432- # Start up the server to expose the metrics.
433- self .expose_metrics = get_config_variable (
434- "CONNECTOR_EXPOSE_METRICS" ,
435- ["connector" , "expose_metrics" ],
436- config ,
437- False ,
438- False ,
439- )
440- metrics_port = get_config_variable (
441- "CONNECTOR_METRICS_PORT" , ["connector" , "metrics_port" ], config , True , 9095
442- )
443- self .metrics = None
444- if self .expose_metrics :
445- self .log_info (f"Exposing metrics on port { metrics_port } " )
446- start_http_server (metrics_port )
447-
448- self .metrics = {
449- "bundle_send" : Counter (
450- "bundle_send" ,
451- "Number of bundle send" ,
452- ),
453- "record_send" : Counter (
454- "record_send" ,
455- "Number of record (objects per bundle) send" ,
456- ),
457- "run_count" : Counter (
458- "run_count" ,
459- "Number of run" ,
460- ),
461- "error_count" : Counter (
462- "error_count" ,
463- "Number of error" ,
464- ),
465- "client_error_count" : Counter (
466- "client_error_count" ,
467- "Number of client error" ,
468- ),
469- "state" : Enum (
470- "state" , "State of connector" , states = ["idle" , "running" , "stopped" ]
471- ),
472- }
473433
474434 # Configure logger
475435 numeric_level = getattr (
476436 logging , self .log_level .upper () if self .log_level else "INFO" , None
477437 )
478438 if not isinstance (numeric_level , int ):
479- self .metric_inc ("error_count" )
480439 raise ValueError (f"Invalid log level: { self .log_level } " )
481440 logging .basicConfig (level = numeric_level )
482441
@@ -514,26 +473,6 @@ def __init__(self, config: Dict) -> None:
514473 # self.listen_stream = None
515474 self .listen_queue = None
516475
517- def metric_inc (self , metric : str , n : int = 1 ):
518- """Increase the given metric counter by `n`.
519-
520- :param metric: metric name
521- :type metric: str
522- :param n: increase the counter by `n`
523- :type n: int
524- """
525- if self .metrics is not None :
526- self .metrics [metric ].inc (n )
527-
528- def metric_state (self , state : str ):
529- """Set the state of the `state` metric.
530-
531- :param state: new `state` to set
532- :type state: str
533- """
534- if self .metrics is not None :
535- self .metrics ["state" ].state (state )
536-
537476 def stop (self ) -> None :
538477 if self .listen_queue :
539478 self .listen_queue .stop ()
@@ -554,6 +493,7 @@ def set_state(self, state) -> None:
554493 :param state: state object
555494 :type state: Dict
556495 """
496+
557497 self .connector_state = json .dumps (state )
558498
559499 def get_state (self ) -> Optional [Dict ]:
@@ -562,6 +502,7 @@ def get_state(self) -> Optional[Dict]:
562502 :return: returns the current state of the connector if there is any
563503 :rtype:
564504 """
505+
565506 try :
566507 if self .connector_state :
567508 state = json .loads (self .connector_state )
@@ -577,6 +518,7 @@ def listen(self, message_callback: Callable[[Dict], str]) -> None:
577518 :param message_callback: callback function to process messages
578519 :type message_callback: Callable[[Dict], str]
579520 """
521+
580522 self .listen_queue = ListenQueue (self , self .config , message_callback )
581523 self .listen_queue .start ()
582524
@@ -593,6 +535,7 @@ def listen_stream(
593535
594536 :param message_callback: callback function to process messages
595537 """
538+
596539 self .listen_stream = ListenStream (
597540 self ,
598541 message_callback ,
@@ -744,10 +687,8 @@ def _send_bundle(self, channel, bundle, **kwargs) -> None:
744687 ),
745688 )
746689 logging .info ("Bundle has been sent" )
747- self .metric_inc ("bundle_send" )
748690 except (UnroutableError , NackError ) as e :
749691 logging .error ("Unable to send bundle, retry...%s" , e )
750- self .metric_inc ("error_count" )
751692 self ._send_bundle (channel , bundle , ** kwargs )
752693
753694 def split_stix2_bundle (self , bundle ) -> list :
@@ -759,12 +700,12 @@ def split_stix2_bundle(self, bundle) -> list:
759700 :return: returns a list of bundles
760701 :rtype: list
761702 """
703+
762704 self .cache_index = {}
763705 self .cache_added = []
764706 try :
765707 bundle_data = json .loads (bundle )
766708 except Exception as e :
767- self .metric_inc ("error_count" )
768709 raise Exception ("File data is not a valid JSON" ) from e
769710
770711 # validation = validate_parsed_json(bundle_data)
@@ -840,6 +781,7 @@ def stix2_get_entity_objects(self, entity) -> list:
840781 :return: entity objects as list
841782 :rtype: list
842783 """
784+
843785 items = [entity ]
844786 # Get embedded objects
845787 embedded_objects = self .stix2_get_embedded_objects (entity )
@@ -860,6 +802,7 @@ def stix2_get_relationship_objects(self, relationship) -> list:
860802 :return: list of relations objects
861803 :rtype: list
862804 """
805+
863806 items = [relationship ]
864807 # Get source ref
865808 if relationship ["source_ref" ] in self .cache_index :
@@ -888,6 +831,7 @@ def stix2_get_report_objects(self, report) -> list:
888831 :return: list of items for a stix2 report object
889832 :rtype: list
890833 """
834+
891835 items = [report ]
892836 # Add all object refs
893837 for object_ref in report ["object_refs" ]:
@@ -908,6 +852,7 @@ def stix2_deduplicate_objects(items) -> list:
908852 :return: de-duplicated list of items
909853 :rtype: list
910854 """
855+
911856 ids = []
912857 final_items = []
913858 for item in items :
@@ -925,6 +870,7 @@ def stix2_create_bundle(items) -> Optional[str]:
925870 :return: JSON of the stix2 bundle
926871 :rtype:
927872 """
873+
928874 bundle = {
929875 "type" : "bundle" ,
930876 "id" : f"bundle--{ uuid .uuid4 ()} " ,
@@ -944,6 +890,7 @@ def check_max_tlp(tlp: str, max_tlp: str) -> bool:
944890 :return: TLP level in allowed TLPs
945891 :rtype: bool
946892 """
893+
947894 allowed_tlps : Dict [str , List [str ]] = {
948895 "TLP:RED" : ["TLP:WHITE" , "TLP:GREEN" , "TLP:AMBER" , "TLP:RED" ],
949896 "TLP:AMBER" : ["TLP:WHITE" , "TLP:GREEN" , "TLP:AMBER" ],
0 commit comments