-
Christian Elberfeld authoredChristian Elberfeld authored
app.py 13.51 KiB
from flask import Flask, render_template, jsonify, Response, url_for
import requests
import json
from datetime import datetime
import logging
import os
from requests.auth import HTTPBasicAuth
import sys
from marshmallow import Schema, fields
from flask_apispec import FlaskApiSpec, marshal_with, doc
from apispec import APISpec
from apispec.ext.marshmallow import MarshmallowPlugin
from flask_swagger_ui import get_swaggerui_blueprint
# Flask App Setup
app = Flask(__name__)
logging.basicConfig(
level=logging.DEBUG,
stream=sys.stdout,
format='%(asctime)s %(levelname)s %(message)s'
)
# Kea Configuration
KEA_CONTROL_AGENT_URL = "http://127.0.0.1:{{ kea_ctrl_agent_port }}"
KEA_SERVICE = "dhcp4" # oder "dhcp6" für IPv6
KEA_USERNAME = "{{ kea_ctrl_agent_user }}"
KEA_PASSWORD = "{{ kea_api_password }}"
# Web Server Configuration
WEB_HOST = "0.0.0.0"
WEB_PORT = {{ dhcpinfo_web_port }}
WEB_DEBUG = False
# Auto-Refresh Intervall in Sekunden (0 = kein Auto-Refresh)
AUTO_REFRESH = 30
# Marshmallow-Schemas (nur einmal definiert, werden überall verwendet)
class LeaseSchema(Schema):
ip_address = fields.Str(data_key='ip-address')
hw_address = fields.Str(data_key='hw-address')
hostname = fields.Str()
state = fields.Int()
cltt = fields.Int()
valid_lft = fields.Int()
subnet_id = fields.Int(data_key='subnet-id')
class StatusSchema(Schema):
connected = fields.Bool()
message = fields.Str()
url = fields.Str()
service = fields.Str()
username = fields.Str()
timestamp = fields.Str()
class KeaDHCPClient:
def __init__(self, url, service, username=None, password=None):
self.url = url
self.service = service
self.username = username
self.password = password
self.auth = HTTPBasicAuth(username, password) if username and password else None
def send_command(self, command, arguments=None):
"""Sendet einen Befehl an den Kea Control Agent mit Basic Auth"""
payload = {
"command": command,
"service": [self.service]
}
if arguments:
payload["arguments"] = arguments
headers = {'Content-Type': 'application/json'}
try:
# Request mit Basic Authentication
response = requests.post(
self.url,
json=payload,
headers=headers,
auth=self.auth,
timeout=10,
verify=True # SSL-Zertifikat verifizieren bei HTTPS
)
# HTTP-Status prüfen
response.raise_for_status()
# JSON-Response parsen
json_response = response.json()
# Kea-spezifische Fehler prüfen
if isinstance(json_response, list) and len(json_response) > 0:
result_code = json_response[0].get("result", -1)
if result_code != 0:
error_text = json_response[0].get("text", "Unbekannter Kea-Fehler")
logging.error(f"Kea API Fehler (Code {result_code}): {error_text}")
return json_response
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401:
logging.error("Authentifizierung fehlgeschlagen - Benutzername/Passwort prüfen")
elif e.response.status_code == 403:
logging.error("Zugriff verweigert - Benutzer hat keine Berechtigung")
else:
logging.error(f"HTTP-Fehler bei Kea API Request: {e}")
return None
except requests.exceptions.ConnectionError as e:
logging.error(f"Verbindungsfehler zu Kea Control Agent: {e}")
return None
except requests.exceptions.Timeout as e:
logging.error(f"Timeout bei Kea API Request: {e}")
return None
except requests.exceptions.RequestException as e:
logging.error(f"Allgemeiner Fehler bei Kea API Request: {e}")
return None
except json.JSONDecodeError as e:
logging.error(f"Fehler beim Parsen der JSON-Antwort: {e}")
return None
def test_connection(self):
"""Testet die Verbindung und Authentifizierung"""
try:
response = self.send_command("list-commands")
if response and len(response) > 0 and response[0].get("result") == 0:
return True, "Verbindung erfolgreich"
else:
return False, "Kea API Fehler"
except Exception as e:
return False, f"Verbindungstest fehlgeschlagen: {str(e)}"
def get_all_leases(self):
"""Ruft alle DHCP-Leases ab"""
command = "lease4-get-all" if self.service == "dhcp4" else "lease6-get-all"
return self.send_command(command)
def get_server_config(self):
"""Ruft die Server-Konfiguration ab"""
return self.send_command("config-get")
def get_statistics(self):
"""Ruft Server-Statistiken ab"""
return self.send_command("statistic-get-all")
def get_server_info(self):
"""Ruft Server-Informationen ab"""
return self.send_command("build-report")
def format_timestamp(timestamp):
"""Formatiert Unix-Timestamp zu lesbarem Datum"""
if timestamp and timestamp > 0:
return datetime.fromtimestamp(timestamp).strftime('%d.%m.%Y %H:%M:%S')
return "N/A"
def format_lease_state(state):
"""Formatiert Lease-Status"""
states = {
0: "Verfügbar",
1: "Vergeben",
2: "Abgelaufen",
3: "Zurückgegeben"
}
return states.get(state, f"Unbekannt ({state})")
@app.route('/')
def index():
"""Hauptseite mit Lease-Übersicht"""
kea_client = KeaDHCPClient(KEA_CONTROL_AGENT_URL, KEA_SERVICE, KEA_USERNAME, KEA_PASSWORD)
# Verbindung testen
connection_ok, connection_message = kea_client.test_connection()
if not connection_ok:
return render_template('leases.html',
leases=[],
error=f"Verbindungsfehler: {connection_message}",
connection_status="Fehler",
last_update=datetime.now().strftime('%d.%m.%Y %H:%M:%S'))
# Leases abrufen
lease_response = kea_client.get_all_leases()
leases = []
error_message = None
if lease_response and lease_response[0].get("result") == 0:
# Erfolgreich - Leases verarbeiten
lease_data = lease_response[0].get("arguments", {})
leases = lease_data.get("leases", [])
# Leases für die Anzeige formatieren
for lease in leases:
lease['valid_lft_formatted'] = format_timestamp(lease.get('cltt', 0) + lease.get('valid_lft', 0))
lease['cltt_formatted'] = format_timestamp(lease.get('cltt', 0))
lease['state_formatted'] = format_lease_state(lease.get('state', 0))
else:
error_message = "Fehler beim Abrufen der Leases von Kea DHCP Server"
if lease_response:
error_message += f": {lease_response[0].get('text', 'Unbekannter Fehler')}"
# Statistiken abrufen (optional)
stats_response = kea_client.get_statistics()
logging.debug(f"Raw statistics response: {stats_response}")
statistics = {}
if stats_response and stats_response[0].get("result") == 0:
stats_data = stats_response[0].get("arguments", {})
logging.debug(f"Parsed statistics data: {stats_data}")
for stat_name, stat_values in stats_data.items():
if isinstance(stat_values, list) and len(stat_values) > 0:
statistics[stat_name] = stat_values[0][0] # Extract the actual statistic value
else:
logging.warning(f"Unexpected format for statistic {stat_name}: {stat_values}")
# Server-Info abrufen
server_info = {}
info_response = kea_client.get_server_info()
if info_response and info_response[0].get("result") == 0:
server_info = info_response[0].get("arguments", {})
return render_template('leases.html',
leases=leases,
error=error_message,
statistics=statistics,
server_info=server_info,
total_leases=len(leases),
connection_status="Verbunden",
auto_refresh=AUTO_REFRESH,
last_update=datetime.now().strftime('%d.%m.%Y %H:%M:%S'))
@doc(description='Liste aller DHCP-Leases', tags=['leases'])
@marshal_with(LeaseSchema(many=True))
@app.route('/api/leases')
def api_leases():
"""JSON API für Leases"""
kea_client = KeaDHCPClient(KEA_CONTROL_AGENT_URL, KEA_SERVICE, KEA_USERNAME, KEA_PASSWORD)
lease_response = kea_client.get_all_leases()
if lease_response and lease_response[0].get("result") == 0:
lease_data = lease_response[0].get("arguments", {})
return lease_data.get("leases", [])
else:
return {"error": "Fehler beim Abrufen der Leases"}, 500
@doc(description='Status und Verbindung zur Kea API', tags=['status'])
@marshal_with(StatusSchema)
@app.route('/api/status')
def api_status():
"""API-Endpunkt für Verbindungsstatus"""
kea_client = KeaDHCPClient(KEA_CONTROL_AGENT_URL, KEA_SERVICE, KEA_USERNAME, KEA_PASSWORD)
connection_ok, message = kea_client.test_connection()
return {
"connected": connection_ok,
"message": message,
"url": KEA_CONTROL_AGENT_URL,
"service": KEA_SERVICE,
"username": KEA_USERNAME,
"timestamp": datetime.now().isoformat()
}
@app.route('/metrics')
def metrics():
"""Prometheus Metrics Endpoint für Kea Statistiken"""
kea_client = KeaDHCPClient(KEA_CONTROL_AGENT_URL, KEA_SERVICE, KEA_USERNAME, KEA_PASSWORD)
stats_response = kea_client.get_statistics()
output = []
connection_ok, _ = kea_client.test_connection()
output.append(f"kea_connection_ok {1 if connection_ok else 0}")
# Lease-Anzahl ermitteln
lease_response = kea_client.get_all_leases()
total_leases = 0
if lease_response and lease_response[0].get("result") == 0:
lease_data = lease_response[0].get("arguments", {})
total_leases = len(lease_data.get("leases", []))
output.append(f"kea_total_leases {total_leases}")
if stats_response and stats_response[0].get("result") == 0:
stats_data = stats_response[0].get("arguments", {})
for stat_name, stat_values in stats_data.items():
if isinstance(stat_values, list) and len(stat_values) > 0:
value = stat_values[0][0]
prom_name = stat_name.lower().replace('.', '_').replace('-', '_').replace('[', '_').replace(']', '')
output.append(f"kea_{prom_name} {value}")
return '\n'.join(output) + '\n', 200, {'Content-Type': 'text/plain; version=0.0.4; charset=utf-8'}
# OpenAPI/Swagger-Konfiguration
app.config.update({
'APISPEC_SPEC': APISpec(
title='Warpzone DHCP API',
version='1.0.0',
openapi_version='3.0.2',
plugins=[MarshmallowPlugin()],
),
'APISPEC_SWAGGER_URL': '/swagger.yaml',
})
docs = FlaskApiSpec(app)
docs.register(api_leases)
docs.register(api_status)
swaggerui_blueprint = get_swaggerui_blueprint(
'/swagger',
'/swagger.yaml',
config={'app_name': "Warpzone DHCP API"}
)
app.register_blueprint(swaggerui_blueprint, url_prefix='/swagger')
@app.route('/rss')
def rss_feed():
"""RSS Feed für aktuelle DHCP-Leases"""
kea_client = KeaDHCPClient(KEA_CONTROL_AGENT_URL, KEA_SERVICE, KEA_USERNAME, KEA_PASSWORD)
lease_response = kea_client.get_all_leases()
leases = []
if lease_response and lease_response[0].get("result") == 0:
lease_data = lease_response[0].get("arguments", {})
leases = lease_data.get("leases", [])
now = datetime.now().strftime('%a, %d %b %Y %H:%M:%S +0000')
rss_items = []
for lease in leases:
hostname = lease.get('hostname', 'N/A')
ip = lease.get('ip-address', 'N/A')
mac = lease.get('hw-address', 'N/A')
start = format_timestamp(lease.get('cltt', 0))
end = format_timestamp(lease.get('cltt', 0) + lease.get('valid_lft', 0))
rss_items.append(f"""
<item>
<title>DHCP Lease: {hostname} ({ip})</title>
<description>MAC: {mac}, Lease Start: {start}, Lease Ende: {end}</description>
<pubDate>{start}</pubDate>
<guid>{ip}-{mac}</guid>
</item>
""")
rss = f"""<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0">
<channel>
<title>Warpzone DHCP Leases</title>
<link>{url_for('index', _external=True)}</link>
<description>Aktuelle DHCP-Leases</description>
<lastBuildDate>{now}</lastBuildDate>
{''.join(rss_items)}
</channel>
</rss>"""
return Response(rss, mimetype='application/rss+xml')
if __name__ == '__main__':
# Konfiguration ausgeben (ohne Passwort)
print(f"🚀 Starte Kea DHCP Web Interface")
print(f"🐞 Debug-Modus: {WEB_DEBUG}")
print(f"📡 Kea Control Agent URL: {KEA_CONTROL_AGENT_URL}")
print(f"🔧 Kea Service: {KEA_SERVICE}")
print(f"👤 Benutzername: {KEA_USERNAME}")
print(f"🔒 Passwort: {'*' * len(KEA_PASSWORD) if KEA_PASSWORD else 'NICHT GESETZT'}")
print(f"🔄 Auto-Refresh: {AUTO_REFRESH} Sekunden")
print(f"🌐 Web Interface: http://{WEB_HOST}:{WEB_PORT}")
print(f"📄 Swagger UI: http://{WEB_HOST}:{WEB_PORT}/swagger")
app.run(debug=WEB_DEBUG, host=WEB_HOST, port=WEB_PORT)