Skip to content
Snippets Groups Projects
app.py 13.51 KiB
from flask import Flask, render_template, jsonify, Response, url_for
import requests
import json
from datetime import datetime
import logging
import os
from requests.auth import HTTPBasicAuth
import sys
from marshmallow import Schema, fields
from flask_apispec import FlaskApiSpec, marshal_with, doc
from apispec import APISpec
from apispec.ext.marshmallow import MarshmallowPlugin
from flask_swagger_ui import get_swaggerui_blueprint

# Flask App Setup
app = Flask(__name__)
logging.basicConfig(
    level=logging.DEBUG,
    stream=sys.stdout,
    format='%(asctime)s %(levelname)s %(message)s'
)

# Kea Configuration 
KEA_CONTROL_AGENT_URL = "http://127.0.0.1:{{ kea_ctrl_agent_port }}"
KEA_SERVICE = "dhcp4"  # oder "dhcp6" für IPv6
KEA_USERNAME = "{{ kea_ctrl_agent_user }}"
KEA_PASSWORD = "{{ kea_api_password }}"

# Web Server Configuration
WEB_HOST = "0.0.0.0"
WEB_PORT = {{ dhcpinfo_web_port }}
WEB_DEBUG = False

# Auto-Refresh Intervall in Sekunden (0 = kein Auto-Refresh)
AUTO_REFRESH = 30


# Marshmallow-Schemas (nur einmal definiert, werden überall verwendet)
class LeaseSchema(Schema):
    ip_address = fields.Str(data_key='ip-address')
    hw_address = fields.Str(data_key='hw-address')
    hostname = fields.Str()
    state = fields.Int()
    cltt = fields.Int()
    valid_lft = fields.Int()
    subnet_id = fields.Int(data_key='subnet-id')

class StatusSchema(Schema):
    connected = fields.Bool()
    message = fields.Str()
    url = fields.Str()
    service = fields.Str()
    username = fields.Str()
    timestamp = fields.Str()

class KeaDHCPClient:
    def __init__(self, url, service, username=None, password=None):
        self.url = url
        self.service = service
        self.username = username
        self.password = password
        self.auth = HTTPBasicAuth(username, password) if username and password else None
    
    def send_command(self, command, arguments=None):
        """Sendet einen Befehl an den Kea Control Agent mit Basic Auth"""
        payload = {
            "command": command,
            "service": [self.service]
        }
        
        if arguments:
            payload["arguments"] = arguments
        
        headers = {'Content-Type': 'application/json'}
        
        try:
            # Request mit Basic Authentication
            response = requests.post(
                self.url,
                json=payload,
                headers=headers,
                auth=self.auth,
                timeout=10,
                verify=True  # SSL-Zertifikat verifizieren bei HTTPS
            )
            
            # HTTP-Status prüfen
            response.raise_for_status()
            
            # JSON-Response parsen
            json_response = response.json()
            
            # Kea-spezifische Fehler prüfen
            if isinstance(json_response, list) and len(json_response) > 0:
                result_code = json_response[0].get("result", -1)
                if result_code != 0:
                    error_text = json_response[0].get("text", "Unbekannter Kea-Fehler")
                    logging.error(f"Kea API Fehler (Code {result_code}): {error_text}")
                    
            return json_response
            
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 401:
                logging.error("Authentifizierung fehlgeschlagen - Benutzername/Passwort prüfen")
            elif e.response.status_code == 403:
                logging.error("Zugriff verweigert - Benutzer hat keine Berechtigung")
            else:
                logging.error(f"HTTP-Fehler bei Kea API Request: {e}")
            return None
        except requests.exceptions.ConnectionError as e:
            logging.error(f"Verbindungsfehler zu Kea Control Agent: {e}")
            return None
        except requests.exceptions.Timeout as e:
            logging.error(f"Timeout bei Kea API Request: {e}")
            return None
        except requests.exceptions.RequestException as e:
            logging.error(f"Allgemeiner Fehler bei Kea API Request: {e}")
            return None
        except json.JSONDecodeError as e:
            logging.error(f"Fehler beim Parsen der JSON-Antwort: {e}")
            return None
    
    def test_connection(self):
        """Testet die Verbindung und Authentifizierung"""
        try:
            response = self.send_command("list-commands")
            if response and len(response) > 0 and response[0].get("result") == 0:
                return True, "Verbindung erfolgreich"
            else:
                return False, "Kea API Fehler"
        except Exception as e:
            return False, f"Verbindungstest fehlgeschlagen: {str(e)}"
    
    def get_all_leases(self):
        """Ruft alle DHCP-Leases ab"""
        command = "lease4-get-all" if self.service == "dhcp4" else "lease6-get-all"
        return self.send_command(command)
    
    def get_server_config(self):
        """Ruft die Server-Konfiguration ab"""
        return self.send_command("config-get")
    
    def get_statistics(self):
        """Ruft Server-Statistiken ab"""
        return self.send_command("statistic-get-all")
    
    def get_server_info(self):
        """Ruft Server-Informationen ab"""
        return self.send_command("build-report")

def format_timestamp(timestamp):
    """Formatiert Unix-Timestamp zu lesbarem Datum"""
    if timestamp and timestamp > 0:
        return datetime.fromtimestamp(timestamp).strftime('%d.%m.%Y %H:%M:%S')
    return "N/A"

def format_lease_state(state):
    """Formatiert Lease-Status"""
    states = {
        0: "Verfügbar",
        1: "Vergeben", 
        2: "Abgelaufen",
        3: "Zurückgegeben"
    }
    return states.get(state, f"Unbekannt ({state})")

@app.route('/')
def index():
    """Hauptseite mit Lease-Übersicht"""
    kea_client = KeaDHCPClient(KEA_CONTROL_AGENT_URL, KEA_SERVICE, KEA_USERNAME, KEA_PASSWORD)
    
    # Verbindung testen
    connection_ok, connection_message = kea_client.test_connection()
    if not connection_ok:
        return render_template('leases.html', 
                                leases=[], 
                                error=f"Verbindungsfehler: {connection_message}",
                                connection_status="Fehler",
                                last_update=datetime.now().strftime('%d.%m.%Y %H:%M:%S'))
    
    # Leases abrufen
    lease_response = kea_client.get_all_leases()
    leases = []
    error_message = None
    
    if lease_response and lease_response[0].get("result") == 0:
        # Erfolgreich - Leases verarbeiten
        lease_data = lease_response[0].get("arguments", {})
        leases = lease_data.get("leases", [])
        
        # Leases für die Anzeige formatieren
        for lease in leases:
            lease['valid_lft_formatted'] = format_timestamp(lease.get('cltt', 0) + lease.get('valid_lft', 0))
            lease['cltt_formatted'] = format_timestamp(lease.get('cltt', 0))
            lease['state_formatted'] = format_lease_state(lease.get('state', 0))
    else:
        error_message = "Fehler beim Abrufen der Leases von Kea DHCP Server"
        if lease_response:
            error_message += f": {lease_response[0].get('text', 'Unbekannter Fehler')}"
    
    # Statistiken abrufen (optional)
    stats_response = kea_client.get_statistics()
    logging.debug(f"Raw statistics response: {stats_response}")
    statistics = {}
    if stats_response and stats_response[0].get("result") == 0:
        stats_data = stats_response[0].get("arguments", {})
        logging.debug(f"Parsed statistics data: {stats_data}")
        for stat_name, stat_values in stats_data.items():
            if isinstance(stat_values, list) and len(stat_values) > 0:
                statistics[stat_name] = stat_values[0][0]  # Extract the actual statistic value
            else:
                logging.warning(f"Unexpected format for statistic {stat_name}: {stat_values}")
    
    # Server-Info abrufen
    server_info = {}
    info_response = kea_client.get_server_info()
    if info_response and info_response[0].get("result") == 0:
        server_info = info_response[0].get("arguments", {})
    
    return render_template('leases.html', 
                            leases=leases, 
                            error=error_message,
                            statistics=statistics,
                            server_info=server_info,
                            total_leases=len(leases),
                            connection_status="Verbunden",
                            auto_refresh=AUTO_REFRESH,
                            last_update=datetime.now().strftime('%d.%m.%Y %H:%M:%S'))

@doc(description='Liste aller DHCP-Leases', tags=['leases'])
@marshal_with(LeaseSchema(many=True))
@app.route('/api/leases')
def api_leases():
    """JSON API für Leases"""
    kea_client = KeaDHCPClient(KEA_CONTROL_AGENT_URL, KEA_SERVICE, KEA_USERNAME, KEA_PASSWORD)
    lease_response = kea_client.get_all_leases()
    
    if lease_response and lease_response[0].get("result") == 0:
        lease_data = lease_response[0].get("arguments", {})
        return lease_data.get("leases", [])
    else:
        return {"error": "Fehler beim Abrufen der Leases"}, 500

@doc(description='Status und Verbindung zur Kea API', tags=['status'])
@marshal_with(StatusSchema)
@app.route('/api/status')
def api_status():
    """API-Endpunkt für Verbindungsstatus"""
    kea_client = KeaDHCPClient(KEA_CONTROL_AGENT_URL, KEA_SERVICE, KEA_USERNAME, KEA_PASSWORD)
    connection_ok, message = kea_client.test_connection()
    
    return {
        "connected": connection_ok,
        "message": message,
        "url": KEA_CONTROL_AGENT_URL,
        "service": KEA_SERVICE,
        "username": KEA_USERNAME,
        "timestamp": datetime.now().isoformat()
    }

@app.route('/metrics')
def metrics():
    """Prometheus Metrics Endpoint für Kea Statistiken"""
    kea_client = KeaDHCPClient(KEA_CONTROL_AGENT_URL, KEA_SERVICE, KEA_USERNAME, KEA_PASSWORD)
    stats_response = kea_client.get_statistics()
    output = []
    connection_ok, _ = kea_client.test_connection()
    output.append(f"kea_connection_ok {1 if connection_ok else 0}")
    # Lease-Anzahl ermitteln
    lease_response = kea_client.get_all_leases()
    total_leases = 0
    if lease_response and lease_response[0].get("result") == 0:
        lease_data = lease_response[0].get("arguments", {})
        total_leases = len(lease_data.get("leases", []))
    output.append(f"kea_total_leases {total_leases}")
    if stats_response and stats_response[0].get("result") == 0:
        stats_data = stats_response[0].get("arguments", {})
        for stat_name, stat_values in stats_data.items():
            if isinstance(stat_values, list) and len(stat_values) > 0:
                value = stat_values[0][0]
                prom_name = stat_name.lower().replace('.', '_').replace('-', '_').replace('[', '_').replace(']', '')
                output.append(f"kea_{prom_name} {value}")
    return '\n'.join(output) + '\n', 200, {'Content-Type': 'text/plain; version=0.0.4; charset=utf-8'}

# OpenAPI/Swagger-Konfiguration
app.config.update({
    'APISPEC_SPEC': APISpec(
        title='Warpzone DHCP API',
        version='1.0.0',
        openapi_version='3.0.2',
        plugins=[MarshmallowPlugin()],
    ),
    'APISPEC_SWAGGER_URL': '/swagger.yaml',
})

docs = FlaskApiSpec(app)
docs.register(api_leases)
docs.register(api_status)

swaggerui_blueprint = get_swaggerui_blueprint(
    '/swagger',
    '/swagger.yaml',
    config={'app_name': "Warpzone DHCP API"}
)
app.register_blueprint(swaggerui_blueprint, url_prefix='/swagger')

@app.route('/rss')
def rss_feed():
    """RSS Feed für aktuelle DHCP-Leases"""
    kea_client = KeaDHCPClient(KEA_CONTROL_AGENT_URL, KEA_SERVICE, KEA_USERNAME, KEA_PASSWORD)
    lease_response = kea_client.get_all_leases()
    leases = []
    if lease_response and lease_response[0].get("result") == 0:
        lease_data = lease_response[0].get("arguments", {})
        leases = lease_data.get("leases", [])
    now = datetime.now().strftime('%a, %d %b %Y %H:%M:%S +0000')
    rss_items = []
    for lease in leases:
        hostname = lease.get('hostname', 'N/A')
        ip = lease.get('ip-address', 'N/A')
        mac = lease.get('hw-address', 'N/A')
        start = format_timestamp(lease.get('cltt', 0))
        end = format_timestamp(lease.get('cltt', 0) + lease.get('valid_lft', 0))
        rss_items.append(f"""
        <item>
            <title>DHCP Lease: {hostname} ({ip})</title>
            <description>MAC: {mac}, Lease Start: {start}, Lease Ende: {end}</description>
            <pubDate>{start}</pubDate>
            <guid>{ip}-{mac}</guid>
        </item>
        """)
    rss = f"""<?xml version="1.0" encoding="UTF-8"?>
    <rss version="2.0">
    <channel>
        <title>Warpzone DHCP Leases</title>
        <link>{url_for('index', _external=True)}</link>
        <description>Aktuelle DHCP-Leases</description>
        <lastBuildDate>{now}</lastBuildDate>
        {''.join(rss_items)}
    </channel>
    </rss>"""
    return Response(rss, mimetype='application/rss+xml')

if __name__ == '__main__':
    # Konfiguration ausgeben (ohne Passwort)
    print(f"🚀 Starte Kea DHCP Web Interface")
    print(f"🐞 Debug-Modus: {WEB_DEBUG}")
    print(f"📡 Kea Control Agent URL: {KEA_CONTROL_AGENT_URL}")
    print(f"🔧 Kea Service: {KEA_SERVICE}")
    print(f"👤 Benutzername: {KEA_USERNAME}")
    print(f"🔒 Passwort: {'*' * len(KEA_PASSWORD) if KEA_PASSWORD else 'NICHT GESETZT'}")
    print(f"🔄 Auto-Refresh: {AUTO_REFRESH} Sekunden")
    print(f"🌐 Web Interface: http://{WEB_HOST}:{WEB_PORT}")
    print(f"📄 Swagger UI: http://{WEB_HOST}:{WEB_PORT}/swagger")

    app.run(debug=WEB_DEBUG, host=WEB_HOST, port=WEB_PORT)