有个需求需要调用宝塔自带 API 去根据策略封禁 IP,没找到相关API 接口,只能研究手搓一个了。

宝塔 WAF 的全局黑名单封禁规则文件有两个 ip_black.jsonip_black_v6.json'

封禁规则为 JSON 文件规则
IPV4 为 一个起始 IP 和 一个结束 IP 以及备注 IP 值需要将点分十进制表示的IP地址转为整数形式
IPV6 为 一个 IPV6 地址 以及备注

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
from flask import Flask, request, jsonify
import json
import struct
import socket
import os
from datetime import datetime

app = Flask(__name__)
IPV4_BLACKLIST_PATH = '/www/server/btwaf/rule/ip_black.json'
IPV6_BLACKLIST_PATH = '/www/server/btwaf/rule/ip_black_v6.json'
LOG_DIR = './logs/'

def ip_to_int(ip):
return struct.unpack("!I", socket.inet_aton(ip))[0]

def is_ipv4(ip):
try:
socket.inet_aton(ip)
return True
except socket.error:
return False

def is_ipv6(ip):
try:
socket.inet_pton(socket.AF_INET6, ip)
return True
except socket.error:
return False

def load_blacklist(file_path):
if os.path.exists(file_path):
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
return []

def save_blacklist(file_path, blacklist):
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(blacklist, f, ensure_ascii=False, indent=4)

def log_ban(ip, ipaddr):
try:
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR)
date_str = datetime.now().strftime('%Y-%m-%d')
log_file_path = os.path.join(LOG_DIR, f"{date_str}.log")
time_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
with open(log_file_path, 'a', encoding='utf-8') as log_file:
log_file.write(f"{time_str} - IP: {ip}, Address: {ipaddr}\n")
except Exception as e:
print(f"Error writing to log file: {e}")

@app.route('/add_ip', methods=['POST'])
def add_ip():
data = request.get_json()
ip = data.get('ip', '')
ipaddr = data.get('ipaddr', '')
if not ip:
return jsonify({"error": "error"}), 400

if is_ipv4(ip):
file_path = IPV4_BLACKLIST_PATH
ip_int = ip_to_int(ip)
new_entry = [ip_int, ip_int, "\u6295\u8bc9\u81ea\u52a8\u5c01\u7981"]
elif is_ipv6(ip):
file_path = IPV6_BLACKLIST_PATH
new_entry = [ip, "\u6295\u8bc9\u81ea\u52a8\u5c01\u7981"]
else:
return jsonify({"error": "error"}), 400

blacklist = load_blacklist(file_path)

if new_entry not in blacklist:
blacklist.append(new_entry)
save_blacklist(file_path, blacklist)
log_ban(ip, ipaddr)
return jsonify({"message": "ok"}), 200
else:
return jsonify({"message": "ok"}), 200

if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)