-
Notifications
You must be signed in to change notification settings - Fork 4
Enhanced Aircan with refactored codebase and v3 api support #40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 8 commits
6a916dc
42dc360
af23623
8f0af76
89b9afa
5f61f9e
7cc24ea
d5b0ffb
7826898
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,4 +2,4 @@ | |
| omit = | ||
| */site-packages/* | ||
| */python?.?/* | ||
| ckan/* | ||
| ckan/* | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,48 @@ | ||
| name: Tests | ||
| on: [push, pull_request] | ||
| jobs: | ||
| test: | ||
| runs-on: ubuntu-latest | ||
| container: | ||
| # The CKAN version tag of the Solr and Postgres containers should match | ||
| # the one of the container the tests run on. | ||
| # You can switch this base image with a custom image tailored to your project | ||
| image: ckan/ckan-dev:2.11 | ||
| services: | ||
| solr: | ||
| image: ckan/ckan-solr:2.11-solr9 | ||
| postgres: | ||
| image: ckan/ckan-postgres-dev:2.11 | ||
| env: | ||
| POSTGRES_USER: postgres | ||
| POSTGRES_PASSWORD: postgres | ||
| POSTGRES_DB: postgres | ||
| options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5 | ||
| redis: | ||
| image: redis:3 | ||
|
|
||
| env: | ||
| CKAN_SQLALCHEMY_URL: postgresql://ckan_default:pass@postgres/ckan_test | ||
| CKAN_DATASTORE_WRITE_URL: postgresql://datastore_write:pass@postgres/datastore_test | ||
| CKAN_DATASTORE_READ_URL: postgresql://datastore_read:pass@postgres/datastore_test | ||
| CKAN_SOLR_URL: http://solr:8983/solr/ckan | ||
| CKAN_REDIS_URL: redis://redis:6379/1 | ||
|
|
||
| steps: | ||
| - uses: actions/checkout@v4 | ||
| - name: Install requirements | ||
| # Install any extra requirements your extension has here (dev requirements, other extensions etc) | ||
| run: | | ||
| pip install -r requirements.txt | ||
| pip install -r dev-requirements.txt | ||
| pip install -e . | ||
| - name: Setup extension | ||
| # Extra initialization steps | ||
| run: | | ||
| # Replace default path to CKAN core config file with the one on the container | ||
| sed -i -e 's/use = config:.*/use = config:\/srv\/app\/src\/ckan\/test-core.ini/' test.ini | ||
|
|
||
| ckan -c test.ini db init | ||
| - name: Run tests | ||
| run: pytest --ckan-ini=test.ini --cov=ckanext.aircan --disable-warnings ckanext/aircan | ||
|
|
This file was deleted.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,6 @@ | ||
| include README.rst | ||
| include LICENSE | ||
| include requirements.txt | ||
| recursive-include ckanext/aircan_connector *.html *.json *.js *.less *.css *.mo | ||
| recursive-include ckanext/aircan *.html *.json *.js *.less *.css *.mo *.yml | ||
| recursive-include ckanext/aircan/migration *.ini *.py *.mako | ||
| recursive-include ckanext/aircan/public *.* |
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,134 +1,121 @@ | ||||||||||||||
| [](https://github.com/Datopian/ckanext-aircan/actions) | ||||||||||||||
|
|
||||||||||||||
| # ckanext-aircan | ||||||||||||||
| A CKAN extension that integrates Airflow orchestrating with CKAN. This extension allows you to trigger, monitor, and display the status and logs of Aiflow data ingestion flows directly from the CKAN interface. | ||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix typo: "Aiflow" → "Airflow". 📝 Proposed fix-A CKAN extension that integrates Airflow orchestrating with CKAN. This extension allows you to trigger, monitor, and display the status and logs of Aiflow data ingestion flows directly from the CKAN interface.
+A CKAN extension that integrates Airflow orchestration with CKAN. This extension allows you to trigger, monitor, and display the status and logs of Airflow data ingestion flows directly from the CKAN interface.📝 Committable suggestion
Suggested change
🧰 Tools🪛 LanguageTool[grammar] ~4-~4: Ensure spelling is correct (QB_NEW_EN_ORTHOGRAPHY_ERROR_IDS_1) 🤖 Prompt for AI Agents |
||||||||||||||
|
|
||||||||||||||
| A CKAN extension for integrating the [AirFlow-based AirCan Data Factory into CKAN][aircan]. Specifically, this extension provides: | ||||||||||||||
|
|
||||||||||||||
| [aircan]: https://tech.datopian.com/flows/#ckan-v3 | ||||||||||||||
| ## Features | ||||||||||||||
| - **Trigger Prefect Flows**: Automatically or manually submit CKAN resources for processing via Prefect. | ||||||||||||||
| - **Status & Logs**: View the current status and logs of Prefect flow runs associated with CKAN resources. | ||||||||||||||
|
Comment on lines
+7
to
+9
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Inconsistent terminology: Features mention "Prefect" but extension is for Airflow. The Features section references "Prefect Flows" and "Prefect flow runs," but the rest of the documentation consistently refers to Airflow. This creates confusion about what orchestration system the extension actually supports. 📝 Proposed fix ## Features
-- **Trigger Prefect Flows**: Automatically or manually submit CKAN resources for processing via Prefect.
-- **Status & Logs**: View the current status and logs of Prefect flow runs associated with CKAN resources.
+- **Trigger Airflow DAGs**: Automatically or manually submit CKAN resources for processing via Airflow.
+- **Status & Logs**: View the current status and logs of Airflow DAG runs associated with CKAN resources.📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||
|
|
||||||||||||||
| * New APIs in CKAN for triggering and monitoring workflows (DAGs) in AirFlow | ||||||||||||||
| * Hooking key events in CKAN into the AirFlow instance. Specifically, resource creation and update trigger DAGs in AirFlow to load resource data into the DataStore. See https://tech.datopian.com/load/#ckan-v3 | ||||||||||||||
| ## Requirements | ||||||||||||||
| - CKAN 2.11 or later (not tested on earlier versions) | ||||||||||||||
| - Python 3.8+ | ||||||||||||||
| - A running [Airflow](https://airflow.apache.org/) server or Prefect Cloud | ||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Requirements section also references "Prefect Cloud". This is another instance of the Prefect/Airflow terminology inconsistency. The extension is for Airflow, so "Prefect Cloud" should be removed or replaced. 📝 Proposed fix ## Requirements
- CKAN 2.11 or later (not tested on earlier versions)
- Python 3.8+
-- A running [Airflow](https://airflow.apache.org/) server or Prefect Cloud
+- A running [Airflow](https://airflow.apache.org/) server🤖 Prompt for AI Agents |
||||||||||||||
|
|
||||||||||||||
| ## Installation | ||||||||||||||
|
|
||||||||||||||
| ### Basic Setup | ||||||||||||||
|
|
||||||||||||||
| There are two potential cases: | ||||||||||||||
| **TODO:** Add any additional install steps to the list below. | ||||||||||||||
| For example installing any non-Python dependencies or adding any required | ||||||||||||||
| config settings. | ||||||||||||||
|
Comment on lines
+18
to
+20
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove or complete the TODO placeholder. Installation instructions should not contain placeholder text for users. Either complete the additional install steps or remove this block. 📝 Proposed fix-**TODO:** Add any additional install steps to the list below.
-For example installing any non-Python dependencies or adding any required
-config settings.
-
To install ckanext-aircan:📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||
|
|
||||||||||||||
| * Docker for Deployment: Install this extension in the usual way, see https://tech.datopian.com/ckan/install-extension.html | ||||||||||||||
| * Local Development Install manually via cloning the desired commit into the docker-ckan/src directory: `git@github.com:datopian/ckanext-aircan.git` | ||||||||||||||
| To install ckanext-aircan: | ||||||||||||||
|
|
||||||||||||||
| ### Configuration | ||||||||||||||
| 1. Activate your CKAN virtual environment, for example: | ||||||||||||||
|
|
||||||||||||||
| * Enable the extension in CKAN by adding the plugin `aircan_connector` to `CKAN__PLUGINS` list in your`.env`. Make sure to disable `datapusher` and `xloader` if you have them there as AirCan replaces them. | ||||||||||||||
| * Add details of the AirFlow instance -- details below for Local case and Cloud case | ||||||||||||||
| . /usr/lib/ckan/default/bin/activate | ||||||||||||||
|
|
||||||||||||||
| ### Local Airflow instance | ||||||||||||||
|
|
||||||||||||||
| * In your`.env` file add `CKAN__AIRFLOW__URL` according to [Apache AirFlow REST API Reference](https://airflow.apache.org/docs/stable/rest-api-ref#post--api-experimental-dags--DAG_ID--dag_runs). If you are running CKAN in a Docker container, make sure to specify the Airflow URL with `host.docker.internal` instead of `localhost`: `CKAN__AIRFLOW__URL=http://host.docker.internal:8080/api/experimental/dags/ckan_api_load_multiple_steps/dag_runs`. Also notice Airflow requires, by default, the endpoint `api/experimental/dags/DAG_ID` for API access. | ||||||||||||||
| * Add Airflow admin username and password for authorization: | ||||||||||||||
| `CKAN__AIRFLOW__USERNAME=airflow_amin_username` and `CKAN__AIRFLOW__PASSWORD=airflow_admin_password` | ||||||||||||||
| * Also in your `.env` file, specify a temporary directory for files: `CKAN__AIRFLOW__STORAGE_PATH=/tmp/` and `CKAN__AIRFLOW__CLOUD=local`. | ||||||||||||||
| 2. Clone the source and install it on the virtualenv | ||||||||||||||
|
|
||||||||||||||
| ### Airflow instance on Google Composer | ||||||||||||||
| git clone https://github.com/Datopian/ckanext-aircan.git | ||||||||||||||
| cd ckanext-aircan | ||||||||||||||
| pip install -e . | ||||||||||||||
|
|
||||||||||||||
| Assuming you already have a Google Cloud Composer properly set up, it is possible to trigger a DAG on GoogleCloud Platform following these steps: | ||||||||||||||
| 3. Add `aircan` to the `ckan.plugins` setting in your CKAN | ||||||||||||||
| config file (by default the config file is located at | ||||||||||||||
| `/etc/ckan/default/ckan.ini`). | ||||||||||||||
|
|
||||||||||||||
| * Download your credentials file (a JSON file) from Google Cloud Platform. Convert it to a single-line JSON. | ||||||||||||||
| * Set up the following environment variables on your `.env` file: | ||||||||||||||
| 4. Restart CKAN. For example if you've deployed CKAN with Apache on Ubuntu: | ||||||||||||||
|
|
||||||||||||||
| ```bash | ||||||||||||||
| CKAN__AIRFLOW__CLOUD=GCP # this line activates the integration with GCP | ||||||||||||||
| CKAN__AIRFLOW__CLOUD__PROJECT_ID=YOUR_PROJECT_ID_ON_COMPOSER | ||||||||||||||
| CKAN__AIRFLOW__CLOUD__LOCATION=us-east1_OR_OTHER | ||||||||||||||
| CKAN__AIRFLOW__CLOUD__COMPOSER_ENVIRONMENT=NAME_OF_COMPOSER_ENVIRONMENT | ||||||||||||||
| CKAN__AIRFLOW__CLOUD__WEB_UI_ID=ID_FROM_AIRFLOW_UI_ON_COMPOSER | ||||||||||||||
| CKAN__AIRFLOW__CLOUD__GOOGLE_APPLICATION_CREDENTIALS={ YOUR SINGLE LINE CREDENTIALS JSON FILE } | ||||||||||||||
| ``` | ||||||||||||||
| sudo service apache2 reload | ||||||||||||||
|
|
||||||||||||||
| ## Getting Started | ||||||||||||||
| ## Config settings | ||||||||||||||
|
|
||||||||||||||
| ### Triggering a Workflow (DAG) | ||||||||||||||
| ``` | ||||||||||||||
| ckanext.aircan.endpoint = http://localhost:8080 | ||||||||||||||
| ckanext.aircan.server = gcp | local | ||||||||||||||
| ckanext.aircan.dag_id = example_dag | ||||||||||||||
| ckanext.aircan.api_version = v1 | v2 | ||||||||||||||
|
|
||||||||||||||
| Make a request to `http://YOUR-CKAN:5000/api/3/action/aircan_submit?dag_name=DAG_NAME`, specifying your `CKAN_API_KEY` on the header and send the following information on the body of the request, replacing the values accordingly: | ||||||||||||||
| # If GCP server is used | ||||||||||||||
| ckanext.aircan.google_credentials_json = | ||||||||||||||
|
|
||||||||||||||
| ```json | ||||||||||||||
| { | ||||||||||||||
| "package_id": "YOUR_PACKAGE_ID", | ||||||||||||||
| "url": "http://url.for.your.resource.com", | ||||||||||||||
| "description": "This is the best resource ever!" , | ||||||||||||||
| "schema": { | ||||||||||||||
| "fields": [ | ||||||||||||||
| { | ||||||||||||||
| "name": "FID", | ||||||||||||||
| "type": "int", | ||||||||||||||
| "format": "default" | ||||||||||||||
| }, | ||||||||||||||
| { | ||||||||||||||
| "name": "another-field", | ||||||||||||||
| "type": "float", | ||||||||||||||
| "format": "default" | ||||||||||||||
| } | ||||||||||||||
| ] | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
| # If local server is used | ||||||||||||||
| ckanext.aircan.airflow_password = | ||||||||||||||
| ckanext.aircan.airflow_username = | ||||||||||||||
| ``` | ||||||||||||||
|
|
||||||||||||||
| Replace `dag_name` with the DAG you want to invoke, for example, `http://YOUR-CKAN:5000/api/3/action/aircan_submit?dag_name=ckan_api_load_gcp`. This will trigger the DAG `ckan_api_load_gcp`. | ||||||||||||||
| ## Developer installation | ||||||||||||||
| To install ckanext-aircan for development, activate your CKAN virtualenv and | ||||||||||||||
| do: | ||||||||||||||
|
|
||||||||||||||
| NB: the DAG `ckan_api_load_gcp` is designed for Google Cloud Composer AirFlow instance and will load a resource into the DataStore. | ||||||||||||||
| git clone https://github.com/Datopian/ckanext-aircan.git | ||||||||||||||
| cd ckanext-aircan | ||||||||||||||
| pip install -e . | ||||||||||||||
| pip install -r dev-requirements.txt | ||||||||||||||
|
|
||||||||||||||
| The endpoint `http://YOUR-CKAN:5000/api/3/action/resource_create` produces the same effect of `http://YOUR-CKAN:5000/api/3/action/aircan_submit?dag_name=DAG_NAME`. Make sure you set up an extra variable on your `.env` file specifying the DAG you want to trigger: | ||||||||||||||
|
|
||||||||||||||
| ``` | ||||||||||||||
| # .env | ||||||||||||||
| # all other variables | ||||||||||||||
| CKAN__AIRFLOW__CLOUD__DAG_NAME=DAG_YOU_WANT_TO_TRIGGER | ||||||||||||||
| ``` | ||||||||||||||
| ## For CKAN Datastore data loader dag | ||||||||||||||
| ## Endpoints | ||||||||||||||
|
|
||||||||||||||
| * Add `ckanext.aircan.load_with_postgres_copy=True` env to load with postgres copy loader. By default it loads with datastore API. | ||||||||||||||
| * Add `ckanext.aircan.datastore_chunk_insert_rows_size=300` env variable to configure number of records to send a request to datastore. Default 250 rows. | ||||||||||||||
| * addd `append_or_update_datastore = true` if new data schema matches with old schema append or update data, otherwise create new table | ||||||||||||||
| * add `ckanext.aircan.enable_datastore_upload_configuration=true` to enable the upload configuration UI option. | ||||||||||||||
| * add `ckanext.aircan.notification_to = author, maintainer, editor, someone@gmail.com` failure email notification sent to. | ||||||||||||||
| * add `ckanext.aircan.notification_from = sender@gmail.com` failure notification from email. | ||||||||||||||
| * add `ckanext.aircan.notification_subject` configure notification subject. | ||||||||||||||
| This extension adds three CKAN Action API endpoints: | ||||||||||||||
|
|
||||||||||||||
| * **`/api/3/action/aircan_submit`** | ||||||||||||||
| Triggers an Airflow DAG run for a specified CKAN **resource**. | ||||||||||||||
|
|
||||||||||||||
| ### Update aircan run status | ||||||||||||||
| The `aircan_status_update` API can be use to store or update the run status for given resource. It accepts the POST request with authorized user. | ||||||||||||||
| ```json | ||||||||||||||
| { | ||||||||||||||
| "resource_id": "a4a520aa-c790-4b53-93aa-de61e1a2813c", | ||||||||||||||
| "state": "progress", | ||||||||||||||
| "message":"Pusing dataset records.", | ||||||||||||||
| "dag_run_id":"394a1f0f-d8b3-47f2-9a51-08732349b785", | ||||||||||||||
| "error": { | ||||||||||||||
| "message" : "Failed to push data records." | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
| ``` | ||||||||||||||
| * **`/api/3/action/aircan_status`** | ||||||||||||||
| Returns the status of the **most recent** Airflow DAG run for a specified CKAN **resource**. | ||||||||||||||
|
|
||||||||||||||
| * **`/api/3/action/aircan_status_logs`** | ||||||||||||||
| Updates (or appends) progress logs on CKAN for a resource based on messages emitted during an Airflow DAG run. | ||||||||||||||
|
|
||||||||||||||
| ### Retrieving aircan run status | ||||||||||||||
| Use `aircan_status` API to get aircan run status for given resource providing resource id. | ||||||||||||||
| eg. | ||||||||||||||
| `http://YOUR-CKAN:5000/api/3/action/aircan_status` | ||||||||||||||
|
|
||||||||||||||
| Example request body used by `aircan_status_logs` (and/or as the shape of a status/log record): | ||||||||||||||
|
|
||||||||||||||
| ```json | ||||||||||||||
| { | ||||||||||||||
| "resource_id": "a4a520aa-c790-4b53-93aa-de61e1a2813c" | ||||||||||||||
| "dag_run_id": "6591d0db-053e-4d9d-98d3-a0ce8f9a004d", | ||||||||||||||
| "resource_id": "63b3d77e-032f-4ef0-8790-cc81d0509d5f", | ||||||||||||||
| "state": "running", | ||||||||||||||
| "message": "Queued for processing (dag_run_id=6591d0db-053e-4d9d-98d3-a0ce8f9a004d).", | ||||||||||||||
| "type": "info", | ||||||||||||||
| "error": null, | ||||||||||||||
| "clear_logs": false | ||||||||||||||
| } | ||||||||||||||
| ``` | ||||||||||||||
|
|
||||||||||||||
| # Tests with Cypress | ||||||||||||||
| ### Field descriptions | ||||||||||||||
| * **`resource_id`** *(string, required)*: CKAN resource UUID. | ||||||||||||||
| * **`dag_run_id`** *(string, required)*: Airflow DAG run identifier. | ||||||||||||||
| * **`state`** *(string, optional)*: Current run state (e.g. `queued`, `running`, `success`, `failed`). | ||||||||||||||
| * **`message`** *(string, optional)*: Human-readable progress or status message. It will time-stamped and stored in CKAN appended to previous messages unless `clear_logs` is set to `true`. | ||||||||||||||
| * **`type`** *(string, optional)*: Log level/category, e.g. `info`, `warning`, `error`. Default is `info`. | ||||||||||||||
| * **`error`** *(string|null, optional)*: Error details (use `null` when not applicable). | ||||||||||||||
| * **`clear_logs`** *(boolean, optional)*: If `true`, clears existing logs otherwise appends to them keep a history. | ||||||||||||||
|
|
||||||||||||||
| Test the aircan-connector with cypress. | ||||||||||||||
| ## Extending Aircan payload | ||||||||||||||
| You can extend the payload sent to Airflow by implementing the `IAircan` interface in your own CKAN plugin. | ||||||||||||||
|
|
||||||||||||||
| ## Installation | ||||||||||||||
|
|
||||||||||||||
| `npm install` | ||||||||||||||
|
|
||||||||||||||
| ## Running | ||||||||||||||
| ``` | ||||||||||||||
| class ExamplePlugin(plugins.SingletonPlugin): | ||||||||||||||
| plugins.implements(plugins.IConfigurer) | ||||||||||||||
| plugins.implements(interfaces.IAircan) | ||||||||||||||
|
|
||||||||||||||
| def update_payload(self, context, payload): | ||||||||||||||
| payload['new_field'] = tk.config.get('ckanext.example.new_field') | ||||||||||||||
| return payload | ||||||||||||||
|
|
||||||||||||||
| ``` | ||||||||||||||
|
|
||||||||||||||
| Opens up the cypress app and you can choose the specs to run. | ||||||||||||||
| ## License | ||||||||||||||
|
|
||||||||||||||
| `npm test` | ||||||||||||||
| [AGPL](https://www.gnu.org/licenses/agpl-3.0.en.html) | ||||||||||||||
This file was deleted.
This file was deleted.
This file was deleted.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,10 @@ | ||
| ckan.module("aircan-module", function ($, _) { | ||
| "use strict"; | ||
| return { | ||
| options: { | ||
| debug: false, | ||
| }, | ||
|
|
||
| initialize: function () {}, | ||
| }; | ||
| }); |
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove duplicate
__pycache__/entry.Line 6 already includes
__pycache__/in the "Byte-compiled / optimized / DLL files" section. The duplicate entry on line 22 is redundant and should be removed to maintain clarity.🧹 Proposed fix
*.spec # Installer logs pip-log.txt pip-delete-this-directory.txt # Unit test / coverage reports htmlcov/ .tox/ .coverage .cache nosetests.xml coverage.xml # Sphinx documentation -__pycache__/ docs/_build/🤖 Prompt for AI Agents
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@copilot In @.gitignore at line 22, Remove the duplicate "pycache/" entry in the
.gitignore (the second occurrence around line 22) since it is already listed
earlier in the "Byte-compiled / optimized / DLL files" section; delete the
redundant line and scan the rest of .gitignore for any other duplicate entries
to keep the file concise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.