airflow-dags/include/01_geodaten_holen.py
2026-06-14 00:58:09 +02:00

116 lines
3.6 KiB
Python

#!/usr/bin/env python3
"""
01_geodaten_holen.py
--------------------
Lädt die deutschen PLZ-Koordinaten (Geokodierungs-Referenz) herunter und
erzeugt die Datei output/geo_plz_koordinaten.csv mit den Spalten:
plz, lat, lon, ort, bundesland
Primärquelle: GeoNames (https://download.geonames.org/export/zip/DE.zip)
Lizenz CC-BY 4.0 — Quellenangabe "GeoNames" genügt.
Fallback: OpenPLZ API (https://openplzapi.org), falls GeoNames
nicht erreichbar ist (liefert keine Koordinaten, daher
nur als Notnagel für Ort/Bundesland).
Aufruf: python 01_geodaten_holen.py
"""
import io
import sys
import zipfile
from pathlib import Path
import pandas as pd
import requests
GEONAMES_URL = "https://download.geonames.org/export/zip/DE.zip"
OUTPUT_DIR = Path(__file__).parent / "output"
OUTPUT_CSV = OUTPUT_DIR / "geo_plz_koordinaten.csv"
# Spalten der GeoNames-Postleitzahldatei (Tab-getrennt, ohne Header)
GEONAMES_COLS = [
"country_code", "plz", "ort", "bundesland", "bundesland_code",
"kreis", "kreis_code", "gemeinde", "gemeinde_code",
"lat", "lon", "accuracy",
]
def lade_geonames() -> pd.DataFrame:
print(f"Lade {GEONAMES_URL} ...")
resp = requests.get(GEONAMES_URL, timeout=120)
resp.raise_for_status()
with zipfile.ZipFile(io.BytesIO(resp.content)) as zf:
with zf.open("DE.txt") as f:
df = pd.read_csv(
f,
sep="\t",
header=None,
names=GEONAMES_COLS,
dtype={"plz": str}, # führende Nullen erhalten!
encoding="utf-8",
)
print(f" {len(df)} Zeilen geladen (eine Zeile je PLZ/Ortsteil).")
# Eine PLZ kann mehrere Orte/Ortsteile umfassen -> auf eine
# Koordinate je PLZ verdichten (Mittelpunkt), ersten Ort behalten.
geo = (
df.groupby("plz", as_index=False)
.agg(
lat=("lat", "mean"),
lon=("lon", "mean"),
ort=("ort", "first"),
bundesland=("bundesland", "first"),
)
)
geo["lat"] = geo["lat"].round(5)
geo["lon"] = geo["lon"].round(5)
return geo
def lade_openplz_fallback() -> pd.DataFrame:
"""Fallback ohne Koordinaten — nur damit der ETL-Lauf nicht scheitert."""
print("GeoNames nicht erreichbar — Fallback auf OpenPLZ (ohne Koordinaten).")
rows, page = [], 1
while True:
r = requests.get(
"https://openplzapi.org/de/Localities",
params={"page": page, "pageSize": 50},
timeout=60,
)
r.raise_for_status()
data = r.json()
if not data:
break
for item in data:
rows.append({
"plz": item.get("postalCode"),
"lat": None,
"lon": None,
"ort": item.get("name"),
"bundesland": (item.get("federalState") or {}).get("name"),
})
page += 1
if page > 400: # Sicherheitsbremse
break
return pd.DataFrame(rows).drop_duplicates(subset="plz")
def main() -> None:
OUTPUT_DIR.mkdir(exist_ok=True)
try:
geo = lade_geonames()
except Exception as exc:
print(f" Fehler: {exc}", file=sys.stderr)
geo = lade_openplz_fallback()
geo.to_csv(OUTPUT_CSV, index=False, encoding="utf-8")
print(f"OK: {len(geo)} PLZ geschrieben nach {OUTPUT_CSV}")
# Plausibilitätscheck: führende Nullen?
nullen = geo[geo["plz"].str.startswith("0")]
print(f" davon {len(nullen)} PLZ mit führender Null (z. B. {nullen['plz'].head(3).tolist()})")
if __name__ == "__main__":
main()