|
84 | 84 | result |
85 | 85 | (recur (rest subs) (add-to-existing-mode result (get-in sub [:metadata :Mode]))))))) |
86 | 86 |
|
87 | | -" |
88 | | -({ |
89 | | -:revision-id 1, |
90 | | -:deleted false, |
91 | | -:format application/vnd.nasa.cmr.umm+json;version=1.1.1, |
92 | | -:provider-id JM_PROV1, |
93 | | -:subscription-type granule, |
94 | | -:user-id ECHO_SYS, |
95 | | -:transaction-id 5, |
96 | | -:native-id jyna_ingest_subscription_23, |
97 | | -:normalized-query bca0dee3632df33a114bd85059accb71, |
98 | | -:concept-id SUB1200000003-JM_PROV1, |
99 | | -:created-at 2025-01-31T17:41:16.480Z, |
100 | | -:metadata { |
101 | | - :SubscriberId user2, |
102 | | - :CollectionConceptId C1200000001-JM_PROV1, |
103 | | - :EndPoint http://localhost:9324/000000000000/cmr-internal-subscriptions-queue-local, |
104 | | - :Mode [Update], |
105 | | - :EmailAddress jyna.maeng@nasa.gov, |
106 | | - :Query collection-concept-id=C1200000001-JM_PROV1, |
107 | | - :Name Ingest-Subscription-Test, |
108 | | - :Method ingest, |
109 | | - :Type granule, |
110 | | - :MetadataSpecification {:URL https://cdn.earthdata.nasa.gov/umm/subscription/v1.1.1, :Name UMM-Sub, :Version 1.1.1} |
111 | | - }, |
112 | | -:revision-date 2025-01-31T17:41:16.480Z, |
113 | | -:extra-fields { |
114 | | - :aws-arn SUB1200000003-JM_PROV1, |
115 | | - :subscription-name Ingest-Subscription-Test, |
116 | | - :method ingest, |
117 | | - :collection-concept-id C1200000001-JM_PROV1, |
118 | | - :subscriber-id user2, |
119 | | - :subscription-type granule, |
120 | | - :mode [Update], |
121 | | - :normalized-query bca0dee3632df33a114bd85059accb71, |
122 | | - :endpoint http://localhost:9324/000000000000/cmr-internal-subscriptions-queue-local |
123 | | - }, |
124 | | -:concept-type :subscription |
125 | | -}" |
126 | 87 |
|
127 | 88 | (use 'clojure.set) |
128 | 89 | (defn create-mode-to-endpoints-map |
|
334 | 295 | (:revision-id concept)) |
335 | 296 | "\"")) |
336 | 297 |
|
337 | | -(defn create-notification |
| 298 | +(defn create-notification-message-body |
338 | 299 | "Create the notification when a subscription exists. Returns either a notification message or nil." |
339 | 300 | [concept] |
340 | 301 | (let [concept-edn (convert-concept-to-edn concept) |
|
356 | 317 | [mode] |
357 | 318 | (str mode " Notification")) |
358 | 319 |
|
359 | | -(defn create-attributes-and-subject-map |
360 | | - "Determine based on the passed in concept if the granule is new, is an update |
361 | | - or delete. Use the passed in mode to determine if any subscription is interested |
362 | | - in a notification. If they are then return the message attributes and subject, otherwise |
363 | | - return nil." |
364 | | - [concept mode coll-concept-id] |
| 320 | +;(defn create-attributes-and-subject-map |
| 321 | +; "Determine based on the passed in concept if the granule is new, is an update |
| 322 | +; or delete. Use the passed in mode to determine if any subscription is interested |
| 323 | +; in a notification. If they are then return the message attributes and subject, otherwise |
| 324 | +; return nil." |
| 325 | +; [concept mode coll-concept-id] |
| 326 | +; (cond |
| 327 | +; ;; Mode = Delete. |
| 328 | +; (and (:deleted concept) |
| 329 | +; (some #(= "Delete" %) mode)) |
| 330 | +; {:attributes (create-message-attributes coll-concept-id "Delete") |
| 331 | +; :subject (create-message-subject "Delete")} |
| 332 | +; |
| 333 | +; ;; Mode = New |
| 334 | +; (and (not (:deleted concept)) |
| 335 | +; (= 1 (:revision-id concept)) |
| 336 | +; (some #(= "New" %) mode)) |
| 337 | +; {:attributes (create-message-attributes coll-concept-id "New") |
| 338 | +; :subject (create-message-subject "New")} |
| 339 | +; |
| 340 | +; ;; Mode = Update |
| 341 | +; (and (not (:deleted concept)) |
| 342 | +; (pos? (compare (:revision-id concept) 1)) |
| 343 | +; (some #(= "Update" %) mode)) |
| 344 | +; {:attributes (create-message-attributes coll-concept-id "Update") |
| 345 | +; :subject (create-message-subject "Update")})) |
| 346 | + |
| 347 | +;(defn publish-subscription-notification-if-applicable |
| 348 | +; "Publish a notification to the topic if the passed-in concept is a granule |
| 349 | +; and a subscription is interested in being informed of the granule's actions." |
| 350 | +; [context concept] |
| 351 | +; (when (granule-concept? (:concept-type concept)) |
| 352 | +; (let [start (System/currentTimeMillis) |
| 353 | +; coll-concept-id (:parent-collection-id (:extra-fields concept)) |
| 354 | +; sub-cache-map (subscription-cache/get-value context coll-concept-id)] |
| 355 | +; ;; if this granule's collection is found in subscription cache that means it has a subscription attached to it |
| 356 | +; (when sub-cache-map |
| 357 | +; ;; Check the mode to see if the granule notification needs to be pushed. Mode examples are 'new', 'update', 'delete'. |
| 358 | +; (let [topic (get-in context [:system :sns :internal]) |
| 359 | +; message (create-notification-message-body concept) |
| 360 | +; ;; TODO Jyna will need to update this attributes and subject map for URL endpoints |
| 361 | +; {:keys [attributes subject]} (create-attributes-and-subject-map concept sub-cache-map coll-concept-id)] |
| 362 | +; (when (and attributes subject) |
| 363 | +; (let [result (topic-protocol/publish topic message attributes subject) |
| 364 | +; duration (- (System/currentTimeMillis) start)] |
| 365 | +; (debug (format "Work potential subscription publish took %d ms." duration)) |
| 366 | +; result))))))) |
| 367 | + |
| 368 | + |
| 369 | +(defn- get-gran-concept-mode |
| 370 | + [concept] |
365 | 371 | (cond |
366 | | - ;; Mode = Delete. |
367 | | - (and (:deleted concept) |
368 | | - (some #(= "Delete" %) mode)) |
369 | | - {:attributes (create-message-attributes coll-concept-id "Delete") |
370 | | - :subject (create-message-subject "Delete")} |
371 | | - |
| 372 | + (:deleted concept) "Delete" |
372 | 373 | ;; Mode = New |
373 | | - (and (not (:deleted concept)) |
374 | | - (= 1 (:revision-id concept)) |
375 | | - (some #(= "New" %) mode)) |
376 | | - {:attributes (create-message-attributes coll-concept-id "New") |
377 | | - :subject (create-message-subject "New")} |
378 | | - |
| 374 | + (and (not (:deleted concept)) (= 1 (:revision-id concept))) "New" |
379 | 375 | ;; Mode = Update |
380 | | - (and (not (:deleted concept)) |
381 | | - (pos? (compare (:revision-id concept) 1)) |
382 | | - (some #(= "Update" %) mode)) |
383 | | - {:attributes (create-message-attributes coll-concept-id "Update") |
384 | | - :subject (create-message-subject "Update")})) |
| 376 | + (and (not (:deleted concept)) (pos? (compare (:revision-id concept) 1))) "Update" |
| 377 | + )) |
| 378 | + |
| 379 | +(defn- create-message-attributes-map |
| 380 | + [endpoint mode coll-concept-id] |
| 381 | + (cond |
| 382 | + (or (is-valid-sqs-arn endpoint) (is-local-test-queue endpoint)) (cond |
| 383 | + (= "Delete" mode) {:attributes (create-message-attributes coll-concept-id "Delete")} |
| 384 | + (= "New" mode) {:attributes (create-message-attributes coll-concept-id "New")} |
| 385 | + (= "Update" mode) {:attributes (create-message-attributes coll-concept-id "Update")}) |
| 386 | + (is-valid-subscription-endpoint-url endpoint) {:attributes {"endpoint" endpoint |
| 387 | + "endpoint-type" "url" |
| 388 | + "mode" mode}})) |
385 | 389 |
|
386 | 390 | (defn publish-subscription-notification-if-applicable |
387 | 391 | "Publish a notification to the topic if the passed-in concept is a granule |
388 | 392 | and a subscription is interested in being informed of the granule's actions." |
389 | 393 | [context concept] |
| 394 | + (println "***** INSIDE publish-subscription-notification-if-applicable") |
390 | 395 | (when (granule-concept? (:concept-type concept)) |
391 | 396 | (let [start (System/currentTimeMillis) |
392 | 397 | coll-concept-id (:parent-collection-id (:extra-fields concept)) |
| 398 | + _ (println "coll-concept-id = " coll-concept-id) |
393 | 399 | sub-cache-map (subscription-cache/get-value context coll-concept-id)] |
394 | | - ;; if this granule's collection is found in subscription cache |
| 400 | + ;; if this granule's collection is found in subscription cache that means it has a subscription attached to it |
395 | 401 | (when sub-cache-map |
396 | | - ;; Check the mode to see if the granule notification needs to be pushed. Mode examples are 'new', 'update', 'delete'. |
397 | | - (let [topic (get-in context [:system :sns :internal]) |
398 | | - message (create-notification concept) |
399 | | - ;; TODO Jyna will need to update this attributes and subject map for URL endpoints |
400 | | - {:keys [attributes subject]} (create-attributes-and-subject-map concept sub-cache-map coll-concept-id)] |
401 | | - (when (and attributes subject) |
402 | | - (let [result (topic-protocol/publish topic message attributes subject) |
403 | | - duration (- (System/currentTimeMillis) start)] |
404 | | - (debug (format "Work potential subscription publish took %d ms." duration)) |
405 | | - result))))))) |
| 402 | + (let [gran-concept-mode (get-gran-concept-mode concept) |
| 403 | + _ (println "gran-concept-mode = " gran-concept-mode) |
| 404 | + endpoint-list (get sub-cache-map gran-concept-mode) |
| 405 | + _ (println "endpoint-list = " endpoint-list)] |
| 406 | + ;; for every endpoint in the list create a attributes/subject map and send it along its way |
| 407 | + (doseq [endpoint endpoint-list] |
| 408 | + (let [topic (get-in context [:system :sns :internal]) |
| 409 | + coll-concept-id (:parent-collection-id (:extra-fields concept)) |
| 410 | + message (create-notification-message-body concept) |
| 411 | + message-attributes-map (create-message-attributes-map endpoint gran-concept-mode coll-concept-id) |
| 412 | + _ (println "message-attributes-map = " message-attributes-map) |
| 413 | + subject-map {:subject (create-message-subject gran-concept-mode)} |
| 414 | + _ (println "subject map = " subject-map)] |
| 415 | + (when (and message-attributes-map subject-map) |
| 416 | + (let [result (topic-protocol/publish topic message message-attributes-map subject-map) |
| 417 | + duration (- (System/currentTimeMillis) start)] |
| 418 | + (debug (format "Subscription publish for endpoint %s took %d ms." endpoint duration)) |
| 419 | + result))) |
| 420 | + )))))) |
406 | 421 |
|
407 | 422 | (comment |
408 | 423 | (let [system (get-in user/system [:apps :metadata-db])] |
|
0 commit comments