@@ -17,111 +17,125 @@ import PostgresKit
1717import Vapor
1818
1919
20- /// Update packages (in the `[Result<Joined<Package, Repository>, Error>]` array).
21- ///
22- /// - Parameters:
23- /// - client: `Client` object
24- /// - database: `Database` object
25- /// - results: `Joined<Package, Repository>` results to update
26- /// - stage: Processing stage
27- func updatePackages( client: Client ,
28- database: Database ,
29- results: [ Result < Joined < Package , Repository > , Error > ] ,
30- stage: Package . ProcessingStage ) async throws {
31- do {
32- let total = results. count
33- let errors = results. filter ( \. isError) . count
34- let errorRate = total > 0 ? 100.0 * Double( errors) / Double( total) : 0.0
35- switch errorRate {
36- case 0 :
37- Current . logger ( ) . info ( " Updating \( total) packages for stage ' \( stage) ' " )
38- case 0 ..< 20 :
39- Current . logger ( ) . info ( " Updating \( total) packages for stage ' \( stage) ' (errors: \( errors) ) " )
40- default :
41- Current . logger ( ) . critical ( " updatePackages: unusually high error rate: \( errors) / \( total) = \( errorRate) % " )
42- }
43- }
44- for result in results {
45- do {
46- try await updatePackage ( client: client, database: database, result: result, stage: stage)
47- } catch {
48- Current . logger ( ) . critical ( " updatePackage failed: \( error) " )
49- }
50- }
51-
52- Current . logger ( ) . debug ( " updateStatus ops: \( results. count) " )
20+ // TODO: Adopt ProcessingError also in Analysis and then factor out generic parts back into Common
21+ protocol ProcessingError : Error , CustomStringConvertible {
22+ associatedtype UnderlyingError : Error & CustomStringConvertible
23+ var packageId : Package . Id { get }
24+ var underlyingError : UnderlyingError { get }
25+ var level : Logger . Level { get }
26+ var status : Package . Status { get }
5327}
5428
5529
56- func updatePackage( client: Client ,
57- database: Database ,
58- result: Result < Joined < Package , Repository > , Error > ,
59- stage: Package . ProcessingStage ) async throws {
60- switch result {
61- case . success( let res) :
62- let pkg = res. package
63- if stage == . ingestion && pkg. status == . new {
64- // newly ingested package: leave status == .new for fast-track
65- // analysis
66- } else {
67- pkg. status = . ok
30+ // TODO: Leaving this extension here for now in order to group the updating/error reporting in one place for both Ingestion and Analysis. Eventually these should either go to their respective files or move common parts into a Common namespace.
31+ extension Analyze {
32+ /// Update packages (in the `[Result<Joined<Package, Repository>, Error>]` array).
33+ ///
34+ /// - Parameters:
35+ /// - client: `Client` object
36+ /// - database: `Database` object
37+ /// - results: `Joined<Package, Repository>` results to update
38+ /// - stage: Processing stage
39+ static func updatePackages( client: Client ,
40+ database: Database ,
41+ results: [ Result < Joined < Package , Repository > , Error > ] ) async throws {
42+ do {
43+ let total = results. count
44+ let errors = results. filter ( \. isError) . count
45+ let errorRate = total > 0 ? 100.0 * Double( errors) / Double( total) : 0.0
46+ switch errorRate {
47+ case 0 :
48+ Current . logger ( ) . info ( " Updating \( total) packages for stage 'analysis' " )
49+ case 0 ..< 20 :
50+ Current . logger ( ) . info ( " Updating \( total) packages for stage 'analysis' (errors: \( errors) ) " )
51+ default :
52+ Current . logger ( ) . critical ( " updatePackages: unusually high error rate: \( errors) / \( total) = \( errorRate) % " )
6853 }
69- pkg. processingStage = stage
54+ }
55+ for result in results {
7056 do {
71- try await pkg . update ( on : database)
57+ try await updatePackage ( client : client , database: database , result : result )
7258 } catch {
73- Current . logger ( ) . report ( error : error)
59+ Current . logger ( ) . critical ( " updatePackage failed: \( error) " )
7460 }
61+ }
7562
76- // PSQLError also conforms to DatabaseError but we want to intercept it specifically,
77- // because it allows us to log more concise error messages via serverInfo[.message]
78- case let . failure( error) where error is PSQLError :
79- // Escalate database errors to critical
80- let error = error as! PSQLError
81- let msg = error. serverInfo ? [ . message] ?? String ( reflecting: error)
82- Current . logger ( ) . critical ( " \( msg) " )
83- try await recordError ( database: database, error: error, stage: stage)
63+ Current . logger ( ) . debug ( " updateStatus ops: \( results. count) " )
64+ }
8465
85- case let . failure( error) where error is DatabaseError :
86- // Escalate database errors to critical
87- Current . logger ( ) . critical ( " \( String ( reflecting: error) ) " )
88- try await recordError ( database: database, error: error, stage: stage)
66+ static func updatePackage( client: Client ,
67+ database: Database ,
68+ result: Result < Joined < Package , Repository > , Error > ) async throws {
69+ switch result {
70+ case . success( let res) :
71+ try await res. package . update ( on: database, status: . ok, stage: . analysis)
8972
90- case let . failure( error) :
91- Current . logger ( ) . report ( error: error)
92- try await recordError ( database: database, error: error, stage: stage)
73+ // PSQLError also conforms to DatabaseError but we want to intercept it specifically,
74+ // because it allows us to log more concise error messages via serverInfo[.message]
75+ case let . failure( error) where error is PSQLError :
76+ // Escalate database errors to critical
77+ let error = error as! PSQLError
78+ let msg = error. serverInfo ? [ . message] ?? String ( reflecting: error)
79+ Current . logger ( ) . critical ( " \( msg) " )
80+ try await recordError ( database: database, error: error)
81+
82+ case let . failure( error) where error is DatabaseError :
83+ // Escalate database errors to critical
84+ Current . logger ( ) . critical ( " \( String ( reflecting: error) ) " )
85+ try await recordError ( database: database, error: error)
86+
87+ case let . failure( error) :
88+ Current . logger ( ) . report ( error: error)
89+ try await recordError ( database: database, error: error)
90+ }
9391 }
94- }
9592
93+ static func recordError( database: Database , error: Error ) async throws {
94+ func setStatus( id: Package . Id ? , status: Package . Status ) async throws {
95+ guard let id = id else { return }
96+ try await Package . query ( on: database)
97+ . filter ( \. $id == id)
98+ . set ( \. $processingStage, to: . analysis)
99+ . set ( \. $status, to: status)
100+ . update ( )
101+ }
102+
103+ guard let error = error as? AppError else { return }
96104
97- func recordError( database: Database ,
98- error: Error ,
99- stage: Package . ProcessingStage ) async throws {
100- func setStatus( id: Package . Id ? , status: Package . Status ) async throws {
101- guard let id = id else { return }
102- try await Package . query ( on: database)
103- . filter ( \. $id == id)
104- . set ( \. $processingStage, to: stage)
105- . set ( \. $status, to: status)
106- . update ( )
105+ switch error {
106+ case let . analysisError( id, _) :
107+ try await setStatus ( id: id, status: . analysisFailed)
108+ case . envVariableNotSet, . shellCommandFailed:
109+ break
110+ case let . genericError( id, _) :
111+ try await setStatus ( id: id, status: . ingestionFailed)
112+ case let . invalidPackageCachePath( id, _) :
113+ try await setStatus ( id: id, status: . invalidCachePath)
114+ case let . cacheDirectoryDoesNotExist( id, _) :
115+ try await setStatus ( id: id, status: . cacheDirectoryDoesNotExist)
116+ case let . invalidRevision( id, _) :
117+ try await setStatus ( id: id, status: . analysisFailed)
118+ case let . noValidVersions( id, _) :
119+ try await setStatus ( id: id, status: . noValidVersions)
120+ }
107121 }
122+ }
108123
109- guard let error = error as? AppError else { return }
110124
111- switch error {
112- case let . analysisError ( id , _ ) :
113- try await setStatus ( id : id , status : . analysisFailed )
114- case . envVariableNotSet , . shellCommandFailed :
115- break
116- case let . genericError ( id , _ ) :
117- try await setStatus ( id : id , status : . ingestionFailed )
118- case let . invalidPackageCachePath ( id , _ ) :
119- try await setStatus ( id : id , status: . invalidCachePath )
120- case let . cacheDirectoryDoesNotExist ( id , _ ) :
121- try await setStatus ( id : id , status: . cacheDirectoryDoesNotExist )
122- case let . invalidRevision ( id , _ ) :
123- try await setStatus ( id : id , status : . analysisFailed )
124- case let . noValidVersions ( id , _ ) :
125- try await setStatus ( id : id , status : . noValidVersions )
125+ // TODO: Leaving this extension here for now in order to group the updating/ error reporting in one place for both Ingestion and Analysis. Eventually these should either go to their respective files or move common parts into a Common namespace.
126+ extension Ingestion {
127+ static func updatePackage ( client : Client ,
128+ database : Database ,
129+ result : Result < Joined < Package , Repository > , Ingestion . Error > ,
130+ stage : Package . ProcessingStage ) async throws {
131+ switch result {
132+ case . success ( let res ) :
133+ // for newly ingested package leave status == .new in order to fast-track analysis
134+ let updatedStatus : Package . Status = res . package . status == . new ? . new : . ok
135+ try await res . package . update ( on : database , status: updatedStatus , stage : stage )
136+ case . failure ( let failure ) :
137+ Current . logger ( ) . log ( level : failure . level , " \( failure ) " )
138+ try await Package . update ( for : failure . packageId , on : database , status : failure . status , stage : stage )
139+ }
126140 }
127141}
0 commit comments