|
10 | 10 | import time |
11 | 11 | import base64 |
12 | 12 | import os |
| 13 | +import ssl |
13 | 14 |
|
14 | 15 | from typing import Callable, Dict, Optional, Union |
15 | 16 | from sseclient import SSEClient |
@@ -76,6 +77,7 @@ def __init__(self, helper, config: dict, callback): |
76 | 77 | self.helper = helper |
77 | 78 | self.callback = callback |
78 | 79 | self.host = config["connection"]["host"] |
| 80 | + self.use_ssl = config["connection"]["use_ssl"] |
79 | 81 | self.port = config["connection"]["port"] |
80 | 82 | self.user = config["connection"]["user"] |
81 | 83 | self.password = config["connection"]["pass"] |
@@ -135,9 +137,23 @@ def run(self): |
135 | 137 | try: |
136 | 138 | # Connect the broker |
137 | 139 | self.pika_credentials = pika.PlainCredentials(self.user, self.password) |
138 | | - self.pika_parameters = pika.ConnectionParameters( |
139 | | - self.host, self.port, "/", self.pika_credentials |
140 | | - ) |
| 140 | + if self.use_ssl: |
| 141 | + context = ssl.create_default_context() |
| 142 | + ssl_options = pika.SSLOptions(context, self.host) |
| 143 | + self.pika_parameters = pika.ConnectionParameters( |
| 144 | + host=self.host, |
| 145 | + port=self.port, |
| 146 | + virtual_host="/", |
| 147 | + credentials=self.pika_credentials, |
| 148 | + ssl_options=ssl_options, |
| 149 | + ) |
| 150 | + else: |
| 151 | + self.pika_parameters = pika.ConnectionParameters( |
| 152 | + host=self.host, |
| 153 | + port=self.port, |
| 154 | + virtual_host="/", |
| 155 | + credentials=self.pika_credentials, |
| 156 | + ) |
141 | 157 | self.pika_connection = pika.BlockingConnection(self.pika_parameters) |
142 | 158 | self.channel = self.pika_connection.channel() |
143 | 159 | self.channel.basic_consume( |
@@ -531,12 +547,24 @@ def send_stix2_bundle(self, bundle, **kwargs) -> list: |
531 | 547 | pika_credentials = pika.PlainCredentials( |
532 | 548 | self.config["connection"]["user"], self.config["connection"]["pass"] |
533 | 549 | ) |
534 | | - pika_parameters = pika.ConnectionParameters( |
535 | | - self.config["connection"]["host"], |
536 | | - self.config["connection"]["port"], |
537 | | - "/", |
538 | | - pika_credentials, |
539 | | - ) |
| 550 | + if self.config["connection"]["use_ssl"]: |
| 551 | + context = ssl.create_default_context() |
| 552 | + ssl_options = pika.SSLOptions(context, self.config["connection"]["host"]) |
| 553 | + pika_parameters = pika.ConnectionParameters( |
| 554 | + host=self.config["connection"]["host"], |
| 555 | + port=self.config["connection"]["port"], |
| 556 | + virtual_host="/", |
| 557 | + credentials=pika_credentials, |
| 558 | + ssl_options=ssl_options, |
| 559 | + ) |
| 560 | + else: |
| 561 | + pika_parameters = pika.ConnectionParameters( |
| 562 | + host=self.config["connection"]["host"], |
| 563 | + port=self.config["connection"]["port"], |
| 564 | + virtual_host="/", |
| 565 | + credentials=pika_credentials, |
| 566 | + ) |
| 567 | + |
540 | 568 | pika_connection = pika.BlockingConnection(pika_parameters) |
541 | 569 | channel = pika_connection.channel() |
542 | 570 | for sequence, bundle in enumerate(bundles, start=1): |
|
0 commit comments