Skip to content

Commit d1f286f

Browse files
authored
Enable Kafka log reporting protocol (#154)
1 parent 750e405 commit d1f286f

File tree

8 files changed

+65
-19
lines changed

8 files changed

+65
-19
lines changed

docs/EnvVars.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,15 @@ Environment Variable | Description | Default
2828
| `SW_KAFKA_REPORTER_BOOTSTRAP_SERVERS` | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. It is in the form host1:port1,host2:port2,... | `localhost:9092` |
2929
| `SW_KAFKA_REPORTER_TOPIC_MANAGEMENT` | Specifying Kafka topic name for service instance reporting and registering. | `skywalking-managements` |
3030
| `SW_KAFKA_REPORTER_TOPIC_SEGMENT` | Specifying Kafka topic name for Tracing data. | `skywalking-segments` |
31+
| `SW_KAFKA_REPORTER_TOPIC_LOG` | Specifying Kafka topic name for Log data. | `skywalking-logs` |
3132
| `SW_KAFKA_REPORTER_CONFIG_key` | The configs to init KafkaProducer. it support the basic arguments (whose type is either `str`, `bool`, or `int`) listed [here](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html#kafka.KafkaProducer) | unset |
3233
| `SW_CELERY_PARAMETERS_LENGTH`| The maximum length of `celery` functions parameters, longer than this will be truncated, 0 turns off | `512` |
3334
| `SW_AGENT_PROFILE_ACTIVE` | If `True`, Python agent will enable profile when user create a new profile task. Otherwise disable profile. | `False` |
3435
| `SW_PROFILE_TASK_QUERY_INTERVAL` | The number of seconds between two profile task query. | `20` |
3536
| `SW_AGENT_LOG_REPORTER_ACTIVE` | If `True`, Python agent will report collected logs to the OAP or Satellite. Otherwise, it disables the feature. | `False` |
3637
| `SW_AGENT_LOG_REPORTER_BUFFER_SIZE` | The maximum queue backlog size for sending log data to backend, logs beyond this are silently dropped. | `10000` |
3738
| `SW_AGENT_LOG_REPORTER_LEVEL` | This config specifies the logger levels of concern, any logs with a level below the config will be ignored. | `WARNING` |
38-
| `SW_AGENT_LOG_IGNORE_FILTER` | This config customizes whether to ignore the application-defined logger filters, if `True`, all logs are reported disregarding any filter rules. | `False` |
39+
| `SW_AGENT_LOG_REPORTER_IGNORE_FILTER` | This config customizes whether to ignore the application-defined logger filters, if `True`, all logs are reported disregarding any filter rules. | `False` |
3940
| `SW_AGENT_LOG_REPORTER_FORMATTED` | If `True`, the log reporter will transmit the logs as formatted. Otherwise, puts logRecord.msg and logRecord.args into message content and tags(`argument.n`), respectively. Along with an `exception` tag if an exception was raised. | `True` |
4041
| `SW_AGENT_LOG_REPORTER_LAYOUT` | The log reporter formats the logRecord message based on the layout given. | `%(asctime)s [%(threadName)s] %(levelname)s %(name)s - %(message)s` |
4142
| `SW_AGENT_CAUSE_EXCEPTION_DEPTH` | This config limits agent to report up to `limit` stacktrace, please refer to [Python traceback](https://docs.python.org/3/library/traceback.html#traceback.print_tb) for more explanations. | `5` |

docs/LogReporter.md

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,17 @@ To utilize this feature, you will need to add some new configurations to the age
99
from skywalking import agent, config
1010

1111
config.init(collector_address='127.0.0.1:11800', service_name='your awesome service',
12-
log_reporter_active=True)
12+
log_reporter_active=True) # defaults to grpc protocol
1313
agent.start()
1414
```
1515

16-
Note, if chosen `HTTP` protocol instead of `gRPC`/`Kafka`, the logs will be batch-reported to the collector REST endpoint.
16+
Log reporter supports all three protocols including `grpc`, `http` and `kafka`, which shares the same config `protocol` with trace reporter.
17+
18+
If chosen `http` protocol, the logs will be batch-reported to the collector REST endpoint `oap/v3/logs`.
19+
20+
If chosen `kafka` protocol, please make sure to config
21+
[kafka-fetcher](https://skywalking.apache.org/docs/main/v8.4.0/en/setup/backend/backend-fetcher/#kafka-fetcher)
22+
on the OAP side, and make sure Python agent config `kafka_bootstrap_servers` points to your Kafka brokers.
1723

1824
`log_reporter_active=True` - Enables the log reporter.
1925

skywalking/agent/protocol/kafka.py

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,18 @@
1616
#
1717

1818
import logging
19-
from skywalking.loggings import logger, getLogger
2019
from queue import Queue, Empty
2120
from time import time
2221

23-
from skywalking import config
24-
from skywalking.agent import Protocol
25-
from skywalking.client.kafka import KafkaServiceManagementClient, KafkaTraceSegmentReportService
2622
from skywalking.protocol.common.Common_pb2 import KeyStringValuePair
2723
from skywalking.protocol.language_agent.Tracing_pb2 import SegmentObject, SpanObject, Log, SegmentReference
24+
from skywalking.protocol.logging.Logging_pb2 import LogData
25+
26+
from skywalking import config
27+
from skywalking.agent import Protocol
28+
from skywalking.client.kafka import KafkaServiceManagementClient, KafkaTraceSegmentReportService, \
29+
KafkaLogDataReportService
30+
from skywalking.loggings import logger, getLogger
2831
from skywalking.trace.segment import Segment
2932

3033
# avoid too many kafka logs
@@ -36,6 +39,7 @@ class KafkaProtocol(Protocol):
3639
def __init__(self):
3740
self.service_management = KafkaServiceManagementClient()
3841
self.traces_reporter = KafkaTraceSegmentReportService()
42+
self.log_reporter = KafkaLogDataReportService()
3943

4044
def heartbeat(self):
4145
self.service_management.send_heart_beat()
@@ -97,3 +101,23 @@ def generator():
97101
yield s
98102

99103
self.traces_reporter.report(generator())
104+
105+
def report_log(self, queue: Queue, block: bool = True):
106+
start = time()
107+
108+
def generator():
109+
while True:
110+
try:
111+
timeout = config.QUEUE_TIMEOUT - int(time() - start) # type: int
112+
if timeout <= 0:
113+
return
114+
log_data = queue.get(block=block, timeout=timeout) # type: LogData
115+
except Empty:
116+
return
117+
queue.task_done()
118+
119+
logger.debug('Reporting Log')
120+
121+
yield log_data
122+
123+
self.log_reporter.report(generator=generator())

skywalking/client/http.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from google.protobuf import json_format
2121

2222
from skywalking import config
23-
from skywalking.client import ServiceManagementClient, TraceSegmentReportService
23+
from skywalking.client import ServiceManagementClient, TraceSegmentReportService, LogDataReportService
2424
from skywalking.loggings import logger
2525

2626

@@ -112,7 +112,7 @@ def report(self, generator):
112112
logger.debug('report traces response: %s', res)
113113

114114

115-
class HttpLogDataReportService(TraceSegmentReportService):
115+
class HttpLogDataReportService(LogDataReportService):
116116
def __init__(self):
117117
proto = 'https://' if config.force_tls else 'http://'
118118
self.url_report = proto + config.collector_address.rstrip('/') + '/v3/logs'

skywalking/client/kafka.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,16 @@
1515
# limitations under the License.
1616
#
1717

18-
import os
1918
import ast
20-
from skywalking.loggings import logger
19+
import os
2120

22-
from skywalking import config
23-
from skywalking.client import ServiceManagementClient, TraceSegmentReportService
2421
from skywalking.protocol.common.Common_pb2 import KeyStringValuePair
2522
from skywalking.protocol.management.Management_pb2 import InstancePingPkg, InstanceProperties
2623

2724
from kafka import KafkaProducer
25+
from skywalking import config
26+
from skywalking.client import ServiceManagementClient, TraceSegmentReportService, LogDataReportService
27+
from skywalking.loggings import logger
2828

2929
kafka_configs = {}
3030

@@ -106,6 +106,18 @@ def report(self, generator):
106106
self.producer.send(topic=self.topic, key=key, value=value)
107107

108108

109+
class KafkaLogDataReportService(LogDataReportService):
110+
def __init__(self):
111+
self.producer = KafkaProducer(**kafka_configs)
112+
self.topic = config.kafka_topic_log
113+
114+
def report(self, generator):
115+
for log_data in generator:
116+
key = bytes(log_data.traceContext.traceSegmentId, encoding="utf-8")
117+
value = bytes(log_data.SerializeToString())
118+
self.producer.send(topic=self.topic, key=key, value=value)
119+
120+
109121
class KafkaConfigDuplicated(Exception):
110122
def __init__(self, key):
111123
self.key = key

skywalking/config.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
kafka_bootstrap_servers = os.getenv('SW_KAFKA_REPORTER_BOOTSTRAP_SERVERS') or "localhost:9092" # type: str
6363
kafka_topic_management = os.getenv('SW_KAFKA_REPORTER_TOPIC_MANAGEMENT') or "skywalking-managements" # type: str
6464
kafka_topic_segment = os.getenv('SW_KAFKA_REPORTER_TOPIC_SEGMENT') or "skywalking-segments" # type: str
65+
kafka_topic_log = os.getenv('SW_KAFKA_REPORTER_TOPIC_LOG') or "skywalking-logs" # type: str
6566
celery_parameters_length = int(os.getenv('SW_CELERY_PARAMETERS_LENGTH') or '512')
6667
profile_active = True if os.getenv('SW_AGENT_PROFILE_ACTIVE') and \
6768
os.getenv('SW_AGENT_PROFILE_ACTIVE') == 'True' else False # type: bool
@@ -72,8 +73,8 @@
7273
os.getenv('SW_AGENT_LOG_REPORTER_ACTIVE') == 'True' else False # type: bool
7374
log_reporter_max_buffer_size = int(os.getenv('SW_AGENT_LOG_REPORTER_BUFFER_SIZE') or '10000') # type: int
7475
log_reporter_level = os.getenv('SW_AGENT_LOG_REPORTER_LEVEL') or 'WARNING' # type: str
75-
log_reporter_ignore_filter = True if os.getenv('SW_AGENT_LOG_IGNORE_FILTER') and \
76-
os.getenv('SW_AGENT_LOG_REPORTER_FORMATTED') == 'True' else False # type: bool
76+
log_reporter_ignore_filter = True if os.getenv('SW_AGENT_LOG_REPORTER_IGNORE_FILTER') and \
77+
os.getenv('SW_AGENT_LOG_REPORTER_IGNORE_FILTER') == 'True' else False # type: bool
7778
log_reporter_formatted = False if os.getenv('SW_AGENT_LOG_REPORTER_FORMATTED') and \
7879
os.getenv('SW_AGENT_LOG_REPORTER_FORMATTED') == 'False' else True # type: bool
7980
log_reporter_layout = os.getenv('SW_AGENT_LOG_REPORTER_LAYOUT') or \

skywalking/log/sw_logging.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,5 +96,5 @@ def transform(record) -> str:
9696
if config.log_reporter_formatted:
9797
if layout:
9898
return formatter.format(record=record)
99-
return record.getMessage() + '\n' + sw_traceback()
99+
return record.getMessage() + ('\n' + sw_traceback() if record.exc_info else '')
100100
return str(record.msg) # convert possible exception to str

skywalking/plugins/sw_kafka.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
# limitations under the License.
1616
#
1717

18-
from skywalking import config
1918
from skywalking import Layer, Component
19+
from skywalking import config
2020
from skywalking.trace.carrier import Carrier
2121
from skywalking.trace.context import get_context
2222
from skywalking.trace.tags import TagMqBroker, TagMqTopic
@@ -64,8 +64,10 @@ def _sw__poll_once(this, timeout_ms, max_records, update_offsets=True):
6464

6565
def _sw_send_func(_send):
6666
def _sw_send(this, topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None):
67-
# ignore trace skywalking self request
68-
if config.protocol == 'kafka' and config.kafka_topic_segment == topic or config.kafka_topic_management == topic:
67+
# ignore trace & log reporter - skywalking self request
68+
if config.protocol == 'kafka' and config.kafka_topic_segment == topic \
69+
or config.kafka_topic_log == topic \
70+
or config.kafka_topic_management == topic:
6971
return _send(this, topic, value=value, key=key, headers=headers, partition=partition,
7072
timestamp_ms=timestamp_ms)
7173

0 commit comments

Comments
 (0)