Skip to content

Commit d988f59

Browse files
authored
- Remove the temporary gRPC log channel (#148)
- Add consideration for log filterers - Modify all affected Docs Signed-off-by: YihaoChen <[email protected]>
1 parent 61f018c commit d988f59

File tree

10 files changed

+80
-153
lines changed

10 files changed

+80
-153
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ Alternatively, you can also pass the configurations via environment variables (s
5656
All supported environment variables can be found [here](docs/EnvVars.md)
5757

5858
## Report logs with Python Agent
59-
The Python agent is capable of reporting collected logs to the backend(SkyWalking OAP/ [SkyWalking Satellite Sidecar](https://github.com/apache/skywalking-satellite)), enabling Log & Trace Correlation.
59+
The Python agent is capable of reporting collected logs to the backend(SkyWalking OAP), enabling Log & Trace Correlation.
6060

6161
Please refer to the [Log Reporter Doc](docs/LogReporter.md) for a detailed guide.
6262

docs/EnvVars.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,8 @@ Environment Variable | Description | Default
3333
| `SW_AGENT_PROFILE_ACTIVE` | If `True`, Python agent will enable profile when user create a new profile task. Otherwise disable profile. | `False` |
3434
| `SW_PROFILE_TASK_QUERY_INTERVAL` | The number of seconds between two profile task query. | `20` |
3535
| `SW_AGENT_LOG_REPORTER_ACTIVE` | If `True`, Python agent will report collected logs to the OAP or Satellite. Otherwise, it disables the feature. | `False` |
36-
| `SW_AGENT_LOG_COLLECTOR_BACKEND_SERVICES` | The log reporter will use a separate gRPC channel until the [Satellite](https://github.com/apache/skywalking-satellite) project is ready. | `127.0.0.1:11800` |
3736
| `SW_AGENT_LOG_REPORTER_BUFFER_SIZE` | The maximum queue backlog size for sending log data to backend, logs beyond this are silently dropped. | `10000` |
38-
| `SW_AGENT_LOG_REPORTER_MESSAGE_SIZE` | Max message size allowed for log transmission. | `10485760` |
3937
| `SW_AGENT_LOG_REPORTER_LEVEL` | This config specifies the logger levels of concern, any logs with a level below the config will be ignored. | `WARNING` |
38+
| `SW_AGENT_LOG_IGNORE_FILTER` | This config customizes whether to ignore the application-defined logger filters, if `True`, all logs are reported disregarding any filter rules. | `False` |
4039
| `SW_AGENT_LOG_REPORTER_FORMATTED` | If `True`, the log reporter will transmit the logs as formatted. Otherwise, puts logRecord.msg and logRecord.args into message content and tags(`argument.n`), respectively. Along with an `exception` tag if an exception was raised. | `True` |
4140
| `SW_AGENT_LOG_REPORTER_LAYOUT` | The log reporter formats the logRecord message based on the layout given. | `%(asctime)s [%(threadName)s] %(levelname)s %(name)s - %(message)s` |

docs/LogReporter.md

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,13 @@ To utilize this feature, you will need to add some new configurations to the age
99
from skywalking import agent, config
1010

1111
config.init(collector_address='127.0.0.1:11800', service_name='your awesome service',
12-
log_grpc_reporter_active=True, log_grpc_collector_address='127.0.0.1:11800')
12+
log_grpc_reporter_active=True)
1313
agent.start()
1414
```
1515

1616
`log_grpc_reporter_active=True` - Enables the log reporter.
1717

18-
`log_grpc_collector_address` - For now, the log reporter uses a separate gRPC channel(will be merged upon the [SkyWalking Satellite Sidecar](https://github.com/apache/skywalking-satellite) project matures).
19-
If you would like to use the Satellite sidecar, you will need to configure an address pointing to its gatherer. Otherwise, you can simply keep the address the same as the OAP.
20-
21-
`log_grpc_reporter_max_buffer_size` and `log_grpc_reporter_max_message_size` - Used to limit the reporting overhead.
18+
`log_grpc_reporter_max_buffer_size` - The maximum queue backlog size for sending log data to backend, logs beyond this are silently dropped.
2219

2320
Alternatively, you can pass configurations through environment variables.
2421
Please refer to [EnvVars.md](EnvVars.md) for the list of environment variables associated with the log reporter.
@@ -31,6 +28,21 @@ In other words, the agent ignores some unwanted logs based on your level thresho
3128

3229
Note that it also works with your custom logger levels, simply specify its string name in the config.
3330

31+
### Ignore log filters
32+
The following config is disabled by default. When enabled, the log reporter will collect logs disregarding your custom log filters.
33+
34+
For example, if you attach the filter below to the logger - the default behavior of log reporting aligns with the filter
35+
(not reporting any logs with a message starting with `SW test`)
36+
```python
37+
class AppFilter(logging.Filter):
38+
def filter(self, record):
39+
return not record.getMessage().startswith('SW test')
40+
41+
logger.addFilter(AppFilter())
42+
```
43+
However, if you do would like to report those filtered logs, set the `log_grpc_reporter_ignore_filter` to `True`.
44+
45+
3446
## Formatting
3547
Note that regardless of the formatting, Python agent will always report the following three tags -
3648

@@ -46,7 +58,7 @@ If not set, the agent uses the layout below by default, else the agent uses your
4658

4759
`'%(asctime)s [%(threadName)s] %(levelname)s %(name)s - %(message)s'`
4860

49-
If the layout is set to `None`, the reported log content will only contain the pre-formatted `LogRecord.message`(`msg % args`) without any additional styles and information.
61+
If the layout is set to `None`, the reported log content will only contain the pre-formatted `LogRecord.message`(`msg % args`) without any additional styles, information or extra fields.
5062

5163
### Transmit un-formatted logs
5264
You can also choose to report the log messages without any formatting.
@@ -55,7 +67,7 @@ It separates the raw log msg `logRecord.msg` and `logRecord.args`, then puts the
5567
Note when you set `log_grpc_reporter_formatted` to False, it ignores your custom layout introduced above.
5668

5769
As an example, the following code:
58-
```Python
70+
```python
5971
logger.info("SW test log %s %s %s", 'arg0', 'arg1', 'arg2')
6072
```
6173

skywalking/agent/__init__.py

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
from skywalking import config, plugins
2727
from skywalking import loggings
2828
from skywalking.agent.protocol import Protocol
29-
from skywalking.agent.protocol.grpc_log import GrpcLogProtocol
3029
from skywalking.command import command_service
3130
from skywalking.config import profile_active, profile_task_query_interval
3231
from skywalking.loggings import logger
@@ -35,7 +34,7 @@
3534
from skywalking.trace.context import Segment
3635

3736
__started = False
38-
__protocol = __log_protocol = Protocol() # type: Protocol
37+
__protocol = Protocol() # type: Protocol
3938
__heartbeat_thread = __report_thread = __log_report_thread = __query_profile_thread = __command_dispatch_thread \
4039
= __queue = __log_queue = __finished = None
4140

@@ -60,20 +59,10 @@ def __report():
6059
__finished.wait(0)
6160

6261

63-
def __log_heartbeat():
62+
def __report_log():
6463
while not __finished.is_set():
6564
try:
66-
__log_protocol.heartbeat()
67-
except Exception as exc:
68-
logger.error(str(exc))
69-
70-
__finished.wait(30)
71-
72-
73-
def __log_report():
74-
while not __finished.is_set():
75-
try:
76-
__log_protocol.report(__log_queue)
65+
__protocol.report_log(__log_queue)
7766
except Exception as exc:
7867
logger.error(str(exc))
7968

@@ -112,17 +101,15 @@ def __init_threading():
112101

113102
if config.log_grpc_reporter_active:
114103
__log_queue = Queue(maxsize=config.log_grpc_reporter_max_buffer_size)
115-
__log_heartbeat_thread = Thread(name='LogHeartbeatThread', target=__log_heartbeat, daemon=True)
116-
__log_report_thread = Thread(name='LogReportThread', target=__log_report, daemon=True)
117-
__log_heartbeat_thread.start()
104+
__log_report_thread = Thread(name='LogReportThread', target=__report_log, daemon=True)
118105
__log_report_thread.start()
119106

120107
if profile_active:
121108
__query_profile_thread.start()
122109

123110

124111
def __init():
125-
global __protocol, __log_protocol
112+
global __protocol
126113

127114
if config.protocol == 'grpc':
128115
from skywalking.agent.protocol.grpc import GrpcProtocol
@@ -135,9 +122,8 @@ def __init():
135122
__protocol = KafkaProtocol()
136123

137124
plugins.install()
138-
if config.log_grpc_reporter_active:
125+
if config.log_grpc_reporter_active: # todo - Add support for printing traceID/ context in logs
139126
from skywalking import log
140-
__log_protocol = GrpcLogProtocol()
141127
log.install()
142128

143129
__init_threading()
@@ -147,7 +133,7 @@ def __fini():
147133
__protocol.report(__queue, False)
148134
__queue.join()
149135
if config.log_grpc_reporter_active:
150-
__log_protocol.report(__log_queue, False)
136+
__protocol.report_log(__log_queue, False)
151137
__log_queue.join()
152138
__finished.set()
153139

skywalking/agent/protocol/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,5 +35,8 @@ def heartbeat(self):
3535
def report(self, queue: Queue, block: bool = True):
3636
raise NotImplementedError()
3737

38+
def report_log(self, queue: Queue, block: bool = True):
39+
raise NotImplementedError()
40+
3841
def query_profile_commands(self):
3942
pass

skywalking/agent/protocol/grpc.py

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,13 @@
2323
import grpc
2424
from skywalking.protocol.common.Common_pb2 import KeyStringValuePair
2525
from skywalking.protocol.language_agent.Tracing_pb2 import SegmentObject, SpanObject, Log, SegmentReference
26+
from skywalking.protocol.logging.Logging_pb2 import LogData
2627

2728
from skywalking import config
2829
from skywalking.agent import Protocol
2930
from skywalking.agent.protocol.interceptors import header_adder_interceptor
3031
from skywalking.client.grpc import GrpcServiceManagementClient, GrpcTraceSegmentReportService, \
31-
GrpcProfileTaskChannelService
32+
GrpcProfileTaskChannelService, GrpcLogDataReportService
3233
from skywalking.loggings import logger
3334
from skywalking.trace.segment import Segment
3435

@@ -52,6 +53,7 @@ def __init__(self):
5253
self.service_management = GrpcServiceManagementClient(self.channel)
5354
self.traces_reporter = GrpcTraceSegmentReportService(self.channel)
5455
self.profile_query = GrpcProfileTaskChannelService(self.channel)
56+
self.log_reporter = GrpcLogDataReportService(self.channel)
5557

5658
def _cb(self, state):
5759
logger.debug('grpc channel connectivity changed, [%s -> %s]', self.state, state)
@@ -67,11 +69,6 @@ def heartbeat(self):
6769
self.service_management.send_instance_props()
6870
self.properties_sent = True
6971

70-
logger.debug(
71-
'segment reporter service heart beats, [%s], [%s]',
72-
config.service_name,
73-
config.service_instance,
74-
)
7572
self.service_management.send_heart_beat()
7673

7774
except grpc.RpcError:
@@ -142,3 +139,27 @@ def generator():
142139
self.traces_reporter.report(generator())
143140
except grpc.RpcError:
144141
self.on_error()
142+
143+
def report_log(self, queue: Queue, block: bool = True):
144+
start = time()
145+
146+
def generator():
147+
while True:
148+
try:
149+
timeout = config.QUEUE_TIMEOUT - int(time() - start) # type: int
150+
if timeout <= 0:
151+
return
152+
log_data = queue.get(block=block, timeout=timeout) # type: LogData
153+
except Empty:
154+
return
155+
156+
queue.task_done()
157+
158+
logger.debug('Reporting Log')
159+
160+
yield log_data
161+
162+
try:
163+
self.log_reporter.report(generator())
164+
except grpc.RpcError:
165+
self.on_error()

skywalking/agent/protocol/grpc_log.py

Lines changed: 0 additions & 103 deletions
This file was deleted.

skywalking/client/grpc.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from skywalking.client import ServiceManagementClient, TraceSegmentReportService, ProfileTaskChannelService, \
2929
LogDataReportService
3030
from skywalking.command import command_service
31+
from skywalking.loggings import logger
3132
from skywalking.profile import profile_task_execution_service
3233

3334

@@ -43,6 +44,11 @@ def send_instance_props(self):
4344
))
4445

4546
def send_heart_beat(self):
47+
logger.debug(
48+
'service heart beats, [%s], [%s]',
49+
config.service_name,
50+
config.service_instance,
51+
)
4652
self.service_stub.keepAlive(InstancePingPkg(
4753
service=config.service_name,
4854
serviceInstance=config.service_instance,
@@ -57,6 +63,14 @@ def report(self, generator):
5763
self.report_stub.collect(generator)
5864

5965

66+
class GrpcLogDataReportService(LogDataReportService):
67+
def __init__(self, channel: grpc.Channel):
68+
self.report_stub = LogReportServiceStub(channel)
69+
70+
def report(self, generator):
71+
self.report_stub.collect(generator)
72+
73+
6074
class GrpcProfileTaskChannelService(ProfileTaskChannelService):
6175
def __init__(self, channel: grpc.Channel):
6276
self.task_stub = ProfileTaskStub(channel)
@@ -71,11 +85,3 @@ def do_query(self):
7185

7286
commands = self.task_stub.getProfileTaskCommands(query)
7387
command_service.receive_command(commands)
74-
75-
76-
class GrpcLogDataReportService(LogDataReportService):
77-
def __init__(self, channel: grpc.Channel):
78-
self.report_stub = LogReportServiceStub(channel)
79-
80-
def report(self, generator):
81-
self.report_stub.collect(generator)

skywalking/config.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,10 @@
7070
# NOTE - Log reporting requires a separate channel, will merge in the future.
7171
log_grpc_reporter_active = True if os.getenv('SW_AGENT_LOG_REPORTER_ACTIVE') and \
7272
os.getenv('SW_AGENT_LOG_REPORTER_ACTIVE') == 'True' else False # type: bool
73-
log_grpc_collector_address = os.getenv('SW_AGENT_LOG_COLLECTOR_BACKEND_SERVICES') or '127.0.0.1:11800' # type: str
7473
log_grpc_reporter_max_buffer_size = int(os.getenv('SW_AGENT_LOG_REPORTER_BUFFER_SIZE') or '10000') # type: int
75-
log_grpc_reporter_max_message_size = int(os.getenv('SW_AGENT_LOG_REPORTER_MESSAGE_SIZE') or '10485760') # type: int
7674
log_grpc_reporter_level = os.getenv('SW_AGENT_LOG_REPORTER_LEVEL') or 'WARNING' # type: str
75+
log_grpc_reporter_ignore_filter = True if os.getenv('SW_AGENT_LOG_IGNORE_FILTER') and \
76+
os.getenv('SW_AGENT_LOG_REPORTER_FORMATTED') == 'True' else False # type: bool
7777
log_grpc_reporter_formatted = False if os.getenv('SW_AGENT_LOG_REPORTER_FORMATTED') and \
7878
os.getenv('SW_AGENT_LOG_REPORTER_FORMATTED') == 'False' else True # type: bool
7979
log_grpc_reporter_layout = os.getenv('SW_AGENT_LOG_REPORTER_LAYOUT') or \

0 commit comments

Comments
 (0)