Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions Sources/App/Commands/Alerting.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,15 @@ enum Alerting {
}

func run(using context: CommandContext, signature: Signature) async throws {
Current.setLogger(Logger(component: "alerting"))
@Dependency(\.logger) var logger
logger.set(to: Logger(component: "alerting"))

Current.logger().info("Running alerting...")
logger.info("Running alerting...")

let timePeriod = signature.duration
let limit = signature.limit ?? Alerting.defaultLimit

Current.logger().info("Validation time interval: \(timePeriod.hours)h, limit: \(limit)")
logger.info("Validation time interval: \(timePeriod.hours)h, limit: \(limit)")

let builds = try await Alerting.fetchBuilds(on: context.application.db, timePeriod: timePeriod, limit: limit)
try await Alerting.runBuildChecks(for: builds)
Expand Down Expand Up @@ -94,12 +95,14 @@ extension Alerting {
// to do
// - [ ] doc gen is configured but it failed

Current.logger().info("Build records selected: \(builds.count)")
@Dependency(\.logger) var logger

logger.info("Build records selected: \(builds.count)")
if let oldest = builds.last {
Current.logger().info("Oldest selected: \(oldest.createdAt)")
logger.info("Oldest selected: \(oldest.createdAt)")
}
if let mostRecent = builds.first {
Current.logger().info("Most recent selected: \(mostRecent.createdAt)")
logger.info("Most recent selected: \(mostRecent.createdAt)")
}
builds.validateBuildsPresent().log(check: "CHECK_BUILDS_PRESENT")
builds.validatePlatformsPresent().log(check: "CHECK_BUILDS_PLATFORMS_PRESENT")
Expand All @@ -114,9 +117,11 @@ extension Alerting {
}

static func fetchBuilds(on database: Database, timePeriod: TimeAmount, limit: Int) async throws -> [Alerting.BuildInfo] {
@Dependency(\.logger) var logger

let start = Date.now
defer {
Current.logger().debug("fetchBuilds elapsed: \(Date.now.timeIntervalSince(start).rounded(decimalPlaces: 2))s")
logger.debug("fetchBuilds elapsed: \(Date.now.timeIntervalSince(start).rounded(decimalPlaces: 2))s")
}
@Dependency(\.date.now) var now
let cutoff = now.addingTimeInterval(-timePeriod.timeInterval)
Expand Down Expand Up @@ -187,15 +192,16 @@ extension Alerting {
case failed(reasons: [String])

func log(check: String) {
@Dependency(\.logger) var logger
switch self {
case .ok:
Current.logger().debug("\(check) passed")
logger.debug("\(check) passed")
case .failed(let reasons):
if reasons.count >= 5 {
Current.logger().critical("\(check) failures: \(reasons.count)")
logger.critical("\(check) failures: \(reasons.count)")
}
for reason in reasons {
Current.logger().critical("\(check) failed: \(reason)")
logger.critical("\(check) failed: \(reason)")
}
}
}
Expand Down
55 changes: 35 additions & 20 deletions Sources/App/Commands/Analyze.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ enum Analyze {
func run(using context: CommandContext, signature: SPICommand.Signature) async throws {
let client = context.application.client
let db = context.application.db
Current.setLogger(Logger(component: "analyze"))
@Dependency(\.logger) var logger
logger.set(to: Logger(component: "analyze"))

Analyze.resetMetrics()

Expand All @@ -39,20 +40,20 @@ enum Analyze {
database: db,
mode: .init(signature: signature))
} catch {
Current.logger().error("\(error.localizedDescription)")
logger.error("\(error.localizedDescription)")
}

do {
try Analyze.trimCheckouts()
} catch {
Current.logger().error("\(error.localizedDescription)")
logger.error("\(error.localizedDescription)")
}

do {
try await AppMetrics.push(client: client,
jobName: "analyze")
} catch {
Current.logger().warning("\(error.localizedDescription)")
logger.warning("\(error.localizedDescription)")
}
}
}
Expand Down Expand Up @@ -110,19 +111,21 @@ extension Analyze {
let start = DispatchTime.now().uptimeNanoseconds
defer { AppMetrics.analyzeDurationSeconds?.time(since: start) }

@Dependency(\.logger) var logger

switch mode {
case .id(let id):
Current.logger().info("Analyzing (id: \(id)) ...")
logger.info("Analyzing (id: \(id)) ...")
let pkg = try await Package.fetchCandidate(database, id: id)
try await analyze(client: client, database: database, packages: [pkg])

case .limit(let limit):
Current.logger().info("Analyzing (limit: \(limit)) ...")
logger.info("Analyzing (limit: \(limit)) ...")
let packages = try await Package.fetchCandidates(database, for: .analysis, limit: limit)
try await analyze(client: client, database: database, packages: packages)

case .url(let url):
Current.logger().info("Analyzing (url: \(url)) ...")
logger.info("Analyzing (url: \(url)) ...")
let pkg = try await Package.fetchCandidate(database, url: url)
try await analyze(client: client, database: database, packages: [pkg])
}
Expand All @@ -140,10 +143,12 @@ extension Analyze {
packages: [Joined<Package, Repository>]) async throws {
AppMetrics.analyzeCandidatesCount?.set(packages.count)

// get or create directory
@Dependency(\.fileManager) var fileManager
@Dependency(\.logger) var logger

// get or create directory
let checkoutDir = fileManager.checkoutsDirectory()
Current.logger().info("Checkout directory: \(checkoutDir)")
logger.info("Checkout directory: \(checkoutDir)")
if !fileManager.fileExists(atPath: checkoutDir) {
try await createCheckoutsDirectory(client: client, path: checkoutDir)
}
Expand All @@ -170,6 +175,8 @@ extension Analyze {
package: Joined<Package, Repository>) async throws {
try await refreshCheckout(package: package)

@Dependency(\.logger) var logger

// 2024-10-05 sas: We need to explicitly weave dependencies into the `transaction` closure, because escaping closures strip them.
// https://github.com/pointfreeco/swift-dependencies/discussions/283#discussioncomment-10846172
// This might not be needed in Vapor 5 / FluentKit 2
Expand All @@ -183,7 +190,7 @@ extension Analyze {
package: package)
let netDeleteCount = versionDelta.toDelete.count - versionDelta.toAdd.count
if netDeleteCount > 1 {
Current.logger().warning("Suspicious loss of \(netDeleteCount) versions for package \(package.model.id)")
logger.warning("Suspicious loss of \(netDeleteCount) versions for package \(package.model.id)")
}

try await applyVersionDelta(on: tx, delta: versionDelta)
Expand Down Expand Up @@ -235,15 +242,16 @@ extension Analyze {

static func createCheckoutsDirectory(client: Client,
path: String) async throws {
Current.logger().info("Creating checkouts directory at path: \(path)")
@Dependency(\.logger) var logger
logger.info("Creating checkouts directory at path: \(path)")
do {
@Dependency(\.fileManager) var fileManager
try fileManager.createDirectory(atPath: path,
withIntermediateDirectories: false,
attributes: nil)
} catch {
let error = AppError.genericError(nil, "Failed to create checkouts directory: \(error.localizedDescription)")
Current.logger().report(error: error)
logger.logger.report(error: error)
}
}

Expand All @@ -254,7 +262,8 @@ extension Analyze {
/// - url: url to clone from
/// - Throws: Shell errors
static func clone(cacheDir: String, url: String) async throws {
Current.logger().info("cloning \(url) to \(cacheDir)")
@Dependency(\.logger) var logger
logger.info("cloning \(url) to \(cacheDir)")
@Dependency(\.fileManager) var fileManager
@Dependency(\.shell) var shell
try await shell.run(command: .gitClone(url: URL(string: url)!, to: cacheDir),
Expand All @@ -270,13 +279,14 @@ extension Analyze {
/// - Throws: Shell errors
static func fetch(cacheDir: String, branch: String, url: String) async throws {
@Dependency(\.fileManager) var fileManager
@Dependency(\.logger) var logger
@Dependency(\.shell) var shell
Current.logger().info("pulling \(url) in \(cacheDir)")
logger.info("pulling \(url) in \(cacheDir)")
// clean up stray lock files that might have remained from aborted commands
for fileName in ["HEAD.lock", "index.lock"] {
let filePath = cacheDir + "/.git/\(fileName)"
if fileManager.fileExists(atPath: filePath) {
Current.logger().info("Removing stale \(fileName) at path: \(filePath)")
logger.info("Removing stale \(fileName) at path: \(filePath)")
try await shell.run(command: .removeFile(from: filePath), at: .cwd)
}
}
Expand All @@ -294,6 +304,7 @@ extension Analyze {
/// - package: `Package` to refresh
static func refreshCheckout(package: Joined<Package, Repository>) async throws {
@Dependency(\.fileManager) var fileManager
@Dependency(\.logger) var logger
@Dependency(\.shell) var shell

guard let cacheDir = fileManager.cacheDirectoryPath(for: package.model) else {
Expand All @@ -313,7 +324,7 @@ extension Analyze {
branch: package.repository?.defaultBranch ?? "master",
url: package.model.url)
} catch {
Current.logger().info("fetch failed: \(error.localizedDescription)")
logger.info("fetch failed: \(error.localizedDescription)")
try await shell.run(command: .removeFile(from: cacheDir, arguments: ["-r", "-f"]), at: .cwd)
try await clone(cacheDir: cacheDir, url: package.model.url)
}
Expand Down Expand Up @@ -357,6 +368,8 @@ extension Analyze {
static func diffVersions(client: Client,
transaction: Database,
package: Joined<Package, Repository>) async throws -> VersionDelta {
@Dependency(\.logger) var logger

guard let pkgId = package.model.id else {
throw AppError.genericError(nil, "PANIC: package id nil for package \(package.model.url)")
}
Expand All @@ -374,7 +387,7 @@ extension Analyze {
let newDiff = Version.diff(local: existing, incoming: throttled)
let delta = origDiff.toAdd.count - newDiff.toAdd.count
if delta > 0 {
Current.logger().info("throttled \(delta) incoming revisions")
logger.info("throttled \(delta) incoming revisions")
AppMetrics.buildThrottleCount?.inc(delta)
}
return newDiff
Expand Down Expand Up @@ -506,12 +519,13 @@ extension Analyze {
/// have processed the new version.
/// - Parameter versionDelta: The version change
static func carryOverDefaultBranchData(versionDelta: VersionDelta) {
@Dependency(\.logger) var logger
guard versionDelta.toDelete.filter(\.isBranch).count <= 1 else {
Current.logger().warning("versionDelta.toDelete has more than one branch version")
logger.warning("versionDelta.toDelete has more than one branch version")
return
}
guard versionDelta.toAdd.filter(\.isBranch).count <= 1 else {
Current.logger().warning("versionDelta.toAdd has more than one branch version")
logger.warning("versionDelta.toAdd has more than one branch version")
return
}
guard let oldDefaultBranch = versionDelta.toDelete.first(where: \.isBranch),
Expand Down Expand Up @@ -745,7 +759,8 @@ extension Analyze {
package: package,
versions: versions)
} catch {
Current.logger().warning("Social.postToFirehose failed: \(error.localizedDescription)")
@Dependency(\.logger) var logger
logger.warning("Social.postToFirehose failed: \(error.localizedDescription)")
}
}

Expand Down
22 changes: 13 additions & 9 deletions Sources/App/Commands/Common.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

import Dependencies
import Fluent
import PostgresKit
import Vapor
Expand Down Expand Up @@ -39,33 +40,35 @@ extension Analyze {
static func updatePackages(client: Client,
database: Database,
results: [Result<Joined<Package, Repository>, Error>]) async throws {
@Dependency(\.logger) var logger
do {
let total = results.count
let errors = results.filter(\.isError).count
let errorRate = total > 0 ? 100.0 * Double(errors) / Double(total) : 0.0
switch errorRate {
case 0:
Current.logger().info("Updating \(total) packages for stage 'analysis'")
logger.info("Updating \(total) packages for stage 'analysis'")
case 0..<20:
Current.logger().info("Updating \(total) packages for stage 'analysis' (errors: \(errors))")
logger.info("Updating \(total) packages for stage 'analysis' (errors: \(errors))")
default:
Current.logger().critical("updatePackages: unusually high error rate: \(errors)/\(total) = \(errorRate)%")
logger.critical("updatePackages: unusually high error rate: \(errors)/\(total) = \(errorRate)%")
}
}
for result in results {
do {
try await updatePackage(client: client, database: database, result: result)
} catch {
Current.logger().critical("updatePackage failed: \(error)")
logger.critical("updatePackage failed: \(error)")
}
}

Current.logger().debug("updateStatus ops: \(results.count)")
logger.debug("updateStatus ops: \(results.count)")
}

static func updatePackage(client: Client,
database: Database,
result: Result<Joined<Package, Repository>, Error>) async throws {
@Dependency(\.logger) var logger
switch result {
case .success(let res):
try await res.package.update(on: database, status: .ok, stage: .analysis)
Expand All @@ -76,16 +79,16 @@ extension Analyze {
// Escalate database errors to critical
let error = error as! PSQLError
let msg = error.serverInfo?[.message] ?? String(reflecting: error)
Current.logger().critical("\(msg)")
logger.critical("\(msg)")
try await recordError(database: database, error: error)

case let .failure(error) where error is DatabaseError:
// Escalate database errors to critical
Current.logger().critical("\(String(reflecting: error))")
logger.critical("\(String(reflecting: error))")
try await recordError(database: database, error: error)

case let .failure(error):
Current.logger().report(error: error)
logger.report(error: error)
try await recordError(database: database, error: error)
}
}
Expand Down Expand Up @@ -128,13 +131,14 @@ extension Ingestion {
database: Database,
result: Result<Joined<Package, Repository>, Ingestion.Error>,
stage: Package.ProcessingStage) async throws {
@Dependency(\.logger) var logger
switch result {
case .success(let res):
// for newly ingested package leave status == .new in order to fast-track analysis
let updatedStatus: Package.Status = res.package.status == .new ? .new : .ok
try await res.package.update(on: database, status: updatedStatus, stage: stage)
case .failure(let failure):
Current.logger().log(level: failure.level, "\(failure)")
logger.log(level: failure.level, "\(failure)")
try await Package.update(for: failure.packageId, on: database, status: failure.status, stage: stage)
}
}
Expand Down
Loading
Loading