ETL-QS-Suite, CI-Pipeline und korrigierte Pfade/DB
Some checks failed
ETL-QS / etl-tests (push) Failing after 44s

- Pfad-Defaults im DAG auf das Repo-Checkout /opt/airflow/git/current
  umgestellt (include-Skripte + etl_cache) und Ziel-DB auf
  analytics_pg_duckdb festgelegt
- tests/: Fixture-Generator (>=4 Dateien je Quell-Ordner mit Dubletten/
  Edge-Cases) und End-to-End-Runner mit 45 Pruefungen gegen erwartete
  Ergebnisse, inkl. README
- .forgejo/workflows: CI laeuft die ETL-QS bei Aenderungen an ETL-Skript,
  tests/ oder Geo-Referenz (python:3.12-Container, runs-on docker)
- .gitignore: .venv_test/ und generierte tests/fixtures/ ausgeschlossen

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Pascal Beyer 2026-06-14 14:11:55 +02:00
parent 9cacfd7ae2
commit 82c393408f
8 changed files with 691 additions and 0 deletions

View file

@ -0,0 +1,34 @@
name: ETL-QS
# Läuft bei Pushes und Pull Requests, sobald ETL-Logik, Tests oder die
# Geo-Referenz angefasst werden. So bleibt der Lauf schnell und gezielt.
on:
push:
paths:
- "include/02_etl_angebote_zuschlaege.py"
- "tests/**"
- "etl_cache/geo_plz_koordinaten.csv"
- ".forgejo/workflows/etl-tests.yml"
pull_request:
paths:
- "include/02_etl_angebote_zuschlaege.py"
- "tests/**"
- "etl_cache/geo_plz_koordinaten.csv"
- ".forgejo/workflows/etl-tests.yml"
jobs:
etl-tests:
runs-on: docker
# python:3.12 (Debian) bringt git für den Checkout mit; das ETL-Skript
# benötigt Python >= 3.10 (str | None-Syntax).
container:
image: python:3.12
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Test-Abhängigkeiten installieren
run: pip install --no-cache-dir -r tests/requirements.txt
- name: ETL-QS ausführen (Fixtures erzeugen, ETL laufen, prüfen)
run: python tests/run_tests.py

1
.gitignore vendored
View file

@ -125,6 +125,7 @@ celerybeat.pid
# Environments # Environments
.env .env
.venv .venv
.venv_test/
env/ env/
venv/ venv/
ENV/ ENV/

View file

@ -121,6 +121,11 @@ CONN_ID_NEXTCLOUD = "nextcloud_webdav"
# in dieser Airflow-Connection (Typ postgres), NICHT im Code. # in dieser Airflow-Connection (Typ postgres), NICHT im Code.
CONN_ID_DUCKDB = "duckdb_postgres" CONN_ID_DUCKDB = "duckdb_postgres"
# Datenbankname auf dem pg_duckdb-Server. Wird explizit gesetzt (überschreibt
# das in der Connection hinterlegte Default-Schema/Database), damit der Lade-
# Task unabhängig von der Connection-Konfiguration immer dieselbe DB anspricht.
DB_NAME = "analytics_pg_duckdb"
# Schema-Namen für den atomaren Tausch: geladen wird ins Staging-Schema, # Schema-Namen für den atomaren Tausch: geladen wird ins Staging-Schema,
# danach wird in einer Transaktion gegen das produktive Schema getauscht. # danach wird in einer Transaktion gegen das produktive Schema getauscht.
# Superset liest aus DB_SCHEMA_PROD und sieht nie einen Zwischenzustand. # Superset liest aus DB_SCHEMA_PROD und sieht nie einen Zwischenzustand.

2
tests/.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
# Generierte Test-Eingabedaten (per generate_fixtures.py reproduzierbar)
fixtures/

101
tests/README.md Normal file
View file

@ -0,0 +1,101 @@
# Qualitätssicherung HVB-ETL
Tests für die Transformationslogik in
[`include/02_etl_angebote_zuschlaege.py`](../include/02_etl_angebote_zuschlaege.py).
Die Suite erzeugt deterministische Excel-Eingabedaten (≥4 Dateien je
Quell-Ordner, inkl. Dubletten und Edge-Cases), führt das ETL-Skript real aus
und prüft die erzeugten Output-CSVs gegen exakt berechnete Erwartungswerte.
## Ausführen
```bash
# Einmalig: Umgebung (Python ≥ 3.10, das ETL nutzt `str | None`-Syntax)
python3.12 -m venv .venv_test
.venv_test/bin/pip install pandas openpyxl
# Komplette QS (erzeugt Fixtures, läuft ETL, prüft Ergebnisse):
.venv_test/bin/python tests/run_tests.py
```
Exit-Code `0` = alle Prüfungen bestanden (CI-tauglich). Bei Fehlern bleibt das
temporäre Arbeitsverzeichnis zur Analyse erhalten; bei Erfolg wird es gelöscht.
Nur die Eingabedaten erzeugen (ohne ETL/Prüfung):
```bash
.venv_test/bin/python tests/generate_fixtures.py # -> tests/fixtures/input/
```
## Dateien
| Datei | Zweck |
|-------|-------|
| `generate_fixtures.py` | Erzeugt die Excel-Testdaten (4 Dateien/Ordner). |
| `run_tests.py` | Baut Workdir, führt ETL aus, 45 Prüfungen. |
| `fixtures/` | Generierte Eingabedaten (nicht versioniert). |
## Testdatensatz (Soll-Bild)
9 Angebote (6 aus Export, 3 nur aus Archiv), 8 eindeutige Kunden, 3 Zuschläge.
### `taifun_export/` — 4 Dateien, „jüngster Export gewinnt"
- **A-1001** liegt in 3 Dateien (Stände 500/2000/4000 via `_dateistand.csv`).
Erwartung: jüngste Version (Stand 4000) gewinnt → Beschreibung „1000 m³",
Kunde „Müller GmbH". 8 Rohzeilen → **6 eindeutige** Angebote.
- **A-1004**: Kundenadresse leer, nur Standort „80331 München" →
Standortadresse hat Vorrang (`adress_quelle = standort`).
- **A-1006**: Land `CH`, PLZ „1010 Wien" (kein dt. Format) → kein Geo-Treffer.
### `pflege/` — 4 Dateien, UNION + gültiger Status
- 8 Status-Events → **7 nach Dedup** (A-1003/Auftrag exakt doppelt).
- **A-1001**: neuestes Datum 2024-03-05 mit Gleichstand „Verhandlung" (Rang 3)
vs. „Angebot" (Rang 2) → **höherer Rang gewinnt: Verhandlung** (Ampel orange).
- **A-1003**: Pflege „Auftrag" überschreibt Export-Abbruchtext → Auftrag (grün).
- **A-9999**: Waisen-Status ohne Angebot → erscheint in `staging_status_historie`,
**nicht** in `staging_angebote`.
### `archiv/` — 4 Dateien, Alt-Angebote ergänzen
- **B-9001/9002/9003** nicht im Export → werden als `herkunft_stamm = archiv`
ergänzt. **B-9001** über zwei Dateien dupliziert → nur einmal.
- **A-1002** ist bereits im Export → Archiv-Version wird **verworfen**
(Export-Kunde „Bauer AG" bleibt, nicht „Bauer AG ARCHIV").
### Status-Ableitung (ohne Pflege)
- **A-1004**: gültige Projektnummer `P-67890`, keine Pflege → `Auftrag`
(`status_quelle = export_projekt`).
- **A-1006**: Projekttext „kein Interesse" (kein Muster) → `Abgelehnt` (schwarz).
- **A-1005**: weder Pflege noch Projekt → Default `Angebot` (gelb).
- **Status-Konflikt**: A-1002 hat Projektnummer `P-12345`, Pflege sagt aber
„Angebot" → `status_konflikt = True` (genau 1 Konflikt).
### Kunden-Dedup
- „Müller GmbH | 70001" = A-1001 + A-1005 → **1 Kunde**, aktuellstes Angebot =
A-1005 (jüngstes Datum). Gesamt **8 eindeutige Kunden**.
### `zuschlaege/` — 4 Dateien, zwei Blätter, Kopfzeile in Zeile 4
- Blätter `Zuschläge_Detailliert` + `Zuschläge_Kompakt` (jeweils `header=3`).
- 6 Detailzeilen → **5 nach Dedup** je (Zuschlags-Nr, Flurstück); Z-100/F1
doppelt. **3 eindeutige Zuschläge**.
- Gebotsmenge wird aus dem Kompakt-Blatt gemerged (Z-100 → 5000).
- PLZ **04109** behält die führende Null.
- `plz_mit_eigenem_angebot`: 10115 (eigenes Angebot) → True; 99999 → False.
- Lead-Ampel durchgängig `lila`.
## Erwartete Kennzahlen (Übersicht)
| Output-CSV | Zeilen | Kernaussage |
|------------|-------:|-------------|
| `staging_angebote` | 9 | 6 Export + 3 Archiv, Nummern eindeutig |
| `staging_status_historie` | 7 | inkl. Waise A-9999, Dublette entfernt |
| `staging_zuschlaege` | 5 | 3 eindeutige Zuschläge |
| `v_angebote_karte` | 9 | 8/9 mit Koordinaten (A-1006 Ausland ohne) |
| `v_angebote_karte_aktuell` | 8 | 1 Zeile je Kunde |
| `v_kunden_pipeline` | 8 | Top-Kunde Müller GmbH (2 Angebote) |
| `v_zuschlaege_karte` | 5 | 4/5 mit Koordinaten, Ampel lila |
Status-Verteilung: `Angebot 5, Auftrag 2, Verhandlung 1, Abgelehnt 1`.
> Hinweis: Die Geo-Prüfungen nutzen `etl_cache/geo_plz_koordinaten.csv`. Diese
> Datei hat keine Spalte `kreis`, daher bleibt `landkreis` im Test leer — das
> ist erwartet und wird vom ETL korrekt behandelt.

273
tests/generate_fixtures.py Normal file
View file

@ -0,0 +1,273 @@
#!/usr/bin/env python3
"""
generate_fixtures.py
--------------------
Erzeugt deterministische Test-Eingabedaten für den HVB-ETL-Prozess
(Skript include/02_etl_angebote_zuschlaege.py).
Pro Quell-Ordner werden MINDESTENS 4 Excel-Dateien erzeugt, die gezielt
folgende Fälle abdecken (vollständige Beschreibung in tests/README.md):
taifun_export/ - dieselbe Angebotsnummer in mehreren Dateien
(jüngster Export-Stand gewinnt), Standort- vs.
Kundenadresse, Auslands-Angebot, Projektnummer-Status.
pflege/ - UNION mehrerer Status-Historien, exakte Dubletten,
Tie-Break per Status-Rang, Waisen-Status (ohne Angebot).
archiv/ - Alt-Angebote ergänzen den Export; bereits im Export
vorhandene Nummern werden ignoriert; Datei-Dubletten.
zuschlaege/ - zwei Blätter (Detail/Kompakt) mit Kopfzeile in Zeile 4,
Dedup je (Zuschlags-Nr, Flurstück), PLZ ohne Geo-Treffer.
Aufruf: python tests/generate_fixtures.py
"""
from pathlib import Path
import pandas as pd
BASIS = Path(__file__).parent
INPUT = BASIS / "fixtures" / "input"
# Reale PLZ aus etl_cache/geo_plz_koordinaten.csv (haben Koordinaten):
# 10115 Berlin | 20095 Hamburg | 50667 Köln | 80331 München
# 04109 Leipzig (führende Null!) | 99999 = bewusst OHNE Geo-Treffer
EXPORT_COLS = [
"Nummer", "Datum", "Kunde", "Debitor", "Beschreibung", "Bearbeiter",
"Status", "Projekt", "Gültig bis",
"Kunden-Adresse: PLZ, Ort", "Kunden-Adresse: Straße, Nr.",
"Kunden-Adresse: Land (ISO-Ländercode)", "Kunden-Adresse: Telefon",
"Kunden-Adresse: Mobiltelefon", "Standort-Adresse: PLZ, Ort",
"Wiedervorlage: Datum",
]
def _exp(nummer, datum, kunde, debitor, beschreibung, projekt="",
kunden_plz_ort="", land="DE", standort_plz_ort=""):
return {
"Nummer": nummer, "Datum": datum, "Kunde": kunde, "Debitor": debitor,
"Beschreibung": beschreibung, "Bearbeiter": "Schmidt",
"Status": "", "Projekt": projekt, "Gültig bis": "2099-12-31",
"Kunden-Adresse: PLZ, Ort": kunden_plz_ort,
"Kunden-Adresse: Straße, Nr.": "Teststr. 1",
"Kunden-Adresse: Land (ISO-Ländercode)": land,
"Kunden-Adresse: Telefon": "030-123", "Kunden-Adresse: Mobiltelefon": "",
"Standort-Adresse: PLZ, Ort": standort_plz_ort,
"Wiedervorlage: Datum": "",
}
def schreibe_export():
ordner = INPUT / "taifun_export"
ordner.mkdir(parents=True, exist_ok=True)
# Datei 1 (ältester Stand, siehe _dateistand.csv): A-1001 v0 ("ALT")
df1 = pd.DataFrame([
_exp("A-1001", "2023-12-01", "Müller ALT GmbH", 70001,
"Altanlage 100 m³", kunden_plz_ort="10115 Berlin"),
])
# Datei 2: A-1001 v1, A-1002 (mit echter Projektnummer + Pflege -> Konflikt)
df2 = pd.DataFrame([
_exp("A-1001", "2024-01-10", "Müller GmbH", 70001,
"Pufferspeicher 800 m³", kunden_plz_ort="10115 Berlin"),
_exp("A-1002", "2024-01-20", "Bauer AG", 70002, "Heizung 500 m³",
projekt="P-12345", kunden_plz_ort="20095 Hamburg"),
])
# Datei 3: A-1003 (Pflege Auftrag), A-1004 (Projektnr ohne Pflege -> Auftrag)
df3 = pd.DataFrame([
_exp("A-1003", "2024-02-05", "Klein KG", 70003, "Solaranlage",
projekt="Auftrag verloren", kunden_plz_ort="50667 Köln"),
_exp("A-1004", "2024-02-12", "Lang GmbH", 70004, "Anlage 2000 m³",
projekt="P-67890", kunden_plz_ort="", standort_plz_ort="80331 München"),
])
# Datei 4 (jüngster Stand): A-1001 v2 (gewinnt!), A-1005 (gleicher Kunde
# wie A-1001 -> Kunden-Dedup), A-1006 (Ausland + Abbruch-Projekttext)
df4 = pd.DataFrame([
_exp("A-1001", "2024-03-02", "Müller GmbH", 70001,
"Pufferspeicher 1000 m³ neu", kunden_plz_ort="10115 Berlin"),
_exp("A-1005", "2024-03-15", "Müller GmbH", 70001,
"Erweiterung 1200 m³", kunden_plz_ort="10115 Berlin"),
_exp("A-1006", "2024-03-20", "Swiss AG", 70006, "Anlage CH",
projekt="kein Interesse", kunden_plz_ort="1010 Wien", land="CH"),
])
for name, df in [
("export_00_dup.xlsx", df1),
("export_2024_01.xlsx", df2),
("export_2024_02.xlsx", df3),
("export_2024_03.xlsx", df4),
]:
df[EXPORT_COLS].to_excel(ordner / name, sheet_name="Tabelle 1",
index=False)
# _dateistand.csv: legt die Reihenfolge "jüngster Export gewinnt" fest.
# (Im Echtbetrieb schreibt der DAG diese Datei aus den WebDAV-mtimes.)
stand = pd.DataFrame([
{"datei": "export_00_dup.xlsx", "stand_epoch": 500.0},
{"datei": "export_2024_01.xlsx", "stand_epoch": 2000.0},
{"datei": "export_2024_02.xlsx", "stand_epoch": 3000.0},
{"datei": "export_2024_03.xlsx", "stand_epoch": 4000.0},
])
stand.to_csv(INPUT / "_dateistand.csv", index=False)
print(f"taifun_export: 4 Dateien + _dateistand.csv")
def _pflege(nummer, datum, status, netto=None, kontaktart="Telefon",
zustaendig="Meier"):
return {
"Nummer": nummer, "Status-Datum": datum, "Status": status,
"Netto (EUR)": netto, "Kontaktart": kontaktart, "Zuständig": zustaendig,
"Wettbewerber": "", "Abbruchgrund": "", "Wiedervorlage": "",
"Bemerkungen": "",
}
def schreibe_pflege():
ordner = INPUT / "pflege"
ordner.mkdir(parents=True, exist_ok=True)
df1 = pd.DataFrame([
_pflege("A-1001", "2024-01-15", "Kontakt/Lead", 0),
_pflege("A-1001", "2024-02-20", "Angebot", 50000),
_pflege("A-1002", "2024-02-01", "Angebot", 30000),
])
df2 = pd.DataFrame([
_pflege("A-1001", "2024-03-05", "Verhandlung", 55000),
_pflege("A-1003", "2024-02-10", "Auftrag", 80000),
])
# Datei 3: exakte Dublette von A-1003 (wird entfernt) + Tie-Break-Test:
# A-1001 am selben Datum wie "Verhandlung", aber niedrigerer Rang -> verliert.
df3 = pd.DataFrame([
_pflege("A-1003", "2024-02-10", "Auftrag", 80000),
_pflege("A-1001", "2024-03-05", "Angebot", 52000),
])
# Datei 4: Waisen-Status ohne zugehöriges Angebot (nur in Historie sichtbar)
df4 = pd.DataFrame([
_pflege("A-9999", "2024-01-01", "Angebot", 9999),
])
for name, df in [
("pflege_2024_01.xlsx", df1),
("pflege_2024_02.xlsx", df2),
("pflege_2024_03_dup.xlsx", df3),
("pflege_2024_04_waise.xlsx", df4),
]:
df.to_excel(ordner / name, sheet_name="Status_Historie", index=False)
print("pflege: 4 Dateien")
def _archiv(nummer, kunde, debitor, datum, plz_ort, beschreibung="Alt-Projekt"):
return {
"Nummer": nummer, "Kunde": kunde, "Debitor": debitor, "Datum": datum,
"PLZ, Ort": plz_ort, "Beschreibung": beschreibung,
"Telefon": "0341-9", "Mobil": "", "E-Mail": "alt@example.de",
"Ansprechpartner": "Herr Alt",
}
def schreibe_archiv():
ordner = INPUT / "archiv"
ordner.mkdir(parents=True, exist_ok=True)
df1 = pd.DataFrame([
_archiv("B-9001", "Altkunde Nord", 80001, "2015-05-01", "10115 Berlin"),
])
df2 = pd.DataFrame([
_archiv("B-9002", "Altkunde Süd", 80002, "2016-06-01", "04109 Leipzig"),
])
# Datei 3: A-1002 ist bereits im Export -> wird ignoriert (Export gewinnt);
# B-9001 ist Dublette aus Datei 1 -> wird entfernt.
df3 = pd.DataFrame([
_archiv("A-1002", "Bauer AG ARCHIV", 70002, "2014-01-01", "20095 Hamburg"),
_archiv("B-9001", "Altkunde Nord", 80001, "2015-05-01", "10115 Berlin"),
])
df4 = pd.DataFrame([
_archiv("B-9003", "Altkunde West", 80003, "2017-07-01", "50667 Köln"),
])
for name, df in [
("archiv_1.xlsx", df1),
("archiv_2.xlsx", df2),
("archiv_3_dup.xlsx", df3),
("archiv_4.xlsx", df4),
]:
df.to_excel(ordner / name, sheet_name="Archiv_Stammdaten", index=False)
print("archiv: 4 Dateien")
# Detail-Spalten werden vom ETL positionsbasiert umbenannt; Kopfzeile in Zeile 4
# (header=3). Spaltennamen hier nur informativ — "Postleitzahl" steuert dtype=str.
DETAIL_COLS = ["Bieter", "Gebot-Nr", "Zuschlags-Nr", "Bundesland",
"Landkreis", "Postleitzahl", "Gemeinde", "Gemarkung", "Flurstück"]
def _detail(bieter, gebot, zuschlag, bundesland, kreis, plz, gemeinde,
gemarkung, flurstueck):
return dict(zip(DETAIL_COLS, [bieter, gebot, zuschlag, bundesland, kreis,
plz, gemeinde, gemarkung, flurstueck]))
def _schreibe_zuschlag_datei(pfad, detail_rows, kompakt_rows):
detail = pd.DataFrame(detail_rows, columns=DETAIL_COLS)
kompakt = pd.DataFrame(kompakt_rows, columns=["Zuschlags-Nr", "Gebotsmenge"])
with pd.ExcelWriter(pfad, engine="openpyxl") as xls:
# startrow=3 -> Kopfzeile landet in Zeile 4 (read_excel header=3)
detail.to_excel(xls, sheet_name="Zuschläge_Detailliert",
index=False, startrow=3)
kompakt.to_excel(xls, sheet_name="Zuschläge_Kompakt",
index=False, startrow=3)
def schreibe_zuschlaege():
ordner = INPUT / "zuschlaege"
ordner.mkdir(parents=True, exist_ok=True)
# Datei 1: Z-100 mit zwei Flurstücken (beide bleiben erhalten)
_schreibe_zuschlag_datei(
ordner / "zuschlaege_1.xlsx",
[_detail("WindCo", "G-01", "Z-100", "Berlin", "Kreis A", "10115",
"Berlin", "GemA", "F1"),
_detail("WindCo", "G-01", "Z-100", "Berlin", "Kreis A", "10115",
"Berlin", "GemA", "F2")],
[["Z-100", 5000]],
)
# Datei 2: Z-200 in München (PLZ hat eigenes Angebot -> Flag True)
_schreibe_zuschlag_datei(
ordner / "zuschlaege_2.xlsx",
[_detail("SolarCo", "G-02", "Z-200", "Bayern", "Kreis B", "80331",
"München", "GemB", "F3")],
[["Z-200", 3000]],
)
# Datei 3: exakte Dublette (Z-100, F1) -> wird per Dedup entfernt
_schreibe_zuschlag_datei(
ordner / "zuschlaege_3_dup.xlsx",
[_detail("WindCo", "G-01", "Z-100", "Berlin", "Kreis A", "10115",
"Berlin", "GemA", "F1")],
[["Z-100", 5000]],
)
# Datei 4: Z-300 mit Leipzig (führende Null!) + PLZ ohne Geo-Treffer (99999)
_schreibe_zuschlag_datei(
ordner / "zuschlaege_4.xlsx",
[_detail("HydroCo", "G-03", "Z-300", "Sachsen", "Kreis C", "04109",
"Leipzig", "GemC", "F4"),
_detail("HydroCo", "G-03", "Z-300", "Sachsen", "Kreis C", "99999",
"Nirgendwo", "GemD", "F5")],
[["Z-300", 7000]],
)
print("zuschlaege: 4 Dateien")
def main():
if INPUT.exists():
import shutil
shutil.rmtree(INPUT)
INPUT.mkdir(parents=True)
schreibe_export()
schreibe_pflege()
schreibe_archiv()
schreibe_zuschlaege()
print(f"\nFixtures erzeugt unter: {INPUT}")
if __name__ == "__main__":
main()

4
tests/requirements.txt Normal file
View file

@ -0,0 +1,4 @@
# Abhängigkeiten für die ETL-QS (tests/run_tests.py).
# Versionen wie im Airflow-Environment des ETL-Skripts.
pandas>=2.0
openpyxl>=3.1

271
tests/run_tests.py Normal file
View file

@ -0,0 +1,271 @@
#!/usr/bin/env python3
"""
run_tests.py
------------
End-to-End-Qualitätssicherung für den HVB-ETL-Prozess.
Ablauf:
1. Fixtures erzeugen (tests/generate_fixtures.py).
2. Arbeitsverzeichnis aufbauen: ETL-Skript + Geo-Cache + input/ kopieren
(genau die Struktur, die der DAG zur Laufzeit anlegt).
3. include/02_etl_angebote_zuschlaege.py ausführen.
4. Erzeugte output/*.csv laden und gegen ERWARTETE Ergebnisse prüfen.
Jede Prüfung gibt PASS/FAIL aus; am Ende ein Gesamtergebnis. Exit-Code 0
nur, wenn alle Prüfungen bestanden sind (CI-tauglich).
Aufruf: python tests/run_tests.py
"""
import shutil
import subprocess
import sys
import tempfile
from pathlib import Path
import pandas as pd
TESTS = Path(__file__).parent
REPO = TESTS.parent
ETL_SKRIPT = REPO / "include" / "02_etl_angebote_zuschlaege.py"
GEO_CSV = REPO / "etl_cache" / "geo_plz_koordinaten.csv"
FIXTURES_INPUT = TESTS / "fixtures" / "input"
# Textspalten, die beim Wieder-Einlesen der CSVs als String erzwungen werden
# (führende Nullen / IDs), analog zur DAG-Ladelogik (TEXT_SPALTEN):
TEXT_DTYPE = {c: str for c in (
"plz", "plz_kunde", "plz_standort", "nummer", "debitor", "zuschlag_nr",
"gebot_nr", "projektnummer", "aktuellste_nummer", "kunde_schluessel")}
# ----------------------------------------------------------------------
# Mini-Prüfgerüst
# ----------------------------------------------------------------------
class Pruefer:
def __init__(self):
self.bestanden = 0
self.fehler = 0
def pruefe(self, beschreibung, ist, erwartet):
ok = ist == erwartet
marke = "PASS" if ok else "FAIL"
print(f" [{marke}] {beschreibung}")
if not ok:
print(f" erwartet: {erwartet!r}")
print(f" erhalten: {ist!r}")
self.fehler += 1
else:
self.bestanden += 1
return ok
def abschnitt(self, titel):
print(f"\n=== {titel} ===")
def lade_csv(output: Path, name: str) -> pd.DataFrame:
return pd.read_csv(output / name, dtype=TEXT_DTYPE, encoding="utf-8-sig")
def main() -> int:
if not ETL_SKRIPT.exists():
sys.exit(f"ETL-Skript nicht gefunden: {ETL_SKRIPT}")
# 1) Fixtures erzeugen
print("Erzeuge Fixtures ...")
subprocess.run([sys.executable, str(TESTS / "generate_fixtures.py")],
check=True)
# 2) Arbeitsverzeichnis wie zur DAG-Laufzeit aufbauen
workdir = Path(tempfile.mkdtemp(prefix="hvb_etl_test_"))
(workdir / "output").mkdir()
shutil.copy2(ETL_SKRIPT, workdir / ETL_SKRIPT.name)
shutil.copytree(FIXTURES_INPUT, workdir / "input")
if GEO_CSV.exists():
shutil.copy2(GEO_CSV, workdir / "output" / "geo_plz_koordinaten.csv")
else:
print(f"WARNUNG: Geo-Cache fehlt ({GEO_CSV}) — Geo-Prüfungen entfallen.")
# 3) ETL ausführen
print("\nFühre ETL aus ...\n" + "-" * 60)
ergebnis = subprocess.run(
[sys.executable, ETL_SKRIPT.name], cwd=workdir,
capture_output=True, text=True)
print(ergebnis.stdout)
if ergebnis.returncode != 0:
print(ergebnis.stderr, file=sys.stderr)
sys.exit(f"ETL fehlgeschlagen (Exit {ergebnis.returncode})")
print("-" * 60)
output = workdir / "output"
ang = lade_csv(output, "staging_angebote.csv")
hist = lade_csv(output, "staging_status_historie.csv")
zus = lade_csv(output, "staging_zuschlaege.csv")
v_ang = lade_csv(output, "v_angebote_karte.csv")
v_akt = lade_csv(output, "v_angebote_karte_aktuell.csv")
v_pipe = lade_csv(output, "v_kunden_pipeline.csv")
v_zus = lade_csv(output, "v_zuschlaege_karte.csv")
p = Pruefer()
geo_da = (workdir / "output" / "geo_plz_koordinaten.csv").exists()
# ------------------------------------------------------------------
p.abschnitt("Stammdaten / Dedup (taifun_export)")
# 8 Roh-Zeilen -> 6 eindeutige Angebote; + 3 Archiv-only = 9 gesamt
p.pruefe("Angebote gesamt = 9 (6 Export + 3 Archiv)", len(ang), 9)
p.pruefe("herkunft_stamm == 'export' -> 6",
int((ang["herkunft_stamm"] == "export").sum()), 6)
p.pruefe("herkunft_stamm == 'archiv' -> 3",
int((ang["herkunft_stamm"] == "archiv").sum()), 3)
p.pruefe("Angebotsnummern eindeutig", ang["nummer"].is_unique, True)
a1001 = ang[ang["nummer"] == "A-1001"].iloc[0]
# Jüngster Export-Stand (export_2024_03) gewinnt -> puffer 1000, nicht 800/100
p.pruefe("A-1001: jüngste Version gewinnt (puffer_m3 = 1000)",
float(a1001["puffer_m3"]), 1000.0)
p.pruefe("A-1001: Kunde aus jüngstem Export ('Müller GmbH')",
a1001["kunde"], "Müller GmbH")
# ------------------------------------------------------------------
p.abschnitt("Archiv-Logik")
# A-1002 ist im Export -> Archiv-Version wird verworfen (Export-Kunde bleibt)
p.pruefe("A-1002 stammt aus Export (nicht Archiv)",
ang[ang["nummer"] == "A-1002"].iloc[0]["herkunft_stamm"], "export")
p.pruefe("A-1002 trägt Export-Kunde 'Bauer AG'",
ang[ang["nummer"] == "A-1002"].iloc[0]["kunde"], "Bauer AG")
# B-9001 nur einmal trotz Dublette über zwei Archiv-Dateien
p.pruefe("B-9001 genau einmal (Archiv-Dublette entfernt)",
int((ang["nummer"] == "B-9001").sum()), 1)
archiv_nummern = set(ang[ang["herkunft_stamm"] == "archiv"]["nummer"])
p.pruefe("Archiv-Angebote = {B-9001, B-9002, B-9003}",
archiv_nummern, {"B-9001", "B-9002", "B-9003"})
# ------------------------------------------------------------------
p.abschnitt("Status-Historie & gültiger Status (pflege)")
# 8 Roh-Events -> 7 nach Dedup (A-1003/Auftrag doppelt)
p.pruefe("Status-Events nach Dedup = 7", len(hist), 7)
p.pruefe("Status-Historie enthält Waise A-9999",
"A-9999" in set(hist["nummer"]), True)
p.pruefe("Waise A-9999 NICHT in Angeboten",
"A-9999" in set(ang["nummer"]), False)
def status(nr):
return ang[ang["nummer"] == nr].iloc[0]["status"]
def ampel(nr):
return ang[ang["nummer"] == nr].iloc[0]["ampel"]
# A-1001: neuestes Datum 2024-03-05, Tie zwischen Verhandlung(3)/Angebot(2)
# -> höherer Rang gewinnt: Verhandlung
p.pruefe("A-1001 gültiger Status = Verhandlung (Tie-Break per Rang)",
status("A-1001"), "Verhandlung")
p.pruefe("A-1001 Ampel = orange", ampel("A-1001"), "orange")
# A-1003: Pflege 'Auftrag' überschreibt Export-Abbruchtext
p.pruefe("A-1003 Status = Auftrag (Pflege schlägt Export)",
status("A-1003"), "Auftrag")
p.pruefe("A-1003 Ampel = gruen", ampel("A-1003"), "gruen")
# ------------------------------------------------------------------
p.abschnitt("Status aus Export-Projektnummer (ohne Pflege)")
# A-1004: gültige Projektnummer P-67890, keine Pflege -> Auftrag
p.pruefe("A-1004 Status = Auftrag (export_projekt)", status("A-1004"), "Auftrag")
p.pruefe("A-1004 status_quelle = export_projekt",
ang[ang["nummer"] == "A-1004"].iloc[0]["status_quelle"],
"export_projekt")
# A-1006: Projekttext ohne Muster -> Abbruch -> Abgelehnt
p.pruefe("A-1006 Status = Abgelehnt (export_projekt Abbruch)",
status("A-1006"), "Abgelehnt")
p.pruefe("A-1006 Ampel = schwarz", ampel("A-1006"), "schwarz")
# A-1005: weder Pflege noch Projekt -> Default Angebot
p.pruefe("A-1005 Status = Angebot (default)", status("A-1005"), "Angebot")
# Status-Verteilung gesamt
verteilung = ang["status"].value_counts().to_dict()
p.pruefe("Status-Verteilung",
{k: int(v) for k, v in verteilung.items()},
{"Angebot": 5, "Auftrag": 2, "Verhandlung": 1, "Abgelehnt": 1})
# ------------------------------------------------------------------
p.abschnitt("Status-Konflikt-Erkennung")
# A-1002: Pflege 'Angebot' trotz Projektnummer P-12345 -> Konflikt
konflikte = set(ang[ang["status_konflikt"].astype(str).str.lower()
== "true"]["nummer"])
p.pruefe("Genau A-1002 als Status-Konflikt markiert",
konflikte, {"A-1002"})
# ------------------------------------------------------------------
p.abschnitt("Adress-Auflösung (Standort vor Kunde)")
a1004 = ang[ang["nummer"] == "A-1004"].iloc[0]
p.pruefe("A-1004 nutzt Standort-PLZ 80331 (Kundenadresse leer)",
a1004["plz"], "80331")
p.pruefe("A-1004 adress_quelle = standort", a1004["adress_quelle"], "standort")
a1006 = ang[ang["nummer"] == "A-1006"].iloc[0]
p.pruefe("A-1006 Auslands-Land = CH", a1006["land"], "CH")
# ------------------------------------------------------------------
p.abschnitt("Kunden-Deduplizierung")
# Müller GmbH|70001: A-1001 + A-1005 -> 1 Kunde, aktuellstes = A-1005
mueller = ang[ang["kunde_schluessel"].str.startswith("müller gmbh|70001")]
p.pruefe("Müller GmbH: 2 Angebote (A-1001, A-1005)", len(mueller), 2)
p.pruefe("Müller: anzahl_angebote_kunde = 2",
int(mueller.iloc[0]["anzahl_angebote_kunde"]), 2)
aktuellstes = mueller[mueller["ist_aktuellstes_angebot"].astype(str)
.str.lower() == "true"]
p.pruefe("Müller: aktuellstes Angebot = A-1005 (jüngstes Datum)",
set(aktuellstes["nummer"]), {"A-1005"})
p.pruefe("Eindeutige Kunden gesamt = 8",
ang["kunde_schluessel"].nunique(), 8)
p.pruefe("v_angebote_karte_aktuell = 8 Zeilen (1 je Kunde)", len(v_akt), 8)
p.pruefe("v_kunden_pipeline = 8 Kunden", len(v_pipe), 8)
# ------------------------------------------------------------------
p.abschnitt("Views & Geo-Join")
p.pruefe("v_angebote_karte = 9 Zeilen (alle Angebote)", len(v_ang), 9)
if geo_da:
mit_geo = int(v_ang["lat"].notna().sum())
# A-1006 (Ausland, PLZ unauflösbar) ohne Koordinaten -> 8 von 9
p.pruefe("v_angebote_karte: 8/9 mit Koordinaten", mit_geo, 8)
a1004_v = v_ang[v_ang["nummer"] == "A-1004"].iloc[0]
p.pruefe("A-1004 Bundesland aus Geo = Bayern",
a1004_v["bundesland"], "Bayern")
# ------------------------------------------------------------------
p.abschnitt("Zuschläge (zwei Blätter, Dedup, Geo)")
# 6 Roh-Zeilen -> 5 nach Dedup (Z-100/F1 doppelt); 3 eindeutige Zuschläge
p.pruefe("Zuschlag-Standortzeilen nach Dedup = 5", len(zus), 5)
p.pruefe("Eindeutige Zuschlags-Nrn = 3", zus["zuschlag_nr"].nunique(), 3)
z100 = zus[zus["zuschlag_nr"] == "Z-100"]
p.pruefe("Z-100: 2 Flurstücke (F1, F2)", len(z100), 2)
p.pruefe("Z-100 Gebotsmenge aus Kompakt-Blatt = 5000",
float(z100.iloc[0]["gebotsmenge_kw"]), 5000.0)
p.pruefe("PLZ 04109 mit führender Null erhalten",
"04109" in set(zus["plz"]), True)
# Lead-Ampel immer lila
p.pruefe("v_zuschlaege_karte Ampel durchgängig 'lila'",
set(v_zus["ampel"]), {"lila"})
# Flag: PLZ mit eigenem Angebot (PLZ kann mehrfach vorkommen -> dedup)
flag = (v_zus.drop_duplicates("plz").set_index("plz")
["plz_mit_eigenem_angebot"].astype(str).str.lower())
p.pruefe("Z-100 PLZ 10115 hat eigenes Angebot -> True",
flag.get("10115"), "true")
p.pruefe("Z-300 PLZ 99999 ohne eigenes Angebot -> False",
flag.get("99999"), "false")
if geo_da:
p.pruefe("v_zuschlaege_karte: 4/5 mit Koordinaten (99999 ohne Geo)",
int(v_zus["lat"].notna().sum()), 4)
# ------------------------------------------------------------------
print("\n" + "=" * 60)
gesamt = p.bestanden + p.fehler
print(f"ERGEBNIS: {p.bestanden}/{gesamt} Prüfungen bestanden, "
f"{p.fehler} fehlgeschlagen.")
print(f"(Arbeitsverzeichnis: {workdir})")
if p.fehler == 0:
shutil.rmtree(workdir, ignore_errors=True)
print("Alle Prüfungen bestanden. ✓")
return 0
print("Es gab Fehler — Arbeitsverzeichnis bleibt zur Analyse erhalten.")
return 1
if __name__ == "__main__":
sys.exit(main())