有个需求需要调用宝塔自带 API 去根据策略封禁 IP,没找到相关API 接口,只能研究手搓一个了。
宝塔 WAF 的全局黑名单封禁规则文件有两个 ip_black.json
和 ip_black_v6.json'
封禁规则为 JSON 文件规则
IPV4 为 一个起始 IP 和 一个结束 IP 以及备注 IP 值需要将点分十进制表示的IP地址转为整数形式
IPV6 为 一个 IPV6 地址 以及备注
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
| from flask import Flask, request, jsonify import json import struct import socket import os from datetime import datetime
app = Flask(__name__) IPV4_BLACKLIST_PATH = '/www/server/btwaf/rule/ip_black.json' IPV6_BLACKLIST_PATH = '/www/server/btwaf/rule/ip_black_v6.json' LOG_DIR = './logs/'
def ip_to_int(ip): return struct.unpack("!I", socket.inet_aton(ip))[0]
def is_ipv4(ip): try: socket.inet_aton(ip) return True except socket.error: return False
def is_ipv6(ip): try: socket.inet_pton(socket.AF_INET6, ip) return True except socket.error: return False
def load_blacklist(file_path): if os.path.exists(file_path): with open(file_path, 'r', encoding='utf-8') as f: return json.load(f) return []
def save_blacklist(file_path, blacklist): with open(file_path, 'w', encoding='utf-8') as f: json.dump(blacklist, f, ensure_ascii=False, indent=4)
def log_ban(ip, ipaddr): try: if not os.path.exists(LOG_DIR): os.makedirs(LOG_DIR) date_str = datetime.now().strftime('%Y-%m-%d') log_file_path = os.path.join(LOG_DIR, f"{date_str}.log") time_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S') with open(log_file_path, 'a', encoding='utf-8') as log_file: log_file.write(f"{time_str} - IP: {ip}, Address: {ipaddr}\n") except Exception as e: print(f"Error writing to log file: {e}")
@app.route('/add_ip', methods=['POST']) def add_ip(): data = request.get_json() ip = data.get('ip', '') ipaddr = data.get('ipaddr', '') if not ip: return jsonify({"error": "error"}), 400
if is_ipv4(ip): file_path = IPV4_BLACKLIST_PATH ip_int = ip_to_int(ip) new_entry = [ip_int, ip_int, "\u6295\u8bc9\u81ea\u52a8\u5c01\u7981"] elif is_ipv6(ip): file_path = IPV6_BLACKLIST_PATH new_entry = [ip, "\u6295\u8bc9\u81ea\u52a8\u5c01\u7981"] else: return jsonify({"error": "error"}), 400
blacklist = load_blacklist(file_path)
if new_entry not in blacklist: blacklist.append(new_entry) save_blacklist(file_path, blacklist) log_ban(ip, ipaddr) return jsonify({"message": "ok"}), 200 else: return jsonify({"message": "ok"}), 200
if __name__ == '__main__': app.run(host='0.0.0.0', port=5000)
|