Skip to content

Commit 14e1721

Browse files
committed
Move all top level ingestion functions into Ingestion namespace, rename files
1 parent 806a366 commit 14e1721

File tree

8 files changed

+95
-99
lines changed

8 files changed

+95
-99
lines changed

Sources/App/Commands/Ingest.swift renamed to Sources/App/Commands/Ingestion.swift

Lines changed: 70 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import Vapor
1919

2020

2121
enum Ingestion {
22+
2223
struct Error: ProcessingError {
2324
var packageId: Package.Id
2425
var underlyingError: UnderlyingError
@@ -73,100 +74,95 @@ enum Ingestion {
7374
}
7475
}
7576
}
76-
}
7777

7878

79-
struct IngestCommand: AsyncCommand {
80-
typealias Signature = SPICommand.Signature
79+
struct Command: AsyncCommand {
80+
typealias Signature = SPICommand.Signature
8181

82-
var help: String { "Run package ingestion (fetching repository metadata)" }
82+
var help: String { "Run package ingestion (fetching repository metadata)" }
8383

84-
func run(using context: CommandContext, signature: SPICommand.Signature) async {
85-
let client = context.application.client
86-
let db = context.application.db
87-
Current.setLogger(Logger(component: "ingest"))
84+
func run(using context: CommandContext, signature: SPICommand.Signature) async {
85+
let client = context.application.client
86+
let db = context.application.db
87+
Current.setLogger(Logger(component: "ingest"))
8888

89-
Self.resetMetrics()
89+
Self.resetMetrics()
9090

91-
do {
92-
try await ingest(client: client, database: db, mode: .init(signature: signature))
93-
} catch {
94-
Current.logger().error("\(error.localizedDescription)")
95-
}
91+
do {
92+
try await ingest(client: client, database: db, mode: .init(signature: signature))
93+
} catch {
94+
Current.logger().error("\(error.localizedDescription)")
95+
}
9696

97-
do {
98-
try await AppMetrics.push(client: client,
99-
jobName: "ingest")
100-
} catch {
101-
Current.logger().warning("\(error.localizedDescription)")
97+
do {
98+
try await AppMetrics.push(client: client,
99+
jobName: "ingest")
100+
} catch {
101+
Current.logger().warning("\(error.localizedDescription)")
102+
}
102103
}
103-
}
104-
}
105104

106-
107-
extension IngestCommand {
108-
static func resetMetrics() {
109-
AppMetrics.ingestMetadataSuccessCount?.set(0)
110-
AppMetrics.ingestMetadataFailureCount?.set(0)
105+
static func resetMetrics() {
106+
AppMetrics.ingestMetadataSuccessCount?.set(0)
107+
AppMetrics.ingestMetadataFailureCount?.set(0)
108+
}
111109
}
112-
}
113110

114111

115-
/// Ingest via a given mode: either one `Package` identified by its `Id` or a limited number of `Package`s.
116-
/// - Parameters:
117-
/// - client: `Client` object
118-
/// - database: `Database` object
119-
/// - mode: process a single `Package.Id` or a `limit` number of packages
120-
/// - Returns: future
121-
func ingest(client: Client,
122-
database: Database,
123-
mode: SPICommand.Mode) async throws {
124-
let start = DispatchTime.now().uptimeNanoseconds
125-
defer { AppMetrics.ingestDurationSeconds?.time(since: start) }
126-
127-
switch mode {
128-
case .id(let id):
129-
Current.logger().info("Ingesting (id: \(id)) ...")
130-
let pkg = try await Package.fetchCandidate(database, id: id)
131-
await ingest(client: client, database: database, packages: [pkg])
132-
133-
case .limit(let limit):
134-
Current.logger().info("Ingesting (limit: \(limit)) ...")
135-
let packages = try await Package.fetchCandidates(database, for: .ingestion, limit: limit)
136-
Current.logger().info("Candidate count: \(packages.count)")
137-
await ingest(client: client, database: database, packages: packages)
138-
139-
case .url(let url):
140-
Current.logger().info("Ingesting (url: \(url)) ...")
141-
let pkg = try await Package.fetchCandidate(database, url: url)
142-
await ingest(client: client, database: database, packages: [pkg])
112+
/// Ingest via a given mode: either one `Package` identified by its `Id` or a limited number of `Package`s.
113+
/// - Parameters:
114+
/// - client: `Client` object
115+
/// - database: `Database` object
116+
/// - mode: process a single `Package.Id` or a `limit` number of packages
117+
/// - Returns: future
118+
static func ingest(client: Client,
119+
database: Database,
120+
mode: SPICommand.Mode) async throws {
121+
let start = DispatchTime.now().uptimeNanoseconds
122+
defer { AppMetrics.ingestDurationSeconds?.time(since: start) }
123+
124+
switch mode {
125+
case .id(let id):
126+
Current.logger().info("Ingesting (id: \(id)) ...")
127+
let pkg = try await Package.fetchCandidate(database, id: id)
128+
await ingest(client: client, database: database, packages: [pkg])
129+
130+
case .limit(let limit):
131+
Current.logger().info("Ingesting (limit: \(limit)) ...")
132+
let packages = try await Package.fetchCandidates(database, for: .ingestion, limit: limit)
133+
Current.logger().info("Candidate count: \(packages.count)")
134+
await ingest(client: client, database: database, packages: packages)
135+
136+
case .url(let url):
137+
Current.logger().info("Ingesting (url: \(url)) ...")
138+
let pkg = try await Package.fetchCandidate(database, url: url)
139+
await ingest(client: client, database: database, packages: [pkg])
140+
}
143141
}
144-
}
145142

146143

147-
/// Main ingestion function. Fetched package metadata from hosting provider and updates `Repositoy` and `Package`s.
148-
/// - Parameters:
149-
/// - client: `Client` object
150-
/// - database: `Database` object
151-
/// - packages: packages to be ingested
152-
/// - Returns: future
153-
func ingest(client: Client,
154-
database: Database,
155-
packages: [Joined<Package, Repository>]) async {
156-
Current.logger().debug("Ingesting \(packages.compactMap {$0.model.id})")
157-
AppMetrics.ingestCandidatesCount?.set(packages.count)
158-
159-
await withTaskGroup(of: Void.self) { group in
160-
for pkg in packages {
161-
group.addTask {
162-
await Ingestion.ingest(client: client, database: database, package: pkg)
144+
/// Main ingestion function. Fetched package metadata from hosting provider and updates `Repositoy` and `Package`s.
145+
/// - Parameters:
146+
/// - client: `Client` object
147+
/// - database: `Database` object
148+
/// - packages: packages to be ingested
149+
/// - Returns: future
150+
static func ingest(client: Client,
151+
database: Database,
152+
packages: [Joined<Package, Repository>]) async {
153+
Current.logger().debug("Ingesting \(packages.compactMap {$0.model.id})")
154+
AppMetrics.ingestCandidatesCount?.set(packages.count)
155+
156+
await withTaskGroup(of: Void.self) { group in
157+
for pkg in packages {
158+
group.addTask {
159+
await ingest(client: client, database: database, package: pkg)
160+
}
163161
}
164162
}
165163
}
166-
}
167164

168165

169-
extension Ingestion {
170166
static func ingest(client: Client, database: Database, package: Joined<Package, Repository>) async {
171167
let result = await Result { () async throws(Ingestion.Error) -> Joined<Package, Repository> in
172168
@Dependency(\.environment) var environment

Sources/App/configure.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,7 @@ public func configure(_ app: Application) async throws -> String {
354354
app.asyncCommands.use(Analyze.Command(), as: "analyze")
355355
app.asyncCommands.use(CreateRestfileCommand(), as: "create-restfile")
356356
app.asyncCommands.use(DeleteBuildsCommand(), as: "delete-builds")
357-
app.asyncCommands.use(IngestCommand(), as: "ingest")
357+
app.asyncCommands.use(Ingestion.Command(), as: "ingest")
358358
app.asyncCommands.use(ReconcileCommand(), as: "reconcile")
359359
app.asyncCommands.use(TriggerBuildsCommand(), as: "trigger-builds")
360360
app.asyncCommands.use(ReAnalyzeVersions.Command(), as: "re-analyze-versions")

Tests/AppTests/ErrorReportingTests.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class ErrorReportingTests: AppTestCase {
3131
}
3232
}
3333

34-
func test_Ingestor_error_reporting() async throws {
34+
func test_Ingestion_error_reporting() async throws {
3535
// setup
3636
try await Package(id: .id0, url: "1", processingStage: .reconciliation).save(on: app.db)
3737
Current.fetchMetadata = { _, _, _ throws(Github.Error) in throw Github.Error.invalidURL("1") }
@@ -40,7 +40,7 @@ class ErrorReportingTests: AppTestCase {
4040
$0.date.now = .now
4141
} operation: {
4242
// MUT
43-
try await ingest(client: app.client, database: app.db, mode: .limit(10))
43+
try await Ingestion.ingest(client: app.client, database: app.db, mode: .limit(10))
4444
}
4545

4646
// validation

Tests/AppTests/IngestorTests.swift renamed to Tests/AppTests/IngestionTests.swift

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import S3Store
2222
import Vapor
2323

2424

25-
class IngestorTests: AppTestCase {
25+
class IngestionTests: AppTestCase {
2626

2727
func test_ingest_basic() async throws {
2828
// setup
@@ -38,7 +38,7 @@ class IngestorTests: AppTestCase {
3838
$0.date.now = .now
3939
} operation: {
4040
// MUT
41-
try await ingest(client: app.client, database: app.db, mode: .limit(10))
41+
try await Ingestion.ingest(client: app.client, database: app.db, mode: .limit(10))
4242
}
4343

4444
// validate
@@ -77,7 +77,7 @@ class IngestorTests: AppTestCase {
7777
Current.fetchLicense = { _, _, _ in Github.License(htmlUrl: "license") }
7878

7979
// MUT
80-
await ingest(client: app.client, database: app.db, packages: packages)
80+
await Ingestion.ingest(client: app.client, database: app.db, packages: packages)
8181

8282
do {
8383
// validate the second package's license is updated
@@ -310,7 +310,7 @@ class IngestorTests: AppTestCase {
310310
$0.date.now = .now
311311
} operation: {
312312
// MUT
313-
try await ingest(client: app.client, database: app.db, mode: .limit(testUrls.count))
313+
try await Ingestion.ingest(client: app.client, database: app.db, mode: .limit(testUrls.count))
314314
}
315315

316316
// validate
@@ -337,7 +337,7 @@ class IngestorTests: AppTestCase {
337337
$0.date.now = .now
338338
} operation: {
339339
// MUT
340-
try await ingest(client: app.client, database: app.db, mode: .limit(10))
340+
try await Ingestion.ingest(client: app.client, database: app.db, mode: .limit(10))
341341
}
342342

343343
// validate
@@ -389,7 +389,7 @@ class IngestorTests: AppTestCase {
389389
$0.date.now = .now
390390
} operation: {
391391
// MUT
392-
try await ingest(client: app.client, database: app.db, mode: .limit(10))
392+
try await Ingestion.ingest(client: app.client, database: app.db, mode: .limit(10))
393393
}
394394

395395
// validate repositories (single element pointing to the ingested package)
@@ -490,7 +490,7 @@ class IngestorTests: AppTestCase {
490490

491491
do { // first ingestion, no readme has been saved
492492
// MUT
493-
try await ingest(client: app.client, database: app.db, mode: .limit(1))
493+
try await Ingestion.ingest(client: app.client, database: app.db, mode: .limit(1))
494494

495495
// validate
496496
try await XCTAssertEqualAsync(await Repository.query(on: app.db).count(), 1)
@@ -506,7 +506,7 @@ class IngestorTests: AppTestCase {
506506
try await pkg.save(on: app.db)
507507

508508
// MUT
509-
try await ingest(client: app.client, database: app.db, mode: .limit(1))
509+
try await Ingestion.ingest(client: app.client, database: app.db, mode: .limit(1))
510510

511511
// validate
512512
try await XCTAssertEqualAsync(await Repository.query(on: app.db).count(), 1)
@@ -522,7 +522,7 @@ class IngestorTests: AppTestCase {
522522
try await pkg.save(on: app.db)
523523

524524
// MUT
525-
try await ingest(client: app.client, database: app.db, mode: .limit(1))
525+
try await Ingestion.ingest(client: app.client, database: app.db, mode: .limit(1))
526526

527527
// validate
528528
try await XCTAssertEqualAsync(await Repository.query(on: app.db).count(), 1)
@@ -573,7 +573,7 @@ class IngestorTests: AppTestCase {
573573
$0.date.now = .now
574574
} operation: {
575575
// MUT
576-
try await ingest(client: app.client, database: app.db, mode: .limit(1))
576+
try await Ingestion.ingest(client: app.client, database: app.db, mode: .limit(1))
577577
}
578578

579579
// There should only be one call as `storeS3ReadmeImages` takes the array of images.
@@ -604,7 +604,7 @@ class IngestorTests: AppTestCase {
604604
} operation: {
605605
// MUT
606606
let app = self.app!
607-
try await ingest(client: app.client, database: app.db, mode: .limit(1))
607+
try await Ingestion.ingest(client: app.client, database: app.db, mode: .limit(1))
608608
}
609609

610610
// validate

Tests/AppTests/MastodonTests.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ final class MastodonTests: AppTestCase {
6666
} operation: {
6767
// run first two processing steps
6868
try await reconcile(client: app.client, database: app.db)
69-
try await ingest(client: app.client, database: app.db, mode: .limit(10))
69+
try await Ingestion.ingest(client: app.client, database: app.db, mode: .limit(10))
7070

7171
// MUT - analyze, triggering the post
7272
try await Analyze.analyze(client: app.client,
@@ -86,7 +86,7 @@ final class MastodonTests: AppTestCase {
8686
try await withDependencies {
8787
$0.date.now = .now.addingTimeInterval(Constants.reIngestionDeadtime)
8888
} operation: {
89-
try await ingest(client: app.client, database: app.db, mode: .limit(10))
89+
try await Ingestion.ingest(client: app.client, database: app.db, mode: .limit(10))
9090

9191
// MUT - analyze, triggering posts if any
9292
try await Analyze.analyze(client: app.client,
@@ -104,7 +104,7 @@ final class MastodonTests: AppTestCase {
104104
// fast forward our clock by the deadtime interval again (*2) and re-ingest
105105
$0.date.now = .now.addingTimeInterval(Constants.reIngestionDeadtime * 2)
106106
} operation: {
107-
try await ingest(client: app.client, database: app.db, mode: .limit(10))
107+
try await Ingestion.ingest(client: app.client, database: app.db, mode: .limit(10))
108108
// MUT - analyze again
109109
try await Analyze.analyze(client: app.client,
110110
database: app.db,

Tests/AppTests/MetricsTests.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ class MetricsTests: AppTestCase {
120120
let pkg = try await savePackage(on: app.db, "1")
121121

122122
// MUT
123-
try await ingest(client: app.client, database: app.db, mode: .id(pkg.id!))
123+
try await Ingestion.ingest(client: app.client, database: app.db, mode: .id(pkg.id!))
124124

125125
// validation
126126
XCTAssert((AppMetrics.ingestDurationSeconds?.get()) ?? 0 > 0)

Tests/AppTests/PackageTests.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -350,8 +350,8 @@ final class PackageTests: AppTestCase {
350350
}
351351

352352
// run ingestion to progress package through pipeline
353-
try await ingest(client: app.client, database: app.db, mode: .limit(10))
354-
353+
try await Ingestion.ingest(client: app.client, database: app.db, mode: .limit(10))
354+
355355
// MUT & validate
356356
do {
357357
let pkg = try await XCTUnwrapAsync(try await Package.query(on: app.db).first())
@@ -378,7 +378,7 @@ final class PackageTests: AppTestCase {
378378
try await withDependencies {
379379
$0.date.now = .now.addingTimeInterval(Constants.reIngestionDeadtime)
380380
} operation: {
381-
try await ingest(client: app.client, database: app.db, mode: .limit(10))
381+
try await Ingestion.ingest(client: app.client, database: app.db, mode: .limit(10))
382382

383383
do {
384384
let pkg = try await XCTUnwrapAsync(try await Package.query(on: app.db).first())

Tests/AppTests/PipelineTests.swift

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -201,8 +201,8 @@ class PipelineTests: AppTestCase {
201201
}
202202

203203
// MUT - second stage
204-
try await ingest(client: app.client, database: app.db, mode: .limit(10))
205-
204+
try await Ingestion.ingest(client: app.client, database: app.db, mode: .limit(10))
205+
206206
do { // validate
207207
let packages = try await Package.query(on: app.db).sort(\.$url).all()
208208
XCTAssertEqual(packages.map(\.url), ["1", "2", "3"].asGithubUrls)
@@ -240,7 +240,7 @@ class PipelineTests: AppTestCase {
240240
}
241241

242242
// MUT - ingest again
243-
try await ingest(client: app.client, database: app.db, mode: .limit(10))
243+
try await Ingestion.ingest(client: app.client, database: app.db, mode: .limit(10))
244244

245245
do { // validate - only new package moves to .ingestion stage
246246
let packages = try await Package.query(on: app.db).sort(\.$url).all()
@@ -270,7 +270,7 @@ class PipelineTests: AppTestCase {
270270
$0.date.now = .now.addingTimeInterval(Constants.reIngestionDeadtime)
271271
} operation: {
272272
// MUT - ingest yet again
273-
try await ingest(client: app.client, database: app.db, mode: .limit(10))
273+
try await Ingestion.ingest(client: app.client, database: app.db, mode: .limit(10))
274274

275275
do { // validate - now all three packages should have been updated
276276
let packages = try await Package.query(on: app.db).sort(\.$url).all()

0 commit comments

Comments
 (0)