2020import pika
2121import uvicorn
2222from fastapi import FastAPI , Request
23+ from fastapi .responses import JSONResponse
2324from filigran_sseclient import SSEClient
2425from pika .exceptions import NackError , UnroutableError
2526from pydantic import TypeAdapter
@@ -45,7 +46,7 @@ def start_loop(loop):
4546
4647
4748def get_config_variable (
48- env_var : Union [ str , List [ str ]] ,
49+ env_var : str ,
4950 yaml_path : List ,
5051 config : Dict = {},
5152 isNumber : Optional [bool ] = False ,
@@ -61,17 +62,8 @@ def get_config_variable(
6162 :param default: default value
6263 """
6364
64- # Get env var
65- env_result = None
66- env_vars = env_var if type (env_var ) is list else [env_var ]
67- for var in env_vars :
68- if os .getenv (var ) is not None :
69- env_result = os .getenv (var )
70- break
71-
72- # If env not found check in config file
73- if env_result is not None :
74- result = env_result
65+ if os .getenv (env_var ) is not None :
66+ result = os .getenv (env_var )
7567 elif yaml_path is not None :
7668 if yaml_path [0 ] in config and yaml_path [1 ] in config [yaml_path [0 ]]:
7769 result = config [yaml_path [0 ]][yaml_path [1 ]]
@@ -427,26 +419,37 @@ def _data_handler(self, json_data) -> None:
427419 "Failing reporting the processing"
428420 )
429421
430- async def _process_callback (self , request : Request ):
422+ async def _http_process_callback (self , request : Request ):
431423 # 01. Check the authentication
432- try :
433- authorization : str = request .headers .get ("Authorization" )
434- scheme , token = authorization .split ()
435- if scheme .lower () != "bearer" or token != self .opencti_token :
436- return {"error" : "Invalid credentials" }
437- except Exception :
438- return {"error" : "Invalid credentials" }
424+ authorization : str = request .headers .get ("Authorization" , "" )
425+ items = authorization .split () if isinstance (authorization , str ) else []
426+ if (
427+ len (items ) != 2
428+ or items [0 ].lower () != "bearer"
429+ or items [1 ] != self .opencti_token
430+ ):
431+ return JSONResponse (
432+ status_code = 401 , content = {"error" : "Invalid credentials" }
433+ )
439434 # 02. Parse the data and execute
440435 try :
441436 data = await request .json () # Get the JSON payload
442- except Exception as e :
443- return {"error" : "Invalid JSON payload" , "details" : str (e )}
437+ except json .JSONDecodeError as e :
438+ return JSONResponse (
439+ status_code = 400 ,
440+ content = {"error" : "Invalid JSON payload" , "details" : str (e )},
441+ )
444442 try :
445443 self ._data_handler (data )
446444 except Exception as e :
447- return {"error" : "Error processing message" , "details" : str (e )}
445+ return JSONResponse (
446+ status_code = 500 ,
447+ content = {"error" : "Error processing message" , "details" : str (e )},
448+ )
448449 # all good
449- return {"message" : "Message successfully processed" }
450+ return JSONResponse (
451+ status_code = 202 , content = {"message" : "Message successfully processed" }
452+ )
450453
451454 def run (self ) -> None :
452455 if self .listen_protocol == "AMQP" :
@@ -505,7 +508,9 @@ def run(self) -> None:
505508 elif self .listen_protocol == "API" :
506509 self .helper .connector_logger .info ("Starting Listen HTTP thread" )
507510 app .add_api_route (
508- self .listen_protocol_api_path , self ._process_callback , methods = ["POST" ]
511+ self .listen_protocol_api_path ,
512+ self ._http_process_callback ,
513+ methods = ["POST" ],
509514 )
510515 config = uvicorn .Config (
511516 app ,
@@ -927,12 +932,6 @@ def __init__(self, config: Dict, playbook_compatible=False) -> None:
927932 else "http://127.0.0.1:7070"
928933 ),
929934 )
930- self .queue_protocol = get_config_variable (
931- ["QUEUE_PROTOCOL" , "CONNECTOR_QUEUE_PROTOCOL" ],
932- ["connector" , "queue_protocol" ],
933- config ,
934- default = "amqp" ,
935- )
936935 self .connect_type = get_config_variable (
937936 "CONNECTOR_TYPE" , ["connector" , "type" ], config
938937 )
@@ -1113,6 +1112,25 @@ def __init__(self, config: Dict, playbook_compatible=False) -> None:
11131112 self .connector_state = connector_configuration ["connector_state" ]
11141113 self .connector_config = connector_configuration ["config" ]
11151114
1115+ # Configure the push information protocol
1116+ self .queue_protocol = get_config_variable (
1117+ env_var = "CONNECTOR_QUEUE_PROTOCOL" ,
1118+ yaml_path = ["connector" , "queue_protocol" ],
1119+ config = config ,
1120+ )
1121+ if not self .queue_protocol : # for backwards compatibility
1122+ self .queue_protocol = get_config_variable (
1123+ env_var = "QUEUE_PROTOCOL" ,
1124+ yaml_path = ["connector" , "queue_protocol" ],
1125+ config = config ,
1126+ )
1127+ if self .queue_protocol :
1128+ self .connector_logger .error (
1129+ "QUEUE_PROTOCOL is deprecated, please use CONNECTOR_QUEUE_PROTOCOL instead."
1130+ )
1131+ if not self .queue_protocol :
1132+ self .queue_protocol = "amqp"
1133+
11161134 # Overwrite connector config for RabbitMQ if given manually / in conf
11171135 self .connector_config ["connection" ]["host" ] = get_config_variable (
11181136 "MQ_HOST" ,
0 commit comments