Skip to content

Commit f17c748

Browse files
authored
CMR-10393: Allow only one sqs client queue to subscribe to one collection for NRT (ingest) subscriptions (#2216)
* refactor funcs, add unit tests * update delete subscription func and add unit tests * updated changes to subscriptions * updating cache, not working yet * update cache structure, stabilized create, update, delete sub pathways * update subscripion message attributes * add func comments * add debug printlns * fix unit tests * clean up * fix refresh sub cache endpoint * add unit tests * remove all println statements * add new transmit lib method and metadata-db api for subscription * saving work so far * fix mode map get * testing subscription check works * add unit tests * update documentation to reflect the edge case * fix spelling errors and new line * block subscription endpoint http url from being localhost if not local env * fix spacing * do not allow non-local env to allow local URLs * cosmetic changes to metadata_db2 file
1 parent 577c000 commit f17c748

File tree

9 files changed

+211
-52
lines changed

9 files changed

+211
-52
lines changed

ingest-app/docs/api.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1169,6 +1169,10 @@ For NRT Notification subscriptions to be used there are three new fields that ar
11691169
</ul>
11701170
</ul>
11711171

1172+
NOTE: The same SQS endpoint cannot be used for the same collection more than once.
1173+
For example if you, USER 1, create a subscription with an SQS queue: SQS1 endpoint to filter UPDATE granule events from Collection 1, another user, USER 2, cannot create a new subscription with the same SQS1 queue endpoint to filter granules from Collection 1 again.
1174+
Ultimately, only ONE user can perform CRUD operations on the specific SQS endpoint used.
1175+
11721176
##### NRT Notification Subscription POST Request
11731177

11741178
```

ingest-app/src/cmr/ingest/api/subscriptions.clj

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,14 @@
1111
[cmr.common.services.errors :as errors]
1212
[cmr.common.util :as util]
1313
[cmr.ingest.api.core :as api-core]
14+
[cmr.ingest.config :as ingest-config]
1415
[cmr.ingest.services.ingest-service :as ingest]
1516
[cmr.ingest.services.subscriptions-helper :as jobs]
1617
[cmr.ingest.validation.validation :as v]
1718
[cmr.transmit.access-control :as access-control]
1819
[cmr.transmit.config :as config]
1920
[cmr.transmit.metadata-db :as mdb]
21+
[cmr.transmit.metadata-db2 :as metadata-db2]
2022
[cmr.transmit.search :as search]
2123
[cmr.transmit.urs :as urs])
2224
(:import
@@ -83,14 +85,43 @@
8385
[subscription-concept]
8486
(let [method (:Method subscription-concept)
8587
endpoint (:EndPoint subscription-concept)
86-
default-url-validator (UrlValidator. UrlValidator/ALLOW_LOCAL_URLS)]
88+
curr-env (ingest-config/app-environment)
89+
url-validator (if (= curr-env "local")
90+
(UrlValidator. UrlValidator/ALLOW_LOCAL_URLS)
91+
(UrlValidator.))]
8792

8893
(if (= method "ingest")
89-
(if-not (or (some? (re-matches #"arn:aws:sqs:.*" endpoint)) (.isValid default-url-validator endpoint))
94+
(if-not (or (some? (re-matches #"arn:aws:sqs:.*" endpoint))
95+
(.isValid url-validator endpoint))
9096
(errors/throw-service-error
9197
:bad-request
92-
"Subscription creation failed - Method was ingest, but the endpoint given was not valid SQS ARN or HTTP/S URL.
93-
If it is a URL, make sure to give the full URL path like so: https://www.google.com.")))))
98+
"Subscription creation failed - Method was ingest, but the endpoint given was not valid SQS ARN or HTTP/S URL."
99+
"If it is a URL, make sure to give the full URL path like so: https://www.google.com. We do not accept local URLs.")))))
100+
101+
(defn- check-endpoint-queue-for-collection-not-already-exist
102+
"Validates that the subscription with the same collection and same SQS endpoint does not already exist.
103+
Throws error if the same collection and same SQS endpoint already exists because creating duplicate collection
104+
with same SQS endpoint from a different user is not allowed."
105+
[context subscription-concept]
106+
(let [curr-endpoint (:EndPoint subscription-concept)]
107+
(if (and (= "ingest" (:Method subscription-concept))
108+
(or (some? (re-matches #"arn:aws:sqs:.*" curr-endpoint))
109+
(some? (re-matches #"http://localhost:9324.*" curr-endpoint))))
110+
(let [collection-concept-id (:CollectionConceptId subscription-concept)
111+
cache-content (metadata-db2/get-subscription-cache-content context collection-concept-id)]
112+
(if cache-content
113+
;; cache-content format expected is something like: {:Mode {:Delete [sqs1 sqs2], :New [url1], :Update [url1]}}
114+
(let [mode-map (get cache-content :Mode)]
115+
;; check if any of the endpoints in each mode type is equal to the curr sqs endpoint, if so throw the error
116+
(doseq [modetoendpointset mode-map]
117+
(if (some #{curr-endpoint} (val modetoendpointset))
118+
(errors/throw-service-error
119+
:conflict
120+
(format (str "The collection [%s] has already subscribed to the given sqs arn by another user. "
121+
"Each Near Real Time subscription to a collection must have a unique sqs arn endpoint."
122+
"You cannot have the same SQS queue subscribed to the same collection by multiple users,"
123+
"only one user can crate/update/delete subscription to the same end client queue to the same collection.")
124+
(util/html-escape collection-concept-id)))))))))))
94125

95126
(defn- check-subscription-limit
96127
"Given the configuration for subscription limit, this valdiates that the user has no more than
@@ -163,6 +194,7 @@
163194
(check-duplicate-subscription request-context concept parsed)
164195
(check-subscription-limit request-context concept parsed)
165196
(check-subscriber-collection-permission request-context parsed)
197+
(check-endpoint-queue-for-collection-not-already-exist request-context parsed)
166198
(let [concept-with-user-id (api-core/set-user-id concept
167199
request-context
168200
headers)

ingest-app/src/cmr/ingest/config.clj

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,3 +154,8 @@
154154
(defconfig validate-umm-var-keywords
155155
"Flag for whether or not to validate UMM-Var against KMS keywords."
156156
{:default false :type Boolean})
157+
158+
(declare app-environment)
159+
(defconfig app-environment
160+
"The environment in which the application is running in NGAP (wl, sit, uat, ops)"
161+
{:default "local"})

ingest-app/test/cmr/ingest/api/subscriptions_test.clj

Lines changed: 50 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
(ns cmr.ingest.api.subscriptions-test
22
(:require
3-
[clojure.string :as string]
4-
[clojure.test :refer :all]
5-
[cmr.common.util :as util]
6-
[cmr.ingest.api.subscriptions :as subscriptions]))
3+
[clojure.string :as string]
4+
[clojure.test :refer :all]
5+
[cmr.common.util :as util]
6+
[cmr.ingest.api.subscriptions :as subscriptions]
7+
[cmr.ingest.config :as ingest-config]))
78

89
(deftest generate-native-id-test
910
(let [parsed {:Name "the_beginning"
@@ -28,19 +29,55 @@
2829
"given method is search and endpoint not given -- endpoint ignored"
2930
{:EndPoint "", :Method "search"}
3031

31-
"given method is ingest and sqs arn is valid"
32+
"given method is ingest, env is local, and endpoint is sqs arn"
3233
{:EndPoint "arn:aws:sqs:us-east-1:000000000:Test-Queue", :Method "ingest"}
3334

34-
"given method is ingest and url is valid"
35-
{:EndPoint "https://testwebsite.com", :Method "ingest"}))
35+
"given method is ingest, env is local, and url is non-local"
36+
{:EndPoint "https://testwebsite.com", :Method "ingest"}
37+
38+
"given method is ingest, env is local, url is local"
39+
{:EndPoint "http://localhost:8080/localllllll", :Method "ingest"}))
3640

3741
(testing "validate subscription endpoint str -- expected invalid"
42+
(with-redefs [ingest-config/app-environment (fn [] "non-local")]
43+
(let [fun #'cmr.ingest.api.subscriptions/validate-subscription-endpoint]
44+
;; given method is ingest, env is non-local and sqs arn -- throws error
45+
(is (thrown? Exception (fun {:EndPoint "iaminvalidendpoint", :Method "ingest"})))
46+
47+
;; given method is ingest, env is non-local and endpoint is empty -- throws error
48+
(is (thrown? Exception (fun {:EndPoint "", :Method "ingest"})))
49+
50+
;; ;; given method is ingest, env is non-local and endpoint is local endpoint -- throws error
51+
(is (thrown? Exception (fun {:EndPoint "http://localhost:8080", :Method "ingest"})))))))
52+
53+
(deftest check-subscription-for-collection-not-already-exist-test
54+
(let [fun #'cmr.ingest.api.subscriptions/check-endpoint-queue-for-collection-not-already-exist
55+
context nil]
3856
(util/are3 [subscription-concept]
39-
(let [fun #'cmr.ingest.api.subscriptions/validate-subscription-endpoint]
40-
(is (thrown? Exception (fun subscription-concept))))
57+
(is (= nil (fun context subscription-concept)))
58+
59+
"subscription concept not ingest type -- does nothing"
60+
{:EndPoint "", :Method "search"}
61+
62+
"subscription concept not sqs arn nor local queue arn -- does nothing"
63+
{:EndPoint "http://www.something.com", :Method "ingest"})
64+
65+
(let [subscription-concept {:EndPoint "arn:aws:sqs:blahblah" :Method "ingest" :CollectionConceptId "C123-PROV1"}
66+
returned-cache-content {:Mode {:Delete ["sqs1" "sqs2"], :New ["url1"], :Update ["url1"]}}
67+
returned-cache-content-with-duplicates {:Mode {:Delete ["sqs1" "sqs2"], :New ["url1" "arn:aws:sqs:blahblah"], :Update ["url1"]}}]
68+
69+
;; method for getting cache-content returns error -- this method should bubble up that error
70+
(with-redefs [cmr.transmit.metadata-db2/get-subscription-cache-content (fn [context collection-concept-id] (throw (Exception. "Exception was thrown from cache-content func")))]
71+
(is (thrown? Exception (fun context subscription-concept))))
72+
73+
;; returns nil cache-content -- does nothing
74+
(with-redefs [cmr.transmit.metadata-db2/get-subscription-cache-content (fn [context collection-concept-id] nil)]
75+
(is (nil? (fun context subscription-concept))))
4176

42-
"given method is ingest and sqs arn is invalid"
43-
{:EndPoint "iaminvalidendpoint", :Method "ingest"}
77+
;; duplication collection to sqs queue not found -- does nothing
78+
(with-redefs [cmr.transmit.metadata-db2/get-subscription-cache-content (fn [context collection-concept-id] returned-cache-content)]
79+
(is (nil? (fun context subscription-concept))))
4480

45-
"given method is ingest and endpoint is empty is invalid"
46-
{:Endpoint "", :Method "ingest"})))
81+
;; duplicate collection to sqs queue found -- throws error
82+
(with-redefs [cmr.transmit.metadata-db2/get-subscription-cache-content (fn [context collection-concept-id] returned-cache-content-with-duplicates)]
83+
(is (thrown? Exception (fun context subscription-concept)))))))

metadata-db-app/src/cmr/metadata_db/api/subscriptions.clj

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
[clojure.string :as string]
66
[cmr.metadata-db.services.sub-notifications :as sub-note]
77
[cmr.metadata-db.services.subscriptions :as subscriptions]
8-
[compojure.core :refer [PUT POST context]]))
8+
[compojure.core :refer [PUT POST GET context]]))
99

1010
(defn- update-subscription-notification-time
1111
"Update a subscription notification time"
@@ -28,4 +28,7 @@
2828
(update-subscription-notification-time request-context params body))
2929
(POST "/refresh-subscription-cache"
3030
{request-context :request-context}
31-
(subscriptions/refresh-subscription-cache request-context))))
31+
(subscriptions/refresh-subscription-cache request-context))
32+
;; get ingest subscription cache content for a specific collection
33+
(GET "/cache-content" {:keys [params request-context]}
34+
(subscriptions/get-cache-content request-context params))))

metadata-db-app/src/cmr/metadata_db/services/subscriptions.clj

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
(:require
44
[cheshire.core :as json]
55
[cmr.common.log :refer [debug info]]
6+
[cmr.common.services.errors :as errors]
67
[cmr.message-queue.topic.topic-protocol :as topic-protocol]
8+
[cmr.metadata-db.api.route-helpers :as rh]
79
[cmr.metadata-db.config :as mdb-config]
810
[cmr.metadata-db.services.search-service :as mdb-search]
911
[cmr.metadata-db.services.subscription-cache :as subscription-cache]
@@ -326,6 +328,15 @@
326328
(swap! result-array (fn [n] (conj @result-array result)))))))
327329
@result-array)))))
328330

331+
(defn get-cache-content
332+
[context params]
333+
(if-let [collection-concept-id (:collection-concept-id params)]
334+
(let [subscription-object (subscription-cache/get-value context collection-concept-id)]
335+
{:status 200
336+
:body (json/generate-string subscription-object)
337+
:headers rh/json-header})
338+
(errors/throw-service-error :bad-request "A collection concept id query parameter was required but was not provided.")))
339+
329340
(comment
330341
(let [system (get-in user/system [:apps :metadata-db])]
331342
(refresh-subscription-cache {:system system})))

metadata-db-app/test/cmr/metadata_db/test/services/subscriptions_test.clj

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
[cmr.message-queue.pub-sub :as pub-sub]
99
[cmr.message-queue.test.test-util :as sqs-test-util]
1010
[cmr.message-queue.topic.topic-protocol :as topic-protocol]
11+
[cmr.metadata-db.api.route-helpers :as rh]
1112
[cmr.metadata-db.config :as mdb-config]
1213
[cmr.metadata-db.services.subscription-cache :as subscription-cache]
1314
[cmr.metadata-db.services.subscriptions :as subscriptions]
@@ -817,3 +818,14 @@
817818
(with-redefs [topic-protocol/subscribe (fn [topic concept-edn] nil)]
818819
(testing "subscribe fails, will return concept without the aws-arn in extra fields and will NOT throw exception"
819820
(is (= ingest-concept (subscriptions/attach-subscription-to-topic context ingest-concept)))))))
821+
822+
(deftest get-cache-content-test
823+
;; cache returns some value -- return 200 with cache content
824+
(with-redefs [subscription-cache/get-value (fn [context collection-concept-id] {})]
825+
(is (= {:status 200
826+
:body (json/generate-string {})
827+
:headers rh/json-header}
828+
(subscriptions/get-cache-content nil {:collection-concept-id "C123-PROV1"}))))
829+
;; collection concept id not in params -- returns bad request exception
830+
(with-redefs [subscription-cache/get-value (fn [context collection-concept-id] {})]
831+
(is (thrown? Exception (subscriptions/get-cache-content nil {})))))

0 commit comments

Comments
 (0)