Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* @aMAAmina
39 changes: 39 additions & 0 deletions .github/workflows/cd.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
name: CD Pipeline

on:
push:
branches:
- main
workflow_dispatch: {}

env:
VERSION: v1.${{ github.run_number }}

jobs:
release-main:
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4

- name: Build Docker (prod)
run: docker build -t nifipulse-prod .

- name: Create and push git tag
run: |
echo "Tagging version: $VERSION"
git tag "$VERSION"
git push origin "$VERSION"

- name: Log in to Docker Hub
run: |
echo "${{ secrets.DOCKER_PASSWORD }}" \
| docker login -u "${{ secrets.DOCKER_USERNAME }}" --password-stdin

- name: Push Docker image
run: |
docker tag nifipulse-prod myregistry/nifipulse:${{ env.VERSION }}
docker push myregistry/nifipulse:${{ env.VERSION }}
docker tag nifipulse-prod myregistry/nifipulse:latest
docker push myregistry/nifipulse:latest
69 changes: 69 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
name: CI Pipeline

on:
push:
branches:
- dev
pull_request:
branches:
- staging
- main

env:
NIFIPULSE_AUTO_ENV: "1"
PGUSER: postgres
PGPASSWORD: postgres
PGHOST: localhost
PGPORT: "5432"
PGDATABASE: metrics_db

jobs:
build-test-dev:
if: github.ref == 'refs/heads/dev'
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4

# Install dependencies (change this depending on your stack)
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'

- name: Install dependencies
run: |
pip install -U pip
pip install -e .
pip install flake8

- name: Lint
run: |
pip install flake8
flake8 .

- name: Docker build (dev)
run: |
docker build -t nifipulse-dev .

integration-test-staging:
if: github.event.pull_request.base.ref == 'staging'
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4

- name: Docker build
run: docker build -t nifipulse-staging .

- name: Docker compose up
run: docker compose -f docker-compose.yml up -d

- name: Wait for services
run: sleep 15

- name: Integration tests
run: |
pip install -r tests/requirements.txt
pytest tests/integration

74 changes: 74 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
name: Tests

on:
push:
branches: [ dev ]
pull_request:
branches: [ dev, staging, main ]

jobs:
unit-integration:
runs-on: ubuntu-latest

services:
postgres:
image: postgres:15-alpine
env:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: metrics_db
ports:
- 5432:5432
options: >-
--health-cmd="pg_isready -U postgres -d metrics_db"
--health-interval=5s
--health-timeout=5s
--health-retries=12

steps:
- uses: actions/checkout@v4

- uses: actions/setup-python@v5
with:
python-version: '3.11'

- name: Install project and test deps
run: |
pip install -U pip
pip install -e .
pip install pytest "sqlalchemy>=2" psycopg2-binary pandas requests

- name: Wait for Postgres
run: |
python - <<'PY'
import time, psycopg2
for i in range(30):
try:
psycopg2.connect(host="localhost", port=5432, user="postgres", password="postgres", dbname="metrics_db").close()
print("Postgres ready"); break
except Exception as e:
print("Waiting...", e); time.sleep(2)
else:
raise SystemExit("Postgres not ready")
PY

- name: Initialize schema (SQLAlchemy)
run: |
python - <<'PY'
from sqlalchemy import create_engine, text
ddl = """
CREATE TABLE IF NOT EXISTS dim_instance (instance_id SERIAL PRIMARY KEY, instance_name TEXT UNIQUE NOT NULL);
CREATE TABLE IF NOT EXISTS dim_metric (metric_id SERIAL PRIMARY KEY, metric_name TEXT UNIQUE NOT NULL, original_unit TEXT);
CREATE TABLE IF NOT EXISTS dim_component (component_id SERIAL PRIMARY KEY, component_name TEXT NOT NULL, component_type TEXT NOT NULL, UNIQUE (component_name, component_type));
CREATE TABLE IF NOT EXISTS dim_date (date_id SERIAL PRIMARY KEY, timestamp_utc TIMESTAMPTZ UNIQUE NOT NULL, year INT, month INT, day INT, hour INT, minute INT, second INT);
CREATE TABLE IF NOT EXISTS fact_metrics (fact_id SERIAL PRIMARY KEY, date_id INT REFERENCES dim_date(date_id), instance_id INT REFERENCES dim_instance(instance_id), metric_id INT REFERENCES dim_metric(metric_id), component_id INT REFERENCES dim_component(component_id), value DOUBLE PRECISION, UNIQUE (date_id, instance_id, metric_id, component_id));
"""
eng = create_engine("postgresql+psycopg2://postgres:postgres@localhost:5432/metrics_db")
with eng.begin() as c:
for stmt in [s.strip() for s in ddl.split(';') if s.strip()]:
c.execute(text(stmt))
print("Schema ready")
PY

- name: Run tests
run: pytest -q
3 changes: 2 additions & 1 deletion Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ After running `docker compose up` run:
- Verify tables:
1. Ubuntu: `docker exec -it postgres sh -c 'psql -U postgres -d metrics_db -c "\dt"'`
2. Windows: `docker exec -it postgres psql -U postgres -d metrics_db -c "\dt"`
- Run: `pip install -r requirements.txt`
- Run: `pip install -e .`
- Run ETL: `nifipulse --poll <number_of_polls_10_by_default_0_for_infinite>`
- Run quick sanity check:
Expand Down Expand Up @@ -80,4 +81,4 @@ Nifi Registery config

This project is licensed under the Apache License 2.0. See the `LICENSE` file for the full license text.

Copyright (c) 2025 Amina BOUHAMRA, Fadwa EL AMRAOUI, Nawar TOUMI, Soukaina BOUCETTA
Copyright (c) 2025 Amina BOUHAMRA, Fadwa EL AMRAOUI, Nawar TOUMI, Soukayna BOUCETTA
39 changes: 35 additions & 4 deletions nifipulse/config.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,22 @@
import os
from urllib.parse import quote_plus

class BaseConfig:
# Shared defaults
RESULTS_DIR = "results"
PROM_URL = "http://localhost:9090/api/v1/query"
CSV_SINK = "results/prometheus_metrics_log.csv"
CLEAN_DATA = "results/nifi_metrics_propre.csv"
RESULTS_DIR = os.getenv("RESULTS_DIR", "results")
CSV_SINK = os.getenv("CSV_SINK", os.path.join(RESULTS_DIR, "prometheus_metrics_log.csv"))
CLEAN_DATA = os.getenv("CLEAN_DATA", os.path.join(RESULTS_DIR, "nifi_metrics_clean.csv"))
FACT_METRICS = os.getenv("FACT_METRICS", os.path.join(RESULTS_DIR, "fact_metrics_export.csv"))
# Build DSN from env (override PGHOST to 'postgres' when running inside Docker)
PW = quote_plus(os.getenv('PGPASSWORD', 'postgres'))
PG_DSN = os.getenv(
"PG_DSN",
"postgresql+psycopg2://"
f"{os.getenv('PGUSER','postgres')}:{PW}@"
f"{os.getenv('PGHOST','localhost')}:{os.getenv('PGPORT','5432')}/"
f"{os.getenv('PGDATABASE','metrics_db')}"
)
class DevConfig(BaseConfig):
pass

Expand All @@ -26,4 +39,22 @@ def set_env(env_name):
try:
env = ENV_MAP[env_name]
except KeyError:
raise ValueError(f"Unknown environment: {env_name}")
raise ValueError(f"Unknown environment: {env_name}")

def set_env_from_branch(branch_name: str):
"""
Map Git branch to environment: main->prod, staging->staged, dev->dev
"""
mapping = {"main": "prod", "staging": "staged", "dev": "dev"}
set_env(mapping.get(branch_name, "prod"))

def auto_set_env():
name = os.getenv("NIFIPULSE_ENV")
if name:
set_env(name)
return
branch = os.getenv("GITHUB_REF_NAME") or (os.getenv("GITHUB_REF","").split("/")[-1] or "main")
set_env_from_branch(branch)

if os.getenv("NIFIPULSE_AUTO_ENV") == "1":
auto_set_env()
15 changes: 8 additions & 7 deletions nifipulse/data_normalisation.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
}

def process_data():
print(f" Démarrage du traitement de {config.env.CSV_SINK} ")
print(f"Starting normalization: {config.env.CSV_SINK} ")

stats = {"total": 0, "kept": 0, "filtered": 0}

Expand Down Expand Up @@ -88,11 +88,12 @@ def process_data():
stats["kept"] += 1

except FileNotFoundError:
print(f"Erreur : Le fichier {config.env.CSV_SINK} n'existe pas.")
print(f"Error: source CSV not found: {config.env.CSV_SINK}")
return

print(" Traitement terminé ")
print(f"Lignes lues : {stats['total']}")
print(f"Lignes filtrées : {stats['filtered']} (Zeros inutiles)")
print(f"Lignes gardées : {stats['kept']} (Sauvegardées dans {config.env.CLEAN_DATA})")
print(f"Taux de réduction: {round((stats['filtered']/stats['total'])*100, 1)}%")
print("Normalization complete:")
print(f"- Rows read : {stats['total']}")
print(f"- Rows filtered : {stats['filtered']} (zero-value amount metrics)")
print(f"- Rows kept : {stats['kept']} (written to {config.env.CLEAN_DATA})")
reduction = (stats['filtered'] / stats['total'] * 100) if stats['total'] else 0.0
print(f"- Reduction rate : {round(reduction, 1)}%")
2 changes: 1 addition & 1 deletion nifipulse/extract_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def nifipulse(poll_count=10, interval=5):

#Only load to Postgres if cleaned data exists
if _csv_has_rows(config.env.CLEAN_DATA):
load_postgres(config.env.CLEAN_DATA)
load_postgres(config.env.CLEAN_DATA, config.env.FACT_METRICS)
else:
print("No cleaned data to load into Postgres; skipping load.")
return
Expand Down
25 changes: 16 additions & 9 deletions nifipulse/load_postgres.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
import pandas as pd
from sqlalchemy import create_engine, text
from nifipulse import config

def load_postgres(clean_data):
def load_postgres(clean_data: str, fact_metrics: str | None = None):
print(" inserting into PostgreSQL ... ")
df = pd.read_csv(clean_data, parse_dates=['timestamp_utc'])
df['timestamp_utc'] = pd.to_datetime(df['timestamp_utc'], utc=True)

# Basic schema validation
required = {"timestamp_utc","instance","metric_name","original_unit","component_name","component_type","value"}
missing = required - set(df.columns)
if missing:
raise ValueError(f"Missing columns in cleaned CSV: {sorted(missing)}")

# Connexion PostgreSQL

# Use DSN from config (works on host and in Docker; override via env)
engine = create_engine(
"postgresql+psycopg2://postgres:postgres@localhost:5432/metrics_db"
config.env.PG_DSN,
connect_args={"options": "-c client_encoding=UTF8"}
)


print("- Connexion PostgreSQL OK")
print("- PostgreSQL connection OK")

# Sanity check: schema exists
with engine.begin() as conn:
Expand Down Expand Up @@ -58,7 +65,7 @@ def load_postgres(clean_data):
"h": ts2.hour, "min": ts2.minute, "s": ts2.second
})

print("- Dimensions insérées")
print("- Dimensions inserted successfully")

# CHARGER LES IDS
dim_instance = pd.read_sql("SELECT * FROM dim_instance;", engine)
Expand Down Expand Up @@ -87,7 +94,7 @@ def load_postgres(clean_data):
"v": row['value']
})

print("- Faits insérés avec succès")
print("- Facts inserted")

# EXPORT CSV
fact_df = pd.read_sql("""
Expand All @@ -101,5 +108,5 @@ def load_postgres(clean_data):
ORDER BY d.timestamp_utc;
""", engine)

fact_df.to_csv("metrics_star_schema.csv", index=False)
print("- Export fichier csv")
fact_df.to_csv(fact_metrics, index=False)
print(f"- CSV exported: {fact_metrics}")
12 changes: 11 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,14 @@ build-backend = "setuptools.build_meta"
include = ["nifipulse*"]

[tool.setuptools.package-data]
nifipulse = ["metrics_list.txt"]
nifipulse = ["metrics_list.txt"]

[project.optional-dependencies]
dev = [
"pytest",
"flake8",
"black",
"mypy",
"types-requests",
"python-dotenv"
]
16 changes: 16 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,21 @@
certifi==2025.10.5
charset-normalizer==3.4.4
greenlet==3.3.0
idna==3.11
iniconfig==2.3.0
-e git+https://github.com/DXC-DP-Monitoring/NiFiPulse.git@1d86f5d89bc746cb1f2087a95fb264e3b2e7b53a#egg=nifipulse
numpy==2.3.5
packaging==25.0
pandas==2.3.3
pluggy==1.6.0
psycopg2-binary==2.9.11
Pygments==2.19.2
pytest==9.0.2
python-dateutil==2.9.0.post0
pytz==2025.2
requests==2.32.5
six==1.17.0
SQLAlchemy==2.0.44
typing_extensions==4.15.0
tzdata==2025.2
urllib3==2.5.0
Loading
Loading