|
1 | 1 | from __future__ import annotations |
2 | 2 |
|
3 | | -from typing import TYPE_CHECKING |
| 3 | +from typing import Iterator, Sequence |
4 | 4 |
|
5 | | -if TYPE_CHECKING: |
6 | | - from mypy_boto3_lambda import LambdaClient |
7 | | - from mypy_boto3_s3 import S3ServiceResource |
8 | | - from mypy_boto3_sqs import SQSServiceResource |
9 | | - from mypy_boto3_ssm import SSMClient |
| 5 | +from mypy_boto3_s3 import S3ServiceResource |
| 6 | +from mypy_boto3_s3.service_resource import Bucket, Object |
| 7 | +from mypy_boto3_sqs import SQSServiceResource |
| 8 | +from mypy_boto3_sqs.service_resource import Message, Queue |
| 9 | +from mypy_boto3_ssm import SSMClient |
10 | 10 |
|
11 | 11 |
|
12 | 12 | def test_notification( |
13 | | - lambda_: LambdaClient, |
14 | 13 | s3: S3ServiceResource, |
15 | 14 | sqs: SQSServiceResource, |
16 | 15 | ssm: SSMClient, |
17 | 16 | ) -> None: |
18 | 17 | # Get source bucket |
19 | 18 | bucket_name = ssm_param_value(ssm, "/hls/tests/forward-bucket-name") |
20 | 19 | bucket = s3.Bucket(bucket_name) |
21 | | - |
22 | | - # Get forward notification queue |
23 | 20 | forward_queue_name = ssm_param_value(ssm, "/hls/tests/forward-queue-name") |
24 | 21 | forward_queue = sqs.get_queue_by_name(QueueName=forward_queue_name) |
25 | | - |
26 | | - # Get tiler queue |
27 | 22 | tiler_queue_name = ssm_param_value(ssm, "/hls/tests/tiler-queue-name") |
28 | 23 | tiler_queue = sqs.get_queue_by_name(QueueName=tiler_queue_name) |
29 | 24 |
|
30 | | - # Write S3 Object with .v2.0.json suffix to source bucket to trigger notification. |
31 | 25 | body = '{ "greeting": "hello world!" }' |
32 | | - json_key = "greeting.v2.0.json" |
33 | | - obj = bucket.Object(json_key) |
34 | | - obj.put(Body=body) |
35 | | - obj.wait_until_exists() |
| 26 | + objects = write_objects(bucket, body) |
36 | 27 |
|
37 | 28 | try: |
38 | | - # Wait for lambda function to succeed, which should be triggered by S3 |
39 | | - # notification of object created in bucket above. |
40 | | - name = ssm_param_value(ssm, "/hls/tests/forward-function-name") |
41 | | - waiter = lambda_.get_waiter("function_active_v2") |
42 | | - waiter.wait(FunctionName=name, WaiterConfig={"Delay": 5, "MaxAttempts": 20}) |
43 | | - |
44 | | - # Receive message from destination queue, which should be sent by Lambda |
45 | | - # function above. |
46 | | - forward_messages = forward_queue.receive_messages( |
47 | | - MaxNumberOfMessages=10, WaitTimeSeconds=20 |
48 | | - ) |
49 | | - tiler_messages = tiler_queue.receive_messages( |
50 | | - MaxNumberOfMessages=10, WaitTimeSeconds=20 |
51 | | - ) |
| 29 | + forward_messages = list(fetch_messages(forward_queue)) |
| 30 | + tiler_messages = list(fetch_messages(tiler_queue)) |
52 | 31 | finally: |
53 | 32 | # Cleanup S3 Object with .v2.0.json suffix from source bucket. |
54 | | - obj.delete() |
55 | | - obj.wait_until_not_exists() |
| 33 | + for obj in objects: |
| 34 | + obj.delete() |
| 35 | + obj.wait_until_not_exists() |
56 | 36 |
|
57 | | - # Assert message contents == S3 Object contents (written above) |
58 | | - assert len(forward_messages) == 1 |
| 37 | + # We expect 4 messages, 2 for regular and 2 for VI |
| 38 | + assert len(forward_messages) == 4 |
59 | 39 | assert forward_messages[0].body == body |
60 | 40 |
|
61 | | - # Assert message contents == S3 Object contents (written above) |
62 | | - assert len(tiler_messages) == 1 |
63 | | - assert ( |
64 | | - tiler_messages[0].body |
65 | | - == f"s3://{bucket_name}/{json_key.replace('.json', '_stac.json')}" |
66 | | - ) |
| 41 | + # We expect only 2 messages for the 2 non-VI objects written |
| 42 | + assert len(tiler_messages) == 2 |
| 43 | + expected_bodies = [ |
| 44 | + f"s3://{bucket_name}/{obj.key.replace('.json', '_stac.json')}" |
| 45 | + for obj in objects |
| 46 | + if "_VI" in obj.key |
| 47 | + ] |
| 48 | + assert all(message.body in expected_bodies for message in tiler_messages) |
67 | 49 |
|
68 | 50 |
|
69 | 51 | def ssm_param_value(ssm: SSMClient, name: str) -> str: |
70 | 52 | value = ssm.get_parameter(Name=name)["Parameter"].get("Value") |
71 | 53 | assert value is not None # make type checker happy |
72 | 54 |
|
73 | 55 | return value |
| 56 | + |
| 57 | + |
| 58 | +def write_objects(bucket: Bucket, body: str) -> Sequence[Object]: |
| 59 | + # Write S3 Objects with .v2.0.json suffix to source bucket to trigger notification. |
| 60 | + # We expect 4 messages in the forward queue and 2 in the tiler queue because the |
| 61 | + # tiler queue should not send messages for VI. |
| 62 | + |
| 63 | + objects = [ |
| 64 | + bucket.Object(f"{prefix}/greeting.v2.0.json") |
| 65 | + for prefix in ("L30", "S30", "L30_VI", "S30_VI") |
| 66 | + ] |
| 67 | + |
| 68 | + for obj in objects: |
| 69 | + obj.put(Body=body) |
| 70 | + obj.wait_until_exists() |
| 71 | + |
| 72 | + return objects |
| 73 | + |
| 74 | + |
| 75 | +def fetch_messages(queue: Queue) -> Iterator[Message]: |
| 76 | + while messages := queue.receive_messages( |
| 77 | + MaxNumberOfMessages=10, WaitTimeSeconds=20 |
| 78 | + ): |
| 79 | + queue.delete_messages( |
| 80 | + Entries=[ |
| 81 | + {"Id": message.message_id, "ReceiptHandle": message.receipt_handle} |
| 82 | + for message in messages |
| 83 | + ] |
| 84 | + ) |
| 85 | + |
| 86 | + yield from messages |
0 commit comments