diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ed8ebf5 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +__pycache__ \ No newline at end of file diff --git a/Week2 experimental(working)/Kafka Output.png b/Week2 experimental(working)/Kafka Output.png new file mode 100644 index 0000000..8161dec Binary files /dev/null and b/Week2 experimental(working)/Kafka Output.png differ diff --git a/Week2 experimental(working)/Logs_generating.png b/Week2 experimental(working)/Logs_generating.png new file mode 100644 index 0000000..722c345 Binary files /dev/null and b/Week2 experimental(working)/Logs_generating.png differ diff --git a/Week2 experimental(working)/fluent.conf b/Week2 experimental(working)/fluent.conf new file mode 100644 index 0000000..1cf2467 --- /dev/null +++ b/Week2 experimental(working)/fluent.conf @@ -0,0 +1,28 @@ +# fluent.conf + + @type forward + port 24224 + bind 0.0.0.0 + + + + @type kafka2 + + # Kafka configuration + brokers localhost:9092 + topic_key topic + default_topic microservice_logs + + # Buffer configuration + + @type memory + chunk_limit_size 1m + queue_limit_length 128 + flush_interval 1s + + + # Data format + + @type json + + \ No newline at end of file diff --git a/Week2 experimental(working)/kafka_utils.py b/Week2 experimental(working)/kafka_utils.py new file mode 100644 index 0000000..71d6dda --- /dev/null +++ b/Week2 experimental(working)/kafka_utils.py @@ -0,0 +1,71 @@ +# kafka_utils.py +from kafka import KafkaProducer, KafkaConsumer +import json +from datetime import datetime +import threading +from typing import Callable, Optional +import logging + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +class KafkaWrapper: + def __init__(self, bootstrap_servers: str = 'localhost:9092'): + self.bootstrap_servers = bootstrap_servers + self.producer = None + self.consumer = None + self._setup_producer() + + def _setup_producer(self): + try: + self.producer = KafkaProducer( + bootstrap_servers=self.bootstrap_servers, + value_serializer=lambda v: json.dumps(v).encode('utf-8') + ) + logger.info("Kafka producer initialized successfully") + except Exception as e: + logger.error(f"Failed to initialize Kafka producer: {e}") + raise + + def send_message(self, topic: str, message: dict): + try: + future = self.producer.send(topic, value=message) + future.get(timeout=10) # Wait for message to be sent + logger.debug(f"Message sent to topic {topic}: {message}") + except Exception as e: + logger.error(f"Failed to send message to Kafka: {e}") + raise + + def start_consumer(self, topic: str, message_handler: Callable, + group_id: Optional[str] = None): + def consume(): + try: + self.consumer = KafkaConsumer( + topic, + bootstrap_servers=self.bootstrap_servers, + group_id=group_id or f'group-{datetime.now().timestamp()}', + value_deserializer=lambda m: json.loads(m.decode('utf-8')), + auto_offset_reset='latest' + ) + + logger.info(f"Started consuming from topic: {topic}") + for message in self.consumer: + try: + message_handler(message.value) + except Exception as e: + logger.error(f"Error processing message: {e}") + + except Exception as e: + logger.error(f"Consumer error: {e}") + raise + + # Start consumer in a separate thread + consumer_thread = threading.Thread(target=consume, daemon=True) + consumer_thread.start() + return consumer_thread + + def close(self): + if self.producer: + self.producer.close() + if self.consumer: + self.consumer.close() \ No newline at end of file diff --git a/Week2 experimental(working)/log_consumer.py b/Week2 experimental(working)/log_consumer.py new file mode 100644 index 0000000..ac4da8a --- /dev/null +++ b/Week2 experimental(working)/log_consumer.py @@ -0,0 +1,90 @@ +# log_consumer.py +from kafka_utils import KafkaWrapper +import logging +from colorama import Fore, Style, init +from datetime import datetime + +# Initialize colorama and logging +init(autoreset=True) +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +class LogConsumer: + def __init__(self): + self.kafka = KafkaWrapper() + + def format_timestamp(self, timestamp_str): + """Format ISO timestamp to a more readable format""" + try: + dt = datetime.fromisoformat(timestamp_str) + return dt.strftime("%Y-%m-%d %H:%M:%S") + except: + return timestamp_str + + def handle_log(self, message): + """Handle incoming log messages""" + log_level = message.get('log_level', 'UNKNOWN') + color = { + 'INFO': Fore.GREEN, + 'WARN': Fore.YELLOW, + 'ERROR': Fore.RED + }.get(log_level, Fore.WHITE) + + timestamp = self.format_timestamp(message.get('timestamp', '')) + service_name = message.get('service_name', 'Unknown') + node_id = message.get('node_id', 'Unknown')[:8] # Show first 8 chars of node_id + msg = message.get('message', '') + + # Add extra details for WARN and ERROR logs + extra_info = "" + if log_level == 'WARN': + response_time = message.get('response_time_ms', '') + threshold = message.get('threshold_limit_ms', '') + if response_time and threshold: + extra_info = f" [Response: {response_time}ms, Threshold: {threshold}ms]" + elif log_level == 'ERROR': + error_details = message.get('error_details', {}) + if error_details: + error_code = error_details.get('error_code', '') + error_msg = error_details.get('error_message', '') + extra_info = f" [Code: {error_code}, Details: {error_msg}]" + + print(f"{color}[{timestamp}] [{log_level}] {service_name} ({node_id}): {msg}{extra_info}{Style.RESET_ALL}") + + def handle_heartbeat(self, message): + """Handle incoming heartbeat messages""" + timestamp = self.format_timestamp(message.get('timestamp', '')) + service_name = message.get('service_name', 'Unknown') + node_id = message.get('node_id', 'Unknown')[:8] + status = message.get('status', 'UNKNOWN') + + color = Fore.GREEN if status == 'UP' else Fore.RED + print(f"{color}[{timestamp}] [HEARTBEAT] {service_name} ({node_id}): Status: {status}{Style.RESET_ALL}") + + def handle_registration(self, message): + """Handle incoming registration messages""" + timestamp = self.format_timestamp(message.get('timestamp', '')) + service_name = message.get('service_name', 'Unknown') + node_id = message.get('node_id', 'Unknown')[:8] + + print(f"{Fore.MAGENTA}[{timestamp}] [REGISTRATION] New service registered: {service_name} ({node_id}){Style.RESET_ALL}") + + def start(self): + """Start consuming messages from all topics""" + self.kafka.start_consumer('microservice_logs', self.handle_log, 'log-consumer') + self.kafka.start_consumer('microservice_heartbeats', self.handle_heartbeat, 'heartbeat-consumer') + self.kafka.start_consumer('microservice_registration', self.handle_registration, 'registration-consumer') + + logger.info("Started consuming messages from all topics") + + # Keep the main thread running + try: + while True: + pass + except KeyboardInterrupt: + logger.info("Shutting down consumers...") + self.kafka.close() + +if __name__ == "__main__": + consumer = LogConsumer() + consumer.start() \ No newline at end of file diff --git a/Week2 experimental(working)/node.py b/Week2 experimental(working)/node.py new file mode 100644 index 0000000..22fd43a --- /dev/null +++ b/Week2 experimental(working)/node.py @@ -0,0 +1,154 @@ +# node.py +import uuid +import time +import threading +import random +import json +from datetime import datetime +from colorama import Fore, Style, init +from threading import Lock +from fluent import sender +from kafka_utils import KafkaWrapper + +# Initialize colorama for colored terminal output +init(autoreset=True) + +# Global lock for synchronized printing +print_lock = Lock() + +class Node: + # Colors for message types + message_colors = { + "registration": Fore.CYAN, + "heartbeat": Fore.RED, + "Log": Fore.GREEN + } + # Colors for keys and values within messages + key_color = Fore.LIGHTMAGENTA_EX + value_color = Fore.LIGHTYELLOW_EX + + def __init__(self, service_name): + self.node_id = str(uuid.uuid4()) + self.service_name = service_name + self.status = "UP" + + # Initialize Fluentd sender + self.fluent_sender = sender.FluentSender( + 'microservice', + host='localhost', + port=24224 + ) + + # Initialize Kafka wrapper + self.kafka = KafkaWrapper() + + self.register_node() + + def format_message(self, message): + """ + Format JSON message to color keys and values distinctly. + """ + formatted_message = "" + for key, value in message.items(): + formatted_message += f"{Node.key_color}{key}{Style.RESET_ALL}: {Node.value_color}{value}{Style.RESET_ALL}, " + return "{" + formatted_message.rstrip(", ") + "}" + + def print_message(self, message_type, message_content): + with print_lock: + color = Node.message_colors[message_type] + formatted_content = self.format_message(json.loads(message_content)) + print(color + f"{message_type.capitalize()}:" + Style.RESET_ALL, formatted_content) + + def send_to_fluentd(self, tag, message): + """Send message to Fluentd""" + try: + self.fluent_sender.emit(tag, message) + except Exception as e: + print(f"Error sending to Fluentd: {e}") + + def send_to_kafka(self, topic, message): + """Send message to Kafka""" + try: + self.kafka.send_message(topic, message) + except Exception as e: + print(f"Error sending to Kafka: {e}") + + def register_node(self): + registration_message = { + "node_id": self.node_id, + "message_type": "REGISTRATION", + "service_name": self.service_name, + "timestamp": datetime.now().isoformat() + } + + # Send to both console and message brokers + self.print_message("registration", json.dumps(registration_message)) + self.send_to_fluentd('registration', registration_message) + self.send_to_kafka('microservice_registration', registration_message) + + def generate_log(self, log_level, message, extra_data=None): + log_message = { + "log_id": str(uuid.uuid4()), + "node_id": self.node_id, + "log_level": log_level, + "message_type": "LOG", + "message": message, + "service_name": self.service_name, + "timestamp": datetime.now().isoformat() + } + if extra_data: + log_message.update(extra_data) + + # Send to both console and message brokers + self.print_message("Log", json.dumps(log_message)) + self.send_to_fluentd(f'log.{log_level.lower()}', log_message) + self.send_to_kafka('microservice_logs', log_message) + + def send_heartbeat(self): + heartbeat_message = { + "node_id": self.node_id, + "message_type": "HEARTBEAT", + "service_name": self.service_name, # Added service_name + "status": self.status, + "timestamp": datetime.now().isoformat() + } + + # Send to both console and message brokers + self.print_message("heartbeat", json.dumps(heartbeat_message)) + self.send_to_fluentd('heartbeat', heartbeat_message) + self.send_to_kafka('microservice_heartbeats', heartbeat_message) + + def start_heartbeat(self, interval=5): + def heartbeat(): + while self.status == "UP": + self.send_heartbeat() + time.sleep(interval) + threading.Thread(target=heartbeat).start() + + def start_log_generation(self, interval=3): + def generate_logs(): + log_levels = ["INFO", "WARN", "ERROR"] + while self.status == "UP": + log_level = random.choice(log_levels) + if log_level == "INFO": + self.generate_log("INFO", "This is an info log.") + elif log_level == "WARN": + self.generate_log("WARN", "This is a warning log.", { + "response_time_ms": random.randint(100, 500), + "threshold_limit_ms": 300 + }) + elif log_level == "ERROR": + self.generate_log("ERROR", "This is an error log.", { + "error_details": { + "error_code": "500", + "error_message": "Internal Server Error" + } + }) + time.sleep(interval) + threading.Thread(target=generate_logs).start() + + def __del__(self): + if hasattr(self, 'fluent_sender'): + self.fluent_sender.close() + if hasattr(self, 'kafka'): + self.kafka.close() \ No newline at end of file diff --git a/Week2 experimental(working)/run_nodes.py b/Week2 experimental(working)/run_nodes.py new file mode 100644 index 0000000..72259af --- /dev/null +++ b/Week2 experimental(working)/run_nodes.py @@ -0,0 +1,11 @@ +# run_nodes.py +from node import Node + +if __name__ == "__main__": + services = ["PaymentService", "OrderService", "InventoryService"] + nodes = [Node(service) for service in services] + + # Start log generation and heartbeat for each node + for node in nodes: + node.start_heartbeat() + node.start_log_generation()