Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions ingest-app/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1169,10 +1169,6 @@ For NRT Notification subscriptions to be used there are three new fields that ar
</ul>
</ul>

NOTE: The same SQS endpoint cannot be used for the same collection more than once.
For example if you, USER 1, create a subscription with an SQS queue: SQS1 endpoint to filter UPDATE granule events from Collection 1, another user, USER 2, cannot create a new subscription with the same SQS1 queue endpoint to filter granules from Collection 1 again.
Ultimately, only ONE user can perform CRUD operations on the specific SQS endpoint used.

##### NRT Notification Subscription POST Request

```
Expand Down
40 changes: 4 additions & 36 deletions ingest-app/src/cmr/ingest/api/subscriptions.clj
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,12 @@
[cmr.common.services.errors :as errors]
[cmr.common.util :as util]
[cmr.ingest.api.core :as api-core]
[cmr.ingest.config :as ingest-config]
[cmr.ingest.services.ingest-service :as ingest]
[cmr.ingest.services.subscriptions-helper :as jobs]
[cmr.ingest.validation.validation :as v]
[cmr.transmit.access-control :as access-control]
[cmr.transmit.config :as config]
[cmr.transmit.metadata-db :as mdb]
[cmr.transmit.metadata-db2 :as metadata-db2]
[cmr.transmit.search :as search]
[cmr.transmit.urs :as urs])
(:import
Expand Down Expand Up @@ -85,43 +83,14 @@
[subscription-concept]
(let [method (:Method subscription-concept)
endpoint (:EndPoint subscription-concept)
curr-env (ingest-config/app-environment)
url-validator (if (= curr-env "local")
(UrlValidator. UrlValidator/ALLOW_LOCAL_URLS)
(UrlValidator.))]
default-url-validator (UrlValidator. UrlValidator/ALLOW_LOCAL_URLS)]

(if (= method "ingest")
(if-not (or (some? (re-matches #"arn:aws:sqs:.*" endpoint))
(.isValid url-validator endpoint))
(if-not (or (some? (re-matches #"arn:aws:sqs:.*" endpoint)) (.isValid default-url-validator endpoint))
(errors/throw-service-error
:bad-request
"Subscription creation failed - Method was ingest, but the endpoint given was not valid SQS ARN or HTTP/S URL."
"If it is a URL, make sure to give the full URL path like so: https://www.google.com. We do not accept local URLs.")))))

(defn- check-endpoint-queue-for-collection-not-already-exist
"Validates that the subscription with the same collection and same SQS endpoint does not already exist.
Throws error if the same collection and same SQS endpoint already exists because creating duplicate collection
with same SQS endpoint from a different user is not allowed."
[context subscription-concept]
(let [curr-endpoint (:EndPoint subscription-concept)]
(if (and (= "ingest" (:Method subscription-concept))
(or (some? (re-matches #"arn:aws:sqs:.*" curr-endpoint))
(some? (re-matches #"http://localhost:9324.*" curr-endpoint))))
(let [collection-concept-id (:CollectionConceptId subscription-concept)
cache-content (metadata-db2/get-subscription-cache-content context collection-concept-id)]
(if cache-content
;; cache-content format expected is something like: {:Mode {:Delete [sqs1 sqs2], :New [url1], :Update [url1]}}
(let [mode-map (get cache-content :Mode)]
;; check if any of the endpoints in each mode type is equal to the curr sqs endpoint, if so throw the error
(doseq [modetoendpointset mode-map]
(if (some #{curr-endpoint} (val modetoendpointset))
(errors/throw-service-error
:conflict
(format (str "The collection [%s] has already subscribed to the given sqs arn by another user. "
"Each Near Real Time subscription to a collection must have a unique sqs arn endpoint."
"You cannot have the same SQS queue subscribed to the same collection by multiple users,"
"only one user can crate/update/delete subscription to the same end client queue to the same collection.")
(util/html-escape collection-concept-id)))))))))))
"Subscription creation failed - Method was ingest, but the endpoint given was not valid SQS ARN or HTTP/S URL.
If it is a URL, make sure to give the full URL path like so: https://www.google.com.")))))

(defn- check-subscription-limit
"Given the configuration for subscription limit, this valdiates that the user has no more than
Expand Down Expand Up @@ -194,7 +163,6 @@
(check-duplicate-subscription request-context concept parsed)
(check-subscription-limit request-context concept parsed)
(check-subscriber-collection-permission request-context parsed)
(check-endpoint-queue-for-collection-not-already-exist request-context parsed)
(let [concept-with-user-id (api-core/set-user-id concept
request-context
headers)
Expand Down
5 changes: 0 additions & 5 deletions ingest-app/src/cmr/ingest/config.clj
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,3 @@
(defconfig validate-umm-var-keywords
"Flag for whether or not to validate UMM-Var against KMS keywords."
{:default false :type Boolean})

(declare app-environment)
(defconfig app-environment
"The environment in which the application is running in NGAP (wl, sit, uat, ops)"
{:default "local"})
63 changes: 13 additions & 50 deletions ingest-app/test/cmr/ingest/api/subscriptions_test.clj
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
(ns cmr.ingest.api.subscriptions-test
(:require
[clojure.string :as string]
[clojure.test :refer :all]
[cmr.common.util :as util]
[cmr.ingest.api.subscriptions :as subscriptions]
[cmr.ingest.config :as ingest-config]))
[clojure.string :as string]
[clojure.test :refer :all]
[cmr.common.util :as util]
[cmr.ingest.api.subscriptions :as subscriptions]))

(deftest generate-native-id-test
(let [parsed {:Name "the_beginning"
Expand All @@ -29,55 +28,19 @@
"given method is search and endpoint not given -- endpoint ignored"
{:EndPoint "", :Method "search"}

"given method is ingest, env is local, and endpoint is sqs arn"
"given method is ingest and sqs arn is valid"
{:EndPoint "arn:aws:sqs:us-east-1:000000000:Test-Queue", :Method "ingest"}

"given method is ingest, env is local, and url is non-local"
{:EndPoint "https://testwebsite.com", :Method "ingest"}

"given method is ingest, env is local, url is local"
{:EndPoint "http://localhost:8080/localllllll", :Method "ingest"}))
"given method is ingest and url is valid"
{:EndPoint "https://testwebsite.com", :Method "ingest"}))

(testing "validate subscription endpoint str -- expected invalid"
(with-redefs [ingest-config/app-environment (fn [] "non-local")]
(let [fun #'cmr.ingest.api.subscriptions/validate-subscription-endpoint]
;; given method is ingest, env is non-local and sqs arn -- throws error
(is (thrown? Exception (fun {:EndPoint "iaminvalidendpoint", :Method "ingest"})))

;; given method is ingest, env is non-local and endpoint is empty -- throws error
(is (thrown? Exception (fun {:EndPoint "", :Method "ingest"})))

;; ;; given method is ingest, env is non-local and endpoint is local endpoint -- throws error
(is (thrown? Exception (fun {:EndPoint "http://localhost:8080", :Method "ingest"})))))))

(deftest check-subscription-for-collection-not-already-exist-test
(let [fun #'cmr.ingest.api.subscriptions/check-endpoint-queue-for-collection-not-already-exist
context nil]
(util/are3 [subscription-concept]
(is (= nil (fun context subscription-concept)))

"subscription concept not ingest type -- does nothing"
{:EndPoint "", :Method "search"}

"subscription concept not sqs arn nor local queue arn -- does nothing"
{:EndPoint "http://www.something.com", :Method "ingest"})

(let [subscription-concept {:EndPoint "arn:aws:sqs:blahblah" :Method "ingest" :CollectionConceptId "C123-PROV1"}
returned-cache-content {:Mode {:Delete ["sqs1" "sqs2"], :New ["url1"], :Update ["url1"]}}
returned-cache-content-with-duplicates {:Mode {:Delete ["sqs1" "sqs2"], :New ["url1" "arn:aws:sqs:blahblah"], :Update ["url1"]}}]

;; method for getting cache-content returns error -- this method should bubble up that error
(with-redefs [cmr.transmit.metadata-db2/get-subscription-cache-content (fn [context collection-concept-id] (throw (Exception. "Exception was thrown from cache-content func")))]
(is (thrown? Exception (fun context subscription-concept))))

;; returns nil cache-content -- does nothing
(with-redefs [cmr.transmit.metadata-db2/get-subscription-cache-content (fn [context collection-concept-id] nil)]
(is (nil? (fun context subscription-concept))))
(let [fun #'cmr.ingest.api.subscriptions/validate-subscription-endpoint]
(is (thrown? Exception (fun subscription-concept))))

;; duplication collection to sqs queue not found -- does nothing
(with-redefs [cmr.transmit.metadata-db2/get-subscription-cache-content (fn [context collection-concept-id] returned-cache-content)]
(is (nil? (fun context subscription-concept))))
"given method is ingest and sqs arn is invalid"
{:EndPoint "iaminvalidendpoint", :Method "ingest"}

;; duplicate collection to sqs queue found -- throws error
(with-redefs [cmr.transmit.metadata-db2/get-subscription-cache-content (fn [context collection-concept-id] returned-cache-content-with-duplicates)]
(is (thrown? Exception (fun context subscription-concept)))))))
"given method is ingest and endpoint is empty is invalid"
{:Endpoint "", :Method "ingest"})))
7 changes: 2 additions & 5 deletions metadata-db-app/src/cmr/metadata_db/api/subscriptions.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
[clojure.string :as string]
[cmr.metadata-db.services.sub-notifications :as sub-note]
[cmr.metadata-db.services.subscriptions :as subscriptions]
[compojure.core :refer [PUT POST GET context]]))
[compojure.core :refer [PUT POST context]]))

(defn- update-subscription-notification-time
"Update a subscription notification time"
Expand All @@ -28,7 +28,4 @@
(update-subscription-notification-time request-context params body))
(POST "/refresh-subscription-cache"
{request-context :request-context}
(subscriptions/refresh-subscription-cache request-context))
;; get ingest subscription cache content for a specific collection
(GET "/cache-content" {:keys [params request-context]}
(subscriptions/get-cache-content request-context params))))
(subscriptions/refresh-subscription-cache request-context))))
11 changes: 0 additions & 11 deletions metadata-db-app/src/cmr/metadata_db/services/subscriptions.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
(:require
[cheshire.core :as json]
[cmr.common.log :refer [debug info]]
[cmr.common.services.errors :as errors]
[cmr.message-queue.topic.topic-protocol :as topic-protocol]
[cmr.metadata-db.api.route-helpers :as rh]
[cmr.metadata-db.config :as mdb-config]
[cmr.metadata-db.services.search-service :as mdb-search]
[cmr.metadata-db.services.subscription-cache :as subscription-cache]
Expand Down Expand Up @@ -328,15 +326,6 @@
(swap! result-array (fn [n] (conj @result-array result)))))))
@result-array)))))

(defn get-cache-content
[context params]
(if-let [collection-concept-id (:collection-concept-id params)]
(let [subscription-object (subscription-cache/get-value context collection-concept-id)]
{:status 200
:body (json/generate-string subscription-object)
:headers rh/json-header})
(errors/throw-service-error :bad-request "A collection concept id query parameter was required but was not provided.")))

(comment
(let [system (get-in user/system [:apps :metadata-db])]
(refresh-subscription-cache {:system system})))
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
[cmr.message-queue.pub-sub :as pub-sub]
[cmr.message-queue.test.test-util :as sqs-test-util]
[cmr.message-queue.topic.topic-protocol :as topic-protocol]
[cmr.metadata-db.api.route-helpers :as rh]
[cmr.metadata-db.config :as mdb-config]
[cmr.metadata-db.services.subscription-cache :as subscription-cache]
[cmr.metadata-db.services.subscriptions :as subscriptions]
Expand Down Expand Up @@ -818,14 +817,3 @@
(with-redefs [topic-protocol/subscribe (fn [topic concept-edn] nil)]
(testing "subscribe fails, will return concept without the aws-arn in extra fields and will NOT throw exception"
(is (= ingest-concept (subscriptions/attach-subscription-to-topic context ingest-concept)))))))

(deftest get-cache-content-test
;; cache returns some value -- return 200 with cache content
(with-redefs [subscription-cache/get-value (fn [context collection-concept-id] {})]
(is (= {:status 200
:body (json/generate-string {})
:headers rh/json-header}
(subscriptions/get-cache-content nil {:collection-concept-id "C123-PROV1"}))))
;; collection concept id not in params -- returns bad request exception
(with-redefs [subscription-cache/get-value (fn [context collection-concept-id] {})]
(is (thrown? Exception (subscriptions/get-cache-content nil {})))))
Loading