@@ -162,28 +162,98 @@ Tells Dagster where to find the code.
162162
163163## Assets (Data Products)
164164
165- **6 assets, one per package ecosystem:**
166-
167- | Asset Name | Registry | Schedule | Description |
168- |------------|----------|----------|-------------|
169- | ` pypi_packages` | PyPI | Daily 10:00 AM | Python packages |
170- | `npm_packages` | NPM | Daily 12:00 PM | Node.js packages |
171- | `maven_packages` | Maven Central | Daily 2:00 PM | Java packages |
172- | `cargo_packages` | crates.io | Daily 4:00 PM | Rust packages |
173- | `rubygems_packages` | RubyGems | Daily 6:00 PM | Ruby packages |
174- | `nuget_packages` | NuGet | Daily 8:00 PM | .NET packages |
175-
176- **Asset Structure** (example):
165+ **12 assets total: 6 ingestion assets + 6 update assets (one per package ecosystem):**
166+
167+ ### Ingestion Assets (Weekly - Sundays)
168+ Process new packages that don't exist in the graph. State: **STOPPED** (manual activation required).
169+
170+ | Asset Name | Registry | Schedule | Time | Description |
171+ |------------|----------|----------|------|-------------|
172+ | ` pypi_package_ingestion` | PyPI | Weekly | 2:00 AM | Ingests new Python packages (~500k packages) |
173+ | `npm_package_ingestion` | NPM | Weekly | 3:00 AM | Ingests new Node.js packages (~3M packages) |
174+ | `maven_package_ingestion` | Maven Central | Weekly | 4:00 AM | Ingests new Java packages (~500k-1M unique) |
175+
176+ # ## Update Assets (Daily)
177+ Update existing packages with new versions. State : **RUNNING** (active by default).
178+
179+ | Asset Name | Registry | Schedule | Time | Description |
180+ |------------|----------|----------|------|-------------|
181+ | `pypi_packages_updates` | PyPI | Daily | 10:00 AM | Updates Python package versions |
182+ | `npm_packages_updates` | NPM | Daily | 12:00 PM | Updates Node.js package versions |
183+ | `maven_packages` | Maven Central | Daily | 2:00 PM | Updates Java package versions |
184+ | `cargo_packages` | crates.io | Daily | 4:00 PM | Updates Rust package versions |
185+ | `rubygems_packages` | RubyGems | Daily | 6:00 PM | Updates Ruby package versions |
186+ | `nuget_packages` | NuGet | Daily | 8:00 PM | Updates .NET package versions |
187+
188+ # ## Ingestion Asset Architecture
189+
190+ **Purpose**: Discover and extract ALL packages from registries, adding only those that don't exist in the graph.
191+
192+ **Process Flow**:
193+ ```
194+ Registry API → Fetch All Package Names → Check Graph → Extract if New → Store
195+ ↓ ↓ ↓ ↓ ↓
196+ PyPI/NPM/ List of all read_package Extractor Neo4j
197+ Maven package names _ by_name() .run() Graph
198+ (~ 500k-3M)
199+ ```
200+
201+ **Key Features**:
202+ - **Incremental**: Only processes packages not in graph
203+ - **Efficient**: Uses caching (1 hour TTL) and set-based deduplication
204+ - **Observable**: Logs every 100 new packages, every 1000 skipped
205+ - **Resilient**: Continues on errors, reports statistics
206+ - **Resource-aware**: STOPPED by default due to intensive nature
207+
208+ **Ingestion Asset Structure** (example - PyPI):
209+ ```python
210+ @asset(
211+ description="Ingests new PyPI packages from the Python Package Index",
212+ group_name="pypi",
213+ compute_kind="python",
214+ )
215+ def pypi_package_ingestion(
216+ context: AssetExecutionContext,
217+ pypi_service: PyPIServiceResource,
218+ package_service: PackageServiceResource,
219+ version_service: VersionServiceResource,
220+ attributor: AttributorResource,
221+ ) -> Output[dict[str, Any]]:
222+ # 1. Fetch all package names from registry
223+ all_package_names = await pypi_svc.fetch_all_package_names()
224+
225+ # 2. Check each package
226+ for package_name in all_package_names:
227+ existing = await package_svc.read_package_by_name("PyPIPackage", package_name)
228+
229+ if not existing:
230+ # 3. Create and run extractor for new packages
231+ extractor = PyPIPackageExtractor(...)
232+ await extractor.run()
233+
234+ # 4. Return metrics
235+ return Output(value=stats, metadata={...})
236+ ```
237+
238+ ** Ingestion Metrics** :
239+ - ` total_in_registry ` : Total packages in the registry
240+ - ` new_packages_ingested ` : New packages added to graph
241+ - ` skipped_existing ` : Packages already in graph
242+ - ` errors ` : Errors encountered
243+ - ` ingestion_rate ` : Percentage of new packages
244+
245+ ### Update Asset Structure
246+
247+ ** Update Asset Structure** (example - PyPI):
177248``` python
178249@asset (
179250 description = " Updates PyPI package versions" ,
180251 group_name = " pypi"
181252)
182- def pypi_packages (
253+ def pypi_packages_updates (
183254 pypi_service : PyPIServiceResource,
184255 package_service : PackageServiceResource,
185256 version_service : VersionServiceResource,
186- vulnerability_service: VulnerabilityServiceResource,
187257 attributor : AttributorResource
188258) -> Output[dict ]:
189259 # Business logic here
@@ -192,12 +262,81 @@ def pypi_packages(
192262 return Output(result, metadata = {... })
193263```
194264
195- **Each asset returns metadata **:
265+ ** Update Metrics ** :
196266- ` packages_processed ` : Number of packages updated
197267- ` total_versions ` : Total versions in system
198268- ` errors ` : Errors encountered
199269- ` success_rate ` : Percentage of successful updates
200270
271+ ## Registry-Specific Implementation Details
272+
273+ ### PyPI Ingestion
274+ - ** Endpoint** : ` https://pypi.org/simple/ `
275+ - ** Method** : HTML parsing with regex extraction
276+ - ** Volume** : ~ 500,000 packages
277+ - ** Deduplication** : Not needed (Simple index returns unique packages)
278+ - ** Cache Key** : ` all_pypi_packages `
279+
280+ ### NPM Ingestion
281+ - ** Endpoint** : ` https://replicate.npmjs.com/_all_docs `
282+ - ** Method** : JSON document listing
283+ - ** Volume** : ~ 3,000,000 packages
284+ - ** Deduplication** : Filters ` _design/ ` documents
285+ - ** Normalization** : Converts to lowercase
286+ - ** Cache Key** : ` all_npm_packages `
287+
288+ ### Maven Ingestion
289+ - ** Endpoint** : ` https://search.maven.org/solrsearch/select?q=*:* `
290+ - ** Method** : Solr pagination (1000 per batch)
291+ - ** Volume** : ~ 10,000,000 artifacts → ~ 500,000-1,000,000 unique packages
292+ - ** Deduplication** : Uses ` set ` for O(1) lookup (group_id: artifact_id combinations)
293+ - ** Note** : Each version is a separate artifact, we extract unique group_id: artifact_id pairs
294+ - ** Optimization** :
295+ - Set-based deduplication (O(1) vs O(n))
296+ - Progress logs every 10k artifacts
297+ - 0.1s delay between batches to avoid rate limiting
298+ - ** Cache Key** : ` all_mvn_packages `
299+
300+ ** Maven Deduplication Example** :
301+ ``` python
302+ seen_packages = set () # O(1) lookup
303+ for doc in docs:
304+ package_key = f " { group_id} : { artifact_id} "
305+ if package_key not in seen_packages:
306+ seen_packages.add(package_key)
307+ all_packages.append({... })
308+ ```
309+
310+ ### NuGet Ingestion
311+ - ** Endpoint** : ` https://azuresearch-usnc.nuget.org/query `
312+ - ** Method** : Search API with pagination (1000 per batch, skip-based)
313+ - ** Volume** : ~ 400,000 packages
314+ - ** Deduplication** : Not needed (Search API returns unique packages)
315+ - ** Rate Limiting** : 0.5s delay between requests
316+ - ** Special Features** :
317+ - Extracts vendor from ` authors ` field (first author)
318+ - Fetches version-specific metadata via catalog service
319+ - Supports version listing and requirements extraction
320+ - ** Cache Key** : ` all_nuget_packages `
321+
322+ ### Cargo Ingestion
323+ - ** Endpoint** : ` https://crates.io/api/v1/crates `
324+ - ** Method** : Page-based pagination (100 crates per page)
325+ - ** Volume** : ~ 150,000 crates
326+ - ** Deduplication** : Not needed (API returns unique crates)
327+ - ** Rate Limiting** : Crates.io requires User-Agent header
328+ - ** Note** : Uses page parameter instead of skip/limit
329+ - ** Cache Key** : ` all_cargo_packages `
330+
331+ ### RubyGems Ingestion
332+ - ** Endpoint** : ` https://rubygems.org/api/v1/gems.json `
333+ - ** Method** : Sequential page-based pagination
334+ - ** Volume** : ~ 180,000 gems
335+ - ** Deduplication** : Not needed (API returns unique gems)
336+ - ** Rate Limiting** : 0.2s delay between requests
337+ - ** Termination** : Continues until empty response
338+ - ** Cache Key** : ` all_rubygems_packages `
339+
201340## Resources (Dependency Injection)
202341
203342** 10 ConfigurableResource classes** in ` src/dagster_app/resources/__init__.py ` :
@@ -215,6 +354,63 @@ def pypi_packages(
215354
216355Resources are configured in ` defs ` and injected into assets.
217356
357+ ## API Services Enhancement
358+
359+ ### New Methods for Package Ingestion
360+
361+ ** PyPI Service** (` src/services/apis/pypi_api.py ` ):
362+ ``` python
363+ async def fetch_all_package_names (self ) -> list[str ]:
364+ """ Fetches all package names from PyPI Simple index"""
365+ # Returns ~500k package names
366+ # Uses HTML parsing + regex
367+ # Cache: 1 hour
368+ ```
369+
370+ ** NPM Service** (` src/services/apis/npm_api.py ` ):
371+ ``` python
372+ async def fetch_all_package_names (self ) -> list[str ]:
373+ """ Fetches all package names from NPM registry"""
374+ # Returns ~3M package names
375+ # Uses _all_docs endpoint
376+ # Cache: 1 hour
377+
378+ async def get_versions (self , metadata : dict ) -> list[dict ]:
379+ """ Extract ordered versions from metadata"""
380+
381+ async def fetch_package_version_metadata (self , package_name : str , version : str ) -> dict :
382+ """ Fetch metadata for specific version"""
383+
384+ async def get_package_requirements (self , version_metadata : dict ) -> dict[str , str ]:
385+ """ Get dependencies from version metadata"""
386+ ```
387+
388+ ** Maven Service** (` src/services/apis/maven_api.py ` ):
389+ ``` python
390+ async def fetch_all_packages (self ) -> list[dict[str , str ]]:
391+ """
392+ Fetches all unique packages (group_id:artifact_id) from Maven Central.
393+
394+ Key Points:
395+ - Maven has ~10M artifacts (each version counts)
396+ - Returns ~500k-1M unique packages
397+ - Uses set-based deduplication (O(1) lookup)
398+ - Batch size: 1000 per request
399+ - No artificial limit (processes all)
400+ - Progress logs every 10k artifacts
401+ - Cache: 1 hour
402+
403+ Returns: [{"group_id": "...", "artifact_id": "...", "name": "..."}]
404+ """
405+ ```
406+
407+ ** Performance Optimizations** :
408+ - ** Caching** : 1-hour TTL reduces repeated API calls
409+ - ** Set-based deduplication** : O(1) vs O(n) for Maven uniqueness
410+ - ** Batch processing** : 1000 items per request for optimal throughput
411+ - ** Rate limiting** : 0.1s delay between Maven requests
412+ - ** Progress logging** : Every 10k items for observability
413+
218414## Environment Variables
219415
220416** Required in .env:**
@@ -230,6 +426,14 @@ VULN_DB_URI='mongodb://user:pass@mongo:27017/admin'
230426VULN_DB_USER=' mongoSecureChain'
231427VULN_DB_PASSWORD=' your-password'
232428
429+ # Redis Configuration (Queue Management)
430+ REDIS_HOST=localhost
431+ REDIS_PORT=6379
432+ REDIS_DB=0
433+ REDIS_STREAM=package_extraction
434+ REDIS_GROUP=extractors
435+ REDIS_CONSUMER=package-consumer
436+
233437# Dagster PostgreSQL
234438POSTGRES_USER=dagster
235439POSTGRES_PASSWORD=your-password
@@ -244,6 +448,8 @@ DAGSTER_HOME=/opt/dagster/dagster_home
244448PYTHONPATH=/opt/dagster/app
245449```
246450
451+ ** Note** : All environment variables are managed through the ` Settings ` class in ` src/settings.py ` using Pydantic Settings for validation and type safety.
452+
247453## Common Operations
248454
249455### Starting Services
@@ -304,15 +510,48 @@ docker compose down -v # Remove data
304510
305511### When Adding New Package Ecosystem
306512
307- 1. Create API service in `src/services/apis/new_registry_service.py`
308- 2. Create schema in `src/schemas/new_package_schema.py`
309- 3. Create extractor in `src/processes/extractors/new_extractor.py`
310- 4. Create updater in `src/processes/updaters/new_updater.py`
311- 5. Create asset in `src/dagster_app/assets/new_assets.py`
312- 6. Create resource in `src/dagster_app/resources/__init__.py`
313- 7. Create schedule in `src/dagster_app/schedules.py`
314- 8. Import asset in `src/dagster_app/assets/__init__.py`
315- 9. Register resource and schedule in `src/dagster_app/__init__.py`
513+ 1 . ** Create API service** in ` src/services/apis/new_registry_service.py `
514+ - Implement ` fetch_all_package_names() ` or ` fetch_all_packages() ` for ingestion
515+ - Implement version fetching and metadata retrieval methods
516+ - Add caching with appropriate TTL
517+
518+ 2 . ** Create schema** in ` src/schemas/new_package_schema.py `
519+ - Use Pydantic BaseModel
520+ - Include all required fields (name, vendor, repository_url, etc.)
521+
522+ 3 . ** Create extractor** in ` src/processes/extractors/new_extractor.py `
523+ - Extend ` PackageExtractor ` base class
524+ - Implement package creation and dependency extraction
525+
526+ 4 . ** Create updater** in ` src/processes/updaters/new_updater.py `
527+ - Implement version update logic
528+ - Handle package metadata updates
529+
530+ 5 . ** Create ingestion asset** in ` src/dagster_app/assets/new_assets.py `
531+ - Create ` new_package_ingestion ` for initial bulk ingestion
532+ - Follow the pattern: fetch all → check existence → extract if new
533+ - Return ingestion metrics (total, new, skipped, errors)
534+
535+ 6 . ** Create update asset** in ` src/dagster_app/assets/new_assets.py `
536+ - Create ` new_packages_updates ` for daily version updates
537+ - Follow the pattern: batch read → update → report metrics
538+
539+ 7 . ** Create resource** in ` src/dagster_app/resources/__init__.py `
540+ - Extend ` ConfigurableResource `
541+ - Create factory method to instantiate service
542+
543+ 8 . ** Create schedules** in ` src/dagster_app/schedules.py `
544+ - Create ingestion schedule (weekly, STOPPED by default)
545+ - Create update schedule (daily, RUNNING by default)
546+ - Space out timing to avoid conflicts
547+
548+ 9 . ** Import assets** in ` src/dagster_app/assets/__init__.py `
549+ - Import both ingestion and update assets
550+ - Add to ` __all__ ` list
551+
552+ 10 . ** Register in main module** in ` src/dagster_app/__init__.py `
553+ - Add resource to resources dict
554+ - Schedules auto-discovered from ` all_schedules `
316555
317556## Troubleshooting
318557
@@ -392,6 +631,7 @@ docker compose exec dagster-webserver \
392631
393632---
394633
395- **Last Updated**: October 8 , 2025
634+ **Last Updated**: October 9 , 2025
396635**Dagster Version**: 1.11.13
397- **Python Version**: 3.12
636+ **Python Version**: 3.12
637+ **New Features**: Package ingestion assets for PyPI, NPM, and Maven with optimized deduplication and caching
0 commit comments