airflow-dags/dag_hvb_dashboard_etl_1.py
Pascal Beyer 82c393408f
Some checks failed
ETL-QS / etl-tests (push) Failing after 44s
ETL-QS-Suite, CI-Pipeline und korrigierte Pfade/DB
- Pfad-Defaults im DAG auf das Repo-Checkout /opt/airflow/git/current
  umgestellt (include-Skripte + etl_cache) und Ziel-DB auf
  analytics_pg_duckdb festgelegt
- tests/: Fixture-Generator (>=4 Dateien je Quell-Ordner mit Dubletten/
  Edge-Cases) und End-to-End-Runner mit 45 Pruefungen gegen erwartete
  Ergebnisse, inkl. README
- .forgejo/workflows: CI laeuft die ETL-QS bei Aenderungen an ETL-Skript,
  tests/ oder Geo-Referenz (python:3.12-Container, runs-on docker)
- .gitignore: .venv_test/ und generierte tests/fixtures/ ausgeschlossen

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-14 14:11:55 +02:00

638 lines
29 KiB
Python

"""
DAG: hvb_dashboard_etl
======================
ETL-Lauf alle 15 Minuten für das Projekt-Dashboard (Superset auf DuckDB).
Änderungserkennung: Da sich die Excel-Dateien deutlich seltener ändern
als alle 15 Minuten, merkt sich der DAG den Quell-Stand (alle Dateinamen
+ Änderungsdaten über alle Ordner). Ist nichts Neues da, wird der Lauf
nach dem Listing übersprungen (Skip) — Transformation und DB-Tausch
passieren nur bei tatsächlich geänderten Quelldaten.
Ablauf
------
extract_nextcloud ──► geodaten_bereitstellen ──► transform ──► load_duckdb
1. extract_nextcloud: Lädt ALLE Excel-Dateien aus den Entitäts-
Unterordnern per WebDAV (siehe Ordnerstruktur).
2. geodaten_bereitstellen: Stellt die PLZ-Geokodierungsreferenz bereit
(Cache; GeoNames wird höchstens alle 30 Tage
neu geladen — Skript 01_geodaten_holen.py).
3. transform: Führt 02_etl_angebote_zuschlaege.py aus; das
Skript liest je Ordner alle Dateien ein und
vereinigt sie zu den Ziel-CSVs.
4. load_duckdb: Schreibt die CSVs als Tabellen in eine neue
DuckDB-Datei und tauscht sie atomar gegen die
produktive Datei aus (Superset liest read-only).
Nextcloud-Ordnerstruktur (Basisordner = hvb_nextcloud_ordner):
<Basis>/taifun_export/ Taifun-Exporte (mehrere möglich; jüngste je
Angebotsnummer gewinnt — Reihenfolge aus dem
WebDAV-Änderungsdatum)
<Basis>/pflege/ Pflege-Status-Historie (mehrere möglich;
UNION aller Status-Events, gültiger Status =
neuestes Datum, dann höchster Rang)
<Basis>/archiv/ eingefrorene Alt-Stammdaten (optional)
<Basis>/zuschlaege/ BNetzA-Zuschläge (optional)
Pflicht-Ordner sind taifun_export und pflege; archiv und zuschlaege
dürfen fehlen.
Design-Entscheidungen
---------------------
* TaskFlow API statt klassischer Operators: Die gesamte Logik ist reines
Python (WebDAV, Subprozess, DuckDB). TaskFlow spart Boilerplate, macht
die Datenübergabe zwischen Tasks (Arbeitsverzeichnis-Pfad via XCom)
explizit und typisiert, und es gibt keinen Mehrwert durch spezialisierte
Operators (einen gepflegten Nextcloud-/DuckDB-Provider gibt es nicht).
* Idempotenz:
- Pro logischem Datum ein festes Arbeitsverzeichnis; ein erneuter Lauf
desselben Tages überschreibt dieselben Dateien statt zu duplizieren.
- Der Load ist ein Voll-Refresh ins Staging-Schema — mehrfache
Ausführung erzeugt nie Duplikate.
- Das produktive Schema wird erst nach erfolgreicher Validierung in
EINER Transaktion per Schema-Rename getauscht; ein fehlgeschlagener
Lauf lässt das Prod-Schema unangetastet.
* Superset-Kompatibilität: Superset verbindet sich per Postgres-Login auf
dieselbe pg_duckdb-Instanz und liest aus dem Prod-Schema. Da der Tausch
atomar (Schema-Rename in einer Transaktion) erfolgt, sieht Superset nie
einen halbfertigen Zustand.
Benötigte Airflow-Konfiguration
-------------------------------
Connection "nextcloud_webdav" (Typ HTTP):
host = https://cloud.example.de # Nextcloud-Basis-URL
login = hvb_admin
password = <App-Passwort, kein Klartext-Login!>
Connection "duckdb_postgres" (Typ Postgres) — pg_duckdb-Server:
host = duckdb-host # Postgres mit pg_duckdb-Extension
port = 5432
schema = dashboard # Datenbankname (NICHT das SQL-Schema)
login = <rolle>
password = <passwort>
Diese Instanz ist NICHT die Airflow-Metadaten-DB.
Variables:
hvb_nextcloud_ordner = Hans van Bebber (optional, Default s.u.)
hvb_geo_cache = /opt/airflow/git/current/etl_cache/geo_plz_koordinaten.csv
hvb_log_pfad = /opt/airflow/git/current/etl_cache/etl_laufprotokoll.csv
hvb_skripte_pfad = /opt/airflow/git/current/include (Ablageort der 2 Skripte)
Dashboard-Tabellen im Prod-Schema 'dashboard' (zusätzlich zu den Daten):
etl_metadaten 1 Zeile: geladen_am, Dateianzahl + neuester Stand
je Quell-Ordner — für die "Datenstand"-Anzeige
etl_laufprotokoll Historie aller Läufe inkl. Prüfergebnis
Superset-Datenbank-Verbindung (Postgres-Login, Prod-Schema dashboard):
postgresql://<rolle>:<passwort>@duckdb-host:5432/dashboard
Python-Abhängigkeiten im Airflow-Environment:
pandas, openpyxl, requests, sqlalchemy,
apache-airflow-providers-postgres (PostgresHook)
"""
from __future__ import annotations
import csv
import shutil
import subprocess
import sys
import time
import xml.etree.ElementTree as ET
from datetime import datetime, timedelta
from pathlib import Path
from urllib.parse import quote, unquote
import pendulum
import requests
from airflow.decorators import dag, task
from airflow.exceptions import AirflowFailException, AirflowSkipException
from airflow.hooks.base import BaseHook
from airflow.models import Variable
# ----------------------------------------------------------------------
# Konfiguration
# ----------------------------------------------------------------------
TZ = pendulum.timezone("Europe/Berlin")
CONN_ID_NEXTCLOUD = "nextcloud_webdav"
# pg_duckdb: DuckDB läuft als Extension in einem Postgres-Server. Der Lade-
# Task schreibt per SQL (nicht mehr in eine Datei). Die Zugangsdaten liegen
# in dieser Airflow-Connection (Typ postgres), NICHT im Code.
CONN_ID_DUCKDB = "duckdb_postgres"
# Datenbankname auf dem pg_duckdb-Server. Wird explizit gesetzt (überschreibt
# das in der Connection hinterlegte Default-Schema/Database), damit der Lade-
# Task unabhängig von der Connection-Konfiguration immer dieselbe DB anspricht.
DB_NAME = "analytics_pg_duckdb"
# Schema-Namen für den atomaren Tausch: geladen wird ins Staging-Schema,
# danach wird in einer Transaktion gegen das produktive Schema getauscht.
# Superset liest aus DB_SCHEMA_PROD und sieht nie einen Zwischenzustand.
DB_SCHEMA_PROD = "dashboard"
DB_SCHEMA_STAGING = "dashboard_staging"
DB_SCHEMA_ALT = "dashboard_alt"
# Nextcloud-Ordnerstruktur: je Entität ein Unterordner unterhalb des
# Basisordners (Variable hvb_nextcloud_ordner, Default "Hans van Bebber").
# Pro Ordner können MEHRERE Dateien liegen; alle werden heruntergeladen
# und vom ETL-Skript eingelesen und vereinigt. Der Typ ergibt sich aus
# dem Ordner, nicht mehr aus dem Dateinamen.
# taifun_export/ -> Stammdaten (jüngste je Nummer gewinnt)
# pflege/ -> Status-Historie (UNION aller Events)
# archiv/ -> eingefrorene Alt-Stammdaten
# zuschlaege/ -> BNetzA-Zuschläge (optional)
QUELL_ORDNER = ("taifun_export", "pflege", "archiv", "zuschlaege")
SKRIPT_GEODATEN = "01_geodaten_holen.py"
SKRIPT_ETL = "02_etl_angebote_zuschlaege.py"
# CSV-Ausgaben des ETL-Skripts -> Zieltabellen in DuckDB.
# Spalten, die als Text erzwungen werden müssen (führende Nullen, IDs);
# je Datei wird nur angewendet, was im CSV-Header tatsächlich vorkommt:
TEXT_SPALTEN = ("plz", "plz_kunde", "plz_standort", "nummer", "debitor",
"zuschlag_nr", "gebot_nr", "projektnummer",
"aktuellste_nummer", "kunde_schluessel")
CSV_TABELLEN = {
"staging_angebote.csv": "staging_angebote",
"staging_status_historie.csv": "staging_status_historie",
"staging_zuschlaege.csv": "staging_zuschlaege",
"v_angebote_karte.csv": "v_angebote_karte",
"v_angebote_karte_aktuell.csv": "v_angebote_karte_aktuell",
"v_kunden_pipeline.csv": "v_kunden_pipeline",
"v_zuschlaege_karte.csv": "v_zuschlaege_karte",
}
# Sicherheitsnetz: Wird weniger geliefert, wird die produktive DB NICHT
# überschrieben (Schutz vor leeren/kaputten Exporten). Nur Tabellen, die
# immer Daten haben müssen (Zuschläge sind optional).
MINDEST_ZEILEN = {
"staging_angebote": 1,
"v_angebote_karte_aktuell": 1,
"v_kunden_pipeline": 1,
}
GEO_CACHE_MAX_ALTER_TAGE = 30
# Airflow-Variable, in der der zuletzt erfolgreich geladene Quell-Stand
# (Dateiname + Änderungsdatum) gespeichert wird — Basis der
# Änderungserkennung beim 15-Minuten-Takt:
VAR_QUELLSTAND = "hvb_letzter_quellstand"
# Kurzschlüssel je Zieldatei (für XCom/Protokoll):
# Spalten des Laufprotokolls. Das Protokoll lebt als persistente CSV
# (Variable hvb_log_pfad) und wird bei jedem Load vollständig als
# Tabelle 'etl_laufprotokoll' in die DuckDB übernommen — so überlebt
# die Historie den atomaren Austausch der DB-Datei.
LOG_SPALTEN = [
"lauf_zeitpunkt", "airflow_run_id", "ereignis",
"datei_angebote", "stand_angebote",
"datei_zuschlaege", "stand_zuschlaege",
"zeilen_staging_angebote", "zeilen_staging_zuschlaege",
"zeilen_v_angebote_karte", "zeilen_v_zuschlaege_karte",
"bemerkung",
]
def _log_pfad() -> Path:
return Path(Variable.get(
"hvb_log_pfad",
default_var="/opt/airflow/git/current/etl_cache/etl_laufprotokoll.csv",
))
def _protokolliere(**felder) -> None:
"""Hängt eine Zeile an das persistente Laufprotokoll (CSV) an."""
pfad = _log_pfad()
pfad.parent.mkdir(parents=True, exist_ok=True)
neu = not pfad.exists()
with open(pfad, "a", newline="", encoding="utf-8") as f:
w = csv.DictWriter(f, fieldnames=LOG_SPALTEN)
if neu:
w.writeheader()
w.writerow({k: felder.get(k, "") for k in LOG_SPALTEN})
# ----------------------------------------------------------------------
# WebDAV-Hilfsfunktionen (bewusst ohne Zusatz-Library, nur requests)
# ----------------------------------------------------------------------
def _webdav_basis(conn) -> tuple[str, tuple[str, str]]:
"""Baut Basis-URL und Auth aus der Airflow-Connection."""
host = (conn.host or "").rstrip("/")
if not host.startswith("http"):
host = f"https://{host}"
basis = f"{host}/remote.php/dav/files/{conn.login}"
return basis, (conn.login, conn.password)
def _webdav_liste(basis: str, auth, ordner: str) -> list[dict]:
"""PROPFIND (Tiefe 1) — listet Dateien eines Nextcloud-Ordners."""
url = f"{basis}/{quote(ordner)}"
body = (
'<?xml version="1.0"?>'
'<d:propfind xmlns:d="DAV:">'
"<d:prop><d:getlastmodified/><d:getcontentlength/>"
"<d:resourcetype/></d:prop></d:propfind>"
)
resp = requests.request(
"PROPFIND", url, data=body, auth=auth,
headers={"Depth": "1", "Content-Type": "application/xml"},
timeout=60,
)
resp.raise_for_status()
ns = {"d": "DAV:"}
eintraege = []
for response in ET.fromstring(resp.content).findall("d:response", ns):
href = unquote(response.findtext("d:href", default="", namespaces=ns))
prop = response.find(".//d:prop", ns)
ist_ordner = prop.find(".//d:collection", ns) is not None
if ist_ordner:
continue # den Ordner-Eintrag selbst überspringen
geaendert = prop.findtext("d:getlastmodified", default="", namespaces=ns)
eintraege.append({
"name": Path(href).name,
"href": href,
"geaendert": pendulum.from_format(
geaendert, "ddd, DD MMM YYYY HH:mm:ss z"
) if geaendert else pendulum.datetime(1970, 1, 1),
})
return eintraege
# ----------------------------------------------------------------------
# DAG
# ----------------------------------------------------------------------
@dag(
dag_id="hvb_dashboard_etl",
description="Nextcloud-Excel-Exporte -> Python-ETL -> DuckDB (Superset)",
# Alle 15 Minuten; dank Änderungserkennung wird nur dann wirklich
# gerechnet, wenn sich in Nextcloud etwas geändert hat:
schedule="*/15 * * * *",
start_date=pendulum.datetime(2026, 6, 1, tz=TZ),
catchup=False, # keine Nachholläufe — es zählt nur der aktuelle Stand
max_active_runs=1, # keine parallelen Läufe auf dieselbe DB-Datei
default_args={
"owner": "kalle",
"retries": 2, # Fehlerbehandlung: 2 Wiederholungen ...
"retry_delay": timedelta(minutes=5), # ... mit 5 min Abstand ...
"retry_exponential_backoff": True, # ... und wachsendem Backoff
"execution_timeout": timedelta(minutes=30),
},
tags=["hvb", "dashboard", "duckdb", "superset"],
)
def hvb_dashboard_etl():
# ------------------------------------------------------------------
# 1) EXTRACT — Excel-Dateien aus Nextcloud holen
# ------------------------------------------------------------------
@task(retries=4, multiple_outputs=True) # Netzwerk-Task: großzügigere Retries
def extract_nextcloud(ds_nodash: str | None = None,
run_id: str | None = None) -> dict:
"""
Lädt ALLE Excel-Dateien aus den Entitäts-Unterordnern
(taifun_export/, pflege/, archiv/, zuschlaege/) herunter und legt
sie spiegelbildlich unter <run_dir>/input/<ordner>/ ab. Pro Ordner
können mehrere Dateien liegen — das ETL-Skript liest sie ein und
vereinigt sie (Export: jüngste je Nummer; Pflege: UNION).
Die Datei-Änderungsdaten (WebDAV getlastmodified) werden je Datei
in <run_dir>/input/_dateistand.csv abgelegt — daraus leitet das
ETL die Reihenfolge "jünger gewinnt" für die Exporte ab.
Änderungserkennung: Ein Stand-String über ALLE Dateien aller Ordner
wird mit dem zuletzt erfolgreich geladenen Stand verglichen.
Unverändert => Skip (alle Folge-Tasks werden übersprungen).
Idempotenz: Arbeitsverzeichnis an das logische Datum gebunden —
Re-Run desselben Tages überschreibt, statt zu duplizieren.
"""
conn = BaseHook.get_connection(CONN_ID_NEXTCLOUD)
basis, auth = _webdav_basis(conn)
basis_ordner = Variable.get("hvb_nextcloud_ordner",
default_var="Hans van Bebber")
# Je Unterordner alle Excel-Dateien auflisten
gefunden: dict[str, list[dict]] = {}
for unterordner in QUELL_ORDNER:
pfad = f"{basis_ordner}/{unterordner}"
try:
dateien = _webdav_liste(basis, auth, pfad)
except requests.HTTPError as exc:
# archiv/ und zuschlaege/ dürfen fehlen
if unterordner in ("taifun_export", "pflege"):
raise AirflowFailException(
f"Pflicht-Ordner '{pfad}' nicht erreichbar: {exc}")
print(f"Ordner '{pfad}' nicht vorhanden — übersprungen.")
gefunden[unterordner] = []
continue
excels = [d for d in dateien
if d["name"].lower().endswith((".xlsx", ".xlsm"))]
gefunden[unterordner] = excels
print(f"{unterordner}/: {len(excels)} Datei(en)")
if not gefunden.get("taifun_export"):
raise AirflowFailException(
f"Keine Export-Dateien in '{basis_ordner}/taifun_export'.")
# Änderungserkennung über ALLE Dateien (Name@Stand, sortiert).
# Wird erst am Ende von load_duckdb persistiert -> Fehlläufe retryen.
eintraege = []
for unterordner, dateien in gefunden.items():
for d in dateien:
eintraege.append(
f"{unterordner}/{d['name']}@{d['geaendert'].isoformat()}")
stand = "|".join(sorted(eintraege))
# Quell-Übersicht fürs Protokoll/Metadaten (kompakt je Ordner)
quellen = {
unterordner: {
"anzahl": len(dateien),
"dateien": ", ".join(sorted(d["name"] for d in dateien)) or "",
"neuester_stand": (
max(d["geaendert"] for d in dateien).in_tz(TZ)
.to_datetime_string() if dateien else ""),
}
for unterordner, dateien in gefunden.items()
}
if stand == Variable.get(VAR_QUELLSTAND, default_var=None):
_protokolliere(
lauf_zeitpunkt=pendulum.now(TZ).to_datetime_string(),
airflow_run_id=run_id,
ereignis="uebersprungen_unveraendert",
datei_angebote=quellen["taifun_export"]["dateien"],
stand_angebote=quellen["taifun_export"]["neuester_stand"],
datei_zuschlaege=quellen.get("pflege", {}).get("dateien", ""),
stand_zuschlaege=quellen.get("pflege", {}).get("neuester_stand", ""),
bemerkung="Quelldaten unverändert — kein Load",
)
raise AirflowSkipException(
"Quelldaten unverändert seit dem letzten erfolgreichen Lauf.")
run_dir = Path("/tmp/hvb_etl") / ds_nodash
if run_dir.exists():
shutil.rmtree(run_dir) # sauberer Neustart bei Re-Run
(run_dir / "input").mkdir(parents=True)
# Alle Dateien herunterladen + Dateistand-CSV schreiben
stand_zeilen = []
for unterordner, dateien in gefunden.items():
ziel = run_dir / "input" / unterordner
ziel.mkdir(parents=True, exist_ok=True)
for d in dateien:
url = (f"{basis}/{quote(basis_ordner)}/{quote(unterordner)}/"
f"{quote(d['name'])}")
with requests.get(url, auth=auth, stream=True, timeout=300) as r:
r.raise_for_status()
with open(ziel / d["name"], "wb") as f:
for chunk in r.iter_content(chunk_size=1 << 20):
f.write(chunk)
stand_zeilen.append({
"datei": d["name"],
"stand_epoch": d["geaendert"].timestamp(),
})
print(f" {unterordner}/{d['name']} geladen")
import csv as _csv
with open(run_dir / "input" / "_dateistand.csv", "w",
newline="", encoding="utf-8") as f:
w = _csv.DictWriter(f, fieldnames=["datei", "stand_epoch"])
w.writeheader()
w.writerows(stand_zeilen)
return {"run_dir": str(run_dir), "stand": stand, "quellen": quellen}
# ------------------------------------------------------------------
# 2a) Geodaten-Referenz bereitstellen (Cache, Skript 01)
# ------------------------------------------------------------------
@task(retries=4)
def geodaten_bereitstellen(run_dir: str) -> str:
"""
Stellt output/geo_plz_koordinaten.csv im Arbeitsverzeichnis bereit.
PLZ-Koordinaten ändern sich praktisch nie — GeoNames wird daher
nur angefragt, wenn der Cache fehlt oder älter als 30 Tage ist.
"""
run_pfad = Path(run_dir)
skripte = Path(Variable.get("hvb_skripte_pfad",
default_var="/opt/airflow/git/current/include"))
cache = Path(Variable.get("hvb_geo_cache",
default_var="/opt/airflow/git/current/etl_cache/geo_plz_koordinaten.csv"))
ziel = run_pfad / "output" / "geo_plz_koordinaten.csv"
ziel.parent.mkdir(exist_ok=True)
# Cache gilt nur als brauchbar, wenn er aktuell ist UND bereits
# die Spalte 'kreis' enthält (Schema-Erweiterung Landkreis) —
# ein alter Cache ohne Kreis wird automatisch neu aufgebaut.
def _hat_kreis_spalte(pfad: Path) -> bool:
with open(pfad, encoding="utf-8") as f:
return "kreis" in f.readline().split(",")
cache_ok = (
cache.exists()
and (time.time() - cache.stat().st_mtime)
< GEO_CACHE_MAX_ALTER_TAGE * 86400
and _hat_kreis_spalte(cache)
)
if cache_ok:
print(f"Geo-Cache aktuell ({cache}) — kein GeoNames-Download nötig.")
shutil.copy2(cache, ziel)
return run_dir
# Cache fehlt/veraltet: Skript 01 im Arbeitsverzeichnis ausführen.
# (Die Skripte arbeiten relativ zu __file__ und werden deshalb in
# den Run-Ordner kopiert — so bleiben sie selbst unverändert.)
shutil.copy2(skripte / SKRIPT_GEODATEN, run_pfad / SKRIPT_GEODATEN)
subprocess.run(
[sys.executable, SKRIPT_GEODATEN],
cwd=run_pfad, check=True,
)
if not ziel.exists():
raise AirflowFailException("01_geodaten_holen.py hat keine "
"geo_plz_koordinaten.csv erzeugt.")
# Cache aktualisieren
cache.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(ziel, cache)
return run_dir
# ------------------------------------------------------------------
# 2b) TRANSFORM — Skript 02 ausführen
# ------------------------------------------------------------------
@task
def transform(run_dir: str) -> str:
"""
Führt 02_etl_angebote_zuschlaege.py aus. Eingaben liegen unter
<run_dir>/input/<entität>/ (taifun_export, pflege, archiv,
zuschlaege) — je Ordner ggf. mehrere Dateien, die das Skript
einliest und vereinigt. Ausgaben: CSVs in <run_dir>/output/.
"""
run_pfad = Path(run_dir)
skripte = Path(Variable.get("hvb_skripte_pfad",
default_var="/opt/airflow/git/current/include"))
shutil.copy2(skripte / SKRIPT_ETL, run_pfad / SKRIPT_ETL)
ergebnis = subprocess.run(
[sys.executable, SKRIPT_ETL],
cwd=run_pfad, check=False, capture_output=True, text=True,
)
print(ergebnis.stdout)
if ergebnis.returncode != 0:
print(ergebnis.stderr, file=sys.stderr)
raise AirflowFailException(
f"ETL-Skript fehlgeschlagen (Exit {ergebnis.returncode})."
)
# Pflicht-Ausgaben prüfen (Zuschläge/Historie sind optional und
# entfallen, wenn keine Quelldateien vorliegen).
pflicht = ("staging_angebote.csv", "v_angebote_karte.csv",
"v_angebote_karte_aktuell.csv", "v_kunden_pipeline.csv")
fehlend = [n for n in pflicht
if not (run_pfad / "output" / n).exists()]
if fehlend:
raise AirflowFailException(f"ETL-Pflichtausgaben fehlen: {fehlend}")
return run_dir
# ------------------------------------------------------------------
# 3) LOAD — CSVs nach DuckDB, atomarer Austausch
# ------------------------------------------------------------------
@task
def load_duckdb(run_dir: str, stand: str, quellen: dict,
run_id: str | None = None) -> None:
"""
Schreibt die CSVs als Tabellen in ein STAGING-Schema des
pg_duckdb-Postgres und tauscht nach erfolgreicher Validierung in
EINER Transaktion das produktive Schema um (Schema-Rename). Vorteile:
* Voll-Refresh => idempotent, keine Duplikate möglich.
* Superset (liest aus dem Prod-Schema) sieht nie einen halb-
fertigen Zustand — der Tausch ist atomar.
Zusätzlich entstehen zwei Dashboard-Tabellen:
* etl_metadaten: 1 Zeile — Ladezeitpunkt + Quell-Stände
* etl_laufprotokoll: vollständige Laufhistorie
Erst nach erfolgreichem Tausch wird der Quell-Stand gespeichert,
damit künftige Läufe bei unveränderten Daten skippen.
Zugang über Airflow-Connection CONN_ID_DUCKDB (Typ postgres) —
keine Credentials im Code.
"""
import pandas as pd
from airflow.providers.postgres.hooks.postgres import PostgresHook
from sqlalchemy import text
ausgabe = Path(run_dir) / "output"
hook = PostgresHook(postgres_conn_id=CONN_ID_DUCKDB)
engine = hook.get_sqlalchemy_engine()
geladen_am = pendulum.now(TZ)
q_export = quellen.get("taifun_export", {})
q_pflege = quellen.get("pflege", {})
protokoll_basis = dict(
lauf_zeitpunkt=geladen_am.to_datetime_string(),
airflow_run_id=run_id,
datei_angebote=q_export.get("dateien", ""),
stand_angebote=q_export.get("neuester_stand", ""),
datei_zuschlaege=q_pflege.get("dateien", ""),
stand_zuschlaege=q_pflege.get("neuester_stand", ""),
)
# Spalten, die zwingend als Text geladen werden (führende Nullen, IDs)
def _text_dtypes(spalten: list[str]) -> dict:
return {s: "string" for s in TEXT_SPALTEN if s in spalten}
# ---- 1) Staging-Schema frisch aufbauen ----
with engine.begin() as con:
con.execute(text(f"DROP SCHEMA IF EXISTS {DB_SCHEMA_STAGING} CASCADE"))
con.execute(text(f"CREATE SCHEMA {DB_SCHEMA_STAGING}"))
# ---- 2) CSVs ins Staging schreiben + validieren ----
zeilen: dict[str, int] = {}
for csv_name, tabelle in CSV_TABELLEN.items():
pfad = ausgabe / csv_name
if not pfad.exists():
# Optionale Ausgabe fehlt -> leere Tabelle anlegen
with engine.begin() as con:
con.execute(text(
f"CREATE TABLE {DB_SCHEMA_STAGING}.{tabelle} "
f"(leer BOOLEAN)"))
zeilen[tabelle] = 0
print(f"{tabelle}: keine Quelldatei — leere Tabelle")
continue
kopf = pd.read_csv(pfad, nrows=0, encoding="utf-8-sig").columns.tolist()
df = pd.read_csv(pfad, encoding="utf-8-sig",
dtype=_text_dtypes(kopf))
df.to_sql(tabelle, engine, schema=DB_SCHEMA_STAGING,
if_exists="replace", index=False, chunksize=5000,
method="multi")
n = len(df)
zeilen[tabelle] = n
print(f"{tabelle}: {n} Zeilen")
if n < MINDEST_ZEILEN.get(tabelle, 0):
_protokolliere(
**protokoll_basis,
ereignis="validierung_fehlgeschlagen",
bemerkung=f"{tabelle} hat nur {n} Zeilen — "
f"Prod-Schema nicht getauscht",
)
raise AirflowFailException(
f"Validierung fehlgeschlagen: {tabelle} hat nur {n} "
f"Zeilen — Prod-Schema wird NICHT getauscht."
)
# ---- 3) etl_metadaten (1 Zeile) ----
meta = pd.DataFrame([{
"geladen_am": geladen_am.to_datetime_string(),
"anzahl_export_dateien": q_export.get("anzahl", 0),
"export_dateien": q_export.get("dateien", ""),
"export_neuester_stand": q_export.get("neuester_stand") or None,
"anzahl_pflege_dateien": q_pflege.get("anzahl", 0),
"pflege_dateien": q_pflege.get("dateien", ""),
"pflege_neuester_stand": q_pflege.get("neuester_stand") or None,
}])
meta["geladen_am"] = pd.to_datetime(meta["geladen_am"])
meta.to_sql("etl_metadaten", engine, schema=DB_SCHEMA_STAGING,
if_exists="replace", index=False)
# ---- 4) etl_laufprotokoll: aktuellen Lauf protokollieren + laden ----
_protokolliere(
**protokoll_basis,
ereignis="daten_geladen",
zeilen_staging_angebote=zeilen.get("staging_angebote", 0),
zeilen_staging_zuschlaege=zeilen.get("staging_zuschlaege", 0),
zeilen_v_angebote_karte=zeilen.get("v_angebote_karte", 0),
zeilen_v_zuschlaege_karte=zeilen.get("v_zuschlaege_karte", 0),
bemerkung="OK",
)
log = pd.read_csv(_log_pfad(), dtype={"airflow_run_id": "string"})
log.to_sql("etl_laufprotokoll", engine, schema=DB_SCHEMA_STAGING,
if_exists="replace", index=False, chunksize=5000,
method="multi")
# ---- 5) ATOMARER Schema-Tausch in EINER Transaktion ----
# prod -> alt, staging -> prod, alt verwerfen. Superset sieht
# entweder komplett alt oder komplett neu, nie dazwischen.
with engine.begin() as con:
con.execute(text(f"DROP SCHEMA IF EXISTS {DB_SCHEMA_ALT} CASCADE"))
con.execute(text(
f"ALTER SCHEMA {DB_SCHEMA_PROD} RENAME TO {DB_SCHEMA_ALT}"))
con.execute(text(
f"ALTER SCHEMA {DB_SCHEMA_STAGING} RENAME TO {DB_SCHEMA_PROD}"))
# Aufräumen außerhalb der kritischen Transaktion
with engine.begin() as con:
con.execute(text(f"DROP SCHEMA IF EXISTS {DB_SCHEMA_ALT} CASCADE"))
print(f"pg_duckdb aktualisiert: Schema {DB_SCHEMA_PROD} getauscht")
# Quell-Stand merken => Änderungserkennung für die nächsten Läufe
Variable.set(VAR_QUELLSTAND, stand)
# ------------------------------------------------------------------
# Abhängigkeiten (TaskFlow leitet sie aus den Datenflüssen ab)
# ------------------------------------------------------------------
quelle = extract_nextcloud()
nach_geo = geodaten_bereitstellen(quelle["run_dir"])
nach_transform = transform(nach_geo)
load_duckdb(nach_transform, quelle["stand"], quelle["quellen"])
hvb_dashboard_etl()