Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions subscription/notify-lambda/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Use an official Python runtime as a parent image
FROM python:3.13-slim

# Install zip utility
RUN apt-get update && apt-get install -y zip

# Set the working directory in the container
WORKDIR /app

# Copy the current directory contents into the container at /app
COPY . /app

# Make sure build.sh is executable
RUN chmod +x build.sh

# Run build.sh when the container launches
CMD ["./build.sh"]

20 changes: 20 additions & 0 deletions subscription/notify-lambda/build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/bin/bash
# This script is used by the bamboo build project.

mkdir -p package

pip3 install --target ./package -r requirements.txt --no-compile

# Zip dependencies
cd package
zip -r ../notify_lambda_deployment_package.zip . -x "__pycache__/*"
cd ..

# Add contents of src directory to the zip file
cd src
zip -r ../notify_lambda_deployment_package.zip . -x "__pycache__/*"

# Return to the top directory
cd ..

rm -r package
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
requests

6 changes: 6 additions & 0 deletions subscription/notify-lambda/run-tests-cicd.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/bin/bash

export PYTHONPATH=src

pip3 install requests
python3 -m unittest discover -v -s ./test -p "*_test.py"
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
import sys
from typing import Optional

LOG_LEVEL: int = int(os.getenv("LOG_LEVEL"))
if not LOG_LEVEL:
LOG_LEVEL = logging.INFO
LOG_LEVEL: int = int(os.getenv("LOG_LEVEL", logging.INFO))

def setup_logger(name: str, log_file: Optional[str] = None, level: int = logging.INFO) -> logging.Logger:
"""Function to setup as many loggers as you want"""
Expand All @@ -28,3 +26,4 @@ def setup_logger(name: str, log_file: Optional[str] = None, level: int = logging

# Create a default logger
logger = setup_logger(name='default_logger', level=LOG_LEVEL)

54 changes: 54 additions & 0 deletions subscription/notify-lambda/src/notification_lambda.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import json
import requests
from logger import logger

# This lambda is triggered through a subscription to the cmr-internal-subscription-<env> SNS topic. It processes the events which are notifications that get sent
# to an external URL.

def handler(event, context):
"""The handler is the starting point that is triggered by an SNS topic subscription with a filter that designates tha the notification sent is a URL notification.
Input: event: Dict[str, Any], context: Any
Returns: None"""

logger.debug(f"Ingest notification lambda received event: {json.dumps(event, indent=2)}")
for record in event['Records']:
process_message(record)

def process_message(record):
"""Processes the record in the event.
Input: record: Dict[str, Any]
Returns: None"""

try:
logger.info(f"Ingest notification lambda processing message - record: {record}")
message = record['Sns']
message_attributes = record['Sns']['MessageAttributes']
url = message_attributes['endpoint']['Value']
send_message(url, message)

except Exception as e:
logger.error(f"Ingest notification lambda an error occurred {e} while trying to send the record: {record}")
raise e

def send_message(url, message):
"""Sends the passed message to the external URL. If not successful the message is put onto a dead letter queue.
Input: url: str, message: Dict[str, Any]
Returns: None"""

# Prepare the data to be sent

try:
# Send a POST request to the URL with the message data
headers = {'Content-Type': 'application/json'}
logger.info(f"Ingest notification lambda sending message ID: {message['MessageId']} to URL: {url}")
response = requests.post(url, headers=headers, json=message)

# Check if the request was successful
if response.status_code == 200:
logger.info(f"Ingest notification lambda successfully sent message ID: {message['MessageId']}")
else:
logger.error(f"Ingest notification lambda failed to send message ID: {message['MessageId']}. Status code: {response.status_code}. Response: {response.text}")

except requests.exceptions.RequestException as e:
logger.error(f"Ingest notification lambda an error occurred while sending the message id {message['MessageId']} to URL: {url} {e}")

94 changes: 94 additions & 0 deletions subscription/notify-lambda/test/notification_lambda_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import unittest
from unittest.mock import patch, MagicMock
import json
import requests
from notification_lambda import handler, process_message, send_message

class TestNotificationHandler(unittest.TestCase):

@patch('notification_lambda.process_message')
def test_handler(self, mock_process_message):
event = {
'Records': [
{'some': 'data1'},
{'some': 'data2'}
]
}
context = {}

handler(event, context)

self.assertEqual(mock_process_message.call_count, 2)
mock_process_message.assert_any_call({'some': 'data1'})
mock_process_message.assert_any_call({'some': 'data2'})

@patch('notification_lambda.send_message')
def test_process_message(self, mock_send_message):
record = {
'Sns': {
'MessageId': '12345',
'MessageAttributes': {
'endpoint': {
'Value': 'http://example.com'
}
}
}
}

process_message(record)

mock_send_message.assert_called_once_with('http://example.com', record['Sns'])

@patch('notification_lambda.send_message')
def test_process_message_exception(self, mock_send_message):
record = {
'Sns': {
'MessageId': '12345',
'MessageAttributes': {} # Missing 'endpoint' to cause an exception
}
}

with self.assertRaises(Exception):
process_message(record)

@patch('requests.post')
def test_send_message_success(self, mock_post):
mock_response = MagicMock()
mock_response.status_code = 200
mock_post.return_value = mock_response

url = 'http://example.com'
message = {'MessageId': '12345', 'some': 'data'}

send_message(url, message)

mock_post.assert_called_once_with(url, headers={'Content-Type': 'application/json'}, json=message)

@patch('requests.post')
def test_send_message_failure(self, mock_post):
mock_response = MagicMock()
mock_response.status_code = 400
mock_response.text = 'Bad Request'
mock_post.return_value = mock_response

url = 'http://example.com'
message = {'MessageId': '12345', 'some': 'data'}

send_message(url, message)

mock_post.assert_called_once_with(url, headers={'Content-Type': 'application/json'}, json=message)

@patch('requests.post')
def test_send_message_exception(self, mock_post):
mock_post.side_effect = requests.exceptions.RequestException('Network error')

url = 'http://example.com'
message = {'MessageId': '12345', 'some': 'data'}

send_message(url, message)

mock_post.assert_called_once_with(url, headers={'Content-Type': 'application/json'}, json=message)

if __name__ == '__main__':
unittest.main()

13 changes: 0 additions & 13 deletions subscription/notify_lambda/Dockerfile

This file was deleted.

1 change: 0 additions & 1 deletion subscription/notify_lambda/build.sh

This file was deleted.

41 changes: 0 additions & 41 deletions subscription/notify_lambda/notification-lambda.py

This file was deleted.

7 changes: 0 additions & 7 deletions subscription/notify_lambda/run.local.sh

This file was deleted.

5 changes: 5 additions & 0 deletions subscription/src/subscription_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
SNS_NAME = os.getenv("SNS_NAME")

def receive_message(sqs_client, queue_url):
""" Calls the queue to get one message from it to process the message. """
response = sqs_client.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=1,
Expand All @@ -26,14 +27,18 @@ def receive_message(sqs_client, queue_url):
return response

def delete_message(sqs_client, queue_url, receipt_handle):
""" Calls the queue to delete a processed message. """
sqs_client.delete_message(QueueUrl=queue_url, ReceiptHandle=receipt_handle)

def delete_messages(sqs_client, queue_url, messages):
""" Calls the queue to delete a list of processed messages. """
for message in messages.get("Messages", []):
receipt_handle = message['ReceiptHandle']
delete_message(sqs_client=sqs_client, queue_url=queue_url, receipt_handle=receipt_handle)

def process_messages(sns_client, topic, messages, access_control):
""" Processes a list of messages that was received from a queue. Check to see if ACLs pass for the granule.
If the checks pass then send the notification. """
for message in messages.get("Messages", []):

# Get the permission for the collection from access-control
Expand Down
18 changes: 18 additions & 0 deletions subscription/url-endpoint-test-lambda/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Use an official Python runtime as a parent image
FROM python:3.13-slim

# Install zip utility
RUN apt-get update && apt-get install -y zip

# Set the working directory in the container
WORKDIR /app

# Copy the current directory contents into the container at /app
COPY . /app

# Make sure build.sh is executable
RUN chmod +x build.sh

# Run build.sh when the container launches
CMD ["./build.sh"]

56 changes: 56 additions & 0 deletions subscription/url-endpoint-test-lambda/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
url-endpoint-test-lambda function is a test lambda meant to test subscriptions with a URL endpoint.
The test lambda will run in SIT and WL and it provides an endpoint that testers can use to test URL endpoint based subscriptions.

A user creates an ingest granule subscription. An example follows:
{"Name": "Ingest-Subscription-Test-Sit-http",
"Type": "granule",
"SubscriberId": "user1",
"CollectionConceptId": "C1200463968-CMR_ONLY",
"EndPoint": "http://<the-internal-loadbalancer-url>/notification/tester",
"Mode": ["New", "Update", "Delete"],
"Method": "ingest",
"MetadataSpecification": {
"URL": "https://cdn.earthdata.nasa.gov/umm/subscription/v1.1.1",
"Name": "UMM-Sub",
"Version": "1.1.1"
}
}

Make sure the URL is the CMR internal load balancer followed by /notification/tester.

Ingest a granule.

To verify that the notification is correct and that it was sent through the tunnel into the CMR internal load balancer,
issue a get request to the load balancer with the correct tunnel port number such as

curl http://localhost:8081/notification/tester

The contents of the notification will be sent back to the caller and can then be verified.

To send a test notification to the test tool send a post request. An example follows:

curl -XPOST -H "Content-Type: application/json" http://localhost:8081/notification/tester -d '{
"Records": [
{
"EventSource": "aws:sns",
"EventVersion": "1.0",
"EventSubscriptionArn": "arn:aws:sns:<region>:<account>:<SNS name>:<unique ID>",
"Sns": {
"Type": "Notification",
"MessageId": "ed8c7ee0-c70a-5050-8ef9-1effe57d3dde",
"TopicArn": "arn:aws:sns:<region>:<account>:<sns name>",
"Subject": "testing again",
"Message": "testing again",
"Timestamp": "2025-02-06T20:48:55.564Z",
"SignatureVersion": "1",
"Signature": "iN...TXas7iBEoT5Nw==",
"SigningCertUrl": "https://sns.<region>.amazonaws.com/SimpleNotificationService-9...6.pem",
"UnsubscribeUrl": "https://sns.<region>.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=<subscription arn>",
"MessageAttributes": {
"endpoint": "URL"
}
}
}
]
}'

Loading