Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "ssb-konjunk"
version = "2.1.0"
description = "SSB Konjunk 422"
authors = [
{name = "Johanne Saxegaard", email = "jox@ssb.no"},
{name = "Johanne Saxegaard", email = "jox@ssb.no"},
{name = "Halvor Steffenssen", email = "hvr@ssb.no"}
]
license = "MIT"
Expand Down
120 changes: 46 additions & 74 deletions src/ssb_konjunk/dash/calculations/calc_data.py

Large diffs are not rendered by default.

102 changes: 41 additions & 61 deletions src/ssb_konjunk/dash/calculations/helper_functions.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import datetime
import locale
from datetime import date
from datetime import datetime
from typing import Literal

import pendulum
import polars as pl
from datetime import datetime, date


def monthdelta(d1: datetime, d2: datetime):
"""Finner differansen mellom to måneder"""
Expand All @@ -16,13 +18,13 @@ def parse_period(period: str) -> datetime:
"""Parser en periode til datetime"""
return datetime.strptime(period, "%Y-%m")


def multi_join(
dfs: list[pl.DataFrame],
on: str,
how: Literal["left", "right", "inner", "cross", "semi", "anti"] = "left",
):
"""
Slår sammen flere Polars DataFrames sekvensielt på en spesifisert kolonne.
"""Slår sammen flere Polars DataFrames sekvensielt på en spesifisert kolonne.

Utfører en kjedet sammenslåing (join) av flere DataFrames i en gitt liste basert på én felles kolonne.
Hver DataFrame legges til én etter én, med spesifisert join-type og automatisk suffiks for overlappende kolonnenavn.
Expand All @@ -35,7 +37,6 @@ def multi_join(
Returns:
pl.DataFrame: Ett samlet DataFrame etter sekvensiell join av alle inputtabellene.
"""

df = dfs[0]

for idx, i in enumerate(dfs[1:]):
Expand All @@ -53,8 +54,7 @@ def __init__(
internal_col: str = "avg",
dt_out_format: str = "%b %Y",
) -> None:
"""
Representerer en datakilde tilrettelagt for tidsbasert analyse og gruppering.
"""Representerer en datakilde tilrettelagt for tidsbasert analyse og gruppering.

Denne klassen organiserer et Polars DataFrame ved å sortere på en datokolonne
og gjør det mulig å gruppere og analysere utvalgte kolonner.
Expand All @@ -75,7 +75,6 @@ def __init__(
internal_col (str, valgfritt): Internt kolonnenavn brukt for aggregering. Standard er "avg".
dt_out_format (str, valgfritt): Format for datoer ved visning. Standard er "%b %Y".
"""

self.data = data.sort(date_col)
self._date = date_col
self._col = col_name
Expand All @@ -84,25 +83,22 @@ def __init__(
self._dt_out_format = dt_out_format

def latest_date(self) -> None | datetime:
"""
Henter den siste datoen fra datakolonnen.
"""Henter den siste datoen fra datakolonnen.

Returns:
datetime | None: Den siste datoen som finnes i datasettet,
datetime | None: Den siste datoen som finnes i datasettet,
eller None dersom verdien ikke er en gyldig datetime.
"""

latest = self.data.get_column(self._date).last()
if isinstance(latest, datetime):
return latest
elif isinstance(latest, date): # <- use date directly
elif isinstance(latest, date): # <- use date directly
return datetime.combine(latest, datetime.min.time())
else:
return None

def _percent_change(self, series_1: pl.Expr, series_2: pl.Expr):
"""
Beregner prosentvis endring mellom to serier.
"""Beregner prosentvis endring mellom to serier.

Args:
series_1 (pl.Expr): Første uttrykk / serie.
Expand All @@ -111,25 +107,21 @@ def _percent_change(self, series_1: pl.Expr, series_2: pl.Expr):
Returns:
pl.Expr: Et uttrykk som representerer prosentvis endring.
"""

return ((series_1 - series_2) / series_2) * 100

def _create_date(self, date: datetime) -> str:
"""
Formaterer en datetime til en str med definert utdataformat.
"""Formaterer en datetime til en str med definert utdataformat.

Args:
date (datetime): Datoen som skal formateres.

Returns:
str: Formatert og kapitalisert dato som tekst.
"""

return date.strftime(self._dt_out_format).capitalize()

def _gen_header(self, n: int, skip: int = 0):
"""
Genererer en overskrift som viser datoperioden basert på antall perioder og hopp.
"""Genererer en overskrift som viser datoperioden basert på antall perioder og hopp.

Args:
n (int): Antall måneder per periode.
Expand All @@ -138,15 +130,13 @@ def _gen_header(self, n: int, skip: int = 0):
Returns:
str: En str som representerer datoperioden (eks. "Jan 2023 - Mar 2023").
"""

dates = self.data.get_column(self._date).unique().sort()
latest: datetime = dates[-1 - (n * skip)]
oldest: datetime = dates[-1 - ((n * skip) + n)]
return f"{self._create_date(oldest)} - {self._create_date(latest)}"

def _base(self, n: int, *agg, **named_aggs):
"""
Utfører aggregering over dynamiske tidsvinduer og grupper.
"""Utfører aggregering over dynamiske tidsvinduer og grupper.

Args:
n (int): Størrelsen på tidsvinduet i måneder.
Expand All @@ -166,8 +156,7 @@ def _base(self, n: int, *agg, **named_aggs):
)

def _base_w_header(self, n: int, skip: int = 0, *agg, **named_aggs):
"""
Genererer en overskrift for perioden og returnerer resultatet fra baseaggregering.
"""Genererer en overskrift for perioden og returnerer resultatet fra baseaggregering.

Kombinerer datoperiode-headeren med aggregerte resultater fra `_base`.

Expand All @@ -185,8 +174,7 @@ def _base_w_header(self, n: int, skip: int = 0, *agg, **named_aggs):
return header, result

def n_month(self, n: int, skip: int = 0) -> tuple[str, pl.DataFrame]:
"""
Henter siste tilgjengelige verdi for gjennomsnittskolonnen for en periode.
"""Henter siste tilgjengelige verdi for gjennomsnittskolonnen for en periode.

Args:
n (int): Antall måneder i perioden.
Expand All @@ -195,14 +183,12 @@ def n_month(self, n: int, skip: int = 0) -> tuple[str, pl.DataFrame]:
Returns:
tuple[str, pl.DataFrame]: En tuple med overskrift og filtrert datasett med siste verdi.
"""

return self._base_w_header(
n, skip, **{self._avg: pl.col(self._avg).get(-1 - skip)}
)

def n_month_percent(self, n: int, skip: int = 0) -> tuple[str, pl.DataFrame]:
"""
Beregner prosentvis endring mellom to perioder og returnerer med overskrift.
"""Beregner prosentvis endring mellom to perioder og returnerer med overskrift.

Args:
n (int): Antall måneder i perioden.
Expand All @@ -211,7 +197,6 @@ def n_month_percent(self, n: int, skip: int = 0) -> tuple[str, pl.DataFrame]:
Returns:
tuple[str, pl.DataFrame]: En tuple med periodebeskrivelse og datasett med prosentendringer.
"""

return self._base_w_header(
n,
skip,
Expand All @@ -223,8 +208,7 @@ def n_month_percent(self, n: int, skip: int = 0) -> tuple[str, pl.DataFrame]:
)

def n_percent_rolling(self, n: int, skip: int = 0):
"""
Beregner rullerende prosentvis endring over en periode og returnerer med datoperiode-header.
"""Beregner rullerende prosentvis endring over en periode og returnerer med datoperiode-header.

Denne metoden bruker en rullerende tidsvinduanalyse for å beregne prosentvis endring
mellom første og siste verdi i hvert vindu, og gir samtidig en datoperiodebeskrivelse.
Expand All @@ -238,15 +222,16 @@ def n_percent_rolling(self, n: int, skip: int = 0):
tuple[str, pl.DataFrame]: En tuple med en tekstlig overskrift for datoperioden og et DataFrame
med prosentvis endring for hver gruppe.
"""

def _gen_header(n: int, skip: int = 0):
'''Lager overskrift for hver perioden'''
"""Lager overskrift for hver perioden"""
dates = self.data.get_column(self._date).unique().sort()
latest: datetime = dates[-1 - (skip)]
oldest: datetime = dates[-1 - ((skip) + n - 1)]
return f"{self._create_date(oldest)} - {self._create_date(latest)}"

def map_test(x: pl.DataFrame):
'''Lager det rullende gjennomsnittet for hver periodegruppe'''
"""Lager det rullende gjennomsnittet for hver periodegruppe"""
if x.shape[0] != n:
x = x.with_columns(
**{self._avg: pl.col(self._col).fill_null(strategy="backward")}
Expand Down Expand Up @@ -276,8 +261,7 @@ def map_test(x: pl.DataFrame):
)

def n_mean_rolling(self, n: int, skip: int = 0):
"""
Beregner et rullerende gjennomsnitt for hver gruppe i datasettet og returnerer med datoperiode-header.
"""Beregner et rullerende gjennomsnitt for hver gruppe i datasettet og returnerer med datoperiode-header.

Denne metoden beregner gjennomsnittet av verdiene innenfor et rullerende vindu på `n` måneder.
Hvis et vindu inneholder færre enn `n` datapunkter, blir manglende verdier fylt bakover.
Expand All @@ -293,25 +277,22 @@ def n_mean_rolling(self, n: int, skip: int = 0):
"""

def _gen_header(n: int, skip: int = 0):
'''Lager overskrift for hver perioden'''
"""Lager overskrift for hver perioden"""
dates = self.data.get_column(self._date).unique().sort()
latest: datetime = dates[-1 - (skip)]
oldest: datetime = dates[-1 - ((skip) + n - 1)]
return f"{self._create_date(oldest)} - {self._create_date(latest)}"

def map_test(x: pl.DataFrame):
'''Lager det rullende gjennomsnittet for hver periodegruppe'''
"""Lager det rullende gjennomsnittet for hver periodegruppe"""
if x.shape[0] != n:
x = x.with_columns(
**{self._avg: pl.col(self._col).fill_null(strategy="backward")}
)
else:
x = x.with_columns(
**{
self._avg: pl.col(self._col).mean()
}
)
x = x.with_columns(**{self._avg: pl.col(self._col).mean()})
return x

return _gen_header(n, skip), (
self.data.rolling(
pl.col(self._date),
Expand All @@ -329,11 +310,10 @@ def map_test(x: pl.DataFrame):
def n_month_rolling_percent_compare(
self, n: int, skip: int = 0, skip_1: int = 1
) -> tuple[str, pl.DataFrame]:
"""
Sammenligner rullerende gjennomsnitt mellom to perioder og beregner prosentvis endring.
"""Sammenligner rullerende gjennomsnitt mellom to perioder og beregner prosentvis endring.

Denne metoden sammenligner det rullerende gjennomsnittet for én periode mot en annen forskjøvet
periode, og returnerer en prosentvis endring for hver gruppe. Resultatet inkluderer også en
Denne metoden sammenligner det rullerende gjennomsnittet for én periode mot en annen forskjøvet
periode, og returnerer en prosentvis endring for hver gruppe. Resultatet inkluderer også en
overskrift som beskriver begge periodene.

Args:
Expand All @@ -342,10 +322,9 @@ def n_month_rolling_percent_compare(
skip_1 (int, valgfritt): Antall perioder å hoppe over for den sammenlignende perioden. Standard er 1.

Returns:
tuple[str, pl.DataFrame]: En tuple bestående av en sammensatt overskrift og et DataFrame
tuple[str, pl.DataFrame]: En tuple bestående av en sammensatt overskrift og et DataFrame
med prosentvis endring mellom de to periodene for hver gruppe.
"""

header_1, series_1 = self.n_mean_rolling(n, skip)
header_2, series_2 = self.n_mean_rolling(n, skip_1)

Expand All @@ -363,8 +342,7 @@ def n_month_rolling_percent_compare(
def n_month_percent_compare(
self, n: int, skip: int = 0, skip_1: int = 1
) -> tuple[str, pl.DataFrame]:
"""
Sammenligner prosentvis endring i verdier mellom to definerte perioder.
"""Sammenligner prosentvis endring i verdier mellom to definerte perioder.

Denne metoden henter to distinkte perioder (hver på `n` måneder), basert på ulike skipverdier,
og beregner den prosentvise forskjellen i verdier mellom dem for hver gruppe i datasettet.
Expand All @@ -379,12 +357,11 @@ def n_month_percent_compare(
tuple[str, pl.DataFrame]: En tuple med beskrivelse av periodeparene og et DataFrame
med prosentvis endring mellom verdiene for hver gruppe.
"""

header_1, series_1 = self.n_month(n, skip)
header_2, series_2 = self.n_month(n, skip_1)

header = f"{header_2} / {header_1}"

joined = series_1.join(series_2, on=self._group).select(
**{
self._group: pl.col(self._group),
Expand All @@ -395,28 +372,31 @@ def n_month_percent_compare(
)
return header, joined


def rounded_average(df: pd.DataFrame, ordered_columns: list[str]):
df_copy = df[ordered_columns]
df_copy = df_copy.round(1)
res = df_copy.sum(axis="columns").div(len(ordered_columns))
return res.round(1)


def calc_change_rate(df: pd.DataFrame, ordered_columns: list[str], n: int = 1):
results = []
for i in range(n, len(ordered_columns), 1):
col_present = ordered_columns[i]
previous_period = ordered_columns[i-n]
chg_rate = (df[col_present] - df[previous_period])*100/df[previous_period]
previous_period = ordered_columns[i - n]
chg_rate = (df[col_present] - df[previous_period]) * 100 / df[previous_period]
results.append(chg_rate)

return pd.concat(results, axis = "columns", keys = ordered_columns[n:])
return pd.concat(results, axis="columns", keys=ordered_columns[n:])


def rolling_change_rate(df: pd.DataFrame, step: int = 1):
results = []
for i in range(step, len(df.columns), step):
col_present = df.columns[i]
previous_period = df.columns[i-step]
chg_rate = (df[col_present] - df[previous_period])*100/df[previous_period]
previous_period = df.columns[i - step]
chg_rate = (df[col_present] - df[previous_period]) * 100 / df[previous_period]
results.append(chg_rate)

return pd.concat(results, axis = "columns", keys = df.columns[step:])
return pd.concat(results, axis="columns", keys=df.columns[step:])
4 changes: 1 addition & 3 deletions src/ssb_konjunk/dash/calculations/period_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

import pendulum

from from ssb_konjunk.dash.utils import period_parser


@total_ordering
class Period:
Expand Down Expand Up @@ -178,4 +176,4 @@ def create_period_range_list(year: int, month: int, n_months: int) -> list[str]:
selected_period = dt.subtract(months=i)
period = period_parser(selected_period.year, selected_period.month)
result.append(period)
return result
return result
13 changes: 6 additions & 7 deletions src/ssb_konjunk/dash/components/__init__.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@

try:
from .page_aio import Tables, AnalyticsPageAIO # type: ignore
from .page_aio import AnalyticsPageAIO # type: ignore
from .page_aio import Tables # type: ignore
except ImportError:

class Tables:
def __init__(self, *args, **kwargs):
raise ImportError(
"Tables requires 'dash'. Install it with: pip install ssb-konjunk['dash']"
)

class AnalyticsPageAIO:
def __init__(self, *args, **kwargs):
raise ImportError(
"AnalyticsPageAIO requires 'dash'. Install it with: pip install ssb-konjunk['dash']"
)

__all__ = [
"Tables",
"AnalyticsPageAIO"
]

__all__ = ["AnalyticsPageAIO", "Tables"]
Loading
Loading