1+ # node.py
2+ import uuid
3+ import time
4+ import threading
5+ import random
6+ import json
7+ from datetime import datetime
8+ from colorama import Fore , Style , init
9+ from threading import Lock
10+ from fluent import sender
11+ from kafka_utils import KafkaWrapper
12+
13+ # Initialize colorama for colored terminal output
14+ init (autoreset = True )
15+
16+ # Global lock for synchronized printing
17+ print_lock = Lock ()
18+
19+ class Node :
20+ # Colors for message types
21+ message_colors = {
22+ "registration" : Fore .CYAN ,
23+ "heartbeat" : Fore .RED ,
24+ "Log" : Fore .GREEN
25+ }
26+ # Colors for keys and values within messages
27+ key_color = Fore .LIGHTMAGENTA_EX
28+ value_color = Fore .LIGHTYELLOW_EX
29+
30+ def __init__ (self , service_name ):
31+ self .node_id = str (uuid .uuid4 ())
32+ self .service_name = service_name
33+ self .status = "UP"
34+
35+ # Initialize Fluentd sender
36+ self .fluent_sender = sender .FluentSender (
37+ 'microservice' ,
38+ host = 'localhost' ,
39+ port = 24224
40+ )
41+
42+ # Initialize Kafka wrapper
43+ self .kafka = KafkaWrapper ()
44+
45+ self .register_node ()
46+
47+ def format_message (self , message ):
48+ """
49+ Format JSON message to color keys and values distinctly.
50+ """
51+ formatted_message = ""
52+ for key , value in message .items ():
53+ formatted_message += f"{ Node .key_color } { key } { Style .RESET_ALL } : { Node .value_color } { value } { Style .RESET_ALL } , "
54+ return "{" + formatted_message .rstrip (", " ) + "}"
55+
56+ def print_message (self , message_type , message_content ):
57+ with print_lock :
58+ color = Node .message_colors [message_type ]
59+ formatted_content = self .format_message (json .loads (message_content ))
60+ print (color + f"{ message_type .capitalize ()} :" + Style .RESET_ALL , formatted_content )
61+
62+ def send_to_fluentd (self , tag , message ):
63+ """Send message to Fluentd"""
64+ try :
65+ self .fluent_sender .emit (tag , message )
66+ except Exception as e :
67+ print (f"Error sending to Fluentd: { e } " )
68+
69+ def send_to_kafka (self , topic , message ):
70+ """Send message to Kafka"""
71+ try :
72+ self .kafka .send_message (topic , message )
73+ except Exception as e :
74+ print (f"Error sending to Kafka: { e } " )
75+
76+ def register_node (self ):
77+ registration_message = {
78+ "node_id" : self .node_id ,
79+ "message_type" : "REGISTRATION" ,
80+ "service_name" : self .service_name ,
81+ "timestamp" : datetime .now ().isoformat ()
82+ }
83+
84+ # Send to both console and message brokers
85+ self .print_message ("registration" , json .dumps (registration_message ))
86+ self .send_to_fluentd ('registration' , registration_message )
87+ self .send_to_kafka ('microservice_registration' , registration_message )
88+
89+ def generate_log (self , log_level , message , extra_data = None ):
90+ log_message = {
91+ "log_id" : str (uuid .uuid4 ()),
92+ "node_id" : self .node_id ,
93+ "log_level" : log_level ,
94+ "message_type" : "LOG" ,
95+ "message" : message ,
96+ "service_name" : self .service_name ,
97+ "timestamp" : datetime .now ().isoformat ()
98+ }
99+ if extra_data :
100+ log_message .update (extra_data )
101+
102+ # Send to both console and message brokers
103+ self .print_message ("Log" , json .dumps (log_message ))
104+ self .send_to_fluentd (f'log.{ log_level .lower ()} ' , log_message )
105+ self .send_to_kafka ('microservice_logs' , log_message )
106+
107+ def send_heartbeat (self ):
108+ heartbeat_message = {
109+ "node_id" : self .node_id ,
110+ "message_type" : "HEARTBEAT" ,
111+ "service_name" : self .service_name , # Added service_name
112+ "status" : self .status ,
113+ "timestamp" : datetime .now ().isoformat ()
114+ }
115+
116+ # Send to both console and message brokers
117+ self .print_message ("heartbeat" , json .dumps (heartbeat_message ))
118+ self .send_to_fluentd ('heartbeat' , heartbeat_message )
119+ self .send_to_kafka ('microservice_heartbeats' , heartbeat_message )
120+
121+ def start_heartbeat (self , interval = 5 ):
122+ def heartbeat ():
123+ while self .status == "UP" :
124+ self .send_heartbeat ()
125+ time .sleep (interval )
126+ threading .Thread (target = heartbeat ).start ()
127+
128+ def start_log_generation (self , interval = 3 ):
129+ def generate_logs ():
130+ log_levels = ["INFO" , "WARN" , "ERROR" ]
131+ while self .status == "UP" :
132+ log_level = random .choice (log_levels )
133+ if log_level == "INFO" :
134+ self .generate_log ("INFO" , "This is an info log." )
135+ elif log_level == "WARN" :
136+ self .generate_log ("WARN" , "This is a warning log." , {
137+ "response_time_ms" : random .randint (100 , 500 ),
138+ "threshold_limit_ms" : 300
139+ })
140+ elif log_level == "ERROR" :
141+ self .generate_log ("ERROR" , "This is an error log." , {
142+ "error_details" : {
143+ "error_code" : "500" ,
144+ "error_message" : "Internal Server Error"
145+ }
146+ })
147+ time .sleep (interval )
148+ threading .Thread (target = generate_logs ).start ()
149+
150+ def __del__ (self ):
151+ if hasattr (self , 'fluent_sender' ):
152+ self .fluent_sender .close ()
153+ if hasattr (self , 'kafka' ):
154+ self .kafka .close ()
0 commit comments