diff --git a/ingest-app/docs/api.md b/ingest-app/docs/api.md index a7272c825f..d9ff3e26a8 100644 --- a/ingest-app/docs/api.md +++ b/ingest-app/docs/api.md @@ -1169,6 +1169,10 @@ For NRT Notification subscriptions to be used there are three new fields that ar +NOTE: The same SQS endpoint cannot be used for the same collection more than once. +For example if you, USER 1, create a subscription with an SQS queue: SQS1 endpoint to filter UPDATE granule events from Collection 1, another user, USER 2, cannot create a new subscription with the same SQS1 queue endpoint to filter granules from Collection 1 again. +Ultimately, only ONE user can perform CRUD operations on the specific SQS endpoint used. + ##### NRT Notification Subscription POST Request ``` diff --git a/ingest-app/src/cmr/ingest/api/subscriptions.clj b/ingest-app/src/cmr/ingest/api/subscriptions.clj index aa5f7e6a5a..045c1e641a 100644 --- a/ingest-app/src/cmr/ingest/api/subscriptions.clj +++ b/ingest-app/src/cmr/ingest/api/subscriptions.clj @@ -11,12 +11,14 @@ [cmr.common.services.errors :as errors] [cmr.common.util :as util] [cmr.ingest.api.core :as api-core] + [cmr.ingest.config :as ingest-config] [cmr.ingest.services.ingest-service :as ingest] [cmr.ingest.services.subscriptions-helper :as jobs] [cmr.ingest.validation.validation :as v] [cmr.transmit.access-control :as access-control] [cmr.transmit.config :as config] [cmr.transmit.metadata-db :as mdb] + [cmr.transmit.metadata-db2 :as metadata-db2] [cmr.transmit.search :as search] [cmr.transmit.urs :as urs]) (:import @@ -83,14 +85,43 @@ [subscription-concept] (let [method (:Method subscription-concept) endpoint (:EndPoint subscription-concept) - default-url-validator (UrlValidator. UrlValidator/ALLOW_LOCAL_URLS)] + curr-env (ingest-config/app-environment) + url-validator (if (= curr-env "local") + (UrlValidator. UrlValidator/ALLOW_LOCAL_URLS) + (UrlValidator.))] (if (= method "ingest") - (if-not (or (some? (re-matches #"arn:aws:sqs:.*" endpoint)) (.isValid default-url-validator endpoint)) + (if-not (or (some? (re-matches #"arn:aws:sqs:.*" endpoint)) + (.isValid url-validator endpoint)) (errors/throw-service-error :bad-request - "Subscription creation failed - Method was ingest, but the endpoint given was not valid SQS ARN or HTTP/S URL. - If it is a URL, make sure to give the full URL path like so: https://www.google.com."))))) + "Subscription creation failed - Method was ingest, but the endpoint given was not valid SQS ARN or HTTP/S URL." + "If it is a URL, make sure to give the full URL path like so: https://www.google.com. We do not accept local URLs."))))) + +(defn- check-endpoint-queue-for-collection-not-already-exist + "Validates that the subscription with the same collection and same SQS endpoint does not already exist. + Throws error if the same collection and same SQS endpoint already exists because creating duplicate collection + with same SQS endpoint from a different user is not allowed." + [context subscription-concept] + (let [curr-endpoint (:EndPoint subscription-concept)] + (if (and (= "ingest" (:Method subscription-concept)) + (or (some? (re-matches #"arn:aws:sqs:.*" curr-endpoint)) + (some? (re-matches #"http://localhost:9324.*" curr-endpoint)))) + (let [collection-concept-id (:CollectionConceptId subscription-concept) + cache-content (metadata-db2/get-subscription-cache-content context collection-concept-id)] + (if cache-content + ;; cache-content format expected is something like: {:Mode {:Delete [sqs1 sqs2], :New [url1], :Update [url1]}} + (let [mode-map (get cache-content :Mode)] + ;; check if any of the endpoints in each mode type is equal to the curr sqs endpoint, if so throw the error + (doseq [modetoendpointset mode-map] + (if (some #{curr-endpoint} (val modetoendpointset)) + (errors/throw-service-error + :conflict + (format (str "The collection [%s] has already subscribed to the given sqs arn by another user. " + "Each Near Real Time subscription to a collection must have a unique sqs arn endpoint." + "You cannot have the same SQS queue subscribed to the same collection by multiple users," + "only one user can crate/update/delete subscription to the same end client queue to the same collection.") + (util/html-escape collection-concept-id))))))))))) (defn- check-subscription-limit "Given the configuration for subscription limit, this valdiates that the user has no more than @@ -163,6 +194,7 @@ (check-duplicate-subscription request-context concept parsed) (check-subscription-limit request-context concept parsed) (check-subscriber-collection-permission request-context parsed) + (check-endpoint-queue-for-collection-not-already-exist request-context parsed) (let [concept-with-user-id (api-core/set-user-id concept request-context headers) diff --git a/ingest-app/src/cmr/ingest/config.clj b/ingest-app/src/cmr/ingest/config.clj index 471c8f42cb..1bf92a84f1 100644 --- a/ingest-app/src/cmr/ingest/config.clj +++ b/ingest-app/src/cmr/ingest/config.clj @@ -154,3 +154,8 @@ (defconfig validate-umm-var-keywords "Flag for whether or not to validate UMM-Var against KMS keywords." {:default false :type Boolean}) + +(declare app-environment) +(defconfig app-environment + "The environment in which the application is running in NGAP (wl, sit, uat, ops)" + {:default "local"}) diff --git a/ingest-app/test/cmr/ingest/api/subscriptions_test.clj b/ingest-app/test/cmr/ingest/api/subscriptions_test.clj index c865b763cc..edccf1876c 100644 --- a/ingest-app/test/cmr/ingest/api/subscriptions_test.clj +++ b/ingest-app/test/cmr/ingest/api/subscriptions_test.clj @@ -1,9 +1,10 @@ (ns cmr.ingest.api.subscriptions-test (:require - [clojure.string :as string] - [clojure.test :refer :all] - [cmr.common.util :as util] - [cmr.ingest.api.subscriptions :as subscriptions])) + [clojure.string :as string] + [clojure.test :refer :all] + [cmr.common.util :as util] + [cmr.ingest.api.subscriptions :as subscriptions] + [cmr.ingest.config :as ingest-config])) (deftest generate-native-id-test (let [parsed {:Name "the_beginning" @@ -28,19 +29,55 @@ "given method is search and endpoint not given -- endpoint ignored" {:EndPoint "", :Method "search"} - "given method is ingest and sqs arn is valid" + "given method is ingest, env is local, and endpoint is sqs arn" {:EndPoint "arn:aws:sqs:us-east-1:000000000:Test-Queue", :Method "ingest"} - "given method is ingest and url is valid" - {:EndPoint "https://testwebsite.com", :Method "ingest"})) + "given method is ingest, env is local, and url is non-local" + {:EndPoint "https://testwebsite.com", :Method "ingest"} + + "given method is ingest, env is local, url is local" + {:EndPoint "http://localhost:8080/localllllll", :Method "ingest"})) (testing "validate subscription endpoint str -- expected invalid" + (with-redefs [ingest-config/app-environment (fn [] "non-local")] + (let [fun #'cmr.ingest.api.subscriptions/validate-subscription-endpoint] + ;; given method is ingest, env is non-local and sqs arn -- throws error + (is (thrown? Exception (fun {:EndPoint "iaminvalidendpoint", :Method "ingest"}))) + + ;; given method is ingest, env is non-local and endpoint is empty -- throws error + (is (thrown? Exception (fun {:EndPoint "", :Method "ingest"}))) + + ;; ;; given method is ingest, env is non-local and endpoint is local endpoint -- throws error + (is (thrown? Exception (fun {:EndPoint "http://localhost:8080", :Method "ingest"}))))))) + +(deftest check-subscription-for-collection-not-already-exist-test + (let [fun #'cmr.ingest.api.subscriptions/check-endpoint-queue-for-collection-not-already-exist + context nil] (util/are3 [subscription-concept] - (let [fun #'cmr.ingest.api.subscriptions/validate-subscription-endpoint] - (is (thrown? Exception (fun subscription-concept)))) + (is (= nil (fun context subscription-concept))) + + "subscription concept not ingest type -- does nothing" + {:EndPoint "", :Method "search"} + + "subscription concept not sqs arn nor local queue arn -- does nothing" + {:EndPoint "http://www.something.com", :Method "ingest"}) + + (let [subscription-concept {:EndPoint "arn:aws:sqs:blahblah" :Method "ingest" :CollectionConceptId "C123-PROV1"} + returned-cache-content {:Mode {:Delete ["sqs1" "sqs2"], :New ["url1"], :Update ["url1"]}} + returned-cache-content-with-duplicates {:Mode {:Delete ["sqs1" "sqs2"], :New ["url1" "arn:aws:sqs:blahblah"], :Update ["url1"]}}] + + ;; method for getting cache-content returns error -- this method should bubble up that error + (with-redefs [cmr.transmit.metadata-db2/get-subscription-cache-content (fn [context collection-concept-id] (throw (Exception. "Exception was thrown from cache-content func")))] + (is (thrown? Exception (fun context subscription-concept)))) + + ;; returns nil cache-content -- does nothing + (with-redefs [cmr.transmit.metadata-db2/get-subscription-cache-content (fn [context collection-concept-id] nil)] + (is (nil? (fun context subscription-concept)))) - "given method is ingest and sqs arn is invalid" - {:EndPoint "iaminvalidendpoint", :Method "ingest"} + ;; duplication collection to sqs queue not found -- does nothing + (with-redefs [cmr.transmit.metadata-db2/get-subscription-cache-content (fn [context collection-concept-id] returned-cache-content)] + (is (nil? (fun context subscription-concept)))) - "given method is ingest and endpoint is empty is invalid" - {:Endpoint "", :Method "ingest"}))) + ;; duplicate collection to sqs queue found -- throws error + (with-redefs [cmr.transmit.metadata-db2/get-subscription-cache-content (fn [context collection-concept-id] returned-cache-content-with-duplicates)] + (is (thrown? Exception (fun context subscription-concept))))))) diff --git a/metadata-db-app/src/cmr/metadata_db/api/subscriptions.clj b/metadata-db-app/src/cmr/metadata_db/api/subscriptions.clj index 5f0b11a3e8..a9d6aa5471 100644 --- a/metadata-db-app/src/cmr/metadata_db/api/subscriptions.clj +++ b/metadata-db-app/src/cmr/metadata_db/api/subscriptions.clj @@ -5,7 +5,7 @@ [clojure.string :as string] [cmr.metadata-db.services.sub-notifications :as sub-note] [cmr.metadata-db.services.subscriptions :as subscriptions] - [compojure.core :refer [PUT POST context]])) + [compojure.core :refer [PUT POST GET context]])) (defn- update-subscription-notification-time "Update a subscription notification time" @@ -28,4 +28,7 @@ (update-subscription-notification-time request-context params body)) (POST "/refresh-subscription-cache" {request-context :request-context} - (subscriptions/refresh-subscription-cache request-context)))) + (subscriptions/refresh-subscription-cache request-context)) + ;; get ingest subscription cache content for a specific collection + (GET "/cache-content" {:keys [params request-context]} + (subscriptions/get-cache-content request-context params)))) diff --git a/metadata-db-app/src/cmr/metadata_db/services/subscriptions.clj b/metadata-db-app/src/cmr/metadata_db/services/subscriptions.clj index b193a3da3d..8a42ae5f92 100644 --- a/metadata-db-app/src/cmr/metadata_db/services/subscriptions.clj +++ b/metadata-db-app/src/cmr/metadata_db/services/subscriptions.clj @@ -3,7 +3,9 @@ (:require [cheshire.core :as json] [cmr.common.log :refer [debug info]] + [cmr.common.services.errors :as errors] [cmr.message-queue.topic.topic-protocol :as topic-protocol] + [cmr.metadata-db.api.route-helpers :as rh] [cmr.metadata-db.config :as mdb-config] [cmr.metadata-db.services.search-service :as mdb-search] [cmr.metadata-db.services.subscription-cache :as subscription-cache] @@ -326,6 +328,15 @@ (swap! result-array (fn [n] (conj @result-array result))))))) @result-array))))) +(defn get-cache-content + [context params] + (if-let [collection-concept-id (:collection-concept-id params)] + (let [subscription-object (subscription-cache/get-value context collection-concept-id)] + {:status 200 + :body (json/generate-string subscription-object) + :headers rh/json-header}) + (errors/throw-service-error :bad-request "A collection concept id query parameter was required but was not provided."))) + (comment (let [system (get-in user/system [:apps :metadata-db])] (refresh-subscription-cache {:system system}))) diff --git a/metadata-db-app/test/cmr/metadata_db/test/services/subscriptions_test.clj b/metadata-db-app/test/cmr/metadata_db/test/services/subscriptions_test.clj index 386dab7bbb..c0c18e914c 100644 --- a/metadata-db-app/test/cmr/metadata_db/test/services/subscriptions_test.clj +++ b/metadata-db-app/test/cmr/metadata_db/test/services/subscriptions_test.clj @@ -8,6 +8,7 @@ [cmr.message-queue.pub-sub :as pub-sub] [cmr.message-queue.test.test-util :as sqs-test-util] [cmr.message-queue.topic.topic-protocol :as topic-protocol] + [cmr.metadata-db.api.route-helpers :as rh] [cmr.metadata-db.config :as mdb-config] [cmr.metadata-db.services.subscription-cache :as subscription-cache] [cmr.metadata-db.services.subscriptions :as subscriptions] @@ -817,3 +818,14 @@ (with-redefs [topic-protocol/subscribe (fn [topic concept-edn] nil)] (testing "subscribe fails, will return concept without the aws-arn in extra fields and will NOT throw exception" (is (= ingest-concept (subscriptions/attach-subscription-to-topic context ingest-concept))))))) + +(deftest get-cache-content-test + ;; cache returns some value -- return 200 with cache content + (with-redefs [subscription-cache/get-value (fn [context collection-concept-id] {})] + (is (= {:status 200 + :body (json/generate-string {}) + :headers rh/json-header} + (subscriptions/get-cache-content nil {:collection-concept-id "C123-PROV1"})))) + ;; collection concept id not in params -- returns bad request exception + (with-redefs [subscription-cache/get-value (fn [context collection-concept-id] {})] + (is (thrown? Exception (subscriptions/get-cache-content nil {}))))) diff --git a/transmit-lib/src/cmr/transmit/metadata_db2.clj b/transmit-lib/src/cmr/transmit/metadata_db2.clj index a2b928cf0f..0f99f7b7b3 100644 --- a/transmit-lib/src/cmr/transmit/metadata_db2.clj +++ b/transmit-lib/src/cmr/transmit/metadata_db2.clj @@ -2,10 +2,14 @@ "This contains functions for interacting with the metadata db API. It uses the newer transmit namespace style that concepts, and access control use" (:require - [cmr.transmit.config :as config] - [cmr.transmit.connection :as conn] - [cmr.transmit.http-helper :as h] - [ring.util.codec :as codec])) + [cheshire.core :as json] + [clj-http.client :as client] + [cmr.common.api.context :as ch] + [cmr.common.services.errors :as errors] + [cmr.transmit.config :as config] + [cmr.transmit.connection :as conn] + [cmr.transmit.http-helper :as http-helper] + [ring.util.codec :as codec])) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; URL functions @@ -52,14 +56,14 @@ ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Request functions (declare reset) -(h/defresetter reset :metadata-db) +(http-helper/defresetter reset :metadata-db) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Provider functions (declare create-provider update-provider delete-provider) -(h/defcreator create-provider :metadata-db providers-url {:use-system-token? true}) -(h/defupdater update-provider :metadata-db provider-url {:use-system-token? true}) -(h/defdestroyer delete-provider :metadata-db provider-url {:use-system-token? true}) +(http-helper/defcreator create-provider :metadata-db providers-url {:use-system-token? true}) +(http-helper/defupdater update-provider :metadata-db provider-url {:use-system-token? true}) +(http-helper/defdestroyer delete-provider :metadata-db provider-url {:use-system-token? true}) (defn get-providers "Returns the list of providers configured in the metadata db. Valid options are @@ -73,30 +77,30 @@ ([context {:keys [raw? http-options token]}] (let [token (or token (:token context)) headers (when token {config/token-header token})] - (h/request context :metadata-db - {:url-fn providers-url + (http-helper/request context :metadata-db + {:url-fn providers-url :method :get :raw? raw? - :http-options (h/include-request-id context (merge {:accept :json - :headers headers} - http-options))})))) + :http-options (http-helper/include-request-id context (merge {:accept :json + :headers headers} + http-options))})))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Concept functions (declare save-concept) -(h/defcreator save-concept :metadata-db concepts-url {:use-system-token? true}) +(http-helper/defcreator save-concept :metadata-db concepts-url {:use-system-token? true}) (defn get-concept-id "Returns a concept id for the given concept type, provider, and native id" ([context concept-type provider-id native-id] (get-concept-id context concept-type provider-id native-id nil)) ([context concept-type provider-id native-id {:keys [raw? http-options]}] - (let [response (h/request context :metadata-db - {:url-fn #(concept-id-url % concept-type provider-id native-id) - :method :get - :raw? raw? - :use-system-token? true - :http-options (h/include-request-id context (merge {:accept :json} http-options))})] + (let [response (http-helper/request context :metadata-db + {:url-fn #(concept-id-url % concept-type provider-id native-id) + :method :get + :raw? raw? + :use-system-token? true + :http-options (http-helper/include-request-id context (merge {:accept :json} http-options))})] (if raw? response (:concept-id response))))) @@ -111,12 +115,12 @@ (find-concepts context params concept-type nil)) ([context params concept-type {:keys [raw? http-options]}] (-> context - (h/request :metadata-db - {:url-fn #(concept-search-url % concept-type) - :method :get - :raw? raw? - :use-system-token? true - :http-options (h/include-request-id context (merge {:accept :json} http-options params))}) + (http-helper/request :metadata-db + {:url-fn #(concept-search-url % concept-type) + :method :get + :raw? raw? + :use-system-token? true + :http-options (http-helper/include-request-id context (merge {:accept :json} http-options params))}) finish-parse-concept))) (defn get-concept @@ -128,12 +132,12 @@ (get-concept context concept-id revision-id nil)) ([context concept-id revision-id {:keys [raw? http-options]}] (-> context - (h/request :metadata-db - {:url-fn #(concept-revision-url % concept-id revision-id) + (http-helper/request :metadata-db + {:url-fn #(concept-revision-url % concept-id revision-id) :method :get :raw? raw? :use-system-token? true - :http-options (h/include-request-id context (merge {:accept :json} http-options))}) + :http-options (http-helper/include-request-id context (merge {:accept :json} http-options))}) finish-parse-concept))) (defn get-latest-concept @@ -145,14 +149,40 @@ (get-latest-concept context concept-id nil)) ([context concept-id {:keys [raw? http-options]}] (-> context - (h/request :metadata-db - {:url-fn #(latest-concept-url % concept-id) + (http-helper/request :metadata-db + {:url-fn #(latest-concept-url % concept-id) :method :get :raw? raw? :use-system-token? true - :http-options (h/include-request-id context (merge {:accept :json} http-options))}) + :http-options (http-helper/include-request-id context (merge {:accept :json} http-options))}) finish-parse-concept))) ;; Defines health check function (declare get-metadata-db-health) -(h/defhealther get-metadata-db-health :metadata-db {:timeout-secs 2}) +(http-helper/defhealther get-metadata-db-health :metadata-db {:timeout-secs 2}) + + +(defn get-subscription-cache-content + "Retrieves the cache contents of the ingest subscription cache." + ([context coll-concept-id] + (get-subscription-cache-content context coll-concept-id nil)) + ([context coll-concept-id {:keys [raw? http-options]}] + (let [conn (config/context->app-connection context :metadata-db) + request-url (str (conn/root-url conn) "/subscription/cache-content") + params (merge + (config/conn-params conn) + {:accept :json + :query-params {:collection-concept-id coll-concept-id} + :headers (merge + (ch/context->http-headers context) + {:client-id config/cmr-client-id}) + :throw-exceptions false + :http-options (http-helper/include-request-id context {})}) + response (client/get request-url params) + {:keys [status body]} response + status (int status)] + (case status + 200 (json/decode body true) + ;; default + (errors/internal-error! + (format "Get subscription cache content failed for collection %s. status: %s body: %s" coll-concept-id status body)))))) diff --git a/transmit-lib/test/cmr/transmit/test/metadata_db2_test.clj b/transmit-lib/test/cmr/transmit/test/metadata_db2_test.clj new file mode 100644 index 0000000000..a8a9ee0bb7 --- /dev/null +++ b/transmit-lib/test/cmr/transmit/test/metadata_db2_test.clj @@ -0,0 +1,25 @@ +(ns cmr.transmit.test.metadata-db2-test + "These tests will check the functions in cmr.transmit.metadata-db2." + (:require + [cheshire.core :as json] + [clojure.test :refer [deftest is]] + [clj-http.client :as client] + [cmr.transmit.metadata-db2 :as mdb2])) + +(deftest get-subscription-cache-content + (let [context {:system :metadata-db} + coll-concept-id "C123-PROV1" + response {:status 200 + :headers {}, + :body (json/encode {:Mode {:New ["url1"] + :Update ["url2"]}}) + :request-time 6, + :trace-redirects []} + expected-content-cache {:Mode {:New ["url1"] + :Update ["url2"]}}] + ;; successful response returns the json decoded body + (with-redefs [client/get (fn [request-url params] response)] + (is (= expected-content-cache (mdb2/get-subscription-cache-content context coll-concept-id)))) + ;; unsuccessful response throws an error + (with-redefs [client/get (fn [request-url params] {:status 500})] + (is (thrown? Exception (mdb2/get-subscription-cache-content context coll-concept-id))))))