#!/usr/bin/env python3 """ 02_etl_angebote_zuschlaege.py ----------------------------- Beladung aus DREI Quell-Ordnern (je Entität ein Ordner; pro Ordner können MEHRERE Dateien liegen, die zunächst eingelesen und vereinigt werden): input/taifun_export/ Taifun-Angebotsexporte (Blatt "Tabelle 1"). Mehrere Dateien werden vereinigt; bei gleicher Angebotsnummer gewinnt die Zeile aus dem JÜNGSTEN Export (Reihenfolge = Datei-Änderungsdatum, das der DAG je Datei in input/_dateistand.csv ablegt; fehlt diese Info, gilt die mtime der Datei). input/pflege/ Pflege-Status-Historie (Blatt "Status_Historie"). Mehrere Dateien -> UNION aller Status-Events, exakte Dubletten (Nummer+Datum+Status) entfernt. Gültiger Status je Angebot = neuestes status_datum, bei Gleichstand höchster Status-Rang. input/archiv/ Eingefrorene Stammdaten der Alt-Angebote, die im Export fehlen (Blatt "Archiv_Stammdaten"). Erzeugt in output/ die Tabellen/Views als CSV: staging_angebote.csv alle Angebote (Stammdaten + gültiger Status) staging_status_historie.csv komplette Status-Historie (für Verlaufs- auswertungen in Superset) staging_zuschlaege.csv EEG-Zuschläge (Detail-Blatt) v_angebote_karte.csv Kartenfertige View (ALLE Angebote) v_angebote_karte_aktuell.csv NUR aktuellstes Angebot je Kunde v_kunden_pipeline.csv Pipeline je Kunde (Top-10-Basis) v_zuschlaege_karte.csv Lead-View (Ampel: lila) Status & Ampel sind deckungsgleich: Kontakt/Lead -> lila | Angebot -> gelb | Verhandlung -> orange Auftrag -> gruen | Abgelehnt -> schwarz Voraussetzung: vorher 01_geodaten_holen.py (erzeugt output/geo_plz_koordinaten.csv). Fehlt die Geo-Datei, laufen die Views ohne Koordinaten durch. Aufruf: python 02_etl_angebote_zuschlaege.py """ import re from datetime import datetime from pathlib import Path import pandas as pd BASIS = Path(__file__).parent INPUT = BASIS / "input" ORD_EXPORT = INPUT / "taifun_export" ORD_PFLEGE = INPUT / "pflege" ORD_ARCHIV = INPUT / "archiv" DATEISTAND_CSV = INPUT / "_dateistand.csv" # vom DAG geschrieben (optional) OUTPUT = BASIS / "output" GEO_CSV = OUTPUT / "geo_plz_koordinaten.csv" BLATT_EXPORT = "Tabelle 1" BLATT_PFLEGE = "Status_Historie" BLATT_ARCHIV = "Archiv_Stammdaten" # Status-Rangordnung (für "weiter vorangeschritten" bei Datumsgleichstand) STATUS_RANG = {"Kontakt/Lead": 1, "Angebot": 2, "Verhandlung": 3, "Auftrag": 4, "Abgelehnt": 5} AMPEL = {"Kontakt/Lead": "lila", "Angebot": "gelb", "Verhandlung": "orange", "Auftrag": "gruen", "Abgelehnt": "schwarz"} # Export-Spalten (Exportname -> intern) EXPORT_SPALTEN = { "Nummer": "nummer", "Datum": "datum", "Kunde": "kunde", "Debitor": "debitor", "Beschreibung": "beschreibung", "Bearbeiter": "bearbeiter", "Status": "status_export", "Projekt": "projekt_roh", "Gültig bis": "gueltig_bis", "Kunden-Adresse: PLZ, Ort": "kunde_plz_ort", "Kunden-Adresse: Straße, Nr.": "strasse", "Kunden-Adresse: Land (ISO-Ländercode)": "land", "Kunden-Adresse: Telefon": "telefon", "Kunden-Adresse: Mobiltelefon": "mobil", "Standort-Adresse: PLZ, Ort": "standort_plz_ort", "Wiedervorlage: Datum": "wiedervorlage_export", } PROJEKTNUMMER_MUSTER = re.compile(r"^\s*P[A-Z]{0,3}[-\s/]?\d{4,6}\s*$", re.I) # ---------------------------------------------------------------------- # Hilfsfunktionen # ---------------------------------------------------------------------- def _als_text(wert) -> str | None: if pd.isna(wert): return None if isinstance(wert, float) and wert.is_integer(): wert = int(wert) s = str(wert).strip() return s or None def _gefuellt(wert) -> bool: return pd.notna(wert) and str(wert).strip() != "" def split_plz_ort(wert) -> tuple[str | None, str | None]: if pd.isna(wert): return None, None s = str(wert).strip() m = re.match(r"^(\d{5})\s+(.*)$", s) if m: return m.group(1), m.group(2) return None, (s or None) def _dateien(ordner: Path) -> list[Path]: if not ordner.exists(): return [] return sorted(p for p in ordner.glob("*.xlsx") if not p.name.startswith("~$")) def _stand_je_datei() -> dict[str, float]: """Liest die vom DAG bereitgestellten Datei-Änderungsdaten (WebDAV). Format: Spalten 'datei','stand_epoch'. Fehlt die Datei, wird später auf die lokale mtime zurückgegriffen.""" if not DATEISTAND_CSV.exists(): return {} df = pd.read_csv(DATEISTAND_CSV) return dict(zip(df["datei"].astype(str), df["stand_epoch"].astype(float))) # ---------------------------------------------------------------------- # Stammdaten: Taifun-Export (mehrere Dateien, jüngste gewinnt je Nummer) # ---------------------------------------------------------------------- def lade_export() -> pd.DataFrame: dateien = _dateien(ORD_EXPORT) if not dateien: raise SystemExit(f"FEHLER: keine Export-Dateien in {ORD_EXPORT}") stand = _stand_je_datei() teile = [] for pfad in dateien: roh = pd.read_excel(pfad, sheet_name=BLATT_EXPORT, header=0) fehlend = [s for s in EXPORT_SPALTEN if s not in roh.columns] if fehlend: raise SystemExit(f"FEHLER: Spalten {fehlend} fehlen in {pfad.name}") df = roh[list(EXPORT_SPALTEN)].rename(columns=EXPORT_SPALTEN).copy() df["_quelldatei"] = pfad.name df["_stand"] = stand.get(pfad.name, pfad.stat().st_mtime) teile.append(df) alle = pd.concat(teile, ignore_index=True) alle = alle.dropna(subset=["nummer"]).copy() alle["nummer"] = alle["nummer"].map(_als_text) # Bei gleicher Nummer gewinnt der jüngste Export (höchster _stand). vorher = len(alle) alle = (alle.sort_values("_stand", ascending=False) .drop_duplicates("nummer", keep="first") .reset_index(drop=True)) print(f"Export: {len(dateien)} Datei(en), {vorher} Zeilen -> " f"{len(alle)} eindeutige Angebote (jüngste je Nummer)") return alle # ---------------------------------------------------------------------- # Status: Pflege-Historie (mehrere Dateien -> UNION, gültigen Status bilden) # ---------------------------------------------------------------------- def lade_pflege_historie() -> pd.DataFrame: dateien = _dateien(ORD_PFLEGE) if not dateien: print(f"WARNUNG: keine Pflege-Dateien in {ORD_PFLEGE} — Status kommt " f"nur aus Export-Projektnummer/Default.") return pd.DataFrame(columns=["nummer", "status_datum", "status"]) teile = [] for pfad in dateien: df = pd.read_excel(pfad, sheet_name=BLATT_PFLEGE, header=0, dtype={"Nummer": str}) df = df.rename(columns={ "Nummer": "nummer", "Status-Datum": "status_datum", "Status": "status", "Netto (EUR)": "netto", "Kontaktart": "kontaktart", "Zuständig": "zustaendig", "Wettbewerber": "wettbewerber", "Abbruchgrund": "abbruch_grund", "Wiedervorlage": "wiedervorlage", "Bemerkungen": "bemerkungen", }) df["_quelldatei"] = pfad.name teile.append(df) hist = pd.concat(teile, ignore_index=True) hist = hist.dropna(subset=["nummer", "status"]).copy() hist["nummer"] = hist["nummer"].map(_als_text) hist["status_datum"] = pd.to_datetime(hist["status_datum"], errors="coerce") if "netto" in hist: hist["netto"] = pd.to_numeric(hist["netto"], errors="coerce") # Exakte Dubletten (z. B. dieselbe Datei zweimal hochgeladen) entfernen vorher = len(hist) hist = hist.drop_duplicates(subset=["nummer", "status_datum", "status"]) print(f"Pflege: {len(dateien)} Datei(en), {vorher} Status-Events -> " f"{len(hist)} nach Dedup, {hist['nummer'].nunique()} Angebote") return hist.reset_index(drop=True) def gueltiger_status(hist: pd.DataFrame) -> pd.DataFrame: """Je Angebot der Eintrag mit neuestem status_datum, bei Gleichstand höchstem Status-Rang.""" if hist.empty: return hist h = hist.copy() h["_rang"] = h["status"].map(STATUS_RANG).fillna(0) h = h.sort_values(["status_datum", "_rang"], ascending=[False, False], na_position="last") gueltig = h.drop_duplicates("nummer", keep="first").drop(columns="_rang") return gueltig.reset_index(drop=True) # ---------------------------------------------------------------------- # Archiv: eingefrorene Stammdaten (mehrere Dateien -> einfache UNION) # ---------------------------------------------------------------------- def lade_archiv() -> pd.DataFrame: teile = [] for pfad in _dateien(ORD_ARCHIV): df = pd.read_excel(pfad, sheet_name=BLATT_ARCHIV, header=0, dtype={"Nummer": str, "Debitor": str}) df = df.rename(columns={ "Nummer": "nummer", "Kunde": "kunde", "Debitor": "debitor", "Datum": "datum", "PLZ, Ort": "kunde_plz_ort", "Beschreibung": "beschreibung", "Telefon": "telefon", "Mobil": "mobil", "E-Mail": "email", "Ansprechpartner": "ansprechpartner", }) teile.append(df) if not teile: return pd.DataFrame(columns=["nummer"]) arch = pd.concat(teile, ignore_index=True).dropna(subset=["nummer"]) arch["nummer"] = arch["nummer"].map(_als_text) arch = arch.drop_duplicates("nummer", keep="first") print(f"Archiv: {len(arch)} eingefrorene Alt-Angebote geladen") return arch.reset_index(drop=True) # ---------------------------------------------------------------------- # Angebote zusammenführen (Stammdaten + gültiger Status) # ---------------------------------------------------------------------- def baue_angebote() -> tuple[pd.DataFrame, pd.DataFrame]: export = lade_export() archiv = lade_archiv() hist = lade_pflege_historie() gueltig = gueltiger_status(hist) # Stammdaten = Export, ergänzt um Archiv-Angebote ohne Export-Zeile nur_archiv = archiv[~archiv["nummer"].isin(set(export["nummer"]))].copy() nur_archiv["herkunft_stamm"] = "archiv" export["herkunft_stamm"] = "export" ang = pd.concat([export, nur_archiv], ignore_index=True) print(f"Stammdaten gesamt: {len(ang)} Angebote " f"({(ang['herkunft_stamm'] == 'export').sum()} Export, " f"{(ang['herkunft_stamm'] == 'archiv').sum()} nur Archiv)") # Adresse: deutscher Standort (Projektort) hat Vorrang vor Kundenadresse ks = ang["kunde_plz_ort"].map(split_plz_ort) ang["plz_kunde"] = ks.map(lambda t: t[0]) ang["ort_kunde"] = ks.map(lambda t: t[1]) if "standort_plz_ort" in ang: ss = ang["standort_plz_ort"].map(split_plz_ort) ang["plz_standort"] = ss.map(lambda t: t[0]) ang["ort_standort"] = ss.map(lambda t: t[1]) else: ang["plz_standort"] = None ang["ort_standort"] = None hat_standort = ang["plz_standort"].notna() ang["plz"] = ang["plz_standort"].where(hat_standort, ang["plz_kunde"]) ang["ort"] = ang["ort_standort"].where(hat_standort, ang["ort_kunde"]) ang["adress_quelle"] = hat_standort.map({True: "standort", False: "kunde"}) if "land" not in ang: ang["land"] = None ang["land"] = ang["land"].fillna("DE") # Pufferspeichergröße aus der Beschreibung ang["puffer_m3"] = (ang["beschreibung"].astype(str) .str.extract(r"(\d{2,5})\s*[m,./]", expand=False) .astype("Float64")) for spalte in ("datum", "gueltig_bis"): if spalte in ang: ang[spalte] = pd.to_datetime(ang[spalte], errors="coerce") # Projektnummer aus Export interpretieren def interp(wert): s = _als_text(wert) if s is None: return pd.Series([None, None]) if PROJEKTNUMMER_MUSTER.match(s): return pd.Series([s, None]) return pd.Series([None, s]) if "projekt_roh" in ang: ang[["projektnummer", "projekt_abbruch"]] = ang["projekt_roh"].apply(interp) else: ang["projektnummer"] = None ang["projekt_abbruch"] = None # ---- Gültigen Status zuspielen und Status-Quelle bestimmen ---- spalten_pflege = ["nummer", "status", "status_datum", "netto", "kontaktart", "zustaendig", "wettbewerber", "abbruch_grund", "wiedervorlage", "bemerkungen"] vorhanden = [s for s in spalten_pflege if s in gueltig.columns] ang = ang.merge(gueltig[vorhanden], on="nummer", how="left") def finaler_status(z): if _gefuellt(z.get("status")): return z["status"], "pflege" if _gefuellt(z.get("projektnummer")): return "Auftrag", "export_projekt" if _gefuellt(z.get("projekt_abbruch")): return "Abgelehnt", "export_projekt" return "Angebot", "default" res = ang.apply(lambda z: pd.Series(finaler_status(z), index=["status_final", "status_quelle"]), axis=1) ang[["status_final", "status_quelle"]] = res # Widerspruchs-Kennzeichen: Pflege sagt nicht-Auftrag, Export hat Projektnr. ang["status_konflikt"] = ( (ang["status_quelle"] == "pflege") & ang["projektnummer"].map(_gefuellt) & (ang["status"] != "Auftrag") ) if ang["status_konflikt"].any(): print(f" WARNUNG: {ang['status_konflikt'].sum()} Angebote mit " f"Status-Konflikt (Pflege ≠ Auftrag trotz Projektnummer)") ang["status"] = ang["status_final"] ang["ampel"] = ang["status"].map(AMPEL).fillna("gelb") # Abbruchgrund/Wettbewerber: Pflege bevorzugt, sonst Export-Projekttext if "abbruch_grund" not in ang: ang["abbruch_grund"] = None ang["abbruch_grund"] = ang["abbruch_grund"].fillna(ang["projekt_abbruch"]) if "wettbewerber" not in ang: ang["wettbewerber"] = None # ---- letzter Kontakt / Tage seit letztem Kontakt ---- # status_datum (aus Pflege) gilt als letzter Kontakt, sonst Angebotsdatum ang["letzter_kontakt"] = ang.get("status_datum") if "letzter_kontakt" not in ang or ang["letzter_kontakt"].isna().all(): ang["letzter_kontakt"] = pd.NaT ang["letzter_kontakt"] = pd.to_datetime( ang["letzter_kontakt"], errors="coerce").fillna(ang["datum"]) heute = pd.Timestamp(datetime.now().date()) ang["tage_seit_kontakt"] = (heute - ang["letzter_kontakt"]).dt.days # ---- Kunden-Deduplizierung: (kunde, debitor), Name des neuesten Angebots ---- ang["debitor"] = ang["debitor"].map(_als_text) ang["kunde_schluessel"] = ( ang["kunde"].astype(str).str.strip().str.lower() + "|" + ang["debitor"].fillna("")) ang = ang.sort_values(["kunde_schluessel", "datum", "nummer"], ascending=[True, False, False], na_position="last" ).reset_index(drop=True) ang["ist_aktuellstes_angebot"] = ~ang.duplicated("kunde_schluessel") ang["anzahl_angebote_kunde"] = ( ang.groupby("kunde_schluessel")["nummer"].transform("count")) print(f"Kunden (dedupliziert): {ang['kunde_schluessel'].nunique()} " f"bei {len(ang)} Angeboten") print("Status-/Ampel-Verteilung:") print(ang["status"].value_counts().to_string()) return ang, hist # ---------------------------------------------------------------------- # Zuschläge # ---------------------------------------------------------------------- def baue_zuschlaege() -> pd.DataFrame: dateien = _dateien(INPUT / "zuschlaege") if not dateien: print("Hinweis: keine Zuschläge-Dateien — Zuschlags-View entfällt.") return pd.DataFrame() teile = [] for pfad in dateien: zus = pd.read_excel(pfad, sheet_name="Zuschläge_Detailliert", header=3, dtype={"Postleitzahl": str}) zus.columns = ["bieter", "gebot_nr", "zuschlag_nr", "bundesland", "landkreis", "plz", "gemeinde", "gemarkung", "flurstueck"] zus = zus.dropna(subset=["zuschlag_nr"]).copy() zus["plz"] = zus["plz"].astype(str).str.strip().str.zfill(5) komp = pd.read_excel(pfad, sheet_name="Zuschläge_Kompakt", header=3) komp = komp.rename(columns={"Zuschlags-Nr": "zuschlag_nr", "Gebotsmenge": "gebotsmenge_kw"}) komp = komp[["zuschlag_nr", "gebotsmenge_kw"]] teile.append(zus.merge(komp, on="zuschlag_nr", how="left")) zus = pd.concat(teile, ignore_index=True).drop_duplicates( subset=["zuschlag_nr", "flurstueck"]) print(f"Zuschläge: {len(zus)} Standortzeilen, " f"{zus['zuschlag_nr'].nunique()} Zuschläge") return zus # ---------------------------------------------------------------------- # Geo-Join und Views # ---------------------------------------------------------------------- def lade_geo() -> pd.DataFrame | None: if not GEO_CSV.exists(): print(f"WARNUNG: {GEO_CSV} fehlt — Views ohne Koordinaten.") return None geo = pd.read_csv(GEO_CSV, dtype={"plz": str}) if "kreis" not in geo.columns: geo["kreis"] = None return geo[["plz", "lat", "lon", "bundesland", "kreis"]].rename( columns={"bundesland": "geo_bundesland", "kreis": "geo_kreis"}) def baue_view_angebote(ang: pd.DataFrame, geo: pd.DataFrame | None) -> pd.DataFrame: spalten = [ "nummer", "datum", "kunde", "debitor", "kunde_schluessel", "bearbeiter", "beschreibung", "puffer_m3", "netto", "status", "ampel", "status_quelle", "status_konflikt", "projektnummer", "abbruch_grund", "wettbewerber", "kontaktart", "zustaendig", "anzahl_angebote_kunde", "ist_aktuellstes_angebot", "letzter_kontakt", "tage_seit_kontakt", "wiedervorlage", "gueltig_bis", "plz", "ort", "land", "adress_quelle", "herkunft_stamm", "telefon", "mobil", ] view = ang[[s for s in spalten if s in ang.columns]].copy() view["jahr"] = view["datum"].dt.year view["netto_fehlt"] = view["netto"].isna() if "netto" in view else True if "netto" in view: view["netto"] = view["netto"].fillna(0).round(2) if geo is not None: view = view.merge(geo, on="plz", how="left") view = view.rename(columns={"geo_bundesland": "bundesland", "geo_kreis": "landkreis"}) falsch = view["land"].ne("DE") & view["adress_quelle"].ne("standort") view.loc[falsch, ["lat", "lon", "bundesland", "landkreis"]] = None print(f"v_angebote_karte: {view['lat'].notna().sum()}/{len(view)} " f"mit Koordinaten ({falsch.sum()} Auslands-Angebote ausgenommen)") return view def baue_view_kunden_pipeline(view: pd.DataFrame) -> pd.DataFrame: pipeline = (view.groupby("kunde_schluessel", as_index=False).agg( anzahl_angebote=("nummer", "count"), davon_offen=("status", lambda s: int((s == "Angebot").sum())), summe_netto=("netto", "sum"), erstes_angebot=("datum", "min"), letztes_angebot=("datum", "max"), min_tage_seit_kontakt=("tage_seit_kontakt", "min"), )) pipeline["summe_netto"] = pipeline["summe_netto"].round(2) neueste = view[view["ist_aktuellstes_angebot"]] kand = ["kunde_schluessel", "kunde", "debitor", "nummer", "status", "ampel", "bearbeiter", "plz", "ort", "lat", "lon", "bundesland", "landkreis"] vorh = [s for s in kand if s in neueste.columns] pipeline = pipeline.merge( neueste[vorh].rename(columns={ "kunde": "kunde_anzeige", # Name des neuesten Angebots "nummer": "aktuellste_nummer", "status": "aktueller_status", "ampel": "aktuelle_ampel"}), on="kunde_schluessel", how="left") top = pipeline.nlargest(3, "anzahl_angebote")[["kunde_anzeige", "anzahl_angebote"]] print(f"v_kunden_pipeline: {len(pipeline)} Kunden; Top 3: " f"{top.to_dict('records')}") return pipeline def baue_view_zuschlaege(zus: pd.DataFrame, ang: pd.DataFrame, geo: pd.DataFrame | None) -> pd.DataFrame: if zus.empty: return zus view = zus[["zuschlag_nr", "bieter", "bundesland", "landkreis", "plz", "gemeinde", "gebotsmenge_kw"]].copy() view["ampel"] = "lila" eigene = set(ang["plz"].dropna()) view["plz_mit_eigenem_angebot"] = view["plz"].isin(eigene) if geo is not None: view = view.merge(geo.drop(columns=["geo_bundesland", "geo_kreis"]), on="plz", how="left") print(f"v_zuschlaege_karte: {view['lat'].notna().sum()}/{len(view)} " f"mit Koordinaten") return view # ---------------------------------------------------------------------- def main() -> None: OUTPUT.mkdir(exist_ok=True) ang, hist = baue_angebote() zus = baue_zuschlaege() geo = lade_geo() v_ang = baue_view_angebote(ang, geo) v_ang_aktuell = v_ang[v_ang["ist_aktuellstes_angebot"]].copy() v_pipeline = baue_view_kunden_pipeline(v_ang) v_zus = baue_view_zuschlaege(zus, ang, geo) print(f"v_angebote_karte_aktuell: {len(v_ang_aktuell)} Kunden " f"(dedupliziert aus {len(v_ang)})") exporte = { "staging_angebote.csv": ang, "staging_status_historie.csv": hist, "staging_zuschlaege.csv": zus, "v_angebote_karte.csv": v_ang, "v_angebote_karte_aktuell.csv": v_ang_aktuell, "v_kunden_pipeline.csv": v_pipeline, "v_zuschlaege_karte.csv": v_zus, } for name, df in exporte.items(): if df is None or df.empty: print(f"übersprungen (leer): {name}") continue pfad = OUTPUT / name df.to_csv(pfad, index=False, encoding="utf-8-sig", date_format="%Y-%m-%d") print(f"geschrieben: {pfad} ({len(df)} Zeilen)") if __name__ == "__main__": main()