""" DAG: hvb_dashboard_etl ====================== ETL-Lauf alle 15 Minuten für das Projekt-Dashboard (Superset auf DuckDB). Änderungserkennung: Da sich die Excel-Dateien deutlich seltener ändern als alle 15 Minuten, merkt sich der DAG den Quell-Stand (alle Dateinamen + Änderungsdaten über alle Ordner). Ist nichts Neues da, wird der Lauf nach dem Listing übersprungen (Skip) — Transformation und DB-Tausch passieren nur bei tatsächlich geänderten Quelldaten. Ablauf ------ extract_nextcloud ──► geodaten_bereitstellen ──► transform ──► load_duckdb 1. extract_nextcloud: Lädt ALLE Excel-Dateien aus den Entitäts- Unterordnern per WebDAV (siehe Ordnerstruktur). 2. geodaten_bereitstellen: Stellt die PLZ-Geokodierungsreferenz bereit (Cache; GeoNames wird höchstens alle 30 Tage neu geladen — Skript 01_geodaten_holen.py). 3. transform: Führt 02_etl_angebote_zuschlaege.py aus; das Skript liest je Ordner alle Dateien ein und vereinigt sie zu den Ziel-CSVs. 4. load_duckdb: Schreibt die CSVs als Tabellen in eine neue DuckDB-Datei und tauscht sie atomar gegen die produktive Datei aus (Superset liest read-only). Nextcloud-Ordnerstruktur (Basisordner = hvb_nextcloud_ordner): /taifun_export/ Taifun-Exporte (mehrere möglich; jüngste je Angebotsnummer gewinnt — Reihenfolge aus dem WebDAV-Änderungsdatum) /pflege/ Pflege-Status-Historie (mehrere möglich; UNION aller Status-Events, gültiger Status = neuestes Datum, dann höchster Rang) /archiv/ eingefrorene Alt-Stammdaten (optional) /zuschlaege/ BNetzA-Zuschläge (optional) Pflicht-Ordner sind taifun_export und pflege; archiv und zuschlaege dürfen fehlen. Design-Entscheidungen --------------------- * TaskFlow API statt klassischer Operators: Die gesamte Logik ist reines Python (WebDAV, Subprozess, DuckDB). TaskFlow spart Boilerplate, macht die Datenübergabe zwischen Tasks (Arbeitsverzeichnis-Pfad via XCom) explizit und typisiert, und es gibt keinen Mehrwert durch spezialisierte Operators (einen gepflegten Nextcloud-/DuckDB-Provider gibt es nicht). * Idempotenz: - Pro logischem Datum ein festes Arbeitsverzeichnis; ein erneuter Lauf desselben Tages überschreibt dieselben Dateien statt zu duplizieren. - Der Load ist ein Voll-Refresh ins Staging-Schema — mehrfache Ausführung erzeugt nie Duplikate. - Das produktive Schema wird erst nach erfolgreicher Validierung in EINER Transaktion per Schema-Rename getauscht; ein fehlgeschlagener Lauf lässt das Prod-Schema unangetastet. * Superset-Kompatibilität: Superset verbindet sich per Postgres-Login auf dieselbe pg_duckdb-Instanz und liest aus dem Prod-Schema. Da der Tausch atomar (Schema-Rename in einer Transaktion) erfolgt, sieht Superset nie einen halbfertigen Zustand. Benötigte Airflow-Konfiguration ------------------------------- Connection "nextcloud_webdav" (Typ HTTP): host = https://cloud.example.de # Nextcloud-Basis-URL login = hvb_admin password = Connection "duckdb_postgres" (Typ Postgres) — pg_duckdb-Server: host = duckdb-host # Postgres mit pg_duckdb-Extension port = 5432 schema = dashboard # Datenbankname (NICHT das SQL-Schema) login = password = Diese Instanz ist NICHT die Airflow-Metadaten-DB. Variables: hvb_nextcloud_ordner = Hans van Bebber (optional, Default s.u.) hvb_geo_cache = /opt/airflow/git/current/etl_cache/geo_plz_koordinaten.csv hvb_log_pfad = /opt/airflow/git/current/etl_cache/etl_laufprotokoll.csv hvb_skripte_pfad = /opt/airflow/git/current/include (Ablageort der 2 Skripte) Dashboard-Tabellen im Prod-Schema 'dashboard' (zusätzlich zu den Daten): etl_metadaten 1 Zeile: geladen_am, Dateianzahl + neuester Stand je Quell-Ordner — für die "Datenstand"-Anzeige etl_laufprotokoll Historie aller Läufe inkl. Prüfergebnis Superset-Datenbank-Verbindung (Postgres-Login, Prod-Schema dashboard): postgresql://:@duckdb-host:5432/dashboard Python-Abhängigkeiten im Airflow-Environment: pandas, openpyxl, requests, sqlalchemy, apache-airflow-providers-postgres (PostgresHook) """ from __future__ import annotations import csv import shutil import subprocess import sys import time import xml.etree.ElementTree as ET from datetime import datetime, timedelta from pathlib import Path from urllib.parse import quote, unquote import pendulum import requests from airflow.decorators import dag, task from airflow.exceptions import AirflowFailException, AirflowSkipException from airflow.hooks.base import BaseHook from airflow.models import Variable # ---------------------------------------------------------------------- # Konfiguration # ---------------------------------------------------------------------- TZ = pendulum.timezone("Europe/Berlin") CONN_ID_NEXTCLOUD = "nextcloud_webdav" # pg_duckdb: DuckDB läuft als Extension in einem Postgres-Server. Der Lade- # Task schreibt per SQL (nicht mehr in eine Datei). Die Zugangsdaten liegen # in dieser Airflow-Connection (Typ postgres), NICHT im Code. CONN_ID_DUCKDB = "duckdb_postgres" # Schema-Namen für den atomaren Tausch: geladen wird ins Staging-Schema, # danach wird in einer Transaktion gegen das produktive Schema getauscht. # Superset liest aus DB_SCHEMA_PROD und sieht nie einen Zwischenzustand. DB_SCHEMA_PROD = "dashboard" DB_SCHEMA_STAGING = "dashboard_staging" DB_SCHEMA_ALT = "dashboard_alt" # Nextcloud-Ordnerstruktur: je Entität ein Unterordner unterhalb des # Basisordners (Variable hvb_nextcloud_ordner, Default "Hans van Bebber"). # Pro Ordner können MEHRERE Dateien liegen; alle werden heruntergeladen # und vom ETL-Skript eingelesen und vereinigt. Der Typ ergibt sich aus # dem Ordner, nicht mehr aus dem Dateinamen. # taifun_export/ -> Stammdaten (jüngste je Nummer gewinnt) # pflege/ -> Status-Historie (UNION aller Events) # archiv/ -> eingefrorene Alt-Stammdaten # zuschlaege/ -> BNetzA-Zuschläge (optional) QUELL_ORDNER = ("taifun_export", "pflege", "archiv", "zuschlaege") SKRIPT_GEODATEN = "01_geodaten_holen.py" SKRIPT_ETL = "02_etl_angebote_zuschlaege.py" # CSV-Ausgaben des ETL-Skripts -> Zieltabellen in DuckDB. # Spalten, die als Text erzwungen werden müssen (führende Nullen, IDs); # je Datei wird nur angewendet, was im CSV-Header tatsächlich vorkommt: TEXT_SPALTEN = ("plz", "plz_kunde", "plz_standort", "nummer", "debitor", "zuschlag_nr", "gebot_nr", "projektnummer", "aktuellste_nummer", "kunde_schluessel") CSV_TABELLEN = { "staging_angebote.csv": "staging_angebote", "staging_status_historie.csv": "staging_status_historie", "staging_zuschlaege.csv": "staging_zuschlaege", "v_angebote_karte.csv": "v_angebote_karte", "v_angebote_karte_aktuell.csv": "v_angebote_karte_aktuell", "v_kunden_pipeline.csv": "v_kunden_pipeline", "v_zuschlaege_karte.csv": "v_zuschlaege_karte", } # Sicherheitsnetz: Wird weniger geliefert, wird die produktive DB NICHT # überschrieben (Schutz vor leeren/kaputten Exporten). Nur Tabellen, die # immer Daten haben müssen (Zuschläge sind optional). MINDEST_ZEILEN = { "staging_angebote": 1, "v_angebote_karte_aktuell": 1, "v_kunden_pipeline": 1, } GEO_CACHE_MAX_ALTER_TAGE = 30 # Airflow-Variable, in der der zuletzt erfolgreich geladene Quell-Stand # (Dateiname + Änderungsdatum) gespeichert wird — Basis der # Änderungserkennung beim 15-Minuten-Takt: VAR_QUELLSTAND = "hvb_letzter_quellstand" # Kurzschlüssel je Zieldatei (für XCom/Protokoll): # Spalten des Laufprotokolls. Das Protokoll lebt als persistente CSV # (Variable hvb_log_pfad) und wird bei jedem Load vollständig als # Tabelle 'etl_laufprotokoll' in die DuckDB übernommen — so überlebt # die Historie den atomaren Austausch der DB-Datei. LOG_SPALTEN = [ "lauf_zeitpunkt", "airflow_run_id", "ereignis", "datei_angebote", "stand_angebote", "datei_zuschlaege", "stand_zuschlaege", "zeilen_staging_angebote", "zeilen_staging_zuschlaege", "zeilen_v_angebote_karte", "zeilen_v_zuschlaege_karte", "bemerkung", ] def _log_pfad() -> Path: return Path(Variable.get( "hvb_log_pfad", default_var="/opt/airflow/git/current/etl_cache/etl_laufprotokoll.csv", )) def _protokolliere(**felder) -> None: """Hängt eine Zeile an das persistente Laufprotokoll (CSV) an.""" pfad = _log_pfad() pfad.parent.mkdir(parents=True, exist_ok=True) neu = not pfad.exists() with open(pfad, "a", newline="", encoding="utf-8") as f: w = csv.DictWriter(f, fieldnames=LOG_SPALTEN) if neu: w.writeheader() w.writerow({k: felder.get(k, "") for k in LOG_SPALTEN}) # ---------------------------------------------------------------------- # WebDAV-Hilfsfunktionen (bewusst ohne Zusatz-Library, nur requests) # ---------------------------------------------------------------------- def _webdav_basis(conn) -> tuple[str, tuple[str, str]]: """Baut Basis-URL und Auth aus der Airflow-Connection.""" host = (conn.host or "").rstrip("/") if not host.startswith("http"): host = f"https://{host}" basis = f"{host}/remote.php/dav/files/{conn.login}" return basis, (conn.login, conn.password) def _webdav_liste(basis: str, auth, ordner: str) -> list[dict]: """PROPFIND (Tiefe 1) — listet Dateien eines Nextcloud-Ordners.""" url = f"{basis}/{quote(ordner)}" body = ( '' '' "" "" ) resp = requests.request( "PROPFIND", url, data=body, auth=auth, headers={"Depth": "1", "Content-Type": "application/xml"}, timeout=60, ) resp.raise_for_status() ns = {"d": "DAV:"} eintraege = [] for response in ET.fromstring(resp.content).findall("d:response", ns): href = unquote(response.findtext("d:href", default="", namespaces=ns)) prop = response.find(".//d:prop", ns) ist_ordner = prop.find(".//d:collection", ns) is not None if ist_ordner: continue # den Ordner-Eintrag selbst überspringen geaendert = prop.findtext("d:getlastmodified", default="", namespaces=ns) eintraege.append({ "name": Path(href).name, "href": href, "geaendert": pendulum.from_format( geaendert, "ddd, DD MMM YYYY HH:mm:ss z" ) if geaendert else pendulum.datetime(1970, 1, 1), }) return eintraege # ---------------------------------------------------------------------- # DAG # ---------------------------------------------------------------------- @dag( dag_id="hvb_dashboard_etl", description="Nextcloud-Excel-Exporte -> Python-ETL -> DuckDB (Superset)", # Alle 15 Minuten; dank Änderungserkennung wird nur dann wirklich # gerechnet, wenn sich in Nextcloud etwas geändert hat: schedule="*/15 * * * *", start_date=pendulum.datetime(2026, 6, 1, tz=TZ), catchup=False, # keine Nachholläufe — es zählt nur der aktuelle Stand max_active_runs=1, # keine parallelen Läufe auf dieselbe DB-Datei default_args={ "owner": "kalle", "retries": 2, # Fehlerbehandlung: 2 Wiederholungen ... "retry_delay": timedelta(minutes=5), # ... mit 5 min Abstand ... "retry_exponential_backoff": True, # ... und wachsendem Backoff "execution_timeout": timedelta(minutes=30), }, tags=["hvb", "dashboard", "duckdb", "superset"], ) def hvb_dashboard_etl(): # ------------------------------------------------------------------ # 1) EXTRACT — Excel-Dateien aus Nextcloud holen # ------------------------------------------------------------------ @task(retries=4, multiple_outputs=True) # Netzwerk-Task: großzügigere Retries def extract_nextcloud(ds_nodash: str | None = None, run_id: str | None = None) -> dict: """ Lädt ALLE Excel-Dateien aus den Entitäts-Unterordnern (taifun_export/, pflege/, archiv/, zuschlaege/) herunter und legt sie spiegelbildlich unter /input// ab. Pro Ordner können mehrere Dateien liegen — das ETL-Skript liest sie ein und vereinigt sie (Export: jüngste je Nummer; Pflege: UNION). Die Datei-Änderungsdaten (WebDAV getlastmodified) werden je Datei in /input/_dateistand.csv abgelegt — daraus leitet das ETL die Reihenfolge "jünger gewinnt" für die Exporte ab. Änderungserkennung: Ein Stand-String über ALLE Dateien aller Ordner wird mit dem zuletzt erfolgreich geladenen Stand verglichen. Unverändert => Skip (alle Folge-Tasks werden übersprungen). Idempotenz: Arbeitsverzeichnis an das logische Datum gebunden — Re-Run desselben Tages überschreibt, statt zu duplizieren. """ conn = BaseHook.get_connection(CONN_ID_NEXTCLOUD) basis, auth = _webdav_basis(conn) basis_ordner = Variable.get("hvb_nextcloud_ordner", default_var="Hans van Bebber") # Je Unterordner alle Excel-Dateien auflisten gefunden: dict[str, list[dict]] = {} for unterordner in QUELL_ORDNER: pfad = f"{basis_ordner}/{unterordner}" try: dateien = _webdav_liste(basis, auth, pfad) except requests.HTTPError as exc: # archiv/ und zuschlaege/ dürfen fehlen if unterordner in ("taifun_export", "pflege"): raise AirflowFailException( f"Pflicht-Ordner '{pfad}' nicht erreichbar: {exc}") print(f"Ordner '{pfad}' nicht vorhanden — übersprungen.") gefunden[unterordner] = [] continue excels = [d for d in dateien if d["name"].lower().endswith((".xlsx", ".xlsm"))] gefunden[unterordner] = excels print(f"{unterordner}/: {len(excels)} Datei(en)") if not gefunden.get("taifun_export"): raise AirflowFailException( f"Keine Export-Dateien in '{basis_ordner}/taifun_export'.") # Änderungserkennung über ALLE Dateien (Name@Stand, sortiert). # Wird erst am Ende von load_duckdb persistiert -> Fehlläufe retryen. eintraege = [] for unterordner, dateien in gefunden.items(): for d in dateien: eintraege.append( f"{unterordner}/{d['name']}@{d['geaendert'].isoformat()}") stand = "|".join(sorted(eintraege)) # Quell-Übersicht fürs Protokoll/Metadaten (kompakt je Ordner) quellen = { unterordner: { "anzahl": len(dateien), "dateien": ", ".join(sorted(d["name"] for d in dateien)) or "—", "neuester_stand": ( max(d["geaendert"] for d in dateien).in_tz(TZ) .to_datetime_string() if dateien else ""), } for unterordner, dateien in gefunden.items() } if stand == Variable.get(VAR_QUELLSTAND, default_var=None): _protokolliere( lauf_zeitpunkt=pendulum.now(TZ).to_datetime_string(), airflow_run_id=run_id, ereignis="uebersprungen_unveraendert", datei_angebote=quellen["taifun_export"]["dateien"], stand_angebote=quellen["taifun_export"]["neuester_stand"], datei_zuschlaege=quellen.get("pflege", {}).get("dateien", "—"), stand_zuschlaege=quellen.get("pflege", {}).get("neuester_stand", ""), bemerkung="Quelldaten unverändert — kein Load", ) raise AirflowSkipException( "Quelldaten unverändert seit dem letzten erfolgreichen Lauf.") run_dir = Path("/tmp/hvb_etl") / ds_nodash if run_dir.exists(): shutil.rmtree(run_dir) # sauberer Neustart bei Re-Run (run_dir / "input").mkdir(parents=True) # Alle Dateien herunterladen + Dateistand-CSV schreiben stand_zeilen = [] for unterordner, dateien in gefunden.items(): ziel = run_dir / "input" / unterordner ziel.mkdir(parents=True, exist_ok=True) for d in dateien: url = (f"{basis}/{quote(basis_ordner)}/{quote(unterordner)}/" f"{quote(d['name'])}") with requests.get(url, auth=auth, stream=True, timeout=300) as r: r.raise_for_status() with open(ziel / d["name"], "wb") as f: for chunk in r.iter_content(chunk_size=1 << 20): f.write(chunk) stand_zeilen.append({ "datei": d["name"], "stand_epoch": d["geaendert"].timestamp(), }) print(f" {unterordner}/{d['name']} geladen") import csv as _csv with open(run_dir / "input" / "_dateistand.csv", "w", newline="", encoding="utf-8") as f: w = _csv.DictWriter(f, fieldnames=["datei", "stand_epoch"]) w.writeheader() w.writerows(stand_zeilen) return {"run_dir": str(run_dir), "stand": stand, "quellen": quellen} # ------------------------------------------------------------------ # 2a) Geodaten-Referenz bereitstellen (Cache, Skript 01) # ------------------------------------------------------------------ @task(retries=4) def geodaten_bereitstellen(run_dir: str) -> str: """ Stellt output/geo_plz_koordinaten.csv im Arbeitsverzeichnis bereit. PLZ-Koordinaten ändern sich praktisch nie — GeoNames wird daher nur angefragt, wenn der Cache fehlt oder älter als 30 Tage ist. """ run_pfad = Path(run_dir) skripte = Path(Variable.get("hvb_skripte_pfad", default_var="/opt/airflow/git/current/include")) cache = Path(Variable.get("hvb_geo_cache", default_var="/opt/airflow/git/current/etl_cache/geo_plz_koordinaten.csv")) ziel = run_pfad / "output" / "geo_plz_koordinaten.csv" ziel.parent.mkdir(exist_ok=True) # Cache gilt nur als brauchbar, wenn er aktuell ist UND bereits # die Spalte 'kreis' enthält (Schema-Erweiterung Landkreis) — # ein alter Cache ohne Kreis wird automatisch neu aufgebaut. def _hat_kreis_spalte(pfad: Path) -> bool: with open(pfad, encoding="utf-8") as f: return "kreis" in f.readline().split(",") cache_ok = ( cache.exists() and (time.time() - cache.stat().st_mtime) < GEO_CACHE_MAX_ALTER_TAGE * 86400 and _hat_kreis_spalte(cache) ) if cache_ok: print(f"Geo-Cache aktuell ({cache}) — kein GeoNames-Download nötig.") shutil.copy2(cache, ziel) return run_dir # Cache fehlt/veraltet: Skript 01 im Arbeitsverzeichnis ausführen. # (Die Skripte arbeiten relativ zu __file__ und werden deshalb in # den Run-Ordner kopiert — so bleiben sie selbst unverändert.) shutil.copy2(skripte / SKRIPT_GEODATEN, run_pfad / SKRIPT_GEODATEN) subprocess.run( [sys.executable, SKRIPT_GEODATEN], cwd=run_pfad, check=True, ) if not ziel.exists(): raise AirflowFailException("01_geodaten_holen.py hat keine " "geo_plz_koordinaten.csv erzeugt.") # Cache aktualisieren cache.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(ziel, cache) return run_dir # ------------------------------------------------------------------ # 2b) TRANSFORM — Skript 02 ausführen # ------------------------------------------------------------------ @task def transform(run_dir: str) -> str: """ Führt 02_etl_angebote_zuschlaege.py aus. Eingaben liegen unter /input// (taifun_export, pflege, archiv, zuschlaege) — je Ordner ggf. mehrere Dateien, die das Skript einliest und vereinigt. Ausgaben: CSVs in /output/. """ run_pfad = Path(run_dir) skripte = Path(Variable.get("hvb_skripte_pfad", default_var="/opt/airflow/git/current/include")) shutil.copy2(skripte / SKRIPT_ETL, run_pfad / SKRIPT_ETL) ergebnis = subprocess.run( [sys.executable, SKRIPT_ETL], cwd=run_pfad, check=False, capture_output=True, text=True, ) print(ergebnis.stdout) if ergebnis.returncode != 0: print(ergebnis.stderr, file=sys.stderr) raise AirflowFailException( f"ETL-Skript fehlgeschlagen (Exit {ergebnis.returncode})." ) # Pflicht-Ausgaben prüfen (Zuschläge/Historie sind optional und # entfallen, wenn keine Quelldateien vorliegen). pflicht = ("staging_angebote.csv", "v_angebote_karte.csv", "v_angebote_karte_aktuell.csv", "v_kunden_pipeline.csv") fehlend = [n for n in pflicht if not (run_pfad / "output" / n).exists()] if fehlend: raise AirflowFailException(f"ETL-Pflichtausgaben fehlen: {fehlend}") return run_dir # ------------------------------------------------------------------ # 3) LOAD — CSVs nach DuckDB, atomarer Austausch # ------------------------------------------------------------------ @task def load_duckdb(run_dir: str, stand: str, quellen: dict, run_id: str | None = None) -> None: """ Schreibt die CSVs als Tabellen in ein STAGING-Schema des pg_duckdb-Postgres und tauscht nach erfolgreicher Validierung in EINER Transaktion das produktive Schema um (Schema-Rename). Vorteile: * Voll-Refresh => idempotent, keine Duplikate möglich. * Superset (liest aus dem Prod-Schema) sieht nie einen halb- fertigen Zustand — der Tausch ist atomar. Zusätzlich entstehen zwei Dashboard-Tabellen: * etl_metadaten: 1 Zeile — Ladezeitpunkt + Quell-Stände * etl_laufprotokoll: vollständige Laufhistorie Erst nach erfolgreichem Tausch wird der Quell-Stand gespeichert, damit künftige Läufe bei unveränderten Daten skippen. Zugang über Airflow-Connection CONN_ID_DUCKDB (Typ postgres) — keine Credentials im Code. """ import pandas as pd from airflow.providers.postgres.hooks.postgres import PostgresHook from sqlalchemy import text ausgabe = Path(run_dir) / "output" hook = PostgresHook(postgres_conn_id=CONN_ID_DUCKDB) engine = hook.get_sqlalchemy_engine() geladen_am = pendulum.now(TZ) q_export = quellen.get("taifun_export", {}) q_pflege = quellen.get("pflege", {}) protokoll_basis = dict( lauf_zeitpunkt=geladen_am.to_datetime_string(), airflow_run_id=run_id, datei_angebote=q_export.get("dateien", "—"), stand_angebote=q_export.get("neuester_stand", ""), datei_zuschlaege=q_pflege.get("dateien", "—"), stand_zuschlaege=q_pflege.get("neuester_stand", ""), ) # Spalten, die zwingend als Text geladen werden (führende Nullen, IDs) def _text_dtypes(spalten: list[str]) -> dict: return {s: "string" for s in TEXT_SPALTEN if s in spalten} # ---- 1) Staging-Schema frisch aufbauen ---- with engine.begin() as con: con.execute(text(f"DROP SCHEMA IF EXISTS {DB_SCHEMA_STAGING} CASCADE")) con.execute(text(f"CREATE SCHEMA {DB_SCHEMA_STAGING}")) # ---- 2) CSVs ins Staging schreiben + validieren ---- zeilen: dict[str, int] = {} for csv_name, tabelle in CSV_TABELLEN.items(): pfad = ausgabe / csv_name if not pfad.exists(): # Optionale Ausgabe fehlt -> leere Tabelle anlegen with engine.begin() as con: con.execute(text( f"CREATE TABLE {DB_SCHEMA_STAGING}.{tabelle} " f"(leer BOOLEAN)")) zeilen[tabelle] = 0 print(f"{tabelle}: keine Quelldatei — leere Tabelle") continue kopf = pd.read_csv(pfad, nrows=0, encoding="utf-8-sig").columns.tolist() df = pd.read_csv(pfad, encoding="utf-8-sig", dtype=_text_dtypes(kopf)) df.to_sql(tabelle, engine, schema=DB_SCHEMA_STAGING, if_exists="replace", index=False, chunksize=5000, method="multi") n = len(df) zeilen[tabelle] = n print(f"{tabelle}: {n} Zeilen") if n < MINDEST_ZEILEN.get(tabelle, 0): _protokolliere( **protokoll_basis, ereignis="validierung_fehlgeschlagen", bemerkung=f"{tabelle} hat nur {n} Zeilen — " f"Prod-Schema nicht getauscht", ) raise AirflowFailException( f"Validierung fehlgeschlagen: {tabelle} hat nur {n} " f"Zeilen — Prod-Schema wird NICHT getauscht." ) # ---- 3) etl_metadaten (1 Zeile) ---- meta = pd.DataFrame([{ "geladen_am": geladen_am.to_datetime_string(), "anzahl_export_dateien": q_export.get("anzahl", 0), "export_dateien": q_export.get("dateien", "—"), "export_neuester_stand": q_export.get("neuester_stand") or None, "anzahl_pflege_dateien": q_pflege.get("anzahl", 0), "pflege_dateien": q_pflege.get("dateien", "—"), "pflege_neuester_stand": q_pflege.get("neuester_stand") or None, }]) meta["geladen_am"] = pd.to_datetime(meta["geladen_am"]) meta.to_sql("etl_metadaten", engine, schema=DB_SCHEMA_STAGING, if_exists="replace", index=False) # ---- 4) etl_laufprotokoll: aktuellen Lauf protokollieren + laden ---- _protokolliere( **protokoll_basis, ereignis="daten_geladen", zeilen_staging_angebote=zeilen.get("staging_angebote", 0), zeilen_staging_zuschlaege=zeilen.get("staging_zuschlaege", 0), zeilen_v_angebote_karte=zeilen.get("v_angebote_karte", 0), zeilen_v_zuschlaege_karte=zeilen.get("v_zuschlaege_karte", 0), bemerkung="OK", ) log = pd.read_csv(_log_pfad(), dtype={"airflow_run_id": "string"}) log.to_sql("etl_laufprotokoll", engine, schema=DB_SCHEMA_STAGING, if_exists="replace", index=False, chunksize=5000, method="multi") # ---- 5) ATOMARER Schema-Tausch in EINER Transaktion ---- # prod -> alt, staging -> prod, alt verwerfen. Superset sieht # entweder komplett alt oder komplett neu, nie dazwischen. with engine.begin() as con: con.execute(text(f"DROP SCHEMA IF EXISTS {DB_SCHEMA_ALT} CASCADE")) con.execute(text( f"ALTER SCHEMA {DB_SCHEMA_PROD} RENAME TO {DB_SCHEMA_ALT}")) con.execute(text( f"ALTER SCHEMA {DB_SCHEMA_STAGING} RENAME TO {DB_SCHEMA_PROD}")) # Aufräumen außerhalb der kritischen Transaktion with engine.begin() as con: con.execute(text(f"DROP SCHEMA IF EXISTS {DB_SCHEMA_ALT} CASCADE")) print(f"pg_duckdb aktualisiert: Schema {DB_SCHEMA_PROD} getauscht") # Quell-Stand merken => Änderungserkennung für die nächsten Läufe Variable.set(VAR_QUELLSTAND, stand) # ------------------------------------------------------------------ # Abhängigkeiten (TaskFlow leitet sie aus den Datenflüssen ab) # ------------------------------------------------------------------ quelle = extract_nextcloud() nach_geo = geodaten_bereitstellen(quelle["run_dir"]) nach_transform = transform(nach_geo) load_duckdb(nach_transform, quelle["stand"], quelle["quellen"]) hvb_dashboard_etl()