Skip to content

Commit 8039d8b

Browse files
authored
Enable HTTP log reporting protocol (#149)
1 parent 9abe90d commit 8039d8b

File tree

3 files changed

+50
-5
lines changed

3 files changed

+50
-5
lines changed

docs/LogReporter.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Python agent gRPC log reporter
1+
# Python agent log reporter
22

33
This functionality reports logs collected from the Python logging module(in theory, also logging libraries depending on the core logging module).
44

@@ -13,6 +13,8 @@ config.init(collector_address='127.0.0.1:11800', service_name='your awesome serv
1313
agent.start()
1414
```
1515

16+
Note, if chosen `HTTP` protocol instead of `gRPC`/`Kafka`, the logs will be batch-reported to the collector REST endpoint.
17+
1618
`log_reporter_active=True` - Enables the log reporter.
1719

1820
`log_reporter_max_buffer_size` - The maximum queue backlog size for sending log data to backend, logs beyond this are silently dropped.

skywalking/agent/protocol/http.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,14 @@
1515
# limitations under the License.
1616
#
1717

18-
from skywalking.loggings import logger
1918
from queue import Queue, Empty
2019
from time import time
2120

2221
from skywalking import config
2322
from skywalking.agent import Protocol
24-
from skywalking.client.http import HttpServiceManagementClient, HttpTraceSegmentReportService
23+
from skywalking.client.http import HttpServiceManagementClient, HttpTraceSegmentReportService, HttpLogDataReportService
24+
from skywalking.loggings import logger
25+
from skywalking.protocol.logging.Logging_pb2 import LogData
2526
from skywalking.trace.segment import Segment
2627

2728

@@ -30,6 +31,7 @@ def __init__(self):
3031
self.properties_sent = False
3132
self.service_management = HttpServiceManagementClient()
3233
self.traces_reporter = HttpTraceSegmentReportService()
34+
self.log_reporter = HttpLogDataReportService()
3335

3436
def fork_after_in_child(self):
3537
self.service_management.fork_after_in_child()
@@ -64,3 +66,26 @@ def generator():
6466
self.traces_reporter.report(generator=generator())
6567
except Exception:
6668
pass
69+
70+
def report_log(self, queue: Queue, block: bool = True):
71+
start = time()
72+
73+
def generator():
74+
while True:
75+
try:
76+
timeout = config.QUEUE_TIMEOUT - int(time() - start) # type: int
77+
if timeout <= 0:
78+
return
79+
log_data = queue.get(block=block, timeout=timeout) # type: LogData
80+
except Empty:
81+
return
82+
queue.task_done()
83+
84+
logger.debug('Reporting Log')
85+
86+
yield log_data
87+
88+
try:
89+
self.log_reporter.report(generator=generator())
90+
except Exception:
91+
pass

skywalking/client/http.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,14 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
1616
#
17-
18-
from skywalking.loggings import logger
17+
import json
1918

2019
import requests
20+
from google.protobuf import json_format
2121

2222
from skywalking import config
2323
from skywalking.client import ServiceManagementClient, TraceSegmentReportService
24+
from skywalking.loggings import logger
2425

2526

2627
class HttpServiceManagementClient(ServiceManagementClient):
@@ -109,3 +110,20 @@ def report(self, generator):
109110
} for span in segment.spans]
110111
})
111112
logger.debug('report traces response: %s', res)
113+
114+
115+
class HttpLogDataReportService(TraceSegmentReportService):
116+
def __init__(self):
117+
proto = 'https://' if config.force_tls else 'http://'
118+
self.url_report = proto + config.collector_address.rstrip('/') + '/v3/logs'
119+
self.session = requests.Session()
120+
121+
def fork_after_in_child(self):
122+
self.session.close()
123+
self.session = requests.Session()
124+
125+
def report(self, generator):
126+
log_batch = [json.loads(json_format.MessageToJson(log_data)) for log_data in generator]
127+
if log_batch: # prevent empty batches
128+
res = self.session.post(self.url_report, json=log_batch)
129+
logger.debug('report batch log response: %s', res)

0 commit comments

Comments
 (0)