Skip to content

Commit 2b8190e

Browse files
authored
Add describe_log_streams and filter_log_events to the CloudWatch module (#1785)
* Added describe_log_streams
1 parent b981a20 commit 2b8190e

File tree

3 files changed

+278
-0
lines changed

3 files changed

+278
-0
lines changed

awswrangler/cloudwatch.py

Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,3 +245,213 @@ def read_logs(
245245
if "timestamp" in df.columns:
246246
df["timestamp"] = pd.to_datetime(df["timestamp"])
247247
return df
248+
249+
250+
def describe_log_streams(
251+
log_group_name: str,
252+
log_stream_name_prefix: Optional[str] = None,
253+
order_by: Optional[str] = "LogStreamName",
254+
descending: Optional[bool] = False,
255+
limit: Optional[int] = 50,
256+
boto3_session: Optional[boto3.Session] = None,
257+
) -> pd.DataFrame:
258+
"""List the log streams for the specified log group, return results as a Pandas DataFrame.
259+
260+
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs.html#CloudWatchLogs.Client.describe_log_streams
261+
262+
Parameters
263+
----------
264+
log_group_name : str
265+
The name of the log group.
266+
log_stream_name_prefix : str
267+
The prefix to match log streams' name
268+
order_by : str
269+
If the value is LogStreamName , the results are ordered by log stream name.
270+
If the value is LastEventTime , the results are ordered by the event time.
271+
The default value is LogStreamName .
272+
descending : bool
273+
If the value is True, results are returned in descending order.
274+
If the value is to False, results are returned in ascending order.
275+
The default value is False.
276+
limit : Optional[int]
277+
The maximum number of items returned. The default is up to 50 items.
278+
boto3_session : boto3.Session(), optional
279+
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
280+
281+
Returns
282+
-------
283+
pandas.DataFrame
284+
Result as a Pandas DataFrame.
285+
286+
Examples
287+
--------
288+
>>> import awswrangler as wr
289+
>>> df = wr.cloudwatch.describe_log_streams(
290+
... log_group_name="aws_sdk_pandas_log_group",
291+
... log_stream_name_prefix="aws_sdk_pandas_log_stream",
292+
... )
293+
294+
"""
295+
client_logs: boto3.client = _utils.client(service_name="logs", session=boto3_session)
296+
args: Dict[str, Any] = {
297+
"logGroupName": log_group_name,
298+
"descending": descending,
299+
"orderBy": order_by,
300+
"limit": limit,
301+
}
302+
if log_stream_name_prefix and order_by == "LogStreamName":
303+
args["logStreamNamePrefix"] = log_stream_name_prefix
304+
elif log_stream_name_prefix and order_by == "LastEventTime":
305+
raise exceptions.InvalidArgumentCombination(
306+
"Cannot call describe_log_streams with both `log_stream_name_prefix` and order_by equal 'LastEventTime'"
307+
)
308+
log_streams: List[Dict[str, Any]] = []
309+
response: Dict[str, Any] = client_logs.describe_log_streams(**args)
310+
311+
log_streams += response["logStreams"]
312+
while "nextToken" in response:
313+
response = client_logs.describe_log_streams(
314+
**args,
315+
nextToken=response["nextToken"],
316+
)
317+
log_streams += response["logStreams"]
318+
if log_streams:
319+
df: pd.DataFrame = pd.DataFrame(log_streams)
320+
df["logGroupName"] = log_group_name
321+
return df
322+
return pd.DataFrame()
323+
324+
325+
def _filter_log_events(
326+
log_group_name: str,
327+
log_stream_names: List[str],
328+
start_timestamp: Optional[int] = None,
329+
end_timestamp: Optional[int] = None,
330+
filter_pattern: Optional[str] = None,
331+
limit: Optional[int] = 10000,
332+
boto3_session: Optional[boto3.Session] = None,
333+
) -> List[Dict[str, Any]]:
334+
client_logs: boto3.client = _utils.client(service_name="logs", session=boto3_session)
335+
events: List[Dict[str, Any]] = []
336+
args: Dict[str, Any] = {
337+
"logGroupName": log_group_name,
338+
"logStreamNames": log_stream_names,
339+
"limit": limit,
340+
}
341+
if start_timestamp:
342+
args["startTime"] = start_timestamp
343+
if end_timestamp:
344+
args["endTime"] = end_timestamp
345+
if filter_pattern:
346+
args["filterPattern"] = filter_pattern
347+
response: Dict[str, Any] = client_logs.filter_log_events(**args)
348+
events += response["events"]
349+
while "nextToken" in response:
350+
response = client_logs.filter_log_events(
351+
**args,
352+
nextToken=response["nextToken"],
353+
)
354+
events += response["events"]
355+
return events
356+
357+
358+
def filter_log_events(
359+
log_group_name: str,
360+
log_stream_name_prefix: Optional[str] = None,
361+
log_stream_names: Optional[List[str]] = None,
362+
filter_pattern: Optional[str] = None,
363+
start_time: Optional[datetime.datetime] = None,
364+
end_time: Optional[datetime.datetime] = None,
365+
boto3_session: Optional[boto3.Session] = None,
366+
) -> pd.DataFrame:
367+
"""List log events from the specified log group. The results are returned as Pandas DataFrame.
368+
369+
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs.html#CloudWatchLogs.Client.filter_log_events
370+
371+
Note
372+
----
373+
Cannot call filter_log_events with both log_stream_names and log_stream_name_prefix.
374+
375+
Parameters
376+
----------
377+
log_group_name : str
378+
The name of the log group.
379+
log_stream_name_prefix : str
380+
Filters the results to include only events from log streams that have names starting with this prefix.
381+
log_stream_names: List[str]
382+
Filters the results to only logs from the log streams in this list.
383+
filter_pattern : str
384+
The filter pattern to use. If not provided, all the events are matched.
385+
start_time : datetime.datetime
386+
Events with a timestamp before this time are not returned.
387+
end_time : datetime.datetime
388+
Events with a timestamp later than this time are not returned.
389+
boto3_session : boto3.Session(), optional
390+
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
391+
392+
Returns
393+
-------
394+
pandas.DataFrame
395+
Result as a Pandas DataFrame.
396+
397+
Examples
398+
--------
399+
Get all log events from log group 'aws_sdk_pandas_log_group' that have log stream prefix 'aws_sdk_pandas_log_stream'
400+
401+
>>> import awswrangler as wr
402+
>>> df = wr.cloudwatch.filter_log_events(
403+
... log_group_name="aws_sdk_pandas_log_group",
404+
... log_stream_name_prefix="aws_sdk_pandas_log_stream",
405+
... )
406+
407+
Get all log events contains 'REPORT' from log stream
408+
'aws_sdk_pandas_log_stream_one' and 'aws_sdk_pandas_log_stream_two'
409+
from log group 'aws_sdk_pandas_log_group'
410+
411+
>>> import awswrangler as wr
412+
>>> df = wr.cloudwatch.filter_log_events(
413+
... log_group_name="aws_sdk_pandas_log_group",
414+
... log_stream_names=["aws_sdk_pandas_log_stream_one","aws_sdk_pandas_log_stream_two"],
415+
... filter_pattern='REPORT',
416+
... )
417+
418+
"""
419+
if log_stream_name_prefix and log_stream_names:
420+
raise exceptions.InvalidArgumentCombination(
421+
"Cannot call filter_log_events with both log_stream_names and log_stream_name_prefix"
422+
)
423+
_logger.debug("log_group_name: %s", log_group_name)
424+
425+
events: List[Dict[str, Any]] = []
426+
if not log_stream_names:
427+
describe_log_streams_args: Dict[str, Any] = {
428+
"log_group_name": log_group_name,
429+
}
430+
if boto3_session:
431+
describe_log_streams_args["boto3_session"] = boto3_session
432+
if log_stream_name_prefix:
433+
describe_log_streams_args["log_stream_name_prefix"] = log_stream_name_prefix
434+
log_streams = describe_log_streams(**describe_log_streams_args)
435+
log_stream_names = log_streams["logStreamName"].tolist() if len(log_streams.index) else []
436+
437+
args: Dict[str, Any] = {
438+
"log_group_name": log_group_name,
439+
}
440+
if start_time:
441+
args["start_timestamp"] = int(1000 * start_time.timestamp())
442+
if end_time:
443+
args["end_timestamp"] = int(1000 * end_time.timestamp())
444+
if filter_pattern:
445+
args["filter_pattern"] = filter_pattern
446+
if boto3_session:
447+
args["boto3_session"] = boto3_session
448+
chunked_log_streams_size: int = 50
449+
450+
for i in range(0, len(log_stream_names), chunked_log_streams_size):
451+
log_streams = log_stream_names[i : i + chunked_log_streams_size]
452+
events += _filter_log_events(**args, log_stream_names=log_streams)
453+
if events:
454+
df: pd.DataFrame = pd.DataFrame(events)
455+
df["logGroupName"] = log_group_name
456+
return df
457+
return pd.DataFrame()

docs/source/api.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,8 @@ Amazon CloudWatch Logs
338338
run_query
339339
start_query
340340
wait_query
341+
describe_log_streams
342+
filter_log_events
341343

342344
Amazon QuickSight
343345
-----------------

tests/test_cloudwatch.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import logging
2+
from datetime import datetime
23

34
import boto3
45
import pytest
@@ -49,3 +50,68 @@ def test_read_logs(loggroup):
4950
)
5051
assert len(df.index) == 5
5152
assert len(df.columns) == 3
53+
54+
55+
def test_describe_log_streams_and_filter_log_events(loggroup):
56+
cloudwatch_log_client = boto3.client("logs")
57+
log_stream_names = [
58+
"aws_sdk_pandas_log_stream_one",
59+
"aws_sdk_pandas_log_stream_two",
60+
"aws_sdk_pandas_log_stream_three",
61+
"aws_sdk_pandas_log_stream_four",
62+
]
63+
for log_stream in log_stream_names:
64+
try:
65+
cloudwatch_log_client.create_log_stream(logGroupName=loggroup, logStreamName=log_stream)
66+
except cloudwatch_log_client.exceptions.ResourceAlreadyExistsException:
67+
continue
68+
69+
with pytest.raises(exceptions.InvalidArgumentCombination):
70+
wr.cloudwatch.describe_log_streams(
71+
log_group_name=loggroup,
72+
log_stream_name_prefix="aws_sdk_pandas_log_stream",
73+
order_by="LastEventTime",
74+
descending=False,
75+
)
76+
77+
log_streams_df = wr.cloudwatch.describe_log_streams(
78+
log_group_name=loggroup, order_by="LastEventTime", descending=False
79+
)
80+
81+
assert len(log_streams_df.index) >= 4
82+
assert "logGroupName" in log_streams_df.columns
83+
84+
log_streams_df.dropna(inplace=True)
85+
for log_stream in log_streams_df.to_dict("records"):
86+
events = []
87+
token = log_stream.get("uploadSequenceToken")
88+
for i, event_message in enumerate(["REPORT", "DURATION", "key:value", "START", "END"]):
89+
events.append({"timestamp": int(1000 * datetime.now().timestamp()), "message": f"{i}_{event_message}"})
90+
args = {
91+
"logGroupName": log_stream["logGroupName"],
92+
"logStreamName": log_stream["logStreamName"],
93+
"logEvents": events,
94+
}
95+
if token:
96+
args["sequenceToken"] = token
97+
try:
98+
cloudwatch_log_client.put_log_events(**args)
99+
except cloudwatch_log_client.exceptions.DataAlreadyAcceptedException:
100+
pass
101+
102+
with pytest.raises(exceptions.InvalidArgumentCombination):
103+
wr.cloudwatch.filter_log_events(
104+
log_group_name=loggroup,
105+
log_stream_name_prefix="aws_sdk_pandas_log_stream",
106+
log_stream_names=log_streams_df["logStreamName"].tolist(),
107+
)
108+
109+
log_events_df = wr.cloudwatch.filter_log_events(log_group_name=loggroup)
110+
assert len(log_events_df.index) >= 4
111+
assert "logGroupName" in log_events_df.columns
112+
113+
filtered_log_events_df = wr.cloudwatch.filter_log_events(
114+
log_group_name=loggroup, log_stream_names=log_streams_df["logStreamName"].tolist(), filter_pattern="REPORT"
115+
)
116+
assert len(filtered_log_events_df.index) >= 4
117+
assert "logStreamName" in log_events_df.columns

0 commit comments

Comments
 (0)