Skip to content

Conversation

@sb2k16
Copy link
Contributor

@sb2k16 sb2k16 commented Jul 2, 2024

Fixes #1055

The purpose of this pull request is to provide an example on how OpenSearch Ingestion Pipeline could be used to ingest CloudWatch logs into Opensearch using a CloudWatch Lambda subscription filter.

This example creates the following resources:

  • OpenSearch Serverless collection in a VPC where the logs would eventually be written to by the OpenSearch Ingestion Pipeline sink.
  • OpenSearch Ingestion pipeline
  • CloudWatch Subscription Filter with Lambda function to call the OSI pipeline endpoint to push logs data received from an incoming log stream.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.

for logEvent in logEvents:
request = {}
request['@id'] = logEvent['id']
request['@timestamp'] = str(datetime.now().year) + '0' + str(datetime.now().month) + '0' + str(datetime.now().day)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each log event has a timestamp property. Why not use that?

logEvents = cwLogs['logEvents']
for logEvent in logEvents:
request = {}
request['@id'] = logEvent['id']

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you include @ in here? I think this may be confusing and require additional processing to remove. I think we would be fine without the @.

Maybe keep it on @timestamp if anything.

def cw_subscription_handler(event, context):

"""Extract the data from the event"""
data = jmespath.search("awslogs.data", event)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any limit to the amount of data in each call? OSI only accepts sizes of 10mb or less by default.


// Create a dashboard access role
const dashboardAccessRole = new Role(this, `${this.STACK_RESOURCE_NAMING_PREFIX}DashboardAccessRole`, {
assumedBy: new ServicePrincipal('ec2.amazonaws.com'),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this role assumable by EC2? Wouldn't we have the account be the one to assume it?

id = str(randrange(10000))
source['id'] = id
source['timestamp'] = str(datetime.now())
source['message'] = 'Hello world'

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be nice to make this more interesting and look like some application log.

source:
http:
path: /logs/ingest
sink:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this pipeline example would be more helpful if we ran grok. See my comment above about application logs. We could possibly have a log that you could grok here.

@kaiz-io kaiz-io merged commit ec0aab1 into aws-samples:main Nov 2, 2024
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add an example for ingesting CloudWatch logs into OpenSearch using OpenSearch Ingestion pipeline.

4 participants