Skip to content

Commit 93dd9b3

Browse files
authored
Fix: no attribute '_SkyWalkingAgent__log_queue' using kafka plain text (#343)
1 parent 02dc53c commit 93dd9b3

File tree

2 files changed

+21
-10
lines changed

2 files changed

+21
-10
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
- **Tentative**: Set upper bound <=5.9.5 for psutil package due to test failure. (#326)
1616
- Remove `DeprecationWarning` from `pkg_resources` by replace it with `importlib_metadata` (#329)
1717
- Fix unexpected 'decode' AttributeError when MySQLdb module is mapped by PyMySQL (#336)
18+
- Fix SkyWalking agent failed to start if using kafka protocol with sasl_mechanism=PLAIN. (#343)
1819

1920
### 1.0.1
2021

skywalking/agent/__init__.py

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,10 @@ def __init__(self):
119119
def __bootstrap(self):
120120
# when forking, already instrumented modules must not be instrumented again
121121
# otherwise it will cause double instrumentation! (we should provide an un-instrument method)
122+
123+
# Initialize queues for segment, log, meter and profiling snapshots
124+
self.__init_queues()
125+
122126
if config.agent_protocol == 'grpc':
123127
from skywalking.agent.protocol.grpc import GrpcProtocol
124128
self.__protocol = GrpcProtocol()
@@ -129,18 +133,29 @@ def __bootstrap(self):
129133
from skywalking.agent.protocol.kafka import KafkaProtocol
130134
self.__protocol = KafkaProtocol()
131135

132-
# Initialize queues for segment, log, meter and profiling snapshots
133-
self.__segment_queue: Optional[Queue] = None
136+
# Start reporter threads and register queues
137+
self.__init_threading()
138+
139+
def __init_queues(self) -> None:
140+
"""
141+
This method initializes all the queues for the agent and reporters.
142+
"""
143+
self.__segment_queue = Queue(maxsize=config.agent_trace_reporter_max_buffer_size)
134144
self.__log_queue: Optional[Queue] = None
135145
self.__meter_queue: Optional[Queue] = None
136146
self.__snapshot_queue: Optional[Queue] = None
137147

138-
# Start reporter threads and register queues
139-
self.__init_threading()
148+
if config.agent_meter_reporter_active:
149+
self.__meter_queue = Queue(maxsize=config.agent_meter_reporter_max_buffer_size)
150+
if config.agent_log_reporter_active:
151+
self.__log_queue = Queue(maxsize=config.agent_log_reporter_max_buffer_size)
152+
if config.agent_profile_active:
153+
self.__snapshot_queue = Queue(maxsize=config.agent_profile_snapshot_transport_buffer_size)
154+
140155

141156
def __init_threading(self) -> None:
142157
"""
143-
This method initializes all the queues and threads for the agent and reporters.
158+
This method initializes all the threads for the agent and reporters.
144159
Upon os.fork(), callback will reinitialize threads and queues by calling this method
145160
146161
Heartbeat thread is started by default.
@@ -152,12 +167,10 @@ def __init_threading(self) -> None:
152167
__heartbeat_thread = Thread(name='HeartbeatThread', target=self.__heartbeat, daemon=True)
153168
__heartbeat_thread.start()
154169

155-
self.__segment_queue = Queue(maxsize=config.agent_trace_reporter_max_buffer_size)
156170
__segment_report_thread = Thread(name='SegmentReportThread', target=self.__report_segment, daemon=True)
157171
__segment_report_thread.start()
158172

159173
if config.agent_meter_reporter_active:
160-
self.__meter_queue = Queue(maxsize=config.agent_meter_reporter_max_buffer_size)
161174
__meter_report_thread = Thread(name='MeterReportThread', target=self.__report_meter, daemon=True)
162175
__meter_report_thread.start()
163176

@@ -173,7 +186,6 @@ def __init_threading(self) -> None:
173186
ThreadDataSource().register()
174187

175188
if config.agent_log_reporter_active:
176-
self.__log_queue = Queue(maxsize=config.agent_log_reporter_max_buffer_size)
177189
__log_report_thread = Thread(name='LogReportThread', target=self.__report_log, daemon=True)
178190
__log_report_thread.start()
179191

@@ -183,8 +195,6 @@ def __init_threading(self) -> None:
183195
daemon=True)
184196
__command_dispatch_thread.start()
185197

186-
self.__snapshot_queue = Queue(maxsize=config.agent_profile_snapshot_transport_buffer_size)
187-
188198
__query_profile_thread = Thread(name='QueryProfileCommandThread', target=self.__query_profile_command,
189199
daemon=True)
190200
__query_profile_thread.start()

0 commit comments

Comments
 (0)