Some checks failed
ETL-QS / etl-tests (push) Failing after 44s
- Pfad-Defaults im DAG auf das Repo-Checkout /opt/airflow/git/current umgestellt (include-Skripte + etl_cache) und Ziel-DB auf analytics_pg_duckdb festgelegt - tests/: Fixture-Generator (>=4 Dateien je Quell-Ordner mit Dubletten/ Edge-Cases) und End-to-End-Runner mit 45 Pruefungen gegen erwartete Ergebnisse, inkl. README - .forgejo/workflows: CI laeuft die ETL-QS bei Aenderungen an ETL-Skript, tests/ oder Geo-Referenz (python:3.12-Container, runs-on docker) - .gitignore: .venv_test/ und generierte tests/fixtures/ ausgeschlossen Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
271 lines
12 KiB
Python
271 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
run_tests.py
|
|
------------
|
|
End-to-End-Qualitätssicherung für den HVB-ETL-Prozess.
|
|
|
|
Ablauf:
|
|
1. Fixtures erzeugen (tests/generate_fixtures.py).
|
|
2. Arbeitsverzeichnis aufbauen: ETL-Skript + Geo-Cache + input/ kopieren
|
|
(genau die Struktur, die der DAG zur Laufzeit anlegt).
|
|
3. include/02_etl_angebote_zuschlaege.py ausführen.
|
|
4. Erzeugte output/*.csv laden und gegen ERWARTETE Ergebnisse prüfen.
|
|
|
|
Jede Prüfung gibt PASS/FAIL aus; am Ende ein Gesamtergebnis. Exit-Code 0
|
|
nur, wenn alle Prüfungen bestanden sind (CI-tauglich).
|
|
|
|
Aufruf: python tests/run_tests.py
|
|
"""
|
|
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
import pandas as pd
|
|
|
|
TESTS = Path(__file__).parent
|
|
REPO = TESTS.parent
|
|
ETL_SKRIPT = REPO / "include" / "02_etl_angebote_zuschlaege.py"
|
|
GEO_CSV = REPO / "etl_cache" / "geo_plz_koordinaten.csv"
|
|
FIXTURES_INPUT = TESTS / "fixtures" / "input"
|
|
|
|
# Textspalten, die beim Wieder-Einlesen der CSVs als String erzwungen werden
|
|
# (führende Nullen / IDs), analog zur DAG-Ladelogik (TEXT_SPALTEN):
|
|
TEXT_DTYPE = {c: str for c in (
|
|
"plz", "plz_kunde", "plz_standort", "nummer", "debitor", "zuschlag_nr",
|
|
"gebot_nr", "projektnummer", "aktuellste_nummer", "kunde_schluessel")}
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Mini-Prüfgerüst
|
|
# ----------------------------------------------------------------------
|
|
class Pruefer:
|
|
def __init__(self):
|
|
self.bestanden = 0
|
|
self.fehler = 0
|
|
|
|
def pruefe(self, beschreibung, ist, erwartet):
|
|
ok = ist == erwartet
|
|
marke = "PASS" if ok else "FAIL"
|
|
print(f" [{marke}] {beschreibung}")
|
|
if not ok:
|
|
print(f" erwartet: {erwartet!r}")
|
|
print(f" erhalten: {ist!r}")
|
|
self.fehler += 1
|
|
else:
|
|
self.bestanden += 1
|
|
return ok
|
|
|
|
def abschnitt(self, titel):
|
|
print(f"\n=== {titel} ===")
|
|
|
|
|
|
def lade_csv(output: Path, name: str) -> pd.DataFrame:
|
|
return pd.read_csv(output / name, dtype=TEXT_DTYPE, encoding="utf-8-sig")
|
|
|
|
|
|
def main() -> int:
|
|
if not ETL_SKRIPT.exists():
|
|
sys.exit(f"ETL-Skript nicht gefunden: {ETL_SKRIPT}")
|
|
|
|
# 1) Fixtures erzeugen
|
|
print("Erzeuge Fixtures ...")
|
|
subprocess.run([sys.executable, str(TESTS / "generate_fixtures.py")],
|
|
check=True)
|
|
|
|
# 2) Arbeitsverzeichnis wie zur DAG-Laufzeit aufbauen
|
|
workdir = Path(tempfile.mkdtemp(prefix="hvb_etl_test_"))
|
|
(workdir / "output").mkdir()
|
|
shutil.copy2(ETL_SKRIPT, workdir / ETL_SKRIPT.name)
|
|
shutil.copytree(FIXTURES_INPUT, workdir / "input")
|
|
if GEO_CSV.exists():
|
|
shutil.copy2(GEO_CSV, workdir / "output" / "geo_plz_koordinaten.csv")
|
|
else:
|
|
print(f"WARNUNG: Geo-Cache fehlt ({GEO_CSV}) — Geo-Prüfungen entfallen.")
|
|
|
|
# 3) ETL ausführen
|
|
print("\nFühre ETL aus ...\n" + "-" * 60)
|
|
ergebnis = subprocess.run(
|
|
[sys.executable, ETL_SKRIPT.name], cwd=workdir,
|
|
capture_output=True, text=True)
|
|
print(ergebnis.stdout)
|
|
if ergebnis.returncode != 0:
|
|
print(ergebnis.stderr, file=sys.stderr)
|
|
sys.exit(f"ETL fehlgeschlagen (Exit {ergebnis.returncode})")
|
|
print("-" * 60)
|
|
|
|
output = workdir / "output"
|
|
ang = lade_csv(output, "staging_angebote.csv")
|
|
hist = lade_csv(output, "staging_status_historie.csv")
|
|
zus = lade_csv(output, "staging_zuschlaege.csv")
|
|
v_ang = lade_csv(output, "v_angebote_karte.csv")
|
|
v_akt = lade_csv(output, "v_angebote_karte_aktuell.csv")
|
|
v_pipe = lade_csv(output, "v_kunden_pipeline.csv")
|
|
v_zus = lade_csv(output, "v_zuschlaege_karte.csv")
|
|
|
|
p = Pruefer()
|
|
geo_da = (workdir / "output" / "geo_plz_koordinaten.csv").exists()
|
|
|
|
# ------------------------------------------------------------------
|
|
p.abschnitt("Stammdaten / Dedup (taifun_export)")
|
|
# 8 Roh-Zeilen -> 6 eindeutige Angebote; + 3 Archiv-only = 9 gesamt
|
|
p.pruefe("Angebote gesamt = 9 (6 Export + 3 Archiv)", len(ang), 9)
|
|
p.pruefe("herkunft_stamm == 'export' -> 6",
|
|
int((ang["herkunft_stamm"] == "export").sum()), 6)
|
|
p.pruefe("herkunft_stamm == 'archiv' -> 3",
|
|
int((ang["herkunft_stamm"] == "archiv").sum()), 3)
|
|
p.pruefe("Angebotsnummern eindeutig", ang["nummer"].is_unique, True)
|
|
|
|
a1001 = ang[ang["nummer"] == "A-1001"].iloc[0]
|
|
# Jüngster Export-Stand (export_2024_03) gewinnt -> puffer 1000, nicht 800/100
|
|
p.pruefe("A-1001: jüngste Version gewinnt (puffer_m3 = 1000)",
|
|
float(a1001["puffer_m3"]), 1000.0)
|
|
p.pruefe("A-1001: Kunde aus jüngstem Export ('Müller GmbH')",
|
|
a1001["kunde"], "Müller GmbH")
|
|
|
|
# ------------------------------------------------------------------
|
|
p.abschnitt("Archiv-Logik")
|
|
# A-1002 ist im Export -> Archiv-Version wird verworfen (Export-Kunde bleibt)
|
|
p.pruefe("A-1002 stammt aus Export (nicht Archiv)",
|
|
ang[ang["nummer"] == "A-1002"].iloc[0]["herkunft_stamm"], "export")
|
|
p.pruefe("A-1002 trägt Export-Kunde 'Bauer AG'",
|
|
ang[ang["nummer"] == "A-1002"].iloc[0]["kunde"], "Bauer AG")
|
|
# B-9001 nur einmal trotz Dublette über zwei Archiv-Dateien
|
|
p.pruefe("B-9001 genau einmal (Archiv-Dublette entfernt)",
|
|
int((ang["nummer"] == "B-9001").sum()), 1)
|
|
archiv_nummern = set(ang[ang["herkunft_stamm"] == "archiv"]["nummer"])
|
|
p.pruefe("Archiv-Angebote = {B-9001, B-9002, B-9003}",
|
|
archiv_nummern, {"B-9001", "B-9002", "B-9003"})
|
|
|
|
# ------------------------------------------------------------------
|
|
p.abschnitt("Status-Historie & gültiger Status (pflege)")
|
|
# 8 Roh-Events -> 7 nach Dedup (A-1003/Auftrag doppelt)
|
|
p.pruefe("Status-Events nach Dedup = 7", len(hist), 7)
|
|
p.pruefe("Status-Historie enthält Waise A-9999",
|
|
"A-9999" in set(hist["nummer"]), True)
|
|
p.pruefe("Waise A-9999 NICHT in Angeboten",
|
|
"A-9999" in set(ang["nummer"]), False)
|
|
|
|
def status(nr):
|
|
return ang[ang["nummer"] == nr].iloc[0]["status"]
|
|
|
|
def ampel(nr):
|
|
return ang[ang["nummer"] == nr].iloc[0]["ampel"]
|
|
|
|
# A-1001: neuestes Datum 2024-03-05, Tie zwischen Verhandlung(3)/Angebot(2)
|
|
# -> höherer Rang gewinnt: Verhandlung
|
|
p.pruefe("A-1001 gültiger Status = Verhandlung (Tie-Break per Rang)",
|
|
status("A-1001"), "Verhandlung")
|
|
p.pruefe("A-1001 Ampel = orange", ampel("A-1001"), "orange")
|
|
# A-1003: Pflege 'Auftrag' überschreibt Export-Abbruchtext
|
|
p.pruefe("A-1003 Status = Auftrag (Pflege schlägt Export)",
|
|
status("A-1003"), "Auftrag")
|
|
p.pruefe("A-1003 Ampel = gruen", ampel("A-1003"), "gruen")
|
|
|
|
# ------------------------------------------------------------------
|
|
p.abschnitt("Status aus Export-Projektnummer (ohne Pflege)")
|
|
# A-1004: gültige Projektnummer P-67890, keine Pflege -> Auftrag
|
|
p.pruefe("A-1004 Status = Auftrag (export_projekt)", status("A-1004"), "Auftrag")
|
|
p.pruefe("A-1004 status_quelle = export_projekt",
|
|
ang[ang["nummer"] == "A-1004"].iloc[0]["status_quelle"],
|
|
"export_projekt")
|
|
# A-1006: Projekttext ohne Muster -> Abbruch -> Abgelehnt
|
|
p.pruefe("A-1006 Status = Abgelehnt (export_projekt Abbruch)",
|
|
status("A-1006"), "Abgelehnt")
|
|
p.pruefe("A-1006 Ampel = schwarz", ampel("A-1006"), "schwarz")
|
|
# A-1005: weder Pflege noch Projekt -> Default Angebot
|
|
p.pruefe("A-1005 Status = Angebot (default)", status("A-1005"), "Angebot")
|
|
|
|
# Status-Verteilung gesamt
|
|
verteilung = ang["status"].value_counts().to_dict()
|
|
p.pruefe("Status-Verteilung",
|
|
{k: int(v) for k, v in verteilung.items()},
|
|
{"Angebot": 5, "Auftrag": 2, "Verhandlung": 1, "Abgelehnt": 1})
|
|
|
|
# ------------------------------------------------------------------
|
|
p.abschnitt("Status-Konflikt-Erkennung")
|
|
# A-1002: Pflege 'Angebot' trotz Projektnummer P-12345 -> Konflikt
|
|
konflikte = set(ang[ang["status_konflikt"].astype(str).str.lower()
|
|
== "true"]["nummer"])
|
|
p.pruefe("Genau A-1002 als Status-Konflikt markiert",
|
|
konflikte, {"A-1002"})
|
|
|
|
# ------------------------------------------------------------------
|
|
p.abschnitt("Adress-Auflösung (Standort vor Kunde)")
|
|
a1004 = ang[ang["nummer"] == "A-1004"].iloc[0]
|
|
p.pruefe("A-1004 nutzt Standort-PLZ 80331 (Kundenadresse leer)",
|
|
a1004["plz"], "80331")
|
|
p.pruefe("A-1004 adress_quelle = standort", a1004["adress_quelle"], "standort")
|
|
a1006 = ang[ang["nummer"] == "A-1006"].iloc[0]
|
|
p.pruefe("A-1006 Auslands-Land = CH", a1006["land"], "CH")
|
|
|
|
# ------------------------------------------------------------------
|
|
p.abschnitt("Kunden-Deduplizierung")
|
|
# Müller GmbH|70001: A-1001 + A-1005 -> 1 Kunde, aktuellstes = A-1005
|
|
mueller = ang[ang["kunde_schluessel"].str.startswith("müller gmbh|70001")]
|
|
p.pruefe("Müller GmbH: 2 Angebote (A-1001, A-1005)", len(mueller), 2)
|
|
p.pruefe("Müller: anzahl_angebote_kunde = 2",
|
|
int(mueller.iloc[0]["anzahl_angebote_kunde"]), 2)
|
|
aktuellstes = mueller[mueller["ist_aktuellstes_angebot"].astype(str)
|
|
.str.lower() == "true"]
|
|
p.pruefe("Müller: aktuellstes Angebot = A-1005 (jüngstes Datum)",
|
|
set(aktuellstes["nummer"]), {"A-1005"})
|
|
p.pruefe("Eindeutige Kunden gesamt = 8",
|
|
ang["kunde_schluessel"].nunique(), 8)
|
|
p.pruefe("v_angebote_karte_aktuell = 8 Zeilen (1 je Kunde)", len(v_akt), 8)
|
|
p.pruefe("v_kunden_pipeline = 8 Kunden", len(v_pipe), 8)
|
|
|
|
# ------------------------------------------------------------------
|
|
p.abschnitt("Views & Geo-Join")
|
|
p.pruefe("v_angebote_karte = 9 Zeilen (alle Angebote)", len(v_ang), 9)
|
|
if geo_da:
|
|
mit_geo = int(v_ang["lat"].notna().sum())
|
|
# A-1006 (Ausland, PLZ unauflösbar) ohne Koordinaten -> 8 von 9
|
|
p.pruefe("v_angebote_karte: 8/9 mit Koordinaten", mit_geo, 8)
|
|
a1004_v = v_ang[v_ang["nummer"] == "A-1004"].iloc[0]
|
|
p.pruefe("A-1004 Bundesland aus Geo = Bayern",
|
|
a1004_v["bundesland"], "Bayern")
|
|
|
|
# ------------------------------------------------------------------
|
|
p.abschnitt("Zuschläge (zwei Blätter, Dedup, Geo)")
|
|
# 6 Roh-Zeilen -> 5 nach Dedup (Z-100/F1 doppelt); 3 eindeutige Zuschläge
|
|
p.pruefe("Zuschlag-Standortzeilen nach Dedup = 5", len(zus), 5)
|
|
p.pruefe("Eindeutige Zuschlags-Nrn = 3", zus["zuschlag_nr"].nunique(), 3)
|
|
z100 = zus[zus["zuschlag_nr"] == "Z-100"]
|
|
p.pruefe("Z-100: 2 Flurstücke (F1, F2)", len(z100), 2)
|
|
p.pruefe("Z-100 Gebotsmenge aus Kompakt-Blatt = 5000",
|
|
float(z100.iloc[0]["gebotsmenge_kw"]), 5000.0)
|
|
p.pruefe("PLZ 04109 mit führender Null erhalten",
|
|
"04109" in set(zus["plz"]), True)
|
|
# Lead-Ampel immer lila
|
|
p.pruefe("v_zuschlaege_karte Ampel durchgängig 'lila'",
|
|
set(v_zus["ampel"]), {"lila"})
|
|
# Flag: PLZ mit eigenem Angebot (PLZ kann mehrfach vorkommen -> dedup)
|
|
flag = (v_zus.drop_duplicates("plz").set_index("plz")
|
|
["plz_mit_eigenem_angebot"].astype(str).str.lower())
|
|
p.pruefe("Z-100 PLZ 10115 hat eigenes Angebot -> True",
|
|
flag.get("10115"), "true")
|
|
p.pruefe("Z-300 PLZ 99999 ohne eigenes Angebot -> False",
|
|
flag.get("99999"), "false")
|
|
if geo_da:
|
|
p.pruefe("v_zuschlaege_karte: 4/5 mit Koordinaten (99999 ohne Geo)",
|
|
int(v_zus["lat"].notna().sum()), 4)
|
|
|
|
# ------------------------------------------------------------------
|
|
print("\n" + "=" * 60)
|
|
gesamt = p.bestanden + p.fehler
|
|
print(f"ERGEBNIS: {p.bestanden}/{gesamt} Prüfungen bestanden, "
|
|
f"{p.fehler} fehlgeschlagen.")
|
|
print(f"(Arbeitsverzeichnis: {workdir})")
|
|
if p.fehler == 0:
|
|
shutil.rmtree(workdir, ignore_errors=True)
|
|
print("Alle Prüfungen bestanden. ✓")
|
|
return 0
|
|
print("Es gab Fehler — Arbeitsverzeichnis bleibt zur Analyse erhalten.")
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|