@@ -77,76 +77,136 @@ defmodule Hexdocs.Queue do
7777 message
7878 end
7979
80- @ impl true
81- def handle_batch ( _batcher , messages , _batch_info , _context ) do
82- messages
80+ def handle_message ( % { data: % { "hexdocs:upload" => key } } = message ) do
81+ process_docs ( key , :upload )
82+ message
8383 end
8484
85- defp handle_record ( % { "eventName" => "ObjectCreated:" <> _ , "s3" => s3 } ) do
85+ def handle_message ( % { data: % { "hexdocs:search" => key } } = message ) do
86+ process_docs ( key , :search )
87+ message
88+ end
89+
90+ defp process_docs ( key , type ) do
8691 start = System . os_time ( :millisecond )
87- key = s3 [ "object" ] [ "key" ]
88- Logger . info ( "OBJECT CREATED #{ key } " )
92+ event_name = if type == :upload , do: "hexdocs:upload" , else: "hexdocs:search"
93+ log_prefix = if type == :upload , do: "UPLOAD" , else: "SEARCH INDEX"
94+
95+ Sentry.Context . set_extra_context ( % { queue_event: event_name } )
96+ Logger . info ( "#{ log_prefix } #{ key } " )
8997
9098 case key_components ( key ) do
9199 { :ok , repository , package , version } ->
92100 Sentry.Context . set_extra_context ( % {
93- queue_event: "ObjectCreated" ,
101+ queue_event: event_name ,
94102 repository: repository ,
95103 package: package ,
96104 version: version
97105 } )
98106
99107 body = Hexdocs.Store . get ( :repo_bucket , key )
100108
101- { version , all_versions } =
102- if package in @ special_package_names do
103- version =
104- case Version . parse ( version ) do
105- { :ok , version } ->
106- version
107-
108- # main or MAJOR.MINOR
109- :error ->
110- version
111- end
112-
113- all_versions = Hexdocs.SourceRepo . versions! ( Map . fetch! ( @ special_packages , package ) )
114- { version , all_versions }
115- else
116- version = Version . parse! ( version )
117- all_versions = all_versions ( repository , package )
118- { version , all_versions }
109+ case type do
110+ :upload ->
111+ process_upload ( key , repository , package , version , body , start )
112+
113+ :search ->
114+ process_search ( key , package , version , body , start )
115+ end
116+
117+ :error ->
118+ Logger . info ( "#{ key } : skip" )
119+ end
120+ end
121+
122+ defp process_upload ( key , repository , package , version , body , start ) do
123+ { version , all_versions } =
124+ if package in @ special_package_names do
125+ version =
126+ case Version . parse ( version ) do
127+ { :ok , version } ->
128+ version
129+
130+ # main or MAJOR.MINOR
131+ :error ->
132+ version
119133 end
120134
121- case Hexdocs.Tar . unpack ( body , repository: repository , package: package , version: version ) do
122- { :ok , files } ->
123- files = rewrite_files ( files )
135+ all_versions = Hexdocs.SourceRepo . versions! ( Map . fetch! ( @ special_packages , package ) )
136+ { version , all_versions }
137+ else
138+ version = Version . parse! ( version )
139+ all_versions = all_versions ( repository , package )
140+ { version , all_versions }
141+ end
142+
143+ case Hexdocs.Tar . unpack ( body , repository: repository , package: package , version: version ) do
144+ { :ok , files } ->
145+ files = rewrite_files ( files )
146+
147+ Hexdocs.Bucket . upload (
148+ repository ,
149+ package ,
150+ version ,
151+ all_versions ,
152+ files
153+ )
154+
155+ if Hexdocs.Utils . latest_version? ( package , version , all_versions ) do
156+ update_index_sitemap ( repository , key )
157+ update_package_sitemap ( repository , key , package , files )
158+ update_package_names_csv ( repository )
159+ end
124160
125- Hexdocs.Bucket . upload (
126- repository ,
127- package ,
128- version ,
129- all_versions ,
130- files
131- )
161+ elapsed = System . os_time ( :millisecond ) - start
162+ Logger . info ( "FINISHED UPLOADING DOCS #{ key } #{ elapsed } ms" )
132163
133- if Hexdocs.Utils . latest_version? ( package , version , all_versions ) do
134- update_index_sitemap ( repository , key )
135- update_package_sitemap ( repository , key , package , files )
136- update_package_names_csv ( repository )
137- end
164+ { :error , reason } ->
165+ Logger . error ( "Failed unpack #{ repository } /#{ package } #{ version } : #{ reason } " )
166+ end
167+ end
138168
139- if repository == "hexpm" do
140- update_search_index ( key , package , version , files )
141- end
169+ defp process_search ( key , package , version , body , start ) do
170+ version = Version . parse! ( version )
142171
143- elapsed = System . os_time ( :millisecond ) - start
144- Logger . info ( "FINISHED UPLOADING AND INDEXING DOCS #{ key } #{ elapsed } ms" )
172+ case Hexdocs.Tar . unpack ( body , package: package , version: version ) do
173+ { :ok , files } ->
174+ update_search_index ( key , package , version , files )
145175
146- { :error , reason } ->
147- Logger . error ( "Failed unpack #{ repository } /#{ package } #{ version } : #{ reason } " )
176+ elapsed = System . os_time ( :millisecond ) - start
177+ Logger . info ( "FINISHED INDEXING DOCS #{ key } #{ elapsed } ms" )
178+
179+ { :error , reason } ->
180+ Logger . error ( "Failed unpack #{ package } #{ version } : #{ reason } " )
181+ end
182+ end
183+
184+ @ impl true
185+ def handle_batch ( _batcher , messages , _batch_info , _context ) do
186+ messages
187+ end
188+
189+ defp handle_record ( % { "eventName" => "ObjectCreated:" <> _ , "s3" => s3 } ) do
190+ key = s3 [ "object" ] [ "key" ]
191+ Logger . info ( "OBJECT CREATED #{ key } " )
192+
193+ case key_components ( key ) do
194+ { :ok , repository , _package , _version } ->
195+ Sentry.Context . set_extra_context ( % {
196+ queue_event: "ObjectCreated" ,
197+ repository: repository
198+ } )
199+
200+ # Publish upload message
201+ publish_message ( % { "hexdocs:upload" => key } )
202+
203+ # Publish search message for hexpm repository
204+ if repository == "hexpm" do
205+ publish_message ( % { "hexdocs:search" => key } )
148206 end
149207
208+ Logger . info ( "PUBLISHED MESSAGES FOR #{ key } " )
209+
150210 :error ->
151211 :skip
152212 end
@@ -281,6 +341,14 @@ defmodule Hexdocs.Queue do
281341 end
282342 end
283343
344+ defp publish_message ( map ) do
345+ queue = Application . fetch_env! ( :hexdocs , :queue_id )
346+ message = Jason . encode! ( map )
347+
348+ ExAws.SQS . send_message ( queue , message )
349+ |> ExAws . request! ( )
350+ end
351+
284352 @ doc false
285353 def paths_for_sitemaps ( ) do
286354 key_regex = ~r" docs/(.*)-(.*).tar.gz$"
0 commit comments