Skip to content
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
e65dc51
refactor funcs, add unit tests
jmaeng72 Jan 29, 2025
656c73e
update delete subscription func and add unit tests
jmaeng72 Jan 30, 2025
44eb3e9
updated changes to subscriptions
jmaeng72 Jan 30, 2025
68e37da
updating cache, not working yet
jmaeng72 Jan 31, 2025
9d49d71
update cache structure, stabilized create, update, delete sub pathways
jmaeng72 Jan 31, 2025
7ba4da3
update subscripion message attributes
jmaeng72 Feb 3, 2025
c91d0fb
add func comments
jmaeng72 Feb 3, 2025
851c162
add debug printlns
jmaeng72 Feb 4, 2025
7102dc8
fix unit tests
jmaeng72 Feb 4, 2025
85eec19
fix unit tests
jmaeng72 Feb 5, 2025
36985aa
Merge branch 'master' of https://github.com/nasa/Common-Metadata-Repo…
jmaeng72 Feb 5, 2025
7c322c3
clean up
jmaeng72 Feb 5, 2025
91ba9ab
fix refresh sub cache endpoint
jmaeng72 Feb 5, 2025
11f5fb5
add unit tests
jmaeng72 Feb 5, 2025
869dec5
remove all println statements
jmaeng72 Feb 5, 2025
2acc940
add new transmit lib method and metadata-db api for subscription
jmaeng72 Feb 12, 2025
9e5b0de
saving work so far
jmaeng72 Feb 13, 2025
2c87a04
Merge branch 'master' of https://github.com/nasa/Common-Metadata-Repo…
jmaeng72 Feb 13, 2025
90b9747
fix mode map get
jmaeng72 Feb 13, 2025
668f709
testing subscription check works
jmaeng72 Feb 13, 2025
b574637
Merge branch 'master' of https://github.com/nasa/Common-Metadata-Repo…
jmaeng72 Feb 13, 2025
25a8943
add unit tests
jmaeng72 Feb 13, 2025
53db25a
update documentation to reflect the edge case
jmaeng72 Feb 13, 2025
4841e94
fix spelling errors and new line
jmaeng72 Feb 14, 2025
7bf006b
block subscription endpoint http url from being localhost if not loca…
jmaeng72 Feb 14, 2025
c035b3c
fix spacing
jmaeng72 Feb 14, 2025
77d5029
do not allow non-local env to allow local URLs
jmaeng72 Feb 14, 2025
60c9dfc
cosmetic changes to metadata_db2 file
jmaeng72 Feb 19, 2025
9747f70
Merge branch 'master' of https://github.com/nasa/Common-Metadata-Repo…
jmaeng72 Feb 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ingest-app/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1169,6 +1169,10 @@ For NRT Notification subscriptions to be used there are three new fields that ar
</ul>
</ul>

NOTE: The same SQS endpoint cannot be used for the same collection more than once.
For example if you, USER 1, create a subscription with an SQS queue: SQS1 endpoint to filter UPDATE granule events from Collection 1, another user, USER 2, cannot create a new subscription with the same SQS1 queue endpoint to filter granules from Collection 1 again.
Ultimately, only ONE user can perform CRUD operations on the specific SQS endpoint used.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope this makes sense... please double check during your reviews if this is clear

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its clear to me.

##### NRT Notification Subscription POST Request

```
Expand Down
40 changes: 36 additions & 4 deletions ingest-app/src/cmr/ingest/api/subscriptions.clj
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
[cmr.common.services.errors :as errors]
[cmr.common.util :as util]
[cmr.ingest.api.core :as api-core]
[cmr.ingest.config :as ingest-config]
[cmr.ingest.services.ingest-service :as ingest]
[cmr.ingest.services.subscriptions-helper :as jobs]
[cmr.ingest.validation.validation :as v]
[cmr.transmit.access-control :as access-control]
[cmr.transmit.config :as config]
[cmr.transmit.metadata-db :as mdb]
[cmr.transmit.metadata-db2 :as metadata-db2]
[cmr.transmit.search :as search]
[cmr.transmit.urs :as urs])
(:import
Expand Down Expand Up @@ -83,14 +85,43 @@
[subscription-concept]
(let [method (:Method subscription-concept)
endpoint (:EndPoint subscription-concept)
default-url-validator (UrlValidator. UrlValidator/ALLOW_LOCAL_URLS)]
curr-env (ingest-config/app-environment)
url-validator (if (= curr-env "local")
(UrlValidator. UrlValidator/ALLOW_LOCAL_URLS)
(UrlValidator.))]

(if (= method "ingest")
(if-not (or (some? (re-matches #"arn:aws:sqs:.*" endpoint)) (.isValid default-url-validator endpoint))
(if-not (or (some? (re-matches #"arn:aws:sqs:.*" endpoint))
(.isValid url-validator endpoint))
(errors/throw-service-error
:bad-request
"Subscription creation failed - Method was ingest, but the endpoint given was not valid SQS ARN or HTTP/S URL.
If it is a URL, make sure to give the full URL path like so: https://www.google.com.")))))
"Subscription creation failed - Method was ingest, but the endpoint given was not valid SQS ARN or HTTP/S URL."
"If it is a URL, make sure to give the full URL path like so: https://www.google.com. We do not accept local URLs.")))))

(defn- check-endpoint-queue-for-collection-not-already-exist
"Validates that the subscription with the same collection and same SQS endpoint does not already exist.
Throws error if the same collection and same SQS endpoint already exists because creating duplicate collection
with same SQS endpoint from a different user is not allowed."
[context subscription-concept]
(let [curr-endpoint (:EndPoint subscription-concept)]
(if (and (= "ingest" (:Method subscription-concept))
(or (some? (re-matches #"arn:aws:sqs:.*" curr-endpoint))
(some? (re-matches #"http://localhost:9324.*" curr-endpoint))))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we matching this specific URL? I though the issue was only with SQS?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the local dev SQS ARN (which doesn't start with arn:aws:sqs, but I still wanted to capture it and have it act like it was an sqs arn subscription concept endpoint)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not directed at this specific PR, but are there/do we need guards against localhost subscription URLs?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain further? You mean, do we need to distinguish between localhost URL and localhost SQS?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or are you saying that we might not need line 104 because we don't care about distinguishing between local sqs and local http endpoint?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nah, like when someone sends in a subscription with url endpoint, do we block them/do we even need to block them from sending in a subscription that is for localhost (since that would point to our subscription processor). Again, not aimed directly at this PR, just occurred to me while looking at the localhost there

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ooo I see what you are saying. I think we could. It doesn't hurt to just limit them to non-local URLs for http endpoints

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated with new blocker for this

(let [collection-concept-id (:CollectionConceptId subscription-concept)
cache-content (metadata-db2/get-subscription-cache-content context collection-concept-id)]
Copy link
Copy Markdown
Contributor

@eereiter eereiter Feb 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering why you created an API call into metadata-db, instead of just reading the cache directly?

Not asking you to change anything.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the cache funcs are in the metadata-db app which is typically accessed from the transmit lib that acts like a middle-man. Which is why I created a metadata-db API

(if cache-content
;; cache-content format expected is something like: {:Mode {:Delete [sqs1 sqs2], :New [url1], :Update [url1]}}
(let [mode-map (get cache-content :Mode)]
;; check if any of the endpoints in each mode type is equal to the curr sqs endpoint, if so throw the error
(doseq [modetoendpointset mode-map]
(if (some #{curr-endpoint} (val modetoendpointset))
(errors/throw-service-error
:conflict
(format (str "The collection [%s] has already subscribed to the given sqs arn by another user. "
"Each Near Real Time subscription to a collection must have a unique sqs arn endpoint."
"You cannot have the same SQS queue subscribed to the same collection by multiple users,"
"only one user can crate/update/delete subscription to the same end client queue to the same collection.")
(util/html-escape collection-concept-id)))))))))))

(defn- check-subscription-limit
"Given the configuration for subscription limit, this valdiates that the user has no more than
Expand Down Expand Up @@ -163,6 +194,7 @@
(check-duplicate-subscription request-context concept parsed)
(check-subscription-limit request-context concept parsed)
(check-subscriber-collection-permission request-context parsed)
(check-endpoint-queue-for-collection-not-already-exist request-context parsed)
(let [concept-with-user-id (api-core/set-user-id concept
request-context
headers)
Expand Down
5 changes: 5 additions & 0 deletions ingest-app/src/cmr/ingest/config.clj
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,8 @@
(defconfig validate-umm-var-keywords
"Flag for whether or not to validate UMM-Var against KMS keywords."
{:default false :type Boolean})

(declare app-environment)
(defconfig app-environment
"The environment in which the application is running in NGAP (wl, sit, uat, ops)"
{:default "local"})
63 changes: 50 additions & 13 deletions ingest-app/test/cmr/ingest/api/subscriptions_test.clj
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
(ns cmr.ingest.api.subscriptions-test
(:require
[clojure.string :as string]
[clojure.test :refer :all]
[cmr.common.util :as util]
[cmr.ingest.api.subscriptions :as subscriptions]))
[clojure.string :as string]
[clojure.test :refer :all]
[cmr.common.util :as util]
[cmr.ingest.api.subscriptions :as subscriptions]
[cmr.ingest.config :as ingest-config]))

(deftest generate-native-id-test
(let [parsed {:Name "the_beginning"
Expand All @@ -28,19 +29,55 @@
"given method is search and endpoint not given -- endpoint ignored"
{:EndPoint "", :Method "search"}

"given method is ingest and sqs arn is valid"
"given method is ingest, env is local, and endpoint is sqs arn"
{:EndPoint "arn:aws:sqs:us-east-1:000000000:Test-Queue", :Method "ingest"}

"given method is ingest and url is valid"
{:EndPoint "https://testwebsite.com", :Method "ingest"}))
"given method is ingest, env is local, and url is non-local"
{:EndPoint "https://testwebsite.com", :Method "ingest"}

"given method is ingest, env is local, url is local"
{:EndPoint "http://localhost:8080/localllllll", :Method "ingest"}))

(testing "validate subscription endpoint str -- expected invalid"
(with-redefs [ingest-config/app-environment (fn [] "non-local")]
(let [fun #'cmr.ingest.api.subscriptions/validate-subscription-endpoint]
;; given method is ingest, env is non-local and sqs arn -- throws error
(is (thrown? Exception (fun {:EndPoint "iaminvalidendpoint", :Method "ingest"})))

;; given method is ingest, env is non-local and endpoint is empty -- throws error
(is (thrown? Exception (fun {:EndPoint "", :Method "ingest"})))

;; ;; given method is ingest, env is non-local and endpoint is local endpoint -- throws error
(is (thrown? Exception (fun {:EndPoint "http://localhost:8080", :Method "ingest"})))))))

(deftest check-subscription-for-collection-not-already-exist-test
(let [fun #'cmr.ingest.api.subscriptions/check-endpoint-queue-for-collection-not-already-exist
context nil]
(util/are3 [subscription-concept]
(let [fun #'cmr.ingest.api.subscriptions/validate-subscription-endpoint]
(is (thrown? Exception (fun subscription-concept))))
(is (= nil (fun context subscription-concept)))

"subscription concept not ingest type -- does nothing"
{:EndPoint "", :Method "search"}

"subscription concept not sqs arn nor local queue arn -- does nothing"
{:EndPoint "http://www.something.com", :Method "ingest"})

(let [subscription-concept {:EndPoint "arn:aws:sqs:blahblah" :Method "ingest" :CollectionConceptId "C123-PROV1"}
returned-cache-content {:Mode {:Delete ["sqs1" "sqs2"], :New ["url1"], :Update ["url1"]}}
returned-cache-content-with-duplicates {:Mode {:Delete ["sqs1" "sqs2"], :New ["url1" "arn:aws:sqs:blahblah"], :Update ["url1"]}}]

;; method for getting cache-content returns error -- this method should bubble up that error
(with-redefs [cmr.transmit.metadata-db2/get-subscription-cache-content (fn [context collection-concept-id] (throw (Exception. "Exception was thrown from cache-content func")))]
(is (thrown? Exception (fun context subscription-concept))))

;; returns nil cache-content -- does nothing
(with-redefs [cmr.transmit.metadata-db2/get-subscription-cache-content (fn [context collection-concept-id] nil)]
(is (nil? (fun context subscription-concept))))

"given method is ingest and sqs arn is invalid"
{:EndPoint "iaminvalidendpoint", :Method "ingest"}
;; duplication collection to sqs queue not found -- does nothing
(with-redefs [cmr.transmit.metadata-db2/get-subscription-cache-content (fn [context collection-concept-id] returned-cache-content)]
(is (nil? (fun context subscription-concept))))

"given method is ingest and endpoint is empty is invalid"
{:Endpoint "", :Method "ingest"})))
;; duplicate collection to sqs queue found -- throws error
(with-redefs [cmr.transmit.metadata-db2/get-subscription-cache-content (fn [context collection-concept-id] returned-cache-content-with-duplicates)]
(is (thrown? Exception (fun context subscription-concept)))))))
7 changes: 5 additions & 2 deletions metadata-db-app/src/cmr/metadata_db/api/subscriptions.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
[clojure.string :as string]
[cmr.metadata-db.services.sub-notifications :as sub-note]
[cmr.metadata-db.services.subscriptions :as subscriptions]
[compojure.core :refer [PUT POST context]]))
[compojure.core :refer [PUT POST GET context]]))

(defn- update-subscription-notification-time
"Update a subscription notification time"
Expand All @@ -28,4 +28,7 @@
(update-subscription-notification-time request-context params body))
(POST "/refresh-subscription-cache"
{request-context :request-context}
(subscriptions/refresh-subscription-cache request-context))))
(subscriptions/refresh-subscription-cache request-context))
;; get ingest subscription cache content for a specific collection
(GET "/cache-content" {:keys [params request-context]}
(subscriptions/get-cache-content request-context params))))
11 changes: 11 additions & 0 deletions metadata-db-app/src/cmr/metadata_db/services/subscriptions.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
(:require
[cheshire.core :as json]
[cmr.common.log :refer [debug info]]
[cmr.common.services.errors :as errors]
[cmr.message-queue.topic.topic-protocol :as topic-protocol]
[cmr.metadata-db.api.route-helpers :as rh]
[cmr.metadata-db.config :as mdb-config]
[cmr.metadata-db.services.search-service :as mdb-search]
[cmr.metadata-db.services.subscription-cache :as subscription-cache]
Expand Down Expand Up @@ -326,6 +328,15 @@
(swap! result-array (fn [n] (conj @result-array result)))))))
@result-array)))))

(defn get-cache-content
[context params]
(if-let [collection-concept-id (:collection-concept-id params)]
(let [subscription-object (subscription-cache/get-value context collection-concept-id)]
{:status 200
:body (json/generate-string subscription-object)
:headers rh/json-header})
(errors/throw-service-error :bad-request "A collection concept id query parameter was required but was not provided.")))

(comment
(let [system (get-in user/system [:apps :metadata-db])]
(refresh-subscription-cache {:system system})))
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
[cmr.message-queue.pub-sub :as pub-sub]
[cmr.message-queue.test.test-util :as sqs-test-util]
[cmr.message-queue.topic.topic-protocol :as topic-protocol]
[cmr.metadata-db.api.route-helpers :as rh]
[cmr.metadata-db.config :as mdb-config]
[cmr.metadata-db.services.subscription-cache :as subscription-cache]
[cmr.metadata-db.services.subscriptions :as subscriptions]
Expand Down Expand Up @@ -817,3 +818,14 @@
(with-redefs [topic-protocol/subscribe (fn [topic concept-edn] nil)]
(testing "subscribe fails, will return concept without the aws-arn in extra fields and will NOT throw exception"
(is (= ingest-concept (subscriptions/attach-subscription-to-topic context ingest-concept)))))))

(deftest get-cache-content-test
;; cache returns some value -- return 200 with cache content
(with-redefs [subscription-cache/get-value (fn [context collection-concept-id] {})]
(is (= {:status 200
:body (json/generate-string {})
:headers rh/json-header}
(subscriptions/get-cache-content nil {:collection-concept-id "C123-PROV1"}))))
;; collection concept id not in params -- returns bad request exception
(with-redefs [subscription-cache/get-value (fn [context collection-concept-id] {})]
(is (thrown? Exception (subscriptions/get-cache-content nil {})))))
39 changes: 35 additions & 4 deletions transmit-lib/src/cmr/transmit/metadata_db2.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@
"This contains functions for interacting with the metadata db API. It uses the newer transmit namespace
style that concepts, and access control use"
(:require
[cmr.transmit.config :as config]
[cmr.transmit.connection :as conn]
[cmr.transmit.http-helper :as h]
[ring.util.codec :as codec]))
[cheshire.core :as json]
[clj-http.client :as client]
[cmr.common.api.context :as ch]
[cmr.common.services.errors :as errors]
[cmr.transmit.config :as config]
[cmr.transmit.connection :as conn]
[cmr.transmit.http-helper :as h]
[ring.util.codec :as codec]))

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; URL functions
Expand Down Expand Up @@ -156,3 +160,30 @@
;; Defines health check function
(declare get-metadata-db-health)
(h/defhealther get-metadata-db-health :metadata-db {:timeout-secs 2})


(defn get-subscription-cache-content
"Retrieves the cache contents of the ingest subscription cache."
([context coll-concept-id]
(get-subscription-cache-content context coll-concept-id nil))
([context coll-concept-id {:keys [raw? http-options]}]
(let [
conn (config/context->app-connection context :metadata-db)
request-url (str (conn/root-url conn) "/subscription/cache-content")
params (merge
(config/conn-params conn)
{:accept :json
:query-params {:collection-concept-id coll-concept-id}
:headers (merge
(ch/context->http-headers context)
{:client-id config/cmr-client-id})
:throw-exceptions false
:http-options (h/include-request-id context {})})
response (client/get request-url params)
{:keys [status body]} response
status (int status)]
(case status
200 (json/decode body true)
;; default
(errors/internal-error!
(format "Get subscription cache content failed for collection %s. status: %s body: %s" coll-concept-id status body))))))
25 changes: 25 additions & 0 deletions transmit-lib/test/cmr/transmit/test/metadata_db2_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
(ns cmr.transmit.test.metadata-db2-test
"These tests will check the functions in cmr.transmit.metadata-db2."
(:require
[cheshire.core :as json]
[clojure.test :refer [deftest is]]
[clj-http.client :as client]
[cmr.transmit.metadata-db2 :as mdb2]))

(deftest get-subscription-cache-content
(let [context {:system :metadata-db}
coll-concept-id "C123-PROV1"
response {:status 200
:headers {},
:body (json/encode {:Mode {:New ["url1"]
:Update ["url2"]}})
:request-time 6,
:trace-redirects []}
expected-content-cache {:Mode {:New ["url1"]
:Update ["url2"]}}]
;; successful response returns the json decoded body
(with-redefs [client/get (fn [request-url params] response)]
(is (= expected-content-cache (mdb2/get-subscription-cache-content context coll-concept-id))))
;; unsuccessful response throws an error
(with-redefs [client/get (fn [request-url params] {:status 500})]
(is (thrown? Exception (mdb2/get-subscription-cache-content context coll-concept-id))))))