Skip to content

Commit 5dc97ff

Browse files
committed
Disentangle updatePackage/recordError into Common, Analyze, Ingestion parts
1 parent 583128c commit 5dc97ff

File tree

6 files changed

+189
-220
lines changed

6 files changed

+189
-220
lines changed

Sources/App/Commands/Analyze.swift

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -152,10 +152,11 @@ extension Analyze {
152152
}
153153
}
154154

155-
try await updatePackages(client: client,
156-
database: database,
157-
results: packageResults,
158-
stage: .analysis)
155+
#warning("drop prefix")
156+
try await Analyze.updatePackages(client: client,
157+
database: database,
158+
results: packageResults,
159+
stage: .analysis)
159160

160161
try await RecentPackage.refresh(on: database)
161162
try await RecentRelease.refresh(on: database)

Sources/App/Commands/Common.swift

Lines changed: 158 additions & 142 deletions
Original file line numberDiff line numberDiff line change
@@ -17,176 +17,192 @@ import PostgresKit
1717
import Vapor
1818

1919

20-
/// Update packages (in the `[Result<Joined<Package, Repository>, Error>]` array).
21-
///
22-
/// - Parameters:
23-
/// - client: `Client` object
24-
/// - database: `Database` object
25-
/// - results: `Joined<Package, Repository>` results to update
26-
/// - stage: Processing stage
27-
func updatePackages(client: Client,
28-
database: Database,
29-
results: [Result<Joined<Package, Repository>, Error>],
30-
stage: Package.ProcessingStage) async throws {
31-
do {
32-
let total = results.count
33-
let errors = results.filter(\.isError).count
34-
let errorRate = total > 0 ? 100.0 * Double(errors) / Double(total) : 0.0
35-
switch errorRate {
36-
case 0:
37-
Current.logger().info("Updating \(total) packages for stage '\(stage)'")
38-
case 0..<20:
39-
Current.logger().info("Updating \(total) packages for stage '\(stage)' (errors: \(errors))")
40-
default:
41-
Current.logger().critical("updatePackages: unusually high error rate: \(errors)/\(total) = \(errorRate)%")
42-
}
43-
}
44-
for result in results {
20+
enum Common { }
21+
22+
23+
#warning("move")
24+
extension Analyze {
25+
/// Update packages (in the `[Result<Joined<Package, Repository>, Error>]` array).
26+
///
27+
/// - Parameters:
28+
/// - client: `Client` object
29+
/// - database: `Database` object
30+
/// - results: `Joined<Package, Repository>` results to update
31+
/// - stage: Processing stage
32+
#warning("drop stage parameter")
33+
static func updatePackages(client: Client,
34+
database: Database,
35+
results: [Result<Joined<Package, Repository>, Error>],
36+
stage: Package.ProcessingStage) async throws {
4537
do {
46-
try await updatePackage(client: client, database: database, result: result, stage: stage)
47-
} catch {
48-
Current.logger().critical("updatePackage failed: \(error)")
49-
}
50-
}
51-
52-
Current.logger().debug("updateStatus ops: \(results.count)")
53-
}
54-
55-
56-
func updatePackage(client: Client,
57-
database: Database,
58-
result: Result<Joined<Package, Repository>, Error>,
59-
stage: Package.ProcessingStage) async throws {
60-
switch result {
61-
case .success(let res):
62-
let pkg = res.package
63-
if stage == .ingestion && pkg.status == .new {
64-
// newly ingested package: leave status == .new for fast-track
65-
// analysis
66-
} else {
67-
pkg.status = .ok
38+
let total = results.count
39+
let errors = results.filter(\.isError).count
40+
let errorRate = total > 0 ? 100.0 * Double(errors) / Double(total) : 0.0
41+
switch errorRate {
42+
case 0:
43+
Current.logger().info("Updating \(total) packages for stage '\(stage)'")
44+
case 0..<20:
45+
Current.logger().info("Updating \(total) packages for stage '\(stage)' (errors: \(errors))")
46+
default:
47+
Current.logger().critical("updatePackages: unusually high error rate: \(errors)/\(total) = \(errorRate)%")
6848
}
69-
pkg.processingStage = stage
49+
}
50+
for result in results {
7051
do {
71-
try await pkg.update(on: database)
52+
try await updatePackage(client: client, database: database, result: result, stage: stage)
7253
} catch {
73-
Current.logger().report(error: error)
54+
Current.logger().critical("updatePackage failed: \(error)")
7455
}
56+
}
7557

76-
// PSQLError also conforms to DatabaseError but we want to intercept it specifically,
77-
// because it allows us to log more concise error messages via serverInfo[.message]
78-
case let .failure(error) where error is PSQLError:
79-
// Escalate database errors to critical
80-
let error = error as! PSQLError
81-
let msg = error.serverInfo?[.message] ?? String(reflecting: error)
82-
Current.logger().critical("\(msg)")
83-
try await recordError(database: database, error: error, stage: stage)
84-
85-
case let .failure(error) where error is DatabaseError:
86-
// Escalate database errors to critical
87-
Current.logger().critical("\(String(reflecting: error))")
88-
try await recordError(database: database, error: error, stage: stage)
89-
90-
case let .failure(error):
91-
Current.logger().report(error: error)
92-
try await recordError(database: database, error: error, stage: stage)
58+
Current.logger().debug("updateStatus ops: \(results.count)")
59+
}
60+
61+
#warning("drop stage parameter")
62+
static func updatePackage(client: Client,
63+
database: Database,
64+
result: Result<Joined<Package, Repository>, Error>,
65+
stage: Package.ProcessingStage) async throws {
66+
switch result {
67+
case .success(let res):
68+
let pkg = res.package
69+
if stage == .ingestion && pkg.status == .new {
70+
// newly ingested package: leave status == .new for fast-track
71+
// analysis
72+
} else {
73+
pkg.status = .ok
74+
}
75+
pkg.processingStage = stage
76+
do {
77+
try await pkg.update(on: database)
78+
} catch {
79+
Current.logger().report(error: error)
80+
}
81+
82+
// PSQLError also conforms to DatabaseError but we want to intercept it specifically,
83+
// because it allows us to log more concise error messages via serverInfo[.message]
84+
case let .failure(error) where error is PSQLError:
85+
// Escalate database errors to critical
86+
let error = error as! PSQLError
87+
let msg = error.serverInfo?[.message] ?? String(reflecting: error)
88+
Current.logger().critical("\(msg)")
89+
try await recordError(database: database, error: error, stage: stage)
90+
91+
case let .failure(error) where error is DatabaseError:
92+
// Escalate database errors to critical
93+
Current.logger().critical("\(String(reflecting: error))")
94+
try await recordError(database: database, error: error, stage: stage)
95+
96+
case let .failure(error):
97+
Current.logger().report(error: error)
98+
try await recordError(database: database, error: error, stage: stage)
99+
}
93100
}
94101
}
95102

96103

97-
func updatePackage(client: Client,
98-
database: Database,
99-
result: Result<Joined<Package, Repository>, Ingestion.Error>,
100-
stage: Package.ProcessingStage) async throws {
101-
switch result {
102-
case .success(let res):
103-
try await updatePackage(database: database, package: res.package, stage: stage)
104-
case .failure(let failure):
105-
switch failure.underlyingError {
106-
case .fetchMetadataFailed:
107-
Current.logger().warning("\(failure)")
104+
extension Ingestion {
105+
static func updatePackage(client: Client,
106+
database: Database,
107+
result: Result<Joined<Package, Repository>, Ingestion.Error>,
108+
stage: Package.ProcessingStage) async throws {
109+
switch result {
110+
case .success(let res):
111+
try await Common.updatePackage(database: database, package: res.package, stage: stage)
112+
case .failure(let failure):
113+
switch failure.underlyingError {
114+
case .fetchMetadataFailed:
115+
Current.logger().warning("\(failure)")
108116

109-
case .findOrCreateRepositoryFailed:
110-
Current.logger().critical("\(failure)")
117+
case .findOrCreateRepositoryFailed:
118+
Current.logger().critical("\(failure)")
111119

112-
case .invalidURL, .noRepositoryMetadata:
113-
Current.logger().warning("\(failure)")
120+
case .invalidURL, .noRepositoryMetadata:
121+
Current.logger().warning("\(failure)")
114122

115-
case .repositorySaveFailed, .repositorySaveUniqueViolation:
116-
Current.logger().critical("\(failure)")
117-
}
123+
case .repositorySaveFailed, .repositorySaveUniqueViolation:
124+
Current.logger().critical("\(failure)")
125+
}
118126

119-
try await recordIngestionError(database: database, error: failure)
127+
try await Ingestion.recordError(database: database, error: failure)
128+
}
120129
}
121130
}
122131

123132

124-
func updatePackage(database: Database,
125-
package: Package,
126-
stage: Package.ProcessingStage) async throws {
127-
if stage == .ingestion && package.status == .new {
128-
// newly ingested package: leave status == .new for fast-track
129-
// analysis
130-
} else {
131-
package.status = .ok
132-
}
133-
package.processingStage = stage
134-
do {
135-
try await package.update(on: database)
136-
} catch {
137-
Current.logger().report(error: error)
133+
extension Common {
134+
@available(*, deprecated)
135+
static func updatePackage(database: Database,
136+
package: Package,
137+
stage: Package.ProcessingStage) async throws {
138+
if stage == .ingestion && package.status == .new {
139+
// newly ingested package: leave status == .new for fast-track
140+
// analysis
141+
} else {
142+
package.status = .ok
143+
}
144+
package.processingStage = stage
145+
do {
146+
try await package.update(on: database)
147+
} catch {
148+
Current.logger().report(error: error)
149+
}
138150
}
139151
}
140152

141153

142-
func recordError(database: Database,
143-
error: Error,
144-
stage: Package.ProcessingStage) async throws {
145-
if let error = error as? Ingestion.Error {
146-
try await recordIngestionError(database: database, error: error)
147-
} else {
148-
func setStatus(id: Package.Id?, status: Package.Status) async throws {
149-
guard let id = id else { return }
150-
try await Package.query(on: database)
151-
.filter(\.$id == id)
152-
.set(\.$processingStage, to: stage)
153-
.set(\.$status, to: status)
154-
.update()
155-
}
154+
extension Analyze {
155+
static func recordError(database: Database,
156+
error: Error,
157+
stage: Package.ProcessingStage) async throws {
158+
if let error = error as? Ingestion.Error {
159+
try await Ingestion.recordError(database: database, error: error)
160+
} else {
161+
func setStatus(id: Package.Id?, status: Package.Status) async throws {
162+
guard let id = id else { return }
163+
try await Package.query(on: database)
164+
.filter(\.$id == id)
165+
.set(\.$processingStage, to: stage)
166+
.set(\.$status, to: status)
167+
.update()
168+
}
156169

157-
guard let error = error as? AppError else { return }
158-
159-
switch error {
160-
case let .analysisError(id, _):
161-
try await setStatus(id: id, status: .analysisFailed)
162-
case .envVariableNotSet, .shellCommandFailed:
163-
break
164-
case let .genericError(id, _):
165-
try await setStatus(id: id, status: .ingestionFailed)
166-
case let .invalidPackageCachePath(id, _):
167-
try await setStatus(id: id, status: .invalidCachePath)
168-
case let .cacheDirectoryDoesNotExist(id, _):
169-
try await setStatus(id: id, status: .cacheDirectoryDoesNotExist)
170-
case let .invalidRevision(id, _):
171-
try await setStatus(id: id, status: .analysisFailed)
172-
case let .noValidVersions(id, _):
173-
try await setStatus(id: id, status: .noValidVersions)
170+
guard let error = error as? AppError else { return }
171+
172+
switch error {
173+
case let .analysisError(id, _):
174+
try await setStatus(id: id, status: .analysisFailed)
175+
case .envVariableNotSet, .shellCommandFailed:
176+
break
177+
case let .genericError(id, _):
178+
try await setStatus(id: id, status: .ingestionFailed)
179+
case let .invalidPackageCachePath(id, _):
180+
try await setStatus(id: id, status: .invalidCachePath)
181+
case let .cacheDirectoryDoesNotExist(id, _):
182+
try await setStatus(id: id, status: .cacheDirectoryDoesNotExist)
183+
case let .invalidRevision(id, _):
184+
try await setStatus(id: id, status: .analysisFailed)
185+
case let .noValidVersions(id, _):
186+
try await setStatus(id: id, status: .noValidVersions)
187+
}
174188
}
175189
}
176190
}
177191

178192

179-
func recordIngestionError(database: Database, error: Ingestion.Error) async throws {
180-
switch error.underlyingError {
181-
case .fetchMetadataFailed, .findOrCreateRepositoryFailed, .noRepositoryMetadata, .repositorySaveFailed:
182-
try await Package
183-
.update(for: error.packageId, on: database, status: .ingestionFailed, stage: .ingestion)
184-
case .invalidURL:
185-
try await Package
186-
.update(for: error.packageId, on: database, status: .invalidUrl, stage: .ingestion)
187-
case .repositorySaveUniqueViolation:
188-
try await Package
189-
.update(for: error.packageId, on: database, status: .ingestionFailed, stage: .ingestion)
193+
extension Ingestion {
194+
static func recordError(database: Database, error: Ingestion.Error) async throws {
195+
switch error.underlyingError {
196+
case .fetchMetadataFailed, .findOrCreateRepositoryFailed, .noRepositoryMetadata, .repositorySaveFailed:
197+
try await Package
198+
.update(for: error.packageId, on: database, status: .ingestionFailed, stage: .ingestion)
199+
case .invalidURL:
200+
try await Package
201+
.update(for: error.packageId, on: database, status: .invalidUrl, stage: .ingestion)
202+
case .repositorySaveUniqueViolation:
203+
try await Package
204+
.update(for: error.packageId, on: database, status: .ingestionFailed, stage: .ingestion)
205+
}
190206
}
191207
}
192208

0 commit comments

Comments
 (0)