diff --git a/dag_hvb_dashboard_etl_1.py b/dag_hvb_dashboard_etl_1.py new file mode 100644 index 0000000..10777c3 --- /dev/null +++ b/dag_hvb_dashboard_etl_1.py @@ -0,0 +1,633 @@ +""" +DAG: hvb_dashboard_etl +====================== +ETL-Lauf alle 15 Minuten für das Projekt-Dashboard (Superset auf DuckDB). + +Änderungserkennung: Da sich die Excel-Dateien deutlich seltener ändern +als alle 15 Minuten, merkt sich der DAG den Quell-Stand (alle Dateinamen ++ Änderungsdaten über alle Ordner). Ist nichts Neues da, wird der Lauf +nach dem Listing übersprungen (Skip) — Transformation und DB-Tausch +passieren nur bei tatsächlich geänderten Quelldaten. + +Ablauf +------ + extract_nextcloud ──► geodaten_bereitstellen ──► transform ──► load_duckdb + +1. extract_nextcloud: Lädt ALLE Excel-Dateien aus den Entitäts- + Unterordnern per WebDAV (siehe Ordnerstruktur). +2. geodaten_bereitstellen: Stellt die PLZ-Geokodierungsreferenz bereit + (Cache; GeoNames wird höchstens alle 30 Tage + neu geladen — Skript 01_geodaten_holen.py). +3. transform: Führt 02_etl_angebote_zuschlaege.py aus; das + Skript liest je Ordner alle Dateien ein und + vereinigt sie zu den Ziel-CSVs. +4. load_duckdb: Schreibt die CSVs als Tabellen in eine neue + DuckDB-Datei und tauscht sie atomar gegen die + produktive Datei aus (Superset liest read-only). + +Nextcloud-Ordnerstruktur (Basisordner = hvb_nextcloud_ordner): + /taifun_export/ Taifun-Exporte (mehrere möglich; jüngste je + Angebotsnummer gewinnt — Reihenfolge aus dem + WebDAV-Änderungsdatum) + /pflege/ Pflege-Status-Historie (mehrere möglich; + UNION aller Status-Events, gültiger Status = + neuestes Datum, dann höchster Rang) + /archiv/ eingefrorene Alt-Stammdaten (optional) + /zuschlaege/ BNetzA-Zuschläge (optional) +Pflicht-Ordner sind taifun_export und pflege; archiv und zuschlaege +dürfen fehlen. + +Design-Entscheidungen +--------------------- +* TaskFlow API statt klassischer Operators: Die gesamte Logik ist reines + Python (WebDAV, Subprozess, DuckDB). TaskFlow spart Boilerplate, macht + die Datenübergabe zwischen Tasks (Arbeitsverzeichnis-Pfad via XCom) + explizit und typisiert, und es gibt keinen Mehrwert durch spezialisierte + Operators (einen gepflegten Nextcloud-/DuckDB-Provider gibt es nicht). +* Idempotenz: + - Pro logischem Datum ein festes Arbeitsverzeichnis; ein erneuter Lauf + desselben Tages überschreibt dieselben Dateien statt zu duplizieren. + - Der Load ist ein Voll-Refresh ins Staging-Schema — mehrfache + Ausführung erzeugt nie Duplikate. + - Das produktive Schema wird erst nach erfolgreicher Validierung in + EINER Transaktion per Schema-Rename getauscht; ein fehlgeschlagener + Lauf lässt das Prod-Schema unangetastet. +* Superset-Kompatibilität: Superset verbindet sich per Postgres-Login auf + dieselbe pg_duckdb-Instanz und liest aus dem Prod-Schema. Da der Tausch + atomar (Schema-Rename in einer Transaktion) erfolgt, sieht Superset nie + einen halbfertigen Zustand. + +Benötigte Airflow-Konfiguration +------------------------------- +Connection "nextcloud_webdav" (Typ HTTP): + host = https://cloud.example.de # Nextcloud-Basis-URL + login = hvb_admin + password = + +Connection "duckdb_postgres" (Typ Postgres) — pg_duckdb-Server: + host = duckdb-host # Postgres mit pg_duckdb-Extension + port = 5432 + schema = dashboard # Datenbankname (NICHT das SQL-Schema) + login = + password = + Diese Instanz ist NICHT die Airflow-Metadaten-DB. + +Variables: + hvb_nextcloud_ordner = Hans van Bebber (optional, Default s.u.) + hvb_geo_cache = /data/etl_cache/geo_plz_koordinaten.csv + hvb_log_pfad = /data/etl_cache/etl_laufprotokoll.csv + hvb_skripte_pfad = /opt/airflow/include/hvb (Ablageort der 2 Skripte) + +Dashboard-Tabellen im Prod-Schema 'dashboard' (zusätzlich zu den Daten): + etl_metadaten 1 Zeile: geladen_am, Dateianzahl + neuester Stand + je Quell-Ordner — für die "Datenstand"-Anzeige + etl_laufprotokoll Historie aller Läufe inkl. Prüfergebnis + +Superset-Datenbank-Verbindung (Postgres-Login, Prod-Schema dashboard): + postgresql://:@duckdb-host:5432/dashboard + +Python-Abhängigkeiten im Airflow-Environment: + pandas, openpyxl, requests, sqlalchemy, + apache-airflow-providers-postgres (PostgresHook) +""" + +from __future__ import annotations + +import csv +import shutil +import subprocess +import sys +import time +import xml.etree.ElementTree as ET +from datetime import datetime, timedelta +from pathlib import Path +from urllib.parse import quote, unquote + +import pendulum +import requests +from airflow.decorators import dag, task +from airflow.exceptions import AirflowFailException, AirflowSkipException +from airflow.hooks.base import BaseHook +from airflow.models import Variable + +# ---------------------------------------------------------------------- +# Konfiguration +# ---------------------------------------------------------------------- +TZ = pendulum.timezone("Europe/Berlin") + +CONN_ID_NEXTCLOUD = "nextcloud_webdav" +# pg_duckdb: DuckDB läuft als Extension in einem Postgres-Server. Der Lade- +# Task schreibt per SQL (nicht mehr in eine Datei). Die Zugangsdaten liegen +# in dieser Airflow-Connection (Typ postgres), NICHT im Code. +CONN_ID_DUCKDB = "duckdb_postgres" + +# Schema-Namen für den atomaren Tausch: geladen wird ins Staging-Schema, +# danach wird in einer Transaktion gegen das produktive Schema getauscht. +# Superset liest aus DB_SCHEMA_PROD und sieht nie einen Zwischenzustand. +DB_SCHEMA_PROD = "dashboard" +DB_SCHEMA_STAGING = "dashboard_staging" +DB_SCHEMA_ALT = "dashboard_alt" + +# Nextcloud-Ordnerstruktur: je Entität ein Unterordner unterhalb des +# Basisordners (Variable hvb_nextcloud_ordner, Default "Hans van Bebber"). +# Pro Ordner können MEHRERE Dateien liegen; alle werden heruntergeladen +# und vom ETL-Skript eingelesen und vereinigt. Der Typ ergibt sich aus +# dem Ordner, nicht mehr aus dem Dateinamen. +# taifun_export/ -> Stammdaten (jüngste je Nummer gewinnt) +# pflege/ -> Status-Historie (UNION aller Events) +# archiv/ -> eingefrorene Alt-Stammdaten +# zuschlaege/ -> BNetzA-Zuschläge (optional) +QUELL_ORDNER = ("taifun_export", "pflege", "archiv", "zuschlaege") + +SKRIPT_GEODATEN = "01_geodaten_holen.py" +SKRIPT_ETL = "02_etl_angebote_zuschlaege.py" + +# CSV-Ausgaben des ETL-Skripts -> Zieltabellen in DuckDB. +# Spalten, die als Text erzwungen werden müssen (führende Nullen, IDs); +# je Datei wird nur angewendet, was im CSV-Header tatsächlich vorkommt: +TEXT_SPALTEN = ("plz", "plz_kunde", "plz_standort", "nummer", "debitor", + "zuschlag_nr", "gebot_nr", "projektnummer", + "aktuellste_nummer", "kunde_schluessel") +CSV_TABELLEN = { + "staging_angebote.csv": "staging_angebote", + "staging_status_historie.csv": "staging_status_historie", + "staging_zuschlaege.csv": "staging_zuschlaege", + "v_angebote_karte.csv": "v_angebote_karte", + "v_angebote_karte_aktuell.csv": "v_angebote_karte_aktuell", + "v_kunden_pipeline.csv": "v_kunden_pipeline", + "v_zuschlaege_karte.csv": "v_zuschlaege_karte", +} + +# Sicherheitsnetz: Wird weniger geliefert, wird die produktive DB NICHT +# überschrieben (Schutz vor leeren/kaputten Exporten). Nur Tabellen, die +# immer Daten haben müssen (Zuschläge sind optional). +MINDEST_ZEILEN = { + "staging_angebote": 1, + "v_angebote_karte_aktuell": 1, + "v_kunden_pipeline": 1, +} + +GEO_CACHE_MAX_ALTER_TAGE = 30 + +# Airflow-Variable, in der der zuletzt erfolgreich geladene Quell-Stand +# (Dateiname + Änderungsdatum) gespeichert wird — Basis der +# Änderungserkennung beim 15-Minuten-Takt: +VAR_QUELLSTAND = "hvb_letzter_quellstand" + +# Kurzschlüssel je Zieldatei (für XCom/Protokoll): + +# Spalten des Laufprotokolls. Das Protokoll lebt als persistente CSV +# (Variable hvb_log_pfad) und wird bei jedem Load vollständig als +# Tabelle 'etl_laufprotokoll' in die DuckDB übernommen — so überlebt +# die Historie den atomaren Austausch der DB-Datei. +LOG_SPALTEN = [ + "lauf_zeitpunkt", "airflow_run_id", "ereignis", + "datei_angebote", "stand_angebote", + "datei_zuschlaege", "stand_zuschlaege", + "zeilen_staging_angebote", "zeilen_staging_zuschlaege", + "zeilen_v_angebote_karte", "zeilen_v_zuschlaege_karte", + "bemerkung", +] + + +def _log_pfad() -> Path: + return Path(Variable.get( + "hvb_log_pfad", + default_var="/data/etl_cache/etl_laufprotokoll.csv", + )) + + +def _protokolliere(**felder) -> None: + """Hängt eine Zeile an das persistente Laufprotokoll (CSV) an.""" + pfad = _log_pfad() + pfad.parent.mkdir(parents=True, exist_ok=True) + neu = not pfad.exists() + with open(pfad, "a", newline="", encoding="utf-8") as f: + w = csv.DictWriter(f, fieldnames=LOG_SPALTEN) + if neu: + w.writeheader() + w.writerow({k: felder.get(k, "") for k in LOG_SPALTEN}) + + +# ---------------------------------------------------------------------- +# WebDAV-Hilfsfunktionen (bewusst ohne Zusatz-Library, nur requests) +# ---------------------------------------------------------------------- +def _webdav_basis(conn) -> tuple[str, tuple[str, str]]: + """Baut Basis-URL und Auth aus der Airflow-Connection.""" + host = (conn.host or "").rstrip("/") + if not host.startswith("http"): + host = f"https://{host}" + basis = f"{host}/remote.php/dav/files/{conn.login}" + return basis, (conn.login, conn.password) + + +def _webdav_liste(basis: str, auth, ordner: str) -> list[dict]: + """PROPFIND (Tiefe 1) — listet Dateien eines Nextcloud-Ordners.""" + url = f"{basis}/{quote(ordner)}" + body = ( + '' + '' + "" + "" + ) + resp = requests.request( + "PROPFIND", url, data=body, auth=auth, + headers={"Depth": "1", "Content-Type": "application/xml"}, + timeout=60, + ) + resp.raise_for_status() + + ns = {"d": "DAV:"} + eintraege = [] + for response in ET.fromstring(resp.content).findall("d:response", ns): + href = unquote(response.findtext("d:href", default="", namespaces=ns)) + prop = response.find(".//d:prop", ns) + ist_ordner = prop.find(".//d:collection", ns) is not None + if ist_ordner: + continue # den Ordner-Eintrag selbst überspringen + geaendert = prop.findtext("d:getlastmodified", default="", namespaces=ns) + eintraege.append({ + "name": Path(href).name, + "href": href, + "geaendert": pendulum.from_format( + geaendert, "ddd, DD MMM YYYY HH:mm:ss z" + ) if geaendert else pendulum.datetime(1970, 1, 1), + }) + return eintraege + + +# ---------------------------------------------------------------------- +# DAG +# ---------------------------------------------------------------------- +@dag( + dag_id="hvb_dashboard_etl", + description="Nextcloud-Excel-Exporte -> Python-ETL -> DuckDB (Superset)", + # Alle 15 Minuten; dank Änderungserkennung wird nur dann wirklich + # gerechnet, wenn sich in Nextcloud etwas geändert hat: + schedule="*/15 * * * *", + start_date=pendulum.datetime(2026, 6, 1, tz=TZ), + catchup=False, # keine Nachholläufe — es zählt nur der aktuelle Stand + max_active_runs=1, # keine parallelen Läufe auf dieselbe DB-Datei + default_args={ + "owner": "kalle", + "retries": 2, # Fehlerbehandlung: 2 Wiederholungen ... + "retry_delay": timedelta(minutes=5), # ... mit 5 min Abstand ... + "retry_exponential_backoff": True, # ... und wachsendem Backoff + "execution_timeout": timedelta(minutes=30), + }, + tags=["hvb", "dashboard", "duckdb", "superset"], +) +def hvb_dashboard_etl(): + + # ------------------------------------------------------------------ + # 1) EXTRACT — Excel-Dateien aus Nextcloud holen + # ------------------------------------------------------------------ + @task(retries=4, multiple_outputs=True) # Netzwerk-Task: großzügigere Retries + def extract_nextcloud(ds_nodash: str | None = None, + run_id: str | None = None) -> dict: + """ + Lädt ALLE Excel-Dateien aus den Entitäts-Unterordnern + (taifun_export/, pflege/, archiv/, zuschlaege/) herunter und legt + sie spiegelbildlich unter /input// ab. Pro Ordner + können mehrere Dateien liegen — das ETL-Skript liest sie ein und + vereinigt sie (Export: jüngste je Nummer; Pflege: UNION). + + Die Datei-Änderungsdaten (WebDAV getlastmodified) werden je Datei + in /input/_dateistand.csv abgelegt — daraus leitet das + ETL die Reihenfolge "jünger gewinnt" für die Exporte ab. + + Änderungserkennung: Ein Stand-String über ALLE Dateien aller Ordner + wird mit dem zuletzt erfolgreich geladenen Stand verglichen. + Unverändert => Skip (alle Folge-Tasks werden übersprungen). + + Idempotenz: Arbeitsverzeichnis an das logische Datum gebunden — + Re-Run desselben Tages überschreibt, statt zu duplizieren. + """ + conn = BaseHook.get_connection(CONN_ID_NEXTCLOUD) + basis, auth = _webdav_basis(conn) + basis_ordner = Variable.get("hvb_nextcloud_ordner", + default_var="Hans van Bebber") + + # Je Unterordner alle Excel-Dateien auflisten + gefunden: dict[str, list[dict]] = {} + for unterordner in QUELL_ORDNER: + pfad = f"{basis_ordner}/{unterordner}" + try: + dateien = _webdav_liste(basis, auth, pfad) + except requests.HTTPError as exc: + # archiv/ und zuschlaege/ dürfen fehlen + if unterordner in ("taifun_export", "pflege"): + raise AirflowFailException( + f"Pflicht-Ordner '{pfad}' nicht erreichbar: {exc}") + print(f"Ordner '{pfad}' nicht vorhanden — übersprungen.") + gefunden[unterordner] = [] + continue + excels = [d for d in dateien + if d["name"].lower().endswith((".xlsx", ".xlsm"))] + gefunden[unterordner] = excels + print(f"{unterordner}/: {len(excels)} Datei(en)") + + if not gefunden.get("taifun_export"): + raise AirflowFailException( + f"Keine Export-Dateien in '{basis_ordner}/taifun_export'.") + + # Änderungserkennung über ALLE Dateien (Name@Stand, sortiert). + # Wird erst am Ende von load_duckdb persistiert -> Fehlläufe retryen. + eintraege = [] + for unterordner, dateien in gefunden.items(): + for d in dateien: + eintraege.append( + f"{unterordner}/{d['name']}@{d['geaendert'].isoformat()}") + stand = "|".join(sorted(eintraege)) + + # Quell-Übersicht fürs Protokoll/Metadaten (kompakt je Ordner) + quellen = { + unterordner: { + "anzahl": len(dateien), + "dateien": ", ".join(sorted(d["name"] for d in dateien)) or "—", + "neuester_stand": ( + max(d["geaendert"] for d in dateien).in_tz(TZ) + .to_datetime_string() if dateien else ""), + } + for unterordner, dateien in gefunden.items() + } + + if stand == Variable.get(VAR_QUELLSTAND, default_var=None): + _protokolliere( + lauf_zeitpunkt=pendulum.now(TZ).to_datetime_string(), + airflow_run_id=run_id, + ereignis="uebersprungen_unveraendert", + datei_angebote=quellen["taifun_export"]["dateien"], + stand_angebote=quellen["taifun_export"]["neuester_stand"], + datei_zuschlaege=quellen.get("pflege", {}).get("dateien", "—"), + stand_zuschlaege=quellen.get("pflege", {}).get("neuester_stand", ""), + bemerkung="Quelldaten unverändert — kein Load", + ) + raise AirflowSkipException( + "Quelldaten unverändert seit dem letzten erfolgreichen Lauf.") + + run_dir = Path("/tmp/hvb_etl") / ds_nodash + if run_dir.exists(): + shutil.rmtree(run_dir) # sauberer Neustart bei Re-Run + (run_dir / "input").mkdir(parents=True) + + # Alle Dateien herunterladen + Dateistand-CSV schreiben + stand_zeilen = [] + for unterordner, dateien in gefunden.items(): + ziel = run_dir / "input" / unterordner + ziel.mkdir(parents=True, exist_ok=True) + for d in dateien: + url = (f"{basis}/{quote(basis_ordner)}/{quote(unterordner)}/" + f"{quote(d['name'])}") + with requests.get(url, auth=auth, stream=True, timeout=300) as r: + r.raise_for_status() + with open(ziel / d["name"], "wb") as f: + for chunk in r.iter_content(chunk_size=1 << 20): + f.write(chunk) + stand_zeilen.append({ + "datei": d["name"], + "stand_epoch": d["geaendert"].timestamp(), + }) + print(f" {unterordner}/{d['name']} geladen") + + import csv as _csv + with open(run_dir / "input" / "_dateistand.csv", "w", + newline="", encoding="utf-8") as f: + w = _csv.DictWriter(f, fieldnames=["datei", "stand_epoch"]) + w.writeheader() + w.writerows(stand_zeilen) + + return {"run_dir": str(run_dir), "stand": stand, "quellen": quellen} + + # ------------------------------------------------------------------ + # 2a) Geodaten-Referenz bereitstellen (Cache, Skript 01) + # ------------------------------------------------------------------ + @task(retries=4) + def geodaten_bereitstellen(run_dir: str) -> str: + """ + Stellt output/geo_plz_koordinaten.csv im Arbeitsverzeichnis bereit. + PLZ-Koordinaten ändern sich praktisch nie — GeoNames wird daher + nur angefragt, wenn der Cache fehlt oder älter als 30 Tage ist. + """ + run_pfad = Path(run_dir) + skripte = Path(Variable.get("hvb_skripte_pfad", + default_var="/opt/airflow/include/hvb")) + cache = Path(Variable.get("hvb_geo_cache", + default_var="/data/etl_cache/geo_plz_koordinaten.csv")) + ziel = run_pfad / "output" / "geo_plz_koordinaten.csv" + ziel.parent.mkdir(exist_ok=True) + + # Cache gilt nur als brauchbar, wenn er aktuell ist UND bereits + # die Spalte 'kreis' enthält (Schema-Erweiterung Landkreis) — + # ein alter Cache ohne Kreis wird automatisch neu aufgebaut. + def _hat_kreis_spalte(pfad: Path) -> bool: + with open(pfad, encoding="utf-8") as f: + return "kreis" in f.readline().split(",") + + cache_ok = ( + cache.exists() + and (time.time() - cache.stat().st_mtime) + < GEO_CACHE_MAX_ALTER_TAGE * 86400 + and _hat_kreis_spalte(cache) + ) + if cache_ok: + print(f"Geo-Cache aktuell ({cache}) — kein GeoNames-Download nötig.") + shutil.copy2(cache, ziel) + return run_dir + + # Cache fehlt/veraltet: Skript 01 im Arbeitsverzeichnis ausführen. + # (Die Skripte arbeiten relativ zu __file__ und werden deshalb in + # den Run-Ordner kopiert — so bleiben sie selbst unverändert.) + shutil.copy2(skripte / SKRIPT_GEODATEN, run_pfad / SKRIPT_GEODATEN) + subprocess.run( + [sys.executable, SKRIPT_GEODATEN], + cwd=run_pfad, check=True, + ) + if not ziel.exists(): + raise AirflowFailException("01_geodaten_holen.py hat keine " + "geo_plz_koordinaten.csv erzeugt.") + # Cache aktualisieren + cache.parent.mkdir(parents=True, exist_ok=True) + shutil.copy2(ziel, cache) + return run_dir + + # ------------------------------------------------------------------ + # 2b) TRANSFORM — Skript 02 ausführen + # ------------------------------------------------------------------ + @task + def transform(run_dir: str) -> str: + """ + Führt 02_etl_angebote_zuschlaege.py aus. Eingaben liegen unter + /input// (taifun_export, pflege, archiv, + zuschlaege) — je Ordner ggf. mehrere Dateien, die das Skript + einliest und vereinigt. Ausgaben: CSVs in /output/. + """ + run_pfad = Path(run_dir) + skripte = Path(Variable.get("hvb_skripte_pfad", + default_var="/opt/airflow/include/hvb")) + shutil.copy2(skripte / SKRIPT_ETL, run_pfad / SKRIPT_ETL) + + ergebnis = subprocess.run( + [sys.executable, SKRIPT_ETL], + cwd=run_pfad, check=False, capture_output=True, text=True, + ) + print(ergebnis.stdout) + if ergebnis.returncode != 0: + print(ergebnis.stderr, file=sys.stderr) + raise AirflowFailException( + f"ETL-Skript fehlgeschlagen (Exit {ergebnis.returncode})." + ) + + # Pflicht-Ausgaben prüfen (Zuschläge/Historie sind optional und + # entfallen, wenn keine Quelldateien vorliegen). + pflicht = ("staging_angebote.csv", "v_angebote_karte.csv", + "v_angebote_karte_aktuell.csv", "v_kunden_pipeline.csv") + fehlend = [n for n in pflicht + if not (run_pfad / "output" / n).exists()] + if fehlend: + raise AirflowFailException(f"ETL-Pflichtausgaben fehlen: {fehlend}") + return run_dir + + # ------------------------------------------------------------------ + # 3) LOAD — CSVs nach DuckDB, atomarer Austausch + # ------------------------------------------------------------------ + @task + def load_duckdb(run_dir: str, stand: str, quellen: dict, + run_id: str | None = None) -> None: + """ + Schreibt die CSVs als Tabellen in ein STAGING-Schema des + pg_duckdb-Postgres und tauscht nach erfolgreicher Validierung in + EINER Transaktion das produktive Schema um (Schema-Rename). Vorteile: + * Voll-Refresh => idempotent, keine Duplikate möglich. + * Superset (liest aus dem Prod-Schema) sieht nie einen halb- + fertigen Zustand — der Tausch ist atomar. + Zusätzlich entstehen zwei Dashboard-Tabellen: + * etl_metadaten: 1 Zeile — Ladezeitpunkt + Quell-Stände + * etl_laufprotokoll: vollständige Laufhistorie + Erst nach erfolgreichem Tausch wird der Quell-Stand gespeichert, + damit künftige Läufe bei unveränderten Daten skippen. + + Zugang über Airflow-Connection CONN_ID_DUCKDB (Typ postgres) — + keine Credentials im Code. + """ + import pandas as pd + from airflow.providers.postgres.hooks.postgres import PostgresHook + from sqlalchemy import text + + ausgabe = Path(run_dir) / "output" + hook = PostgresHook(postgres_conn_id=CONN_ID_DUCKDB) + engine = hook.get_sqlalchemy_engine() + + geladen_am = pendulum.now(TZ) + q_export = quellen.get("taifun_export", {}) + q_pflege = quellen.get("pflege", {}) + protokoll_basis = dict( + lauf_zeitpunkt=geladen_am.to_datetime_string(), + airflow_run_id=run_id, + datei_angebote=q_export.get("dateien", "—"), + stand_angebote=q_export.get("neuester_stand", ""), + datei_zuschlaege=q_pflege.get("dateien", "—"), + stand_zuschlaege=q_pflege.get("neuester_stand", ""), + ) + + # Spalten, die zwingend als Text geladen werden (führende Nullen, IDs) + def _text_dtypes(spalten: list[str]) -> dict: + return {s: "string" for s in TEXT_SPALTEN if s in spalten} + + # ---- 1) Staging-Schema frisch aufbauen ---- + with engine.begin() as con: + con.execute(text(f"DROP SCHEMA IF EXISTS {DB_SCHEMA_STAGING} CASCADE")) + con.execute(text(f"CREATE SCHEMA {DB_SCHEMA_STAGING}")) + + # ---- 2) CSVs ins Staging schreiben + validieren ---- + zeilen: dict[str, int] = {} + for csv_name, tabelle in CSV_TABELLEN.items(): + pfad = ausgabe / csv_name + if not pfad.exists(): + # Optionale Ausgabe fehlt -> leere Tabelle anlegen + with engine.begin() as con: + con.execute(text( + f"CREATE TABLE {DB_SCHEMA_STAGING}.{tabelle} " + f"(leer BOOLEAN)")) + zeilen[tabelle] = 0 + print(f"{tabelle}: keine Quelldatei — leere Tabelle") + continue + + kopf = pd.read_csv(pfad, nrows=0, encoding="utf-8-sig").columns.tolist() + df = pd.read_csv(pfad, encoding="utf-8-sig", + dtype=_text_dtypes(kopf)) + df.to_sql(tabelle, engine, schema=DB_SCHEMA_STAGING, + if_exists="replace", index=False, chunksize=5000, + method="multi") + n = len(df) + zeilen[tabelle] = n + print(f"{tabelle}: {n} Zeilen") + + if n < MINDEST_ZEILEN.get(tabelle, 0): + _protokolliere( + **protokoll_basis, + ereignis="validierung_fehlgeschlagen", + bemerkung=f"{tabelle} hat nur {n} Zeilen — " + f"Prod-Schema nicht getauscht", + ) + raise AirflowFailException( + f"Validierung fehlgeschlagen: {tabelle} hat nur {n} " + f"Zeilen — Prod-Schema wird NICHT getauscht." + ) + + # ---- 3) etl_metadaten (1 Zeile) ---- + meta = pd.DataFrame([{ + "geladen_am": geladen_am.to_datetime_string(), + "anzahl_export_dateien": q_export.get("anzahl", 0), + "export_dateien": q_export.get("dateien", "—"), + "export_neuester_stand": q_export.get("neuester_stand") or None, + "anzahl_pflege_dateien": q_pflege.get("anzahl", 0), + "pflege_dateien": q_pflege.get("dateien", "—"), + "pflege_neuester_stand": q_pflege.get("neuester_stand") or None, + }]) + meta["geladen_am"] = pd.to_datetime(meta["geladen_am"]) + meta.to_sql("etl_metadaten", engine, schema=DB_SCHEMA_STAGING, + if_exists="replace", index=False) + + # ---- 4) etl_laufprotokoll: aktuellen Lauf protokollieren + laden ---- + _protokolliere( + **protokoll_basis, + ereignis="daten_geladen", + zeilen_staging_angebote=zeilen.get("staging_angebote", 0), + zeilen_staging_zuschlaege=zeilen.get("staging_zuschlaege", 0), + zeilen_v_angebote_karte=zeilen.get("v_angebote_karte", 0), + zeilen_v_zuschlaege_karte=zeilen.get("v_zuschlaege_karte", 0), + bemerkung="OK", + ) + log = pd.read_csv(_log_pfad(), dtype={"airflow_run_id": "string"}) + log.to_sql("etl_laufprotokoll", engine, schema=DB_SCHEMA_STAGING, + if_exists="replace", index=False, chunksize=5000, + method="multi") + + # ---- 5) ATOMARER Schema-Tausch in EINER Transaktion ---- + # prod -> alt, staging -> prod, alt verwerfen. Superset sieht + # entweder komplett alt oder komplett neu, nie dazwischen. + with engine.begin() as con: + con.execute(text(f"DROP SCHEMA IF EXISTS {DB_SCHEMA_ALT} CASCADE")) + con.execute(text( + f"ALTER SCHEMA {DB_SCHEMA_PROD} RENAME TO {DB_SCHEMA_ALT}")) + con.execute(text( + f"ALTER SCHEMA {DB_SCHEMA_STAGING} RENAME TO {DB_SCHEMA_PROD}")) + # Aufräumen außerhalb der kritischen Transaktion + with engine.begin() as con: + con.execute(text(f"DROP SCHEMA IF EXISTS {DB_SCHEMA_ALT} CASCADE")) + print(f"pg_duckdb aktualisiert: Schema {DB_SCHEMA_PROD} getauscht") + + # Quell-Stand merken => Änderungserkennung für die nächsten Läufe + Variable.set(VAR_QUELLSTAND, stand) + + # ------------------------------------------------------------------ + # Abhängigkeiten (TaskFlow leitet sie aus den Datenflüssen ab) + # ------------------------------------------------------------------ + quelle = extract_nextcloud() + nach_geo = geodaten_bereitstellen(quelle["run_dir"]) + nach_transform = transform(nach_geo) + load_duckdb(nach_transform, quelle["stand"], quelle["quellen"]) + + +hvb_dashboard_etl()