Skip to content
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
3659c86
CMR-10388: Adding access control to the subscription workflow.
eereiter Feb 18, 2025
8cc6483
CMR-10388: stubbing out for now.
eereiter Feb 18, 2025
e6c3156
CMR-10388: testing
eereiter Feb 18, 2025
3387ff5
CMR-10388: testing
eereiter Feb 18, 2025
424e15a
CMR-10388: Adding in access control.
eereiter Feb 19, 2025
aff7d61
CMR-10388: Adding in access control.
eereiter Feb 19, 2025
73e9e33
CMR-10388: Adding subscriber to the message attributes
eereiter Feb 19, 2025
af39ef6
CMR-10388: Fixing calling function.
eereiter Feb 19, 2025
01b07c4
CMR-10388: Fixing adding subscriber to filter parameters.
eereiter Feb 20, 2025
d2806fa
CMR-10388: Fixing parameter names
eereiter Feb 25, 2025
c3965d1
CMR-10388: Fixing logger
eereiter Feb 25, 2025
843fd5d
CMR-10388: Fixing env vars.
eereiter Feb 26, 2025
1f43772
CMR-10388: Fixing unit tests
eereiter Feb 26, 2025
fa07d0f
CMR-10388: Fixing getting parameter store values.
eereiter Feb 26, 2025
1fd483a
CMR-10388: Converting environment name to lower case
eereiter Feb 26, 2025
ab761d9
CMR-10388: Add more debugging
eereiter Feb 26, 2025
8d42c5d
CMR-10388: Fixing logging
eereiter Feb 26, 2025
743bcba
CMR-10388: Checking return type from access control
eereiter Feb 26, 2025
8ece2eb
CMR-10388: Checking return type from access control
eereiter Feb 26, 2025
b4a8c25
CMR-10388: Checking return type from access control
eereiter Feb 26, 2025
81bb5bc
CMR-10388: Checking return type from access control
eereiter Feb 26, 2025
bf3da75
CMR-10388: Finalizing code
eereiter Feb 26, 2025
b7dfa46
CMR-10388: Fixing commented out test.
eereiter Feb 26, 2025
d07621c
Merge branch 'master' into CMR-10388
eereiter Feb 26, 2025
afea885
CMR-10388: Fixing typo.
eereiter Feb 26, 2025
202b5be
CMR-10388: Removing commented out code.
eereiter Feb 27, 2025
a324622
CMR-10388: Updating test URL.
eereiter Feb 27, 2025
b255ba0
CMR-10388: adding the removal of the subscription back into a manual …
eereiter Feb 28, 2025
4587b8c
CMR-10388: Fixing documentation and changing function name per PR req…
eereiter Feb 28, 2025
7b92d30
CMR-10388: updating ingest documentation.
eereiter Feb 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion message-queue-lib/src/cmr/message_queue/topic/aws_topic.clj
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@
(:Mode subscription-metadata))
(let [filters (util/remove-nil-keys
{:collection-concept-id [(:CollectionConceptId subscription-metadata)]
:mode (:Mode subscription-metadata)})
:mode (:Mode subscription-metadata)
:subscriber [(:SubscriberId subscription-metadata)]})
filter-json (json/generate-string filters)
sub-filter-request (-> (SetSubscriptionAttributesRequest/builder)
(.subscriptionArn subscription-arn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@
:filter (when (or (:CollectionConceptId metadata)
(:Mode metadata))
{:collection-concept-id (:CollectionConceptId metadata)
:mode (:Mode metadata)})
:mode (:Mode metadata)
:subscriber (:SubscriberId metadata)})
:queue-url (:EndPoint metadata)
:dead-letter-queue-url (queue/create-queue sqs-client (config/cmr-subscriptions-dead-letter-queue-name))
:concept-id (:concept-id subscription)}]
Expand Down
51 changes: 29 additions & 22 deletions metadata-db-app/src/cmr/metadata_db/services/subscriptions.clj
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,20 @@
"Creates a mode-to-endpoints map given a list of subscriptions all for the same one collection.
Comment thread
eereiter marked this conversation as resolved.
Outdated
Returns a map in this structure:
{
new: ['sqs:arn:111', 'https://www.url1.com', 'https://www.url2.com'],
Comment thread
eereiter marked this conversation as resolved.
update: ['sqs:arn:111'],
delete: ['https://www.url1.com']
new: [['sqs:arn:111', 'user1'], ['https://www.url1.com', 'user2'], ['https://www.url2.com', 'user3']],
update: [['sqs:arn:111', 'user1']],
delete: [['https://www.url1.com', 'user2']]
}"
[subscriptions-of-same-collection]

(let [final-map (atom {})]
(doseq [sub subscriptions-of-same-collection]
(let [temp-map (atom {})
modes (get-in sub [:metadata :Mode])
endpoint (get-in sub [:metadata :EndPoint])]
endpoint (get-in sub [:metadata :EndPoint])
subscriber (get-in sub [:metadata :SubscriberId])]
(doseq [mode modes]
(swap! temp-map assoc mode #{endpoint}))
(swap! temp-map assoc mode #{[endpoint, subscriber]}))
(let [merged-map (merge-with union @final-map @temp-map)]
(swap! final-map (fn [n] merged-map)))))
@final-map))
Expand Down Expand Up @@ -188,8 +189,8 @@
and put those into a map. The end result looks like:
{ coll_1: {
Mode: {
New: #(URL1, SQS1),
Update: #(URL2)
New: #([URL1, user1], [SQS1, user2]),
Update: #([URL2, user3])
}
},
coll_2: {...}"
Expand Down Expand Up @@ -266,9 +267,10 @@
(defn create-message-attributes
"Create the notification message attributes so that the notifications can be
filtered to the correct subscribing endpoint."
[collection-concept-id mode]
[collection-concept-id mode subscriber]
{"collection-concept-id" collection-concept-id
"mode" mode})
"mode" mode
"subscriber" subscriber})

(defn create-message-subject
"Creates the message subject."
Expand All @@ -288,16 +290,21 @@

(defn- create-message-attributes-map
"Create message attribute map that SQS uses to filter out messages from the SNS topic."
[endpoint mode coll-concept-id]
(cond
(or (is-valid-sqs-arn endpoint)
(is-local-test-queue endpoint)) (cond
(= "Delete" mode) (create-message-attributes coll-concept-id "Delete")
(= "New" mode) (create-message-attributes coll-concept-id "New")
(= "Update" mode) (create-message-attributes coll-concept-id "Update"))
(is-valid-subscription-endpoint-url endpoint) {"endpoint" endpoint
"endpoint-type" "url"
"mode" mode}))
[endpoint-set mode coll-concept-id]
(let [endpoint (first endpoint-set)
subscriber (second endpoint-set)]
(cond
(or (is-valid-sqs-arn endpoint)
(is-local-test-queue endpoint)) (cond
(= "Delete" mode) (create-message-attributes coll-concept-id "Delete" subscriber)
(= "New" mode) (create-message-attributes coll-concept-id "New" subscriber)
(= "Update" mode) (create-message-attributes coll-concept-id "Update" subscriber))
(is-valid-subscription-endpoint-url endpoint) {"endpoint" endpoint
"endpoint-type" "url"
"mode" mode
"subscriber" subscriber
"collection-concept-id" coll-concept-id}))
)

(defn publish-subscription-notification-if-applicable
"Publish a notification to the topic if the passed-in concept is a granule
Expand All @@ -313,16 +320,16 @@
endpoint-list (get-in sub-cache-map ["Mode" gran-concept-mode])
result-array (atom [])]
;; for every endpoint in the list create an attributes/subject map and send it along its way
(doseq [endpoint endpoint-list]
(doseq [endpoint-set endpoint-list]
(let [topic (get-in context [:system :sns :internal])
coll-concept-id (:parent-collection-id (:extra-fields concept))
message (create-notification-message-body concept)
message-attributes-map (create-message-attributes-map endpoint gran-concept-mode coll-concept-id)
message-attributes-map (create-message-attributes-map endpoint-set gran-concept-mode coll-concept-id)
subject (create-message-subject gran-concept-mode)]
(when (and message-attributes-map subject)
(let [result (topic-protocol/publish topic message message-attributes-map subject)
duration (- (System/currentTimeMillis) start)]
(debug (format "Subscription publish for endpoint %s took %d ms." endpoint duration))
(debug (format "Subscription publish for endpoint %s took %d ms." (first endpoint-set) duration))
(swap! result-array (fn [n] (conj @result-array result)))))))
@result-array)))))

Expand Down
Loading