633 lines
28 KiB
Python
633 lines
28 KiB
Python
"""
|
|
DAG: hvb_dashboard_etl
|
|
======================
|
|
ETL-Lauf alle 15 Minuten für das Projekt-Dashboard (Superset auf DuckDB).
|
|
|
|
Änderungserkennung: Da sich die Excel-Dateien deutlich seltener ändern
|
|
als alle 15 Minuten, merkt sich der DAG den Quell-Stand (alle Dateinamen
|
|
+ Änderungsdaten über alle Ordner). Ist nichts Neues da, wird der Lauf
|
|
nach dem Listing übersprungen (Skip) — Transformation und DB-Tausch
|
|
passieren nur bei tatsächlich geänderten Quelldaten.
|
|
|
|
Ablauf
|
|
------
|
|
extract_nextcloud ──► geodaten_bereitstellen ──► transform ──► load_duckdb
|
|
|
|
1. extract_nextcloud: Lädt ALLE Excel-Dateien aus den Entitäts-
|
|
Unterordnern per WebDAV (siehe Ordnerstruktur).
|
|
2. geodaten_bereitstellen: Stellt die PLZ-Geokodierungsreferenz bereit
|
|
(Cache; GeoNames wird höchstens alle 30 Tage
|
|
neu geladen — Skript 01_geodaten_holen.py).
|
|
3. transform: Führt 02_etl_angebote_zuschlaege.py aus; das
|
|
Skript liest je Ordner alle Dateien ein und
|
|
vereinigt sie zu den Ziel-CSVs.
|
|
4. load_duckdb: Schreibt die CSVs als Tabellen in eine neue
|
|
DuckDB-Datei und tauscht sie atomar gegen die
|
|
produktive Datei aus (Superset liest read-only).
|
|
|
|
Nextcloud-Ordnerstruktur (Basisordner = hvb_nextcloud_ordner):
|
|
<Basis>/taifun_export/ Taifun-Exporte (mehrere möglich; jüngste je
|
|
Angebotsnummer gewinnt — Reihenfolge aus dem
|
|
WebDAV-Änderungsdatum)
|
|
<Basis>/pflege/ Pflege-Status-Historie (mehrere möglich;
|
|
UNION aller Status-Events, gültiger Status =
|
|
neuestes Datum, dann höchster Rang)
|
|
<Basis>/archiv/ eingefrorene Alt-Stammdaten (optional)
|
|
<Basis>/zuschlaege/ BNetzA-Zuschläge (optional)
|
|
Pflicht-Ordner sind taifun_export und pflege; archiv und zuschlaege
|
|
dürfen fehlen.
|
|
|
|
Design-Entscheidungen
|
|
---------------------
|
|
* TaskFlow API statt klassischer Operators: Die gesamte Logik ist reines
|
|
Python (WebDAV, Subprozess, DuckDB). TaskFlow spart Boilerplate, macht
|
|
die Datenübergabe zwischen Tasks (Arbeitsverzeichnis-Pfad via XCom)
|
|
explizit und typisiert, und es gibt keinen Mehrwert durch spezialisierte
|
|
Operators (einen gepflegten Nextcloud-/DuckDB-Provider gibt es nicht).
|
|
* Idempotenz:
|
|
- Pro logischem Datum ein festes Arbeitsverzeichnis; ein erneuter Lauf
|
|
desselben Tages überschreibt dieselben Dateien statt zu duplizieren.
|
|
- Der Load ist ein Voll-Refresh ins Staging-Schema — mehrfache
|
|
Ausführung erzeugt nie Duplikate.
|
|
- Das produktive Schema wird erst nach erfolgreicher Validierung in
|
|
EINER Transaktion per Schema-Rename getauscht; ein fehlgeschlagener
|
|
Lauf lässt das Prod-Schema unangetastet.
|
|
* Superset-Kompatibilität: Superset verbindet sich per Postgres-Login auf
|
|
dieselbe pg_duckdb-Instanz und liest aus dem Prod-Schema. Da der Tausch
|
|
atomar (Schema-Rename in einer Transaktion) erfolgt, sieht Superset nie
|
|
einen halbfertigen Zustand.
|
|
|
|
Benötigte Airflow-Konfiguration
|
|
-------------------------------
|
|
Connection "nextcloud_webdav" (Typ HTTP):
|
|
host = https://cloud.example.de # Nextcloud-Basis-URL
|
|
login = hvb_admin
|
|
password = <App-Passwort, kein Klartext-Login!>
|
|
|
|
Connection "duckdb_postgres" (Typ Postgres) — pg_duckdb-Server:
|
|
host = duckdb-host # Postgres mit pg_duckdb-Extension
|
|
port = 5432
|
|
schema = dashboard # Datenbankname (NICHT das SQL-Schema)
|
|
login = <rolle>
|
|
password = <passwort>
|
|
Diese Instanz ist NICHT die Airflow-Metadaten-DB.
|
|
|
|
Variables:
|
|
hvb_nextcloud_ordner = Hans van Bebber (optional, Default s.u.)
|
|
hvb_geo_cache = /opt/airflow/git/current/etl_cache/geo_plz_koordinaten.csv
|
|
hvb_log_pfad = /opt/airflow/git/current/etl_cache/etl_laufprotokoll.csv
|
|
hvb_skripte_pfad = /opt/airflow/git/current/include (Ablageort der 2 Skripte)
|
|
|
|
Dashboard-Tabellen im Prod-Schema 'dashboard' (zusätzlich zu den Daten):
|
|
etl_metadaten 1 Zeile: geladen_am, Dateianzahl + neuester Stand
|
|
je Quell-Ordner — für die "Datenstand"-Anzeige
|
|
etl_laufprotokoll Historie aller Läufe inkl. Prüfergebnis
|
|
|
|
Superset-Datenbank-Verbindung (Postgres-Login, Prod-Schema dashboard):
|
|
postgresql://<rolle>:<passwort>@duckdb-host:5432/dashboard
|
|
|
|
Python-Abhängigkeiten im Airflow-Environment:
|
|
pandas, openpyxl, requests, sqlalchemy,
|
|
apache-airflow-providers-postgres (PostgresHook)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import csv
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import xml.etree.ElementTree as ET
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from urllib.parse import quote, unquote
|
|
|
|
import pendulum
|
|
import requests
|
|
from airflow.decorators import dag, task
|
|
from airflow.exceptions import AirflowFailException, AirflowSkipException
|
|
from airflow.hooks.base import BaseHook
|
|
from airflow.models import Variable
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Konfiguration
|
|
# ----------------------------------------------------------------------
|
|
TZ = pendulum.timezone("Europe/Berlin")
|
|
|
|
CONN_ID_NEXTCLOUD = "nextcloud_webdav"
|
|
# pg_duckdb: DuckDB läuft als Extension in einem Postgres-Server. Der Lade-
|
|
# Task schreibt per SQL (nicht mehr in eine Datei). Die Zugangsdaten liegen
|
|
# in dieser Airflow-Connection (Typ postgres), NICHT im Code.
|
|
CONN_ID_DUCKDB = "duckdb_postgres"
|
|
|
|
# Schema-Namen für den atomaren Tausch: geladen wird ins Staging-Schema,
|
|
# danach wird in einer Transaktion gegen das produktive Schema getauscht.
|
|
# Superset liest aus DB_SCHEMA_PROD und sieht nie einen Zwischenzustand.
|
|
DB_SCHEMA_PROD = "dashboard"
|
|
DB_SCHEMA_STAGING = "dashboard_staging"
|
|
DB_SCHEMA_ALT = "dashboard_alt"
|
|
|
|
# Nextcloud-Ordnerstruktur: je Entität ein Unterordner unterhalb des
|
|
# Basisordners (Variable hvb_nextcloud_ordner, Default "Hans van Bebber").
|
|
# Pro Ordner können MEHRERE Dateien liegen; alle werden heruntergeladen
|
|
# und vom ETL-Skript eingelesen und vereinigt. Der Typ ergibt sich aus
|
|
# dem Ordner, nicht mehr aus dem Dateinamen.
|
|
# taifun_export/ -> Stammdaten (jüngste je Nummer gewinnt)
|
|
# pflege/ -> Status-Historie (UNION aller Events)
|
|
# archiv/ -> eingefrorene Alt-Stammdaten
|
|
# zuschlaege/ -> BNetzA-Zuschläge (optional)
|
|
QUELL_ORDNER = ("taifun_export", "pflege", "archiv", "zuschlaege")
|
|
|
|
SKRIPT_GEODATEN = "01_geodaten_holen.py"
|
|
SKRIPT_ETL = "02_etl_angebote_zuschlaege.py"
|
|
|
|
# CSV-Ausgaben des ETL-Skripts -> Zieltabellen in DuckDB.
|
|
# Spalten, die als Text erzwungen werden müssen (führende Nullen, IDs);
|
|
# je Datei wird nur angewendet, was im CSV-Header tatsächlich vorkommt:
|
|
TEXT_SPALTEN = ("plz", "plz_kunde", "plz_standort", "nummer", "debitor",
|
|
"zuschlag_nr", "gebot_nr", "projektnummer",
|
|
"aktuellste_nummer", "kunde_schluessel")
|
|
CSV_TABELLEN = {
|
|
"staging_angebote.csv": "staging_angebote",
|
|
"staging_status_historie.csv": "staging_status_historie",
|
|
"staging_zuschlaege.csv": "staging_zuschlaege",
|
|
"v_angebote_karte.csv": "v_angebote_karte",
|
|
"v_angebote_karte_aktuell.csv": "v_angebote_karte_aktuell",
|
|
"v_kunden_pipeline.csv": "v_kunden_pipeline",
|
|
"v_zuschlaege_karte.csv": "v_zuschlaege_karte",
|
|
}
|
|
|
|
# Sicherheitsnetz: Wird weniger geliefert, wird die produktive DB NICHT
|
|
# überschrieben (Schutz vor leeren/kaputten Exporten). Nur Tabellen, die
|
|
# immer Daten haben müssen (Zuschläge sind optional).
|
|
MINDEST_ZEILEN = {
|
|
"staging_angebote": 1,
|
|
"v_angebote_karte_aktuell": 1,
|
|
"v_kunden_pipeline": 1,
|
|
}
|
|
|
|
GEO_CACHE_MAX_ALTER_TAGE = 30
|
|
|
|
# Airflow-Variable, in der der zuletzt erfolgreich geladene Quell-Stand
|
|
# (Dateiname + Änderungsdatum) gespeichert wird — Basis der
|
|
# Änderungserkennung beim 15-Minuten-Takt:
|
|
VAR_QUELLSTAND = "hvb_letzter_quellstand"
|
|
|
|
# Kurzschlüssel je Zieldatei (für XCom/Protokoll):
|
|
|
|
# Spalten des Laufprotokolls. Das Protokoll lebt als persistente CSV
|
|
# (Variable hvb_log_pfad) und wird bei jedem Load vollständig als
|
|
# Tabelle 'etl_laufprotokoll' in die DuckDB übernommen — so überlebt
|
|
# die Historie den atomaren Austausch der DB-Datei.
|
|
LOG_SPALTEN = [
|
|
"lauf_zeitpunkt", "airflow_run_id", "ereignis",
|
|
"datei_angebote", "stand_angebote",
|
|
"datei_zuschlaege", "stand_zuschlaege",
|
|
"zeilen_staging_angebote", "zeilen_staging_zuschlaege",
|
|
"zeilen_v_angebote_karte", "zeilen_v_zuschlaege_karte",
|
|
"bemerkung",
|
|
]
|
|
|
|
|
|
def _log_pfad() -> Path:
|
|
return Path(Variable.get(
|
|
"hvb_log_pfad",
|
|
default_var="/opt/airflow/git/current/etl_cache/etl_laufprotokoll.csv",
|
|
))
|
|
|
|
|
|
def _protokolliere(**felder) -> None:
|
|
"""Hängt eine Zeile an das persistente Laufprotokoll (CSV) an."""
|
|
pfad = _log_pfad()
|
|
pfad.parent.mkdir(parents=True, exist_ok=True)
|
|
neu = not pfad.exists()
|
|
with open(pfad, "a", newline="", encoding="utf-8") as f:
|
|
w = csv.DictWriter(f, fieldnames=LOG_SPALTEN)
|
|
if neu:
|
|
w.writeheader()
|
|
w.writerow({k: felder.get(k, "") for k in LOG_SPALTEN})
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# WebDAV-Hilfsfunktionen (bewusst ohne Zusatz-Library, nur requests)
|
|
# ----------------------------------------------------------------------
|
|
def _webdav_basis(conn) -> tuple[str, tuple[str, str]]:
|
|
"""Baut Basis-URL und Auth aus der Airflow-Connection."""
|
|
host = (conn.host or "").rstrip("/")
|
|
if not host.startswith("http"):
|
|
host = f"https://{host}"
|
|
basis = f"{host}/remote.php/dav/files/{conn.login}"
|
|
return basis, (conn.login, conn.password)
|
|
|
|
|
|
def _webdav_liste(basis: str, auth, ordner: str) -> list[dict]:
|
|
"""PROPFIND (Tiefe 1) — listet Dateien eines Nextcloud-Ordners."""
|
|
url = f"{basis}/{quote(ordner)}"
|
|
body = (
|
|
'<?xml version="1.0"?>'
|
|
'<d:propfind xmlns:d="DAV:">'
|
|
"<d:prop><d:getlastmodified/><d:getcontentlength/>"
|
|
"<d:resourcetype/></d:prop></d:propfind>"
|
|
)
|
|
resp = requests.request(
|
|
"PROPFIND", url, data=body, auth=auth,
|
|
headers={"Depth": "1", "Content-Type": "application/xml"},
|
|
timeout=60,
|
|
)
|
|
resp.raise_for_status()
|
|
|
|
ns = {"d": "DAV:"}
|
|
eintraege = []
|
|
for response in ET.fromstring(resp.content).findall("d:response", ns):
|
|
href = unquote(response.findtext("d:href", default="", namespaces=ns))
|
|
prop = response.find(".//d:prop", ns)
|
|
ist_ordner = prop.find(".//d:collection", ns) is not None
|
|
if ist_ordner:
|
|
continue # den Ordner-Eintrag selbst überspringen
|
|
geaendert = prop.findtext("d:getlastmodified", default="", namespaces=ns)
|
|
eintraege.append({
|
|
"name": Path(href).name,
|
|
"href": href,
|
|
"geaendert": pendulum.from_format(
|
|
geaendert, "ddd, DD MMM YYYY HH:mm:ss z"
|
|
) if geaendert else pendulum.datetime(1970, 1, 1),
|
|
})
|
|
return eintraege
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# DAG
|
|
# ----------------------------------------------------------------------
|
|
@dag(
|
|
dag_id="hvb_dashboard_etl",
|
|
description="Nextcloud-Excel-Exporte -> Python-ETL -> DuckDB (Superset)",
|
|
# Alle 15 Minuten; dank Änderungserkennung wird nur dann wirklich
|
|
# gerechnet, wenn sich in Nextcloud etwas geändert hat:
|
|
schedule="*/15 * * * *",
|
|
start_date=pendulum.datetime(2026, 6, 1, tz=TZ),
|
|
catchup=False, # keine Nachholläufe — es zählt nur der aktuelle Stand
|
|
max_active_runs=1, # keine parallelen Läufe auf dieselbe DB-Datei
|
|
default_args={
|
|
"owner": "kalle",
|
|
"retries": 2, # Fehlerbehandlung: 2 Wiederholungen ...
|
|
"retry_delay": timedelta(minutes=5), # ... mit 5 min Abstand ...
|
|
"retry_exponential_backoff": True, # ... und wachsendem Backoff
|
|
"execution_timeout": timedelta(minutes=30),
|
|
},
|
|
tags=["hvb", "dashboard", "duckdb", "superset"],
|
|
)
|
|
def hvb_dashboard_etl():
|
|
|
|
# ------------------------------------------------------------------
|
|
# 1) EXTRACT — Excel-Dateien aus Nextcloud holen
|
|
# ------------------------------------------------------------------
|
|
@task(retries=4, multiple_outputs=True) # Netzwerk-Task: großzügigere Retries
|
|
def extract_nextcloud(ds_nodash: str | None = None,
|
|
run_id: str | None = None) -> dict:
|
|
"""
|
|
Lädt ALLE Excel-Dateien aus den Entitäts-Unterordnern
|
|
(taifun_export/, pflege/, archiv/, zuschlaege/) herunter und legt
|
|
sie spiegelbildlich unter <run_dir>/input/<ordner>/ ab. Pro Ordner
|
|
können mehrere Dateien liegen — das ETL-Skript liest sie ein und
|
|
vereinigt sie (Export: jüngste je Nummer; Pflege: UNION).
|
|
|
|
Die Datei-Änderungsdaten (WebDAV getlastmodified) werden je Datei
|
|
in <run_dir>/input/_dateistand.csv abgelegt — daraus leitet das
|
|
ETL die Reihenfolge "jünger gewinnt" für die Exporte ab.
|
|
|
|
Änderungserkennung: Ein Stand-String über ALLE Dateien aller Ordner
|
|
wird mit dem zuletzt erfolgreich geladenen Stand verglichen.
|
|
Unverändert => Skip (alle Folge-Tasks werden übersprungen).
|
|
|
|
Idempotenz: Arbeitsverzeichnis an das logische Datum gebunden —
|
|
Re-Run desselben Tages überschreibt, statt zu duplizieren.
|
|
"""
|
|
conn = BaseHook.get_connection(CONN_ID_NEXTCLOUD)
|
|
basis, auth = _webdav_basis(conn)
|
|
basis_ordner = Variable.get("hvb_nextcloud_ordner",
|
|
default_var="Hans van Bebber")
|
|
|
|
# Je Unterordner alle Excel-Dateien auflisten
|
|
gefunden: dict[str, list[dict]] = {}
|
|
for unterordner in QUELL_ORDNER:
|
|
pfad = f"{basis_ordner}/{unterordner}"
|
|
try:
|
|
dateien = _webdav_liste(basis, auth, pfad)
|
|
except requests.HTTPError as exc:
|
|
# archiv/ und zuschlaege/ dürfen fehlen
|
|
if unterordner in ("taifun_export", "pflege"):
|
|
raise AirflowFailException(
|
|
f"Pflicht-Ordner '{pfad}' nicht erreichbar: {exc}")
|
|
print(f"Ordner '{pfad}' nicht vorhanden — übersprungen.")
|
|
gefunden[unterordner] = []
|
|
continue
|
|
excels = [d for d in dateien
|
|
if d["name"].lower().endswith((".xlsx", ".xlsm"))]
|
|
gefunden[unterordner] = excels
|
|
print(f"{unterordner}/: {len(excels)} Datei(en)")
|
|
|
|
if not gefunden.get("taifun_export"):
|
|
raise AirflowFailException(
|
|
f"Keine Export-Dateien in '{basis_ordner}/taifun_export'.")
|
|
|
|
# Änderungserkennung über ALLE Dateien (Name@Stand, sortiert).
|
|
# Wird erst am Ende von load_duckdb persistiert -> Fehlläufe retryen.
|
|
eintraege = []
|
|
for unterordner, dateien in gefunden.items():
|
|
for d in dateien:
|
|
eintraege.append(
|
|
f"{unterordner}/{d['name']}@{d['geaendert'].isoformat()}")
|
|
stand = "|".join(sorted(eintraege))
|
|
|
|
# Quell-Übersicht fürs Protokoll/Metadaten (kompakt je Ordner)
|
|
quellen = {
|
|
unterordner: {
|
|
"anzahl": len(dateien),
|
|
"dateien": ", ".join(sorted(d["name"] for d in dateien)) or "—",
|
|
"neuester_stand": (
|
|
max(d["geaendert"] for d in dateien).in_tz(TZ)
|
|
.to_datetime_string() if dateien else ""),
|
|
}
|
|
for unterordner, dateien in gefunden.items()
|
|
}
|
|
|
|
if stand == Variable.get(VAR_QUELLSTAND, default_var=None):
|
|
_protokolliere(
|
|
lauf_zeitpunkt=pendulum.now(TZ).to_datetime_string(),
|
|
airflow_run_id=run_id,
|
|
ereignis="uebersprungen_unveraendert",
|
|
datei_angebote=quellen["taifun_export"]["dateien"],
|
|
stand_angebote=quellen["taifun_export"]["neuester_stand"],
|
|
datei_zuschlaege=quellen.get("pflege", {}).get("dateien", "—"),
|
|
stand_zuschlaege=quellen.get("pflege", {}).get("neuester_stand", ""),
|
|
bemerkung="Quelldaten unverändert — kein Load",
|
|
)
|
|
raise AirflowSkipException(
|
|
"Quelldaten unverändert seit dem letzten erfolgreichen Lauf.")
|
|
|
|
run_dir = Path("/tmp/hvb_etl") / ds_nodash
|
|
if run_dir.exists():
|
|
shutil.rmtree(run_dir) # sauberer Neustart bei Re-Run
|
|
(run_dir / "input").mkdir(parents=True)
|
|
|
|
# Alle Dateien herunterladen + Dateistand-CSV schreiben
|
|
stand_zeilen = []
|
|
for unterordner, dateien in gefunden.items():
|
|
ziel = run_dir / "input" / unterordner
|
|
ziel.mkdir(parents=True, exist_ok=True)
|
|
for d in dateien:
|
|
url = (f"{basis}/{quote(basis_ordner)}/{quote(unterordner)}/"
|
|
f"{quote(d['name'])}")
|
|
with requests.get(url, auth=auth, stream=True, timeout=300) as r:
|
|
r.raise_for_status()
|
|
with open(ziel / d["name"], "wb") as f:
|
|
for chunk in r.iter_content(chunk_size=1 << 20):
|
|
f.write(chunk)
|
|
stand_zeilen.append({
|
|
"datei": d["name"],
|
|
"stand_epoch": d["geaendert"].timestamp(),
|
|
})
|
|
print(f" {unterordner}/{d['name']} geladen")
|
|
|
|
import csv as _csv
|
|
with open(run_dir / "input" / "_dateistand.csv", "w",
|
|
newline="", encoding="utf-8") as f:
|
|
w = _csv.DictWriter(f, fieldnames=["datei", "stand_epoch"])
|
|
w.writeheader()
|
|
w.writerows(stand_zeilen)
|
|
|
|
return {"run_dir": str(run_dir), "stand": stand, "quellen": quellen}
|
|
|
|
# ------------------------------------------------------------------
|
|
# 2a) Geodaten-Referenz bereitstellen (Cache, Skript 01)
|
|
# ------------------------------------------------------------------
|
|
@task(retries=4)
|
|
def geodaten_bereitstellen(run_dir: str) -> str:
|
|
"""
|
|
Stellt output/geo_plz_koordinaten.csv im Arbeitsverzeichnis bereit.
|
|
PLZ-Koordinaten ändern sich praktisch nie — GeoNames wird daher
|
|
nur angefragt, wenn der Cache fehlt oder älter als 30 Tage ist.
|
|
"""
|
|
run_pfad = Path(run_dir)
|
|
skripte = Path(Variable.get("hvb_skripte_pfad",
|
|
default_var="/opt/airflow/git/current/include"))
|
|
cache = Path(Variable.get("hvb_geo_cache",
|
|
default_var="/opt/airflow/git/current/etl_cache/geo_plz_koordinaten.csv"))
|
|
ziel = run_pfad / "output" / "geo_plz_koordinaten.csv"
|
|
ziel.parent.mkdir(exist_ok=True)
|
|
|
|
# Cache gilt nur als brauchbar, wenn er aktuell ist UND bereits
|
|
# die Spalte 'kreis' enthält (Schema-Erweiterung Landkreis) —
|
|
# ein alter Cache ohne Kreis wird automatisch neu aufgebaut.
|
|
def _hat_kreis_spalte(pfad: Path) -> bool:
|
|
with open(pfad, encoding="utf-8") as f:
|
|
return "kreis" in f.readline().split(",")
|
|
|
|
cache_ok = (
|
|
cache.exists()
|
|
and (time.time() - cache.stat().st_mtime)
|
|
< GEO_CACHE_MAX_ALTER_TAGE * 86400
|
|
and _hat_kreis_spalte(cache)
|
|
)
|
|
if cache_ok:
|
|
print(f"Geo-Cache aktuell ({cache}) — kein GeoNames-Download nötig.")
|
|
shutil.copy2(cache, ziel)
|
|
return run_dir
|
|
|
|
# Cache fehlt/veraltet: Skript 01 im Arbeitsverzeichnis ausführen.
|
|
# (Die Skripte arbeiten relativ zu __file__ und werden deshalb in
|
|
# den Run-Ordner kopiert — so bleiben sie selbst unverändert.)
|
|
shutil.copy2(skripte / SKRIPT_GEODATEN, run_pfad / SKRIPT_GEODATEN)
|
|
subprocess.run(
|
|
[sys.executable, SKRIPT_GEODATEN],
|
|
cwd=run_pfad, check=True,
|
|
)
|
|
if not ziel.exists():
|
|
raise AirflowFailException("01_geodaten_holen.py hat keine "
|
|
"geo_plz_koordinaten.csv erzeugt.")
|
|
# Cache aktualisieren
|
|
cache.parent.mkdir(parents=True, exist_ok=True)
|
|
shutil.copy2(ziel, cache)
|
|
return run_dir
|
|
|
|
# ------------------------------------------------------------------
|
|
# 2b) TRANSFORM — Skript 02 ausführen
|
|
# ------------------------------------------------------------------
|
|
@task
|
|
def transform(run_dir: str) -> str:
|
|
"""
|
|
Führt 02_etl_angebote_zuschlaege.py aus. Eingaben liegen unter
|
|
<run_dir>/input/<entität>/ (taifun_export, pflege, archiv,
|
|
zuschlaege) — je Ordner ggf. mehrere Dateien, die das Skript
|
|
einliest und vereinigt. Ausgaben: CSVs in <run_dir>/output/.
|
|
"""
|
|
run_pfad = Path(run_dir)
|
|
skripte = Path(Variable.get("hvb_skripte_pfad",
|
|
default_var="/opt/airflow/git/current/include"))
|
|
shutil.copy2(skripte / SKRIPT_ETL, run_pfad / SKRIPT_ETL)
|
|
|
|
ergebnis = subprocess.run(
|
|
[sys.executable, SKRIPT_ETL],
|
|
cwd=run_pfad, check=False, capture_output=True, text=True,
|
|
)
|
|
print(ergebnis.stdout)
|
|
if ergebnis.returncode != 0:
|
|
print(ergebnis.stderr, file=sys.stderr)
|
|
raise AirflowFailException(
|
|
f"ETL-Skript fehlgeschlagen (Exit {ergebnis.returncode})."
|
|
)
|
|
|
|
# Pflicht-Ausgaben prüfen (Zuschläge/Historie sind optional und
|
|
# entfallen, wenn keine Quelldateien vorliegen).
|
|
pflicht = ("staging_angebote.csv", "v_angebote_karte.csv",
|
|
"v_angebote_karte_aktuell.csv", "v_kunden_pipeline.csv")
|
|
fehlend = [n for n in pflicht
|
|
if not (run_pfad / "output" / n).exists()]
|
|
if fehlend:
|
|
raise AirflowFailException(f"ETL-Pflichtausgaben fehlen: {fehlend}")
|
|
return run_dir
|
|
|
|
# ------------------------------------------------------------------
|
|
# 3) LOAD — CSVs nach DuckDB, atomarer Austausch
|
|
# ------------------------------------------------------------------
|
|
@task
|
|
def load_duckdb(run_dir: str, stand: str, quellen: dict,
|
|
run_id: str | None = None) -> None:
|
|
"""
|
|
Schreibt die CSVs als Tabellen in ein STAGING-Schema des
|
|
pg_duckdb-Postgres und tauscht nach erfolgreicher Validierung in
|
|
EINER Transaktion das produktive Schema um (Schema-Rename). Vorteile:
|
|
* Voll-Refresh => idempotent, keine Duplikate möglich.
|
|
* Superset (liest aus dem Prod-Schema) sieht nie einen halb-
|
|
fertigen Zustand — der Tausch ist atomar.
|
|
Zusätzlich entstehen zwei Dashboard-Tabellen:
|
|
* etl_metadaten: 1 Zeile — Ladezeitpunkt + Quell-Stände
|
|
* etl_laufprotokoll: vollständige Laufhistorie
|
|
Erst nach erfolgreichem Tausch wird der Quell-Stand gespeichert,
|
|
damit künftige Läufe bei unveränderten Daten skippen.
|
|
|
|
Zugang über Airflow-Connection CONN_ID_DUCKDB (Typ postgres) —
|
|
keine Credentials im Code.
|
|
"""
|
|
import pandas as pd
|
|
from airflow.providers.postgres.hooks.postgres import PostgresHook
|
|
from sqlalchemy import text
|
|
|
|
ausgabe = Path(run_dir) / "output"
|
|
hook = PostgresHook(postgres_conn_id=CONN_ID_DUCKDB)
|
|
engine = hook.get_sqlalchemy_engine()
|
|
|
|
geladen_am = pendulum.now(TZ)
|
|
q_export = quellen.get("taifun_export", {})
|
|
q_pflege = quellen.get("pflege", {})
|
|
protokoll_basis = dict(
|
|
lauf_zeitpunkt=geladen_am.to_datetime_string(),
|
|
airflow_run_id=run_id,
|
|
datei_angebote=q_export.get("dateien", "—"),
|
|
stand_angebote=q_export.get("neuester_stand", ""),
|
|
datei_zuschlaege=q_pflege.get("dateien", "—"),
|
|
stand_zuschlaege=q_pflege.get("neuester_stand", ""),
|
|
)
|
|
|
|
# Spalten, die zwingend als Text geladen werden (führende Nullen, IDs)
|
|
def _text_dtypes(spalten: list[str]) -> dict:
|
|
return {s: "string" for s in TEXT_SPALTEN if s in spalten}
|
|
|
|
# ---- 1) Staging-Schema frisch aufbauen ----
|
|
with engine.begin() as con:
|
|
con.execute(text(f"DROP SCHEMA IF EXISTS {DB_SCHEMA_STAGING} CASCADE"))
|
|
con.execute(text(f"CREATE SCHEMA {DB_SCHEMA_STAGING}"))
|
|
|
|
# ---- 2) CSVs ins Staging schreiben + validieren ----
|
|
zeilen: dict[str, int] = {}
|
|
for csv_name, tabelle in CSV_TABELLEN.items():
|
|
pfad = ausgabe / csv_name
|
|
if not pfad.exists():
|
|
# Optionale Ausgabe fehlt -> leere Tabelle anlegen
|
|
with engine.begin() as con:
|
|
con.execute(text(
|
|
f"CREATE TABLE {DB_SCHEMA_STAGING}.{tabelle} "
|
|
f"(leer BOOLEAN)"))
|
|
zeilen[tabelle] = 0
|
|
print(f"{tabelle}: keine Quelldatei — leere Tabelle")
|
|
continue
|
|
|
|
kopf = pd.read_csv(pfad, nrows=0, encoding="utf-8-sig").columns.tolist()
|
|
df = pd.read_csv(pfad, encoding="utf-8-sig",
|
|
dtype=_text_dtypes(kopf))
|
|
df.to_sql(tabelle, engine, schema=DB_SCHEMA_STAGING,
|
|
if_exists="replace", index=False, chunksize=5000,
|
|
method="multi")
|
|
n = len(df)
|
|
zeilen[tabelle] = n
|
|
print(f"{tabelle}: {n} Zeilen")
|
|
|
|
if n < MINDEST_ZEILEN.get(tabelle, 0):
|
|
_protokolliere(
|
|
**protokoll_basis,
|
|
ereignis="validierung_fehlgeschlagen",
|
|
bemerkung=f"{tabelle} hat nur {n} Zeilen — "
|
|
f"Prod-Schema nicht getauscht",
|
|
)
|
|
raise AirflowFailException(
|
|
f"Validierung fehlgeschlagen: {tabelle} hat nur {n} "
|
|
f"Zeilen — Prod-Schema wird NICHT getauscht."
|
|
)
|
|
|
|
# ---- 3) etl_metadaten (1 Zeile) ----
|
|
meta = pd.DataFrame([{
|
|
"geladen_am": geladen_am.to_datetime_string(),
|
|
"anzahl_export_dateien": q_export.get("anzahl", 0),
|
|
"export_dateien": q_export.get("dateien", "—"),
|
|
"export_neuester_stand": q_export.get("neuester_stand") or None,
|
|
"anzahl_pflege_dateien": q_pflege.get("anzahl", 0),
|
|
"pflege_dateien": q_pflege.get("dateien", "—"),
|
|
"pflege_neuester_stand": q_pflege.get("neuester_stand") or None,
|
|
}])
|
|
meta["geladen_am"] = pd.to_datetime(meta["geladen_am"])
|
|
meta.to_sql("etl_metadaten", engine, schema=DB_SCHEMA_STAGING,
|
|
if_exists="replace", index=False)
|
|
|
|
# ---- 4) etl_laufprotokoll: aktuellen Lauf protokollieren + laden ----
|
|
_protokolliere(
|
|
**protokoll_basis,
|
|
ereignis="daten_geladen",
|
|
zeilen_staging_angebote=zeilen.get("staging_angebote", 0),
|
|
zeilen_staging_zuschlaege=zeilen.get("staging_zuschlaege", 0),
|
|
zeilen_v_angebote_karte=zeilen.get("v_angebote_karte", 0),
|
|
zeilen_v_zuschlaege_karte=zeilen.get("v_zuschlaege_karte", 0),
|
|
bemerkung="OK",
|
|
)
|
|
log = pd.read_csv(_log_pfad(), dtype={"airflow_run_id": "string"})
|
|
log.to_sql("etl_laufprotokoll", engine, schema=DB_SCHEMA_STAGING,
|
|
if_exists="replace", index=False, chunksize=5000,
|
|
method="multi")
|
|
|
|
# ---- 5) ATOMARER Schema-Tausch in EINER Transaktion ----
|
|
# prod -> alt, staging -> prod, alt verwerfen. Superset sieht
|
|
# entweder komplett alt oder komplett neu, nie dazwischen.
|
|
with engine.begin() as con:
|
|
con.execute(text(f"DROP SCHEMA IF EXISTS {DB_SCHEMA_ALT} CASCADE"))
|
|
con.execute(text(
|
|
f"ALTER SCHEMA {DB_SCHEMA_PROD} RENAME TO {DB_SCHEMA_ALT}"))
|
|
con.execute(text(
|
|
f"ALTER SCHEMA {DB_SCHEMA_STAGING} RENAME TO {DB_SCHEMA_PROD}"))
|
|
# Aufräumen außerhalb der kritischen Transaktion
|
|
with engine.begin() as con:
|
|
con.execute(text(f"DROP SCHEMA IF EXISTS {DB_SCHEMA_ALT} CASCADE"))
|
|
print(f"pg_duckdb aktualisiert: Schema {DB_SCHEMA_PROD} getauscht")
|
|
|
|
# Quell-Stand merken => Änderungserkennung für die nächsten Läufe
|
|
Variable.set(VAR_QUELLSTAND, stand)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Abhängigkeiten (TaskFlow leitet sie aus den Datenflüssen ab)
|
|
# ------------------------------------------------------------------
|
|
quelle = extract_nextcloud()
|
|
nach_geo = geodaten_bereitstellen(quelle["run_dir"])
|
|
nach_transform = transform(nach_geo)
|
|
load_duckdb(nach_transform, quelle["stand"], quelle["quellen"])
|
|
|
|
|
|
hvb_dashboard_etl()
|