diff --git a/ingest-app/docs/api.md b/ingest-app/docs/api.md index a7272c825f..d065f3a75b 100644 --- a/ingest-app/docs/api.md +++ b/ingest-app/docs/api.md @@ -1098,7 +1098,7 @@ There are two kinds of subscriptions: Batch Notification and Near-Real-Time Noti
  • granule subscription for users to be notified when granules are created/update
  • -
  • Near-Real-Time (NRT) Notification subscriptions are processed on ingest and are only for granules. When a user subscribes, notifications are sent out via the provided notification endpoint, such as an AWS SQS messaging queue. +
  • Near-Real-Time (NRT) Notification subscriptions are processed on ingest and are only for granules. When a user subscribes, notifications are sent out via the provided notification endpoint, such as an AWS SQS messaging queue or a URL. ### Create a Subscription diff --git a/message-queue-lib/src/cmr/message_queue/topic/aws_topic.clj b/message-queue-lib/src/cmr/message_queue/topic/aws_topic.clj index 3d0a0b6ff0..16fc5d2f71 100644 --- a/message-queue-lib/src/cmr/message_queue/topic/aws_topic.clj +++ b/message-queue-lib/src/cmr/message_queue/topic/aws_topic.clj @@ -79,7 +79,8 @@ (:Mode subscription-metadata)) (let [filters (util/remove-nil-keys {:collection-concept-id [(:CollectionConceptId subscription-metadata)] - :mode (:Mode subscription-metadata)}) + :mode (:Mode subscription-metadata) + :subscriber [(:SubscriberId subscription-metadata)]}) filter-json (json/generate-string filters) sub-filter-request (-> (SetSubscriptionAttributesRequest/builder) (.subscriptionArn subscription-arn) diff --git a/message-queue-lib/src/cmr/message_queue/topic/local_topic.clj b/message-queue-lib/src/cmr/message_queue/topic/local_topic.clj index 845a4dc16d..da6172306d 100644 --- a/message-queue-lib/src/cmr/message_queue/topic/local_topic.clj +++ b/message-queue-lib/src/cmr/message_queue/topic/local_topic.clj @@ -74,7 +74,8 @@ :filter (when (or (:CollectionConceptId metadata) (:Mode metadata)) {:collection-concept-id (:CollectionConceptId metadata) - :mode (:Mode metadata)}) + :mode (:Mode metadata) + :subscriber (:SubscriberId metadata)}) :queue-url (:EndPoint metadata) :dead-letter-queue-url (queue/create-queue sqs-client (config/cmr-subscriptions-dead-letter-queue-name)) :concept-id (:concept-id subscription)}] diff --git a/metadata-db-app/src/cmr/metadata_db/services/subscriptions.clj b/metadata-db-app/src/cmr/metadata_db/services/subscriptions.clj index b193a3da3d..3469e35305 100644 --- a/metadata-db-app/src/cmr/metadata_db/services/subscriptions.clj +++ b/metadata-db-app/src/cmr/metadata_db/services/subscriptions.clj @@ -60,22 +60,31 @@ (use 'clojure.set) (defn create-mode-to-endpoints-map - "Creates a mode-to-endpoints map given a list of subscriptions all for the same one collection. - Returns a map in this structure: + "Creates a list of endpoint sets associated to a mode. For each ingest subscription, + an endpoint set is created consisting first of the subscription's endpoint followed + by the subscriber-id. This set is then put on one or more lists that is associated + to each mode described in the subscription. The mode lists are merged together per + each collection concept id that exist in all of the ingest subscriptions. + This function returns a map in this structure: { - new: ['sqs:arn:111', 'https://www.url1.com', 'https://www.url2.com'], - update: ['sqs:arn:111'], - delete: ['https://www.url1.com'] - }" + new: [['sqs:arn:111', 'user1'], ['https://www.url1.com', 'user2'], ['https://www.url2.com', 'user3']], + update: [['sqs:arn:111', 'user1']], + delete: [['https://www.url1.com', 'user2']] + } + + This structure is used for fast lookups by mode. For a mode, each endpoint set is iterated over + using the subscriber id to check if the subscriber has read access. If they do then a notification + is sent to the endpoint." [subscriptions-of-same-collection] (let [final-map (atom {})] (doseq [sub subscriptions-of-same-collection] (let [temp-map (atom {}) modes (get-in sub [:metadata :Mode]) - endpoint (get-in sub [:metadata :EndPoint])] + endpoint (get-in sub [:metadata :EndPoint]) + subscriber (get-in sub [:metadata :SubscriberId])] (doseq [mode modes] - (swap! temp-map assoc mode #{endpoint})) + (swap! temp-map assoc mode #{[endpoint, subscriber]})) (let [merged-map (merge-with union @final-map @temp-map)] (swap! final-map (fn [n] merged-map))))) @final-map)) @@ -188,8 +197,8 @@ and put those into a map. The end result looks like: { coll_1: { Mode: { - New: #(URL1, SQS1), - Update: #(URL2) + New: #([URL1, user1], [SQS1, user2]), + Update: #([URL2, user3]) } }, coll_2: {...}" @@ -266,9 +275,10 @@ (defn create-message-attributes "Create the notification message attributes so that the notifications can be filtered to the correct subscribing endpoint." - [collection-concept-id mode] + [collection-concept-id mode subscriber] {"collection-concept-id" collection-concept-id - "mode" mode}) + "mode" mode + "subscriber" subscriber}) (defn create-message-subject "Creates the message subject." @@ -288,16 +298,21 @@ (defn- create-message-attributes-map "Create message attribute map that SQS uses to filter out messages from the SNS topic." - [endpoint mode coll-concept-id] - (cond - (or (is-valid-sqs-arn endpoint) - (is-local-test-queue endpoint)) (cond - (= "Delete" mode) (create-message-attributes coll-concept-id "Delete") - (= "New" mode) (create-message-attributes coll-concept-id "New") - (= "Update" mode) (create-message-attributes coll-concept-id "Update")) - (is-valid-subscription-endpoint-url endpoint) {"endpoint" endpoint - "endpoint-type" "url" - "mode" mode})) + [endpoint-set mode coll-concept-id] + (let [endpoint (first endpoint-set) + subscriber (second endpoint-set)] + (cond + (or (is-valid-sqs-arn endpoint) + (is-local-test-queue endpoint)) (cond + (= "Delete" mode) (create-message-attributes coll-concept-id "Delete" subscriber) + (= "New" mode) (create-message-attributes coll-concept-id "New" subscriber) + (= "Update" mode) (create-message-attributes coll-concept-id "Update" subscriber)) + (is-valid-subscription-endpoint-url endpoint) {"endpoint" endpoint + "endpoint-type" "url" + "mode" mode + "subscriber" subscriber + "collection-concept-id" coll-concept-id})) + ) (defn publish-subscription-notification-if-applicable "Publish a notification to the topic if the passed-in concept is a granule @@ -313,16 +328,16 @@ endpoint-list (get-in sub-cache-map ["Mode" gran-concept-mode]) result-array (atom [])] ;; for every endpoint in the list create an attributes/subject map and send it along its way - (doseq [endpoint endpoint-list] + (doseq [endpoint-set endpoint-list] (let [topic (get-in context [:system :sns :internal]) coll-concept-id (:parent-collection-id (:extra-fields concept)) message (create-notification-message-body concept) - message-attributes-map (create-message-attributes-map endpoint gran-concept-mode coll-concept-id) + message-attributes-map (create-message-attributes-map endpoint-set gran-concept-mode coll-concept-id) subject (create-message-subject gran-concept-mode)] (when (and message-attributes-map subject) (let [result (topic-protocol/publish topic message message-attributes-map subject) duration (- (System/currentTimeMillis) start)] - (debug (format "Subscription publish for endpoint %s took %d ms." endpoint duration)) + (debug (format "Subscription publish for endpoint %s took %d ms." (first endpoint-set) duration)) (swap! result-array (fn [n] (conj @result-array result))))))) @result-array))))) diff --git a/metadata-db-app/test/cmr/metadata_db/test/services/subscriptions_test.clj b/metadata-db-app/test/cmr/metadata_db/test/services/subscriptions_test.clj index 386dab7bbb..ad078aef4a 100644 --- a/metadata-db-app/test/cmr/metadata_db/test/services/subscriptions_test.clj +++ b/metadata-db-app/test/cmr/metadata_db/test/services/subscriptions_test.clj @@ -52,14 +52,16 @@ :metadata "{\"CollectionConceptId\":\"C12345-PROV1\", \"EndPoint\":\"some-endpoint\", \"Mode\":[\"New\"], - \"Method\":\"ingest\"}", + \"Method\":\"ingest\", + \"SubscriberId\":\"user1\"}", :extra-fields {:collection-concept-id "C12345-PROV1"}}))} (is (= 1 (subscriptions/change-subscription-in-cache test-context {:concept-type :subscription :deleted false :metadata {:CollectionConceptId "C12345-PROV1" :EndPoint "some-endpoint" :Mode ["New"] - :Method "ingest"} + :Method "ingest" + :SubscriberId "user1"} :extra-fields {:collection-concept-id "C12345-PROV1"}}))))) (testing "Delete a subscription from the cache" (with-bindings {#'subscriptions/get-subscriptions-from-db (fn [_context _coll-concept-id] '({:concept-type :subscription @@ -67,14 +69,16 @@ :metadata "{\"CollectionConceptId\":\"C12345-PROV1\", \"EndPoint\":\"some-endpoint\", \"Mode\":[\"New\"], - \"Method\":\"ingest\"}", + \"Method\":\"ingest\", + \"SubscriberId\":\"user1\"}", :extra-fields {:collection-concept-id "C12345-PROV1"}}))} (is (= 1 (subscriptions/change-subscription-in-cache test-context {:concept-type :subscription :deleted true :metadata {:CollectionConceptId "C12345-PROV1" :EndPoint "some-endpoint" :Mode ["New"] - :Method "ingest"} + :Method "ingest" + :SubscriberId "user1"} :extra-fields {:collection-concept-id "C12345-PROV1"}}))))) (testing "adding and removing subscriptions from the cache." @@ -86,75 +90,81 @@ "Adding 1 subscription" - {"C12345-PROV1" {"Mode" {"Update" (set ["some-endpoint"])}}} + {"C12345-PROV1" {"Mode" {"Update" (set [["some-endpoint" "user1"]])}}} {:metadata {:CollectionConceptId "C12345-PROV1"}} '({:metadata "{\"CollectionConceptId\":\"C12345-PROV1\", \"EndPoint\":\"some-endpoint\", \"Mode\":[\"Update\"], - \"Method\":\"ingest\"}", + \"Method\":\"ingest\", + \"SubscriberId\":\"user1\"}", :extra-fields {:collection-concept-id "C12345-PROV1"} :deleted false :concept-type :subscription}) "Adding duplicate subscription" - {"C12345-PROV1" {"Mode" {"Update" (set ["some-endpoint"])}}} + {"C12345-PROV1" {"Mode" {"Update" (set [["some-endpoint" "user1"]])}}} {:metadata {:CollectionConceptId "C12345-PROV1"}} '({:metadata "{\"CollectionConceptId\":\"C12345-PROV1\", \"EndPoint\":\"some-endpoint\", \"Mode\":[\"Update\"], - \"Method\":\"ingest\"}", + \"Method\":\"ingest\", + \"SubscriberId\":\"user1\"}", :extra-fields {:collection-concept-id "C12345-PROV1"} :deleted false :concept-type :subscription}) "Adding override subscription" - {"C12345-PROV1" {"Mode" {"New" (set ["some-endpoint"]) - "Delete" (set ["some-endpoint"])}}} + {"C12345-PROV1" {"Mode" {"New" (set [["some-endpoint" "user1"]]) + "Delete" (set [["some-endpoint" "user1"]])}}} {:metadata {:CollectionConceptId "C12345-PROV1"}} '({:metadata "{\"CollectionConceptId\":\"C12345-PROV1\", \"EndPoint\":\"some-endpoint\", \"Mode\":[\"New\", \"Delete\"], - \"Method\":\"ingest\"}" + \"Method\":\"ingest\", + \"SubscriberId\":\"user1\"}" :extra-fields {:collection-concept-id "C12345-PROV1"} :deleted false :concept-type :subscription}) "Adding new subscription that matches the one from before." - {"C12345-PROV1" {"Mode" {"New" (set ["some-endpoint"]) - "Delete" (set ["some-endpoint"])}} - "C12346-PROV1" {"Mode" {"New" (set ["some-endpoint"]) - "Delete" (set ["some-endpoint"])}}} + {"C12345-PROV1" {"Mode" {"New" (set [["some-endpoint" "user1"]]) + "Delete" (set [["some-endpoint" "user1"]])}} + "C12346-PROV1" {"Mode" {"New" (set [["some-endpoint" "user1"]]) + "Delete" (set [["some-endpoint" "user1"]])}}} {:metadata {:CollectionConceptId "C12346-PROV1"}} '({:metadata "{\"CollectionConceptId\":\"C12346-PROV1\", \"EndPoint\":\"some-endpoint\", \"Mode\":[\"New\", \"Delete\"], - \"Method\":\"ingest\"}", + \"Method\":\"ingest\", + \"SubscriberId\":\"user1\"}", :extra-fields {:collection-concept-id "C12346-PROV1"} :deleted false :concept-type :subscription}) "Removing 1 subscription" - {"C12346-PROV1" {"Mode" {"New" (set ["some-endpoint"]) - "Delete" (set ["some-endpoint"])}}} + {"C12346-PROV1" {"Mode" {"New" (set [["some-endpoint" "user1"]]) + "Delete" (set [["some-endpoint" "user1"]])}}} {:metadata {:CollectionConceptId "C12345-PROV1"}} ;; even though C12346-PROV1 is in the db, we are search only for ;; concepts with the collection-concept-id. '({:metadata "{\"CollectionConceptId\":\"C12345-PROV1\", \"EndPoint\":\"some-endpoint\", \"Mode\":[\"New\", \"Delete\"], - \"Method\":\"ingest\"}" + \"Method\":\"ingest\", + \"SubscriberId\":\"user1\"}" :extra-fields {:collection-concept-id "C12345-PROV1"} :deleted true :concept-type :subscription}) "Removing same subscription" - {"C12346-PROV1" {"Mode" {"New" (set ["some-endpoint"]) - "Delete" (set ["some-endpoint"])}}} + {"C12346-PROV1" {"Mode" {"New" (set [["some-endpoint" "user1"]]) + "Delete" (set [["some-endpoint" "user1"]])}}} {:metadata {:CollectionConceptId "C12345-PROV1"}} '({:metadata "{\"CollectionConceptId\":\"C12345-PROV1\", - \"EndPoint\":\"some-endpoint\", - \"Mode\":[\"New\", \"Delete\"], - \"Method\":\"ingest\"}" + \"EndPoint\":\"some-endpoint\", + \"Mode\":[\"New\", \"Delete\"], + \"Method\":\"ingest\", + \"SubscriberId\":\"user1\"}" :extra-fields {:collection-concept-id "C12345-PROV1"} :deleted true :concept-type :subscription}) @@ -163,9 +173,10 @@ nil {:metadata {:CollectionConceptId "C12346-PROV1"}} '({:metadata "{\"CollectionConceptId\":\"C12346-PROV1\", - \"EndPoint\":\"some-endpoint\", - \"Mode\":[\"New\", \"Delete\"], - \"Method\":\"ingest\"}", + \"EndPoint\":\"some-endpoint\", + \"Mode\":[\"New\", \"Delete\"], + \"Method\":\"ingest\", + \"SubscriberId\":\"user1\"}", :extra-fields {:collection-concept-id "C12346-PROV1"} :deleted true :concept-type :subscription}) @@ -181,13 +192,17 @@ (is (= {:metadata {:CollectionConceptId "C12345-PROV1" :EndPoint "ARN" :Mode ["New" "Delete"] - :Method "ingest"} + :Method "ingest" + :SubscriberId "user1"} :concept-type :subscription} - (subscriptions/add-or-delete-ingest-subscription-in-cache test-context {:metadata "{\"CollectionConceptId\":\"C12345-PROV1\", - \"EndPoint\":\"ARN\", - \"Mode\":[\"New\", \"Delete\"], - \"Method\":\"ingest\"}" - :concept-type :subscription})))))))) + (subscriptions/add-or-delete-ingest-subscription-in-cache + test-context + {:metadata "{\"CollectionConceptId\":\"C12345-PROV1\", + \"EndPoint\":\"ARN\", + \"Mode\":[\"New\", \"Delete\"], + \"Method\":\"ingest\", + \"SubscriberId\":\"user1\"}" + :concept-type :subscription})))))))) (def db-result-1 '({:revision-id 1 @@ -198,7 +213,7 @@ :transaction-id "2000000009M" :native-id "erichs_ingest_subscription" :concept-id "SUB1200000005-PROV1" - :metadata "{\"SubscriberId\":\"eereiter\", + :metadata "{\"SubscriberId\":\"user1\", \"CollectionConceptId\":\"C1200000002-PROV1\", \"EndPoint\":\"some-endpoint\", \"Mode\":[\"New\",\"Delete\"], @@ -212,7 +227,7 @@ :extra-fields {:normalized-query "76c6d7a828ef81efb3720638f335f65c" :subscription-type "granule" :subscription-name "Ingest-Subscription-Test" - :subscriber-id "eereiter" + :subscriber-id "user1" :collection-concept-id "C1200000002-PROV1"} :concept-type :subscription})) @@ -225,7 +240,7 @@ :transaction-id "2000000010M" :native-id "erichs_ingest_subscription2" :concept-id "SUB1200000006-PROV1" - :metadata "{\"SubscriberId\":\"eereiter\", + :metadata "{\"SubscriberId\":\"user1\", \"CollectionConceptId\":\"C12346-PROV1\", \"EndPoint\":\"some-endpoint\", \"Mode\":[\"New\",\"Update\"], @@ -239,7 +254,7 @@ :extra-fields {:normalized-query "76c6d7a828ef81efb3720638f335f65c" :subscription-type "granule" :subscription-name "Ingest-Subscription-Test" - :subscriber-id "eereiter" + :subscriber-id "user1" :collection-concept-id "C12346-PROV1"} :concept-type :subscription})) @@ -252,7 +267,7 @@ :transaction-id "2000000010M" :native-id "erichs_ingest_subscription2" :concept-id "SUB1200000006-PROV1" - :metadata "{\"SubscriberId\":\"eereiter\", + :metadata "{\"SubscriberId\":\"user1\", \"CollectionConceptId\":\"C12346-PROV1\", \"EndPoint\":\"some-endpoint-2\", \"Mode\":[\"New\"], @@ -266,7 +281,7 @@ :extra-fields {:normalized-query "76c6d7a828ef81efb3720638f335f65c" :subscription-type "granule" :subscription-name "Ingest-Subscription-Test" - :subscriber-id "eereiter" + :subscriber-id "user1" :collection-concept-id "C12346-PROV1"} :concept-type :subscription})) @@ -281,7 +296,7 @@ :transaction-id "2000000011M" :native-id "erichs_ingest_subscription3" :concept-id "SUB1200000008-PROV1" - :metadata "{\"SubscriberId\":\"eereiter\", + :metadata "{\"SubscriberId\":\"user1\", \"CollectionConceptId\":\"C1200000002-PROV1\", \"EndPoint\":\"some-endpoint\", \"Mode\":[\"Update\"], @@ -295,7 +310,7 @@ :extra-fields {:normalized-query "76c6d7a828ef81efb3720638f335f65c" :subscription-type "granule" :subscription-name "Ingest-Subscription-Test" - :subscriber-id "eereiter" + :subscriber-id "user1" :collection-concept-id "C1200000002-PROV1"} :concept-type :subscription}))) @@ -308,7 +323,7 @@ :transaction-id "20000000013M" :native-id "erichs_ingest_subscription9" :concept-id "SUB1200000009-PROV1" - :metadata "{\"SubscriberId\":\"eereiter\", + :metadata "{\"SubscriberId\":\"user1\", \"CollectionConceptId\":\"C1200000003-PROV1\", \"EndPoint\":\"some-endpoint\", \"Mode\":[\"New\",\"Delete\"], @@ -322,7 +337,7 @@ :extra-fields {:normalized-query "76c6d7a828ef81efb3720638f335f65c" :subscription-type "granule" :subscription-name "Ingest-Subscription-Test" - :subscriber-id "eereiter" + :subscriber-id "user1" :collection-concept-id "C1200000003-PROV1"} :concept-type :subscription})) @@ -339,25 +354,25 @@ (subscriptions/change-subscription-in-cache test-context {:metadata {:CollectionConceptId "C1200000003-PROV1"}})) (testing "What is in the cache" (is (= {"C1200000002-PROV1" {"Mode" - {"New" (set ["some-endpoint"]) - "Delete" (set ["some-endpoint"])}} + {"New" (set [["some-endpoint" "user1"]]) + "Delete" (set [["some-endpoint" "user1"]])}} "C12346-PROV1" {"Mode" - {"New" (set ["some-endpoint"]) - "Update" (set ["some-endpoint"])}} + {"New" (set [["some-endpoint" "user1"]]) + "Update" (set [["some-endpoint" "user1"]])}} "C1200000003-PROV1" {"Mode" - {"New" (set ["some-endpoint"]) - "Delete" (set ["some-endpoint"])}}} + {"New" (set [["some-endpoint" "user1"]]) + "Delete" (set [["some-endpoint" "user1"]])}}} (hash-cache/get-map cache-client cache-key)))) (testing "Cache needs to be updated." (with-bindings {#'subscriptions/get-subscriptions-from-db (fn [_context] db-result-3)} (subscriptions/refresh-subscription-cache test-context)) ;; Update includes C1200000003-PROV1 was completely removed, and C1200000002-PROV1 mode Update was added and C12346-PROV1 lost its Update Mode and New Mode has a new endpoint (is (= {"C1200000002-PROV1" {"Mode" - {"New" (set ["some-endpoint"]) - "Update" (set ["some-endpoint"]) - "Delete" (set ["some-endpoint"])}} + {"New" (set [["some-endpoint" "user1"]]) + "Update" (set [["some-endpoint" "user1"]]) + "Delete" (set [["some-endpoint" "user1"]])}} "C12346-PROV1" {"Mode" - {"New" (set ["some-endpoint-2"])}}} + {"New" (set [["some-endpoint-2" "user1"]])}}} (hash-cache/get-map cache-client cache-key)))) (testing "Testing no subscriptions" (with-bindings {#'subscriptions/get-subscriptions-from-db (fn [_context] '())} @@ -399,10 +414,12 @@ (deftest create-message-attributes-test (testing "Creating the message attributes." (let [collection-concept-id "C12345-PROV1" - mode "New"] + mode "New" + subscriber "user1"] (is {"collection-concept-id" "C12345-PROV1" - "mode" "New"} - (subscriptions/create-message-attributes collection-concept-id mode))))) + "mode" "New" + "subsciber" "user1"} + (subscriptions/create-message-attributes collection-concept-id mode subscriber))))) (deftest create-message-subject-test (testing "Creating the message subject." @@ -424,7 +441,7 @@ :native-id "erichs_ingest_subscription" :concept-id "SUB1200000005-PROV1" :metadata (format - "{\"SubscriberId\":\"eereiter\", + "{\"SubscriberId\":\"user1\", \"CollectionConceptId\":\"C1200000002-PROV1\", \"EndPoint\":\"%s\", \"Mode\":[\"New\",\"Delete\"], @@ -439,7 +456,7 @@ :extra-fields {:normalized-query "76c6d7a828ef81efb3720638f335f65c" :subscription-type "granule" :subscription-name "Ingest-Subscription-Test" - :subscriber-id "eereiter" + :subscriber-id "user1" :collection-concept-id "C1200000002-PROV1"} :concept-type :subscription})) @@ -491,7 +508,8 @@ concept-metadata (format "{\"CollectionConceptId\": \"C1200000002-PROV1\", \"EndPoint\": \"%s\", \"Mode\":[\"New\", \"Delete\"], - \"Method\":\"ingest\"}" + \"Method\":\"ingest\", + \"SubscriberId\":\"user1\"}" queue-url)] (testing "Concept not a granule" @@ -586,7 +604,8 @@ concept-metadata (format "{\"CollectionConceptId\": \"C1200000002-PROV1\", \"EndPoint\": \"%s\", \"Mode\":[\"New\", \"Delete\"], - \"Method\":\"ingest\"}" + \"Method\":\"ingest\", + \"SubscriberId\":\"user1\"}" queue-arn)] (testing "Concept will get published." (with-bindings {#'subscriptions/get-subscriptions-from-db (fn [_context _coll-concept-id] db-result)} @@ -602,17 +621,18 @@ \"Identifier\": \"Algorithm-1\"}]}}" :extra-fields {:parent-collection-id "C1200000002-PROV1"}} _ (subscriptions/add-or-delete-ingest-subscription-in-cache test-context sub-concept) - subscription-arn (subscriptions/attach-subscription-to-topic test-context sub-concept) - sub-concept (assoc-in sub-concept [:extra-fields :aws-arn] subscription-arn)] + sub-concept (subscriptions/attach-subscription-to-topic test-context sub-concept) + subscription-arn (get-in sub-concept [:extra-fields :aws-arn])] (is (some? subscription-arn)) - (when subscription-arn - (is (some? (subscriptions/delete-ingest-subscription test-context sub-concept)))) ;; publish message. this should publish to the internal queue (is (some? (subscriptions/publish-subscription-notification-if-applicable test-context granule-concept))) - (let [internal-queue-url "https://sqs.us-east-1.amazonaws.com/832706493240/cmr-internal-subscriptions-queue-sit" + (when subscription-arn + (is (some? (subscriptions/delete-ingest-subscription test-context sub-concept)))) + + (let [internal-queue-url "https://sqs.us-east-1.amazonaws.com/832706493240/cmr-internal-subscriptions-test-queue" messages (queue/receive-messages sqs-client internal-queue-url) message-str (.body (first messages)) message (json/decode message-str true) @@ -747,6 +767,7 @@ :Mode ["New", "Update"], :Method "ingest", :Type "granule", + :SubscriberId "user-1" }, :extra-fields {:aws-arn "sqs:arn:1"}, :concept-type :subscription}, @@ -759,6 +780,7 @@ :Mode ["New", "Delete"], :Method "ingest", :Type "granule", + :SubscriberId "user-2" }, :extra-fields {:aws-arn "https://www.url1.com"}, :concept-type :subscription}, @@ -771,6 +793,7 @@ :Mode ["New"], :Method "ingest", :Type "granule", + :SubscriberId "user-2" }, :extra-fields {:aws-arn "https://www.url2.com"}, :concept-type :subscription}, @@ -783,19 +806,21 @@ :Mode ["New"], :Method "ingest", :Type "granule", + :SubscriberId "user-3" }, :extra-fields {:aws-arn "https://www.url2.com"}, :concept-type :subscription}] - {"New" #{"sqs:arn:1", "https://www.url1.com", "https://www.url2.com"} - "Update" #{"sqs:arn:1"} - "Delete" #{"https://www.url1.com"}} + {"New" #{["sqs:arn:1" "user-1"], ["https://www.url1.com" "user-2"], ["https://www.url2.com" "user-2"], ["https://www.url2.com" "user-3"]} + "Update" #{["sqs:arn:1" "user-1"]} + "Delete" #{["https://www.url1.com" "user-2"]}} ))) (deftest attach-subscription-to-topic (let [concept-metadata "{\"CollectionConceptId\": \"C1200000002-PROV1\", - \"EndPoint\": \"some-endpoint\", - \"Mode\":[\"New\", \"Delete\"], - \"Method\":\"ingest\"}" + \"EndPoint\": \"some-endpoint\", + \"Mode\":[\"New\", \"Delete\"], + \"Method\":\"ingest\", + \"SubscriberId\":\"user1\"}" ingest-concept {:metadata concept-metadata :concept-type :subscription :concept-id "SUB1200000005-PROV1"} diff --git a/subscription/Dockerfile b/subscription/Dockerfile index 4c0b74aa94..2025a7536e 100644 --- a/subscription/Dockerfile +++ b/subscription/Dockerfile @@ -6,6 +6,7 @@ ARG DEAD_LETTER_QUEUE_URL ARG SNS_NAME ARG SUB_DEAD_LETTER_QUEUE_URL ARG ENVIRONMENT_NAME +ARG LOG_LEVEL #Set environment variables ENV AWS_REGION=$AWS_REGION @@ -15,6 +16,7 @@ ENV LONG_POLL_TIME=10 ENV SNS_NAME=$SNS_NAME ENV SUB_DEAD_LETTER_QUEUE_URL=$SUB_DEAD_LETTER_QUEUE_URL ENV ENVIRONMENT_NAME=$ENVIRONMENT_NAME +ENV LOG_LEVEL=$LOG_LEVEL #Set working directory WORKDIR /app diff --git a/subscription/src/access_control.py b/subscription/src/access_control.py index 9eaf392c8c..6a1abf4ecd 100644 --- a/subscription/src/access_control.py +++ b/subscription/src/access_control.py @@ -1,4 +1,5 @@ import os +import json import requests from env_vars import Env_Vars from sys import stdout @@ -6,12 +7,12 @@ class AccessControl: """Encapsulates Access Control API. - This class needs the following environment variables set: + This class needs the following environment variables set with an example value: For local development: ACCESS_CONTROL_URL=http://localhost:3011/access-control For AWS: - ENVIRONMENT_NAME=SIT + ENVIRONMENT_NAME=sit CMR_ACCESS_CONTROL_PROTOCOL=https CMR_ACCESS_CONTROL_PORT=3011 CMR_ACCESS_CONTROL_HOST=cmr.sit.earthdata.nasa.gov @@ -19,13 +20,12 @@ class AccessControl: Example Use of this class access_control = AccessControl() - response = access_control.get_permissions('eereiter', 'C1200484253-CMR_ONLY') - The call is the same as 'curl https://cmr.sit.earthdata.nasa.gov/access-control/permissions?user_id=eereiter&concept_id=C1200484253-CMR_ONLY' + response = access_control.get_permissions('user1', 'C1200484253-CMR_ONLY') + The call is the same as 'curl https://cmr.sit.earthdata.nasa.gov/access-control/permissions?user_id=user1&concept_id=C1200484253-CMR_ONLY' Return is either None (Null or Nil) (if check on response is false) or {"C1200484253-CMR_ONLY":["read","update","delete","order"]} """ - def __init__(self): """ Sets up a class variable of url.""" self.url = None @@ -40,6 +40,7 @@ def get_url_from_parameter_store(self): if access_control_url: self.url = access_control_url + logger.debug(f"Subscription Worker Access-Control URL: {self.url}") return else: # This block gets the access_control URL from the AWS parameter store. @@ -50,19 +51,22 @@ def get_url_from_parameter_store(self): raise ValueError("ENVIRONMENT_NAME environment variable is not set") # construct the access control parameter names from the environment variable - pre_fix = f"/{environment_name}/ingest/" + env_name = environment_name.lower() + pre_fix = f"/{env_name}/ingest/" protocol_param_name = f"{pre_fix}CMR_ACCESS_CONTROL_PROTOCOL" port_param_name = f"{pre_fix}CMR_ACCESS_CONTROL_PORT" host_param_name = f"{pre_fix}CMR_ACCESS_CONTROL_HOST" context_param_name = f"{pre_fix}CMR_ACCESS_CONTROL_RELATIVE_ROOT_URL" - env_vars = Env_Vars - protocol = env_vars.get_var(protocol_param_name) - port = env_vars.get_var(port_param_name) - host = env_vars.get_var(host_param_name) - context = env_vars.get_var(context_param_name) - self.url = f"{protocol}://{host}:{port}/{context}" - logger.debug("Subscription Worker Access-Control URL:" + self.url) + env_vars = Env_Vars() + protocol = env_vars.get_env_var_from_parameter_store(parameter_name=protocol_param_name) + port = env_vars.get_env_var_from_parameter_store(parameter_name=port_param_name) + host = env_vars.get_env_var_from_parameter_store(parameter_name=host_param_name) + context = env_vars.get_env_var_from_parameter_store(parameter_name=context_param_name) + + # The context already contains the forward / so we don't need it here. + self.url = f"{protocol}://{host}:{port}{context}" + logger.debug(f"Subscription Worker Access-Control URL: {self.url}") def get_url(self): """This function returns the access control URL if it has already been constructed, otherwise it constructs the URL and then returns it.""" @@ -91,8 +95,33 @@ def get_permissions(self, subscriber_id, concept_id): if response.status_code == 200: # Request was successful data = response.text - logger.debug("Response data:", data) + logger.debug(f"Response data: {data}") return data else: # Request failed logger.warning(f"Subscription Worker getting Access Control permissions request using URL {url} with parameters {params} failed with status code: {response.status_code}") + + def has_read_permission(self, subscriber_id, collection_concept_id): + """This function calls access control using a subscriber_id (a users earth data login name), and a CMR concept id. It gets the subscribers permission + set for a specific concept. access control returns None|Nil|Null back if the subscriber does not have any permissions for the concept. Access control + returns a map that contains a concept id followed by an array of permissions the user has on that concept: {"C1200484253-CMR_ONLY":["read","update","delete","order"]} + This function returns true if the read permission exists, false otherwise.""" + + try: + # Call the get_permissions function + permissions_str = self.get_permissions(subscriber_id, collection_concept_id) + + if permissions_str: + permissions = json.loads(permissions_str) + + # Check if the permissions is a dictionary and if + # the collection_concept_id is in the permissions dictionary + if isinstance(permissions, dict) and collection_concept_id in permissions: + # Check if "read" is in the list of permissions for the collection + return "read" in permissions[collection_concept_id] + return False + + except Exception as e: + # Handle any exceptions that may occur (e.g., network issues, API errors) + logger.error(f"Subscription Worker Access Control error getting permissions for subscriber {subscriber_id} on collection concept id {collection_concept_id}: {str(e)}") + return False diff --git a/subscription/src/env_vars.py b/subscription/src/env_vars.py index 0bc244461f..a273902edf 100644 --- a/subscription/src/env_vars.py +++ b/subscription/src/env_vars.py @@ -11,17 +11,22 @@ class Env_Vars: def __init__(self): self.ssm_client = boto3.client('ssm', region_name=os.getenv("AWS_REGION")) - def get_var(self, name, decryption=False): - value = os.getenv(name) + def get_env_var_from_parameter_store(self, parameter_name, decryption=False): + """The name parameter looks like /sit/ingest/ENVIRONMENT_VAR. To check if the environment + variable exists strip off everything except for the actual variable name. Otherwise + go to the AWS ParameterStore and get the values.""" + + logger.debug(f"Subscription worker: Getting the environment variable called {parameter_name}") + value = os.getenv(parameter_name.split('/')[-1]) if not value: try: # Get the parameter value from AWS Parameter Store - response = self.ssm_client.get_parameter(Name=name, WithDecryption=decryption) + response = self.ssm_client.get_parameter(Name=parameter_name, WithDecryption=decryption) return response['Parameter']['Value'] except ClientError as e: - logger.error(f"Error retrieving parameter from AWS Parameter Store: {e}") + logger.error(f"Error retrieving parameter {parameter_name} from AWS Parameter Store: {e}") raise else: return value diff --git a/subscription/src/logger.py b/subscription/src/logger.py index cd81298109..1baf9a7b33 100644 --- a/subscription/src/logger.py +++ b/subscription/src/logger.py @@ -2,9 +2,7 @@ import logging import sys -LOG_LEVEL = os.getenv("LOG_LEVEL") -if not LOG_LEVEL: - LOG_LEVEL = logging.INFO +LOG_LEVEL = os.getenv("LOG_LEVEL", logging.INFO) def setup_logger(name, log_file=None, level=logging.INFO): """Function to setup as many loggers as you want""" diff --git a/subscription/src/subscription_worker.py b/subscription/src/subscription_worker.py index b2b63ceece..b2444ff406 100644 --- a/subscription/src/subscription_worker.py +++ b/subscription/src/subscription_worker.py @@ -1,6 +1,7 @@ import boto3 import multiprocessing import os +import json from flask import Flask, jsonify from sns import Sns from botocore.exceptions import ClientError @@ -23,7 +24,7 @@ def receive_message(sqs_client, queue_url): WaitTimeSeconds=(int (LONG_POLL_TIME))) if len(response.get('Messages', [])) > 0: - logger.info(f"Number of messages received: {len(response.get('Messages', []))}") + logger.debug(f"Number of messages received: {len(response.get('Messages', []))}") return response def delete_message(sqs_client, queue_url, receipt_handle): @@ -39,18 +40,24 @@ def delete_messages(sqs_client, queue_url, messages): def process_messages(sns_client, topic, messages, access_control): """ Processes a list of messages that was received from a queue. Check to see if ACLs pass for the granule. If the checks pass then send the notification. """ - for message in messages.get("Messages", []): - - # Get the permission for the collection from access-control - # response = access_control.get_permissions(subscriber-id, collection-concept-id) - # Return is either None (Null or Nil) (if check on response is false) or - # {"C1200484253-CMR_ONLY":["read","update","delete","order"]} - #if response and if array contains read: - # publish message. - #else: - # log subscriber-id no longer has read access to collection-concept-id - sns_client.publish_message(topic, message) + for message in messages.get("Messages", []): + try: + message_body = json.loads(message["Body"]) + message_attributes = message_body["MessageAttributes"] + logger.debug(f"Subscription worker: Received message including attributes: {message_body}") + + subscriber = message_attributes['subscriber']['Value'] + collection_concept_id = message_attributes['collection-concept-id']['Value'] + + if(access_control.has_read_permission(subscriber, collection_concept_id)): + logger.debug(f"Subscription worker: {subscriber} has permission to receive granule notifications for {collection_concept_id}") + sns_client.publish_message(topic, message) + else: + logger.info(f"Subscription worker: {subscriber} does not have read permission to receive notifications for {collection_concept_id}.") + except Exception as e: + logger.error(f"Subscription worker: There is a problem process messages {message}. {e}") + def poll_queue(running): """ Poll the SQS queue and process messages. """ @@ -77,7 +84,7 @@ def poll_queue(running): delete_messages(sqs_client=sqs_client, queue_url=DEAD_LETTER_QUEUE_URL, messages=dl_messages) except Exception as e: - logger.warning(f"An error occurred receiving or deleting messages: {e}") + logger.error(f"An error occurred receiving or deleting messages: {e}") app = Flask(__name__) @app.route('/shutdown', methods=['POST']) diff --git a/subscription/test/access_control_test.py b/subscription/test/access_control_test.py index 43d3430f55..5ae5b50cf4 100644 --- a/subscription/test/access_control_test.py +++ b/subscription/test/access_control_test.py @@ -8,84 +8,104 @@ class TestAccessControl(unittest.TestCase): def setUp(self): - self.ac = AccessControl() + self.access_control = AccessControl() - @patch.dict(os.environ, {"ENVIRONMENT_NAME": "test_env"}) + @patch.dict(os.environ, {"ACCESS_CONTROL_URL": "http://localhost:3011/access-control"}) + def test_get_url_from_parameter_store_local(self): + self.access_control.get_url_from_parameter_store() + self.assertEqual(self.access_control.url, "http://localhost:3011/access-control") + + @patch.dict(os.environ, {"ENVIRONMENT_NAME": "SIT"}) @patch('access_control.Env_Vars') - def test_get_url_from_parameter_store(self, mock_env_vars): - mock_env_vars.get_var.side_effect = [ - "https", "443", "example.com", "api/v1" + def test_get_url_from_parameter_store_aws(self, mock_env_vars): + mock_env_vars_instance = MagicMock() + mock_env_vars_instance.get_env_var_from_parameter_store.side_effect = [ + "https", "3011", "cmr.sit.earthdata.nasa.gov", "/access-control" ] - - self.ac.get_url_from_parameter_store() - - self.assertEqual(self.ac.url, "https://example.com:443/api/v1") - - @patch.dict(os.environ, {}) - def test_get_url_from_parameter_store_no_env(self): - with self.assertRaises(ValueError): - self.ac.get_url_from_parameter_store() - - @patch('access_control.AccessControl.get_url_from_parameter_store') - def test_get_url(self, mock_get_url): - mock_get_url.return_value = None - self.ac.url = "https://test.com" - - # Exercise - result = self.ac.get_url() - - # Verify - self.assertEqual(result, "https://test.com") - mock_get_url.assert_not_called() - - @patch('access_control.AccessControl.get_url_from_parameter_store') - def test_get_url_not_set(self, mock_get_url): - mock_get_url.return_value = None - self.ac.url = None - - # Exercise - self.ac.get_url() - - # Verify - mock_get_url.assert_called_once() - - @patch('access_control.requests.get') - @patch('access_control.AccessControl.get_url') - def test_get_permissions_success(self, mock_get_url, mock_requests_get): - mock_get_url.return_value = "https://test.com" + mock_env_vars.return_value = mock_env_vars_instance + + self.access_control.get_url_from_parameter_store() + expected_url = "https://cmr.sit.earthdata.nasa.gov:3011/access-control" + self.assertEqual(self.access_control.url, expected_url) + + def test_get_url(self): + with patch.object(AccessControl, 'get_url_from_parameter_store') as mock_method: + self.access_control.url = "http://example.com" + result = self.access_control.get_url() + self.assertEqual(result, "http://example.com") + mock_method.assert_not_called() + + with patch.object(AccessControl, 'get_url_from_parameter_store') as mock_method: + self.access_control.url = None + self.access_control.get_url() + mock_method.assert_called_once() + + @patch('requests.get') + def test_get_permissions(self, mock_get): mock_response = MagicMock() mock_response.status_code = 200 - mock_response.text = '{"permissions": ["read", "write"]}' - mock_requests_get.return_value = mock_response - - # Exercise - result = self.ac.get_permissions("sub123", "concept456") + mock_response.text = '{"C1200484253-CMR_ONLY":["read","update","delete","order"]}' + mock_get.return_value = mock_response - # Verify - self.assertEqual(result, '{"permissions": ["read", "write"]}') - mock_requests_get.assert_called_once_with( - "https://test.com/permissions", - params={"user_id": "sub123", "concept_id": "concept456"} - ) + self.access_control.url = "http://example.com" + result = self.access_control.get_permissions("user1", "C1200484253-CMR_ONLY") + self.assertEqual(result, '{"C1200484253-CMR_ONLY":["read","update","delete","order"]}') - @patch('access_control.requests.get') - @patch('access_control.AccessControl.get_url') - def test_get_permissions_failure(self, mock_get_url, mock_requests_get): - mock_get_url.return_value = "https://test.com" + @patch('requests.get') + def test_get_permissions_failure(self, mock_get): mock_response = MagicMock() mock_response.status_code = 404 - mock_requests_get.return_value = mock_response - - # Exercise - result = self.ac.get_permissions("sub123", "concept456") + mock_get.return_value = mock_response - # Verify + self.access_control.url = "http://example.com" + result = self.access_control.get_permissions("user1", "C1200484253-CMR_ONLY") self.assertIsNone(result) - mock_requests_get.assert_called_once_with( - "https://test.com/permissions", - params={"user_id": "sub123", "concept_id": "concept456"} + + @patch.object(AccessControl, 'get_permissions') + def test_has_read_permission(self, mock_get_permissions): + # Test when user has read permission + mock_get_permissions.return_value = "{\"C1200484253-CMR_ONLY\": [\"read\", \"update\"]}" + result = self.access_control.has_read_permission("user1", "C1200484253-CMR_ONLY") + self.assertTrue(result) + + # Test when user doesn't have read permission + mock_get_permissions.return_value = "{\"C1200484253-CMR_ONLY\": [\"update\"]}" + result = self.access_control.has_read_permission("user1", "C1200484253-CMR_ONLY") + self.assertFalse(result) + + # Test when concept_id is not in permissions + mock_get_permissions.return_value = "{\"C1200484253-OTHER\": [\"read\"]}" + result = self.access_control.has_read_permission("user1", "C1200484253-CMR_ONLY") + self.assertFalse(result) + + # Test when permissions is not a dictionary + mock_get_permissions.return_value = None + result = self.access_control.has_read_permission("user1", "C1200484253-CMR_ONLY") + self.assertFalse(result) + + # Test when get_permissions raises an exception + mock_get_permissions.side_effect = Exception("API Error") + result = self.access_control.has_read_permission("user1", "C1200484253-CMR_ONLY") + self.assertFalse(result) + + @patch('access_control.logger') + @patch.object(AccessControl, 'get_permissions') + def test_has_read_permission_logging(self, mock_get_permissions, mock_logger): + # Test logging when an exception occurs + mock_get_permissions.side_effect = Exception("API Error") + self.access_control.has_read_permission("user1", "C1200484253-CMR_ONLY") + mock_logger.error.assert_called_once_with( + "Subscription Worker Access Control error getting permissions for subscriber user1 on collection concept id C1200484253-CMR_ONLY: API Error" ) + def test_has_read_permission_integration(self): + # This is an integration test that calls the actual get_permissions method + # Note: This test depends on the actual API and may fail if the API is not available + with patch.object(AccessControl, 'get_url', return_value='https://cmr.earthdata.nasa.gov/access-control'): + result = self.access_control.has_read_permission("test_user", "C1234567-TEST") + # Assert based on expected behavior. This might be True or False depending on the actual permissions + self.assertIsInstance(result, bool) + if __name__ == '__main__': unittest.main() diff --git a/subscription/test/env_vars_test.py b/subscription/test/env_vars_test.py index 3da300c5bc..47d60b7516 100644 --- a/subscription/test/env_vars_test.py +++ b/subscription/test/env_vars_test.py @@ -10,7 +10,7 @@ def setUp(self): @patch.dict(os.environ, {"TEST_VAR": "test_value"}) def test_get_var_from_os(self): - value = self.env_vars.get_var("TEST_VAR") + value = self.env_vars.get_env_var_from_parameter_store("TEST_VAR") self.assertEqual(value, "test_value") @patch.dict(os.environ, {}, clear=True) @@ -26,7 +26,7 @@ def test_get_var_from_ssm(self, mock_boto3_client): self.env_vars.ssm_client = mock_ssm - value = self.env_vars.get_var("SOME_VAR") + value = self.env_vars.get_env_var_from_parameter_store("SOME_VAR") self.assertEqual(value, "some_value") mock_ssm.get_parameter.assert_called_once_with(Name="SOME_VAR", WithDecryption=False) @@ -43,7 +43,7 @@ def test_get_var_with_decryption(self, mock_boto3_client): self.env_vars.ssm_client = mock_ssm - value = self.env_vars.get_var('ENCRYPTED_VAR', decryption=True) + value = self.env_vars.get_env_var_from_parameter_store('ENCRYPTED_VAR', decryption=True) self.assertEqual(value, 'encrypted_value') mock_ssm.get_parameter.assert_called_once_with(Name='ENCRYPTED_VAR', WithDecryption=True) @@ -60,7 +60,7 @@ def test_get_var_ssm_error(self, mock_boto3_client): self.env_vars.ssm_client = mock_ssm with self.assertRaises(ClientError): - self.env_vars.get_var('NONEXISTENT_VAR') + self.env_vars.get_env_var_from_parameter_store('NONEXISTENT_VAR') if __name__ == '__main__': unittest.main() diff --git a/subscription/test/subscription_worker_test.py b/subscription/test/subscription_worker_test.py index 95c472d7b7..6c597f6888 100644 --- a/subscription/test/subscription_worker_test.py +++ b/subscription/test/subscription_worker_test.py @@ -1,3 +1,4 @@ +import json import unittest from unittest.mock import patch, MagicMock import boto3 @@ -53,10 +54,38 @@ def test_process_messages(self, mock_access_control, mock_sns): mock_access_control_instance = MagicMock() mock_access_control.return_value = mock_access_control_instance - messages = {'Messages': [{'Body': 'test message'}]} + mock_access_control_instance.has_read_permission.return_value = True + + messages = { + 'Messages': [{ + 'Body': json.dumps({ + 'Type': 'Notification', + 'MessageId': 'dfb70dfe-6f63-5cfc-9a5f-6dc731b504de', + 'TopicArn': 'arn:name', + 'Subject': 'Update Notification', + 'Message': '{"concept-id": "G1200484365-PROV", "granule-ur": "gnss.rnx.gz.json", "producer-granule-id": "gnss.rnx.gz", "location": "http://localhost:3003/concepts/G1200484365-PROV/4"}', + 'Timestamp': '2025-02-26T18:25:26.951Z', + 'SignatureVersion': '1', + 'Signature': 'HIQ==', + 'SigningCertURL': 'https://sns.region.amazonaws.com/SNS-9.pem', + 'UnsubscribeURL': 'https://sns.region.amazonaws.com/?Ac', + 'MessageAttributes': { + 'mode': {'Type': 'String', 'Value': 'Update'}, + 'collection-concept-id': {'Type': 'String', 'Value': 'C1200484363-PROV'}, + 'endpoint': {'Type': 'String', 'Value': 'http://notification/tester'}, + 'subscriber': {'Type': 'String', 'Value': 'user1_test'}, + 'endpoint-type': {'Type': 'String', 'Value': 'url'} + } + }) + }] + } + process_messages(mock_sns_instance, 'test-topic', messages, mock_access_control_instance) + + # Check if has_read_permission was called with correct arguments + mock_access_control_instance.has_read_permission.assert_called_once_with('user1_test', 'C1200484363-PROV') - mock_sns_instance.publish_message.assert_called_once_with('test-topic', {'Body': 'test message'}) + mock_sns_instance.publish_message.assert_called_once_with('test-topic', messages['Messages'][0]) if __name__ == '__main__': unittest.main()