@@ -17,9 +17,6 @@ import PostgresKit
1717import Vapor
1818
1919
20- enum Common { }
21-
22-
2320#warning("move")
2421extension Analyze {
2522 /// Update packages (in the `[Result<Joined<Package, Repository>, Error>]` array).
@@ -29,27 +26,25 @@ extension Analyze {
2926 /// - database: `Database` object
3027 /// - results: `Joined<Package, Repository>` results to update
3128 /// - stage: Processing stage
32- #warning("drop stage parameter")
3329 static func updatePackages( client: Client ,
3430 database: Database ,
35- results: [ Result < Joined < Package , Repository > , Error > ] ,
36- stage: Package . ProcessingStage ) async throws {
31+ results: [ Result < Joined < Package , Repository > , Error > ] ) async throws {
3732 do {
3833 let total = results. count
3934 let errors = results. filter ( \. isError) . count
4035 let errorRate = total > 0 ? 100.0 * Double( errors) / Double( total) : 0.0
4136 switch errorRate {
4237 case 0 :
43- Current . logger ( ) . info ( " Updating \( total) packages for stage ' \( stage ) ' " )
38+ Current . logger ( ) . info ( " Updating \( total) packages for stage 'analysis ' " )
4439 case 0 ..< 20 :
45- Current . logger ( ) . info ( " Updating \( total) packages for stage ' \( stage ) ' (errors: \( errors) ) " )
40+ Current . logger ( ) . info ( " Updating \( total) packages for stage 'analysis ' (errors: \( errors) ) " )
4641 default :
4742 Current . logger ( ) . critical ( " updatePackages: unusually high error rate: \( errors) / \( total) = \( errorRate) % " )
4843 }
4944 }
5045 for result in results {
5146 do {
52- try await updatePackage ( client: client, database: database, result: result, stage : stage )
47+ try await updatePackage ( client: client, database: database, result: result)
5348 } catch {
5449 Current . logger ( ) . critical ( " updatePackage failed: \( error) " )
5550 }
@@ -58,26 +53,12 @@ extension Analyze {
5853 Current . logger ( ) . debug ( " updateStatus ops: \( results. count) " )
5954 }
6055
61- #warning("drop stage parameter")
6256 static func updatePackage( client: Client ,
6357 database: Database ,
64- result: Result < Joined < Package , Repository > , Error > ,
65- stage: Package . ProcessingStage ) async throws {
58+ result: Result < Joined < Package , Repository > , Error > ) async throws {
6659 switch result {
6760 case . success( let res) :
68- let pkg = res. package
69- if stage == . ingestion && pkg. status == . new {
70- // newly ingested package: leave status == .new for fast-track
71- // analysis
72- } else {
73- pkg. status = . ok
74- }
75- pkg. processingStage = stage
76- do {
77- try await pkg. update ( on: database)
78- } catch {
79- Current . logger ( ) . report ( error: error)
80- }
61+ try await res. package . update ( on: database, status: . ok, stage: . analysis)
8162
8263 // PSQLError also conforms to DatabaseError but we want to intercept it specifically,
8364 // because it allows us to log more concise error messages via serverInfo[.message]
@@ -86,16 +67,16 @@ extension Analyze {
8667 let error = error as! PSQLError
8768 let msg = error. serverInfo ? [ . message] ?? String ( reflecting: error)
8869 Current . logger ( ) . critical ( " \( msg) " )
89- try await recordError ( database: database, error: error, stage : stage )
70+ try await recordError ( database: database, error: error)
9071
9172 case let . failure( error) where error is DatabaseError :
9273 // Escalate database errors to critical
9374 Current . logger ( ) . critical ( " \( String ( reflecting: error) ) " )
94- try await recordError ( database: database, error: error, stage : stage )
75+ try await recordError ( database: database, error: error)
9576
9677 case let . failure( error) :
9778 Current . logger ( ) . report ( error: error)
98- try await recordError ( database: database, error: error, stage : stage )
79+ try await recordError ( database: database, error: error)
9980 }
10081 }
10182}
@@ -108,7 +89,9 @@ extension Ingestion {
10889 stage: Package . ProcessingStage ) async throws {
10990 switch result {
11091 case . success( let res) :
111- try await Common . updatePackage ( database: database, package : res. package , stage: stage)
92+ // for newly ingested package leave status == .new in order to fast-track analysis
93+ let updatedStatus : Package . Status = res. package . status == . new ? . new : . ok
94+ try await res. package . update ( on: database, status: updatedStatus, stage: stage)
11295 case . failure( let failure) :
11396 switch failure. underlyingError {
11497 case . fetchMetadataFailed:
@@ -130,61 +113,34 @@ extension Ingestion {
130113}
131114
132115
133- extension Common {
134- @available ( * , deprecated)
135- static func updatePackage( database: Database ,
136- package : Package ,
137- stage: Package . ProcessingStage ) async throws {
138- if stage == . ingestion && package . status == . new {
139- // newly ingested package: leave status == .new for fast-track
140- // analysis
141- } else {
142- package . status = . ok
143- }
144- package . processingStage = stage
145- do {
146- try await package . update ( on: database)
147- } catch {
148- Current . logger ( ) . report ( error: error)
149- }
150- }
151- }
152-
153-
154116extension Analyze {
155- static func recordError( database: Database ,
156- error: Error ,
157- stage: Package . ProcessingStage ) async throws {
158- if let error = error as? Ingestion . Error {
159- try await Ingestion . recordError ( database: database, error: error)
160- } else {
161- func setStatus( id: Package . Id ? , status: Package . Status ) async throws {
162- guard let id = id else { return }
163- try await Package . query ( on: database)
164- . filter ( \. $id == id)
165- . set ( \. $processingStage, to: stage)
166- . set ( \. $status, to: status)
167- . update ( )
168- }
117+ static func recordError( database: Database , error: Error ) async throws {
118+ func setStatus( id: Package . Id ? , status: Package . Status ) async throws {
119+ guard let id = id else { return }
120+ try await Package . query ( on: database)
121+ . filter ( \. $id == id)
122+ . set ( \. $processingStage, to: . analysis)
123+ . set ( \. $status, to: status)
124+ . update ( )
125+ }
169126
170- guard let error = error as? AppError else { return }
171-
172- switch error {
173- case let . analysisError( id, _) :
174- try await setStatus ( id: id, status: . analysisFailed)
175- case . envVariableNotSet, . shellCommandFailed:
176- break
177- case let . genericError( id, _) :
178- try await setStatus ( id: id, status: . ingestionFailed)
179- case let . invalidPackageCachePath( id, _) :
180- try await setStatus ( id: id, status: . invalidCachePath)
181- case let . cacheDirectoryDoesNotExist( id, _) :
182- try await setStatus ( id: id, status: . cacheDirectoryDoesNotExist)
183- case let . invalidRevision( id, _) :
184- try await setStatus ( id: id, status: . analysisFailed)
185- case let . noValidVersions( id, _) :
186- try await setStatus ( id: id, status: . noValidVersions)
187- }
127+ guard let error = error as? AppError else { return }
128+
129+ switch error {
130+ case let . analysisError( id, _) :
131+ try await setStatus ( id: id, status: . analysisFailed)
132+ case . envVariableNotSet, . shellCommandFailed:
133+ break
134+ case let . genericError( id, _) :
135+ try await setStatus ( id: id, status: . ingestionFailed)
136+ case let . invalidPackageCachePath( id, _) :
137+ try await setStatus ( id: id, status: . invalidCachePath)
138+ case let . cacheDirectoryDoesNotExist( id, _) :
139+ try await setStatus ( id: id, status: . cacheDirectoryDoesNotExist)
140+ case let . invalidRevision( id, _) :
141+ try await setStatus ( id: id, status: . analysisFailed)
142+ case let . noValidVersions( id, _) :
143+ try await setStatus ( id: id, status: . noValidVersions)
188144 }
189145 }
190146}
@@ -219,4 +175,10 @@ extension Package {
219175 . set ( \. $status, to: status)
220176 . update ( )
221177 }
178+
179+ func update( on database: Database , status: Status , stage: ProcessingStage ) async throws {
180+ self . status = status
181+ self . processingStage = stage
182+ try await update ( on: database)
183+ }
222184}
0 commit comments