一文带你学会机器学习在Web入侵检测中的应用
1. 引言
1.1 为什么需要Web入侵检测?
想象你开了一家网店,顾客通过网站下单。但有些坏家伙想搞乱:他们可能输入奇怪的代码偷数据(SQL注入),或者塞进恶意脚本吓跑顾客(XSS)。传统的防护工具像Web应用防火墙(WAF),靠人工写规则挡攻击,但新花样一出来就挡不住了。
1.2 机器学习能干啥?
机器学习就像一个聪明助手,它看过去的请求记录,学会“正常”和“坏蛋”的区别,不用你一条条写规则,就能自动揪出可疑行为。
1.3 本文目标
这篇文章带你从零开始,用机器学习做Web入侵检测。零基础也能看懂,会写代码更好,但不会也没关系,我会一步步讲清楚。
2. Web入侵检测超简单入门
2.1 啥是Web入侵检测?
就是检查网站收到的请求,看看有没有坏蛋。比如:
- 正常请求:/shop?item=book(买本书)。
- 恶意请求:/login?user=admin' OR 1=1(想无凭证偷偷登录)。
2.2 数据从哪来?
我们用Web服务器的日志,像记账本,记下每个请求。比如Apache的access.log,里面有:
- 请求的URL(地址)。
- 时间、IP啥的。
3. 特征工程:把请求变成数字
3.1 啥是特征工程?
机器学习不懂文字,得把请求变成数字给它看。这些数字叫“特征”。比如:
- 请求长度:URL有多长。
- 特殊字符比例:像
'、;、=
这些怪符号占多少。
3.2 数据从哪来?
真实场景里,数据在日志文件。假设有个训练文件train_log.txt:
GET /shop?item=book HTTP/1.1
GET /login?user=admin HTTP/1.1
GET /login?user=admin' OR 1=1 HTTP/1.1
GET /page?script=<alert> HTTP/1.1
还有个待检测文件test_log.txt(通过参数指定位置):
GET /shop?page=2 HTTP/1.1
GET /login?user=admin' OR 1=1 HTTP/1.1
GET /page?script=<script>alert(1)</script> HTTP/1.1
3.3 咋处理?
从URL算特征:
- 长度:数字符个数。
- /shop?item=book → 16。
- /login?user=admin' OR 1=1 → 25。
- 特殊字符比例:数'、;、=、<、>等。
- /shop?item=book:1个=,比例1/16 ≈ 0.06。
- /login?user=admin' OR 1=1:2个=、1个',比例3/25 = 0.12。
代码会自动算,还会试着猜攻击类型(SQL注入、XSS等)。
4. 算法选择:挑个好帮手
这里介绍5个算法,像5个侦探,帮你找坏蛋。每个我都用生活例子解释。
4.1 决策树(Decision Tree)
- 介绍:像玩“猜猜我是谁”。它问问题,比如“长度大于20?”“特殊字符多吗?”一步步猜出是好是坏。
- 优点:
- 简单:就像画个流程图,小孩都能看懂。
- 能抓重点:比如发现坏请求都含',它就记住这点。
- 缺点:
- 太死板:比如你教它“苹果是红的”,它可能连橙子都认成苹果。这叫“过拟合”,学得太死,碰到新东西懵了。
- 爱出错:数据里有错(比如正常请求写错了),它就瞎猜。
- 适合场景:
- 检查SQL注入这种明显坏蛋(含'或;)。
4.2 随机森林(Random Forest)
- 介绍:不是一个侦探,而是一群侦探开会。每个看问题不一样,最后投票决定。比如一个说“长度长”,另一个说“字符怪”,一起判断。
- 优点:
- 靠谱:一群人商量,不容易被单个错误忽悠,不像决策树那么容易过拟合。
- 抓复杂案子:能看很多线索,不怕数据乱。
- 缺点:
- 慢:一群人讨论,总比一个人快不了。
- 看不懂咋想的:不像决策树能画出来,它像个神秘团队。
- 适合场景:
- 找XSS这种花样多的攻击。
4.3 支持向量机(SVM)
- 介绍:像在纸上画条线,把好人和坏人分开。它找的线尽量离两边远点,确保分得清楚。
- 优点:
- 精准:小范围案子查得准。
- 不乱猜:不像决策树那么容易过拟合。
- 缺点:
- 费劲:案子多(数据多)时,它画线画到累死。
- 挑剔:得调好“线怎么画”,新手容易搞砸。
- 适合场景:
- 小规模检查,比如CSRF这种简单坏蛋。
4.4 K均值聚类(K-Means)
- 介绍:像分朋友圈,把相似的请求放一堆。大多数在一堆的算正常,落单的是坏蛋。不用提前告诉它谁是坏的。
- 优点:
- 不用教:没标签也能干活。
- 快:分堆很简单。
- 缺点:
- 得猜堆数:你得说“分3堆还是4堆”,猜错了就乱套。
- 怕乱七八糟:数据乱糟糟时,分不清谁是谁。
- 适合场景:
- 找怪请求,比如有人猛点页面。
4.5 孤立森林(Isolation Forest)
- 介绍:像找人群里的怪人。正常请求扎堆,坏请求落单,它随便划几刀,落单的先被挑出来。
- 优点:
- 专抓怪人:找坏蛋特别快。
- 不怕多:数据多也能干。
- 缺点:
- 不细致:只管找怪的,不管正常人咋样。
- 神秘:不好解释为啥挑这个。
- 适合场景:
- 查稀奇古怪的攻击,比如偷偷登录。
5. 动手实践:使用随机森林检测SQL注入
5.1 数据准备
接下来我们要从两个训练文件里提取数据,准备给机器学习用。训练数据来自安全设备提供的两个文件:white_log.txt(正常请求的白样本)和black_log.txt(恶意请求的黑样本),每个文件每行是一个URL(GET的URL或POST的body),比如/shop?item=book。我们的任务是读取这两个文件,计算特征(长度和特殊字符比例),给白样本标记0(正常),黑样本标记1(坏蛋),然后合并存成CSV文件供训练使用。
训练文件格式很简单,每行就是一个URL。例如:
white_log.txt 示例:
black_log.txt 示例:
应该怎么做呢?我们写一个脚本prepare_data.py,通过命令行参数指定两个训练文件(比如python prepare_data.py white_log.txt black_log.txt)。脚本会分别读取白样本和黑样本文件,每行是一个URL,计算两个特征:一是URL的字符数(长度),二是特殊字符(如'、;、=、<、>、[])的比例。白样本标记为0,黑样本标记为1,然后把两部分数据合并,存成train_data.csv,包含URL、特征和标签。
代码如下:
import pandas as pd
import sys
def extract_features(url):
special_chars = "';=<>[]"
length = len(url)
special_count = sum(url.count(c) for c in special_chars)
special_ratio = special_count / length if length > 0 else 0
return [length, special_ratio]
if len(sys.argv) != 3:
print("用法: python prepare_data.py <white_log_file> <black_log_file>")
sys.exit(1)
white_file = sys.argv[1]
black_file = sys.argv[2]
train_data = []
# 读白样本
try:
with open(white_file, 'r') as file:
lines = file.readlines()
except FileNotFoundError:
print(f"错误:找不到 {white_file}")
sys.exit(1)
for line in lines:
url = line.strip()
if not url:
continue
features = extract_features(url)
train_data.append([url] + features + [0]) # 0 表示正常
# 读黑样本
try:
with open(black_file, 'r') as file:
lines = file.readlines()
except FileNotFoundError:
print(f"错误:找不到 {black_file}")
sys.exit(1)
for line in lines:
url = line.strip()
if not url:
continue
features = extract_features(url)
train_data.append([url] + features + [1]) # 1 表示坏蛋
df = pd.DataFrame(train_data, columns=['url', 'length', 'special_char_ratio', 'label'])
df.to_csv('train_data.csv', index=False)
print("训练数据已保存到 train_data.csv")
运行这个脚本(python prepare_data.py white_log.txt black_log.txt),会生成train_data.csv,内容如下:
url,length,special_char_ratio,label
/shop?item=book,16,0.0625,0
/login?user=admin,19,0.0526,0
/shop?page=2,12,0.0833,0
/login?user=admin' OR 1=1,25,0.1200,1
/page?script=<alert>,20,0.2000,1
/page?id=1; DROP TABLE,22,0.0909,1
5.2 训练模型
现在我们要用准备好的数据训练模型,并检查效果。目标是让机器学会区分正常请求和恶意请求,我们选择随机森林算法,因为它简单又有效。接下来写一个脚本train_model.py,从train_data.csv读取数据,标准化特征,训练模型,检查效果,然后保存模型和标准化器。
应该怎么做呢?脚本会先读取train_data.csv,提取特征(长度和特殊字符比例)和标签。特征需要标准化,让数值范围一致,然后分成70%训练集和30%测试集。接着用随机森林训练,训练完后用测试集预测,计算精确率(预测坏蛋的准确率)、召回率(坏蛋的抓获率)和F1分数(综合评价),打印出来。最后,把模型和标准化器保存成model.pkl和scaler.pkl,供验证使用。
代码如下:
import pandas as pd
import pickle
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_score, recall_score, f1_score
try:
df = pd.read_csv('train_data.csv')
except FileNotFoundError:
print("错误:找不到 train_data.csv,请先运行 prepare_data.py")
sys.exit(1)
X = df[['length', 'special_char_ratio']]
y = df['label']
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.3, random_state=42)
rf = RandomForestClassifier(n_estimators=10, random_state=42)
rf.fit(X_train, y_train)
y_pred = rf.predict(X_test)
print("精确率(抓坏蛋准不准):", precision_score(y_test, y_pred))
print("召回率(坏蛋漏没漏):", recall_score(y_test, y_pred))
print("F1分数(综合成绩):", f1_score(y_test, y_pred))
with open('model.pkl', 'wb') as f:
pickle.dump(rf, f)
with open('scaler.pkl', 'wb') as f:
pickle.dump(scaler, f)
print("模型和标准化器已保存到 model.pkl 和 scaler.pkl")
运行这个脚本(python train_model.py),会输出类似:
5.3 检查效果
检查效果已经包含在train_model.py中,训练完模型后直接用测试集预测并打印结果。精确率表示预测为坏蛋的请求中有多少是真的坏蛋,召回率表示所有坏蛋中有多少被抓到,F1分数是两者的平衡。小数据集可能分数偏高,真实场景用更多数据会更稳定。
5.4 用它查新请求
接下来我们要用训练好的模型检查新请求,输出结果到文件。新请求来自安全设备提供的文件(比如test_log.txt),每行是一个URL,我们要预测它是正常还是坏蛋,并标注攻击类型(SQL注入、XSS等),结果写到results.txt。
应该怎么做呢?我们写一个脚本check_requests.py,通过命令行参数指定测试文件(比如python check_requests.py test_log.txt)。脚本会加载保存的模型和标准化器,逐行读取测试文件,每行算特征,预测是否为坏蛋。如果是坏蛋,用简单规则猜类型:含'或;算SQL注入,含<或>算XSS,否则是其他恶意。最后把每行请求和结果写到results.txt。
test_log.txt 示例:
代码如下:
import sys
import pickle
import pandas as pd
def extract_features(url):
special_chars = "';=<>[]"
length = len(url)
special_count = sum(url.count(c) for c in special_chars)
special_ratio = special_count / length if length > 0 else 0
return [length, special_ratio]
def predict_and_classify(url, scaler, model):
features = extract_features(url)
scaled_features = scaler.transform([features])
prediction = model.predict(scaled_features)[0]
if prediction == 0:
return "正常"
if "'" in url or ";" in url:
return "SQL注入"
if "<" in url or ">" in url:
return "XSS"
return "其他恶意"
try:
with open('model.pkl', 'rb') as f:
rf = pickle.load(f)
with open('scaler.pkl', 'rb') as f:
scaler = pickle.load(f)
except FileNotFoundError:
print("错误:找不到 model.pkl 或 scaler.pkl,请先运行 train_model.py")
sys.exit(1)
if len(sys.argv) != 2:
print("用法: python check_requests.py <test_log_file>")
sys.exit(1)
test_file = sys.argv[1]
try:
with open(test_file, 'r') as file:
test_lines = file.readlines()
except FileNotFoundError:
print(f"错误:找不到 {test_file}")
sys.exit(1)
results = []
for line in test_lines:
url = line.strip()
if not url:
continue
result = predict_and_classify(url, scaler, rf)
results.append(f"{url} -> {result}")
with open('results.txt', 'w') as out_file:
out_file.write("\n".join(results))
print("结果已写入 results.txt")
运行这个脚本(python check_requests.py test_log.txt),会生成results.txt:
5.5 数据量大咋办
如果训练或测试文件有几万行,我们得优化处理方式。在prepare_data.py和check_requests.py中,已经用逐行读取的方式,不用一次加载整个文件,内存占用低。对于train_model.py,如果随机森林跑得慢,可以减少树的数量(从10棵改成5棵),或者换成更轻量的决策树。如果数据量特别大,可以把CSV存到数据库(比如SQLite),分块读取,或者用大数据工具(像Spark)。当前脚本能处理几万行,日常够用。
在train_model.py里改树数量的示例:
6. 挑战与应对
6.1 数据不平衡怎么办
有时候白样本多,黑样本少,模型会偏向正常请求,漏掉坏蛋。我们可以用SMOTE技术生成更多黑样本,让数据平衡。怎么做呢?在train_model.py里加SMOTE,读取数据后用它平衡特征和标签,再训练。这样模型对坏蛋更敏感。
更新后的train_model.py片段:
from imblearn.over_sampling import SMOTE
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
smote = SMOTE(random_state=42)
X_balanced, y_balanced = smote.fit_resample(X_scaled, y)
X_train, X_test, y_train, y_test = train_test_split(X_balanced, y_balanced, test_size=0.3, random_state=42)
6.2 模型跑得慢怎么办
如果数据多,随机森林跑得慢,我们可以换成决策树,简单又快。怎么做呢?在train_model.py里把RandomForestClassifier改成DecisionTreeClassifier,其他保持不变。决策树训练快,适合快速测试。
更新后的train_model.py片段:
from sklearn.tree import DecisionTreeClassifier
dt = DecisionTreeClassifier(random_state=42)
dt.fit(X_train, y_train)
with open('model.pkl', 'wb') as f:
pickle.dump(dt, f)
6.3 攻击者会绕过检测吗
坏蛋可能伪装正常请求,比如用编码绕过特殊字符检测。我们可以定期更新white_log.txt和black_log.txt,加入新样本,让模型学习最新花招。还能在check_requests.py加规则,比如检查drop table这种SQL关键字,直接标记为坏蛋。加更多特征(像请求频率)也能提高检测率。
更新后的check_requests.py片段:
def predict_and_classify(url, scaler, model):
features = extract_features(url)
scaled_features = scaler.transform([features])
prediction = model.predict(scaled_features)[0]
if prediction == 0:
return "正常"
if 'drop table' in url.lower():
return "SQL注入"
if "'" in url or ";" in url:
return "SQL注入"
if "<" in url or ">" in url:
return "XSS"
return "其他恶意"
7. 案例实战
7.1 用随机森林检测WebShell
现在我们用机器学习检测WebShell,展示一个实际应用场景。WebShell是黑客上传到服务器的恶意脚本,我们有两个目录:white_dir(正常PHP文件)和black_dir(WebShell文件),每个目录下有多个文件,每个文件是一个样本。我们用随机森林算法检测,输出结果到文件。
white_dir 示例(正常PHP文件):
- index.php:<?php echo "Hello World"; ?>
- login.php:<?php session_start(); $user = $_POST['user']; ?>
black_dir 示例(WebShell文件):
- shell1.php:<?php system($_GET['cmd']); ?>
- shell2.php:<?php eval(base64_decode($_POST['code'])); ?>
应该怎么做呢?我们写一个新脚本prepare_webshell_data.py,通过命令行参数指定两个目录(比如python prepare_webshell_data.py white_dir black_dir)。脚本会遍历两个目录,读取每个PHP文件内容,计算特征:一是文件内容的长度,二是特殊字符比例(这里加了$_、(, ),因为WebShell常含命令执行)。白样本标记0,黑样本标记1,存成webshell_train_data.csv。然后用train_model.py训练(稍改特征名),最后写check_webshell.py检测新目录(比如test_dir),结果写到webshell_results.txt。
prepare_webshell_data.py:
import pandas as pd
import sys
import os
def extract_features(content):
special_chars = "';=<>[]$_()"
length = len(content)
special_count = sum(content.count(c) for c in special_chars)
special_ratio = special_count / length if length > 0 else 0
return [length, special_ratio]
if len(sys.argv) != 3:
print("用法: python prepare_webshell_data.py <white_dir> <black_dir>")
sys.exit(1)
white_dir = sys.argv[1]
black_dir = sys.argv[2]
train_data = []
for dir_path, label in [(white_dir, 0), (black_dir, 1)]:
try:
for filename in os.listdir(dir_path):
if not filename.endswith('.php'):
continue
filepath = os.path.join(dir_path, filename)
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read().strip()
if not content:
continue
features = extract_features(content)
train_data.append([filepath, content] + features + [label])
except FileNotFoundError:
print(f"错误:找不到目录 {dir_path}")
sys.exit(1)
df = pd.DataFrame(train_data, columns=['filepath', 'content', 'length', 'special_char_ratio', 'label'])
df.to_csv('webshell_train_data.csv', index=False)
print("训练数据已保存到 webshell_train_data.csv")
运行(python prepare_webshell_data.py white_dir black_dir),生成webshell_train_data.csv,类似:
filepath,content,length,special_char_ratio,label
white_dir/index.php,<?php echo "Hello World"; ?>,26,0.1538,0
white_dir/login.php,<?php session_start(); $user = $_POST['user']; ?>,48,0.2083,0
black_dir/shell1.php,<?php system($_GET['cmd']); ?>,29,0.3448,1
black_dir/shell2.php,<?php eval(base64_decode($_POST['code'])); ?>,44,0.3182,1
训练用 train_model.py:
import pandas as pd
import pickle
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_score, recall_score, f1_score
try:
df = pd.read_csv('webshell_train_data.csv')
except FileNotFoundError:
print("错误:找不到 webshell_train_data.csv,请先运行 prepare_webshell_data.py")
sys.exit(1)
X = df[['length', 'special_char_ratio']]
y = df['label']
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.3, random_state=42)
rf = RandomForestClassifier(n_estimators=10, random_state=42)
rf.fit(X_train, y_train)
y_pred = rf.predict(X_test)
print("精确率(抓WebShell准不准):", precision_score(y_test, y_pred))
print("召回率(WebShell漏没漏):", recall_score(y_test, y_pred))
print("F1分数(综合成绩):", f1_score(y_test, y_pred))
with open('webshell_model.pkl', 'wb') as f:
pickle.dump(rf, f)
with open('webshell_scaler.pkl', 'wb') as f:
pickle.dump(scaler, f)
print("模型和标准化器已保存到 webshell_model.pkl 和 webshell_scaler.pkl")
check_webshell.py:
import sys
import pickle
import pandas as pd
import os
def extract_features(content):
special_chars = "';=<>[]$_()"
length = len(content)
special_count = sum(content.count(c) for c in special_chars)
special_ratio = special_count / length if length > 0 else 0
return [length, special_ratio]
def predict_and_classify(content, scaler, model):
features = extract_features(content)
scaled_features = scaler.transform([features])
prediction = model.predict(scaled_features)[0]
return "WebShell" if prediction == 1 else "正常"
try:
with open('webshell_model.pkl', 'rb') as f:
rf = pickle.load(f)
with open('webshell_scaler.pkl', 'rb') as f:
scaler = pickle_load(f)
except FileNotFoundError:
print("错误:找不到 webshell_model.pkl 或 webshell_scaler.pkl,请先运行 train_model.py")
sys.exit(1)
if len(sys.argv) != 2:
print("用法: python check_webshell.py <test_dir>")
sys.exit(1)
test_dir = sys.argv[1]
try:
files = os.listdir(test_dir)
except FileNotFoundError:
print(f"错误:找不到 {test_dir}")
sys.exit(1)
results = []
for filename in files:
if not filename.endswith('.php'):
continue
filepath = os.path.join(test_dir, filename)
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read().strip()
if not content:
continue
result = predict_and_classify(content, scaler, rf)
results.append(f"{filepath} -> {result}")
with open('webshell_results.txt', 'w') as out_file:
out_file.write("\n".join(results))
print("结果已写入 webshell_results.txt")
test_dir 示例:
- normal.php:
<?php echo "Test"; ?>
- shell.php:
<?php passthru($_GET['cmd']); ?>
运行流程:
- python prepare_webshell_data.py white_dir black_dir
- python train_model.py
- python check_webshell.py test_dir
webshell_results.txt:
7.2 用孤立森林检测异常文件
我们再用孤立森林检测异常文件,不依赖标签,直接找可疑的。怎么做呢?用webshell_train_data.csv训练孤立森林,然后检测test_dir,结果写到iso_webshell_results.txt。
check_webshell_iso.py:
import sys
import pandas as pd
import os
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
def extract_features(content):
special_chars = "';=<>[]$_()"
length = len(content)
special_count = sum(content.count(c) for c in special_chars)
special_ratio = special_count / length if length > 0 else 0
return [length, special_ratio]
try:
df = pd.read_csv('webshell_train_data.csv')
except FileNotFoundError:
print("错误:找不到 webshell_train_data.csv,请先运行 prepare_webshell_data.py")
sys.exit(1)
X = df[['length', 'special_char_ratio']]
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
iso = IsolationForest(contamination=0.25, random_state=42)
iso.fit(X_scaled)
if len(sys.argv) != 2:
print("用法: python check_webshell_iso.py <test_dir>")
sys.exit(1)
test_dir = sys.argv[1]
try:
files = os.listdir(test_dir)
except FileNotFoundError:
print(f"错误:找不到 {test_dir}")
sys.exit(1)
results = []
for filename in files:
if not filename.endswith('.php'):
continue
filepath = os.path.join(test_dir, filename)
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read().strip()
if not content:
continue
features = extract_features(content)
scaled_features = scaler.transform([features])
prediction = iso.predict(scaled_features)[0]
result = "异常" if prediction == -1 else "正常"
results.append(f"{filepath} -> {result}")
with open('webshell_results.txt', 'w') as out_file:
out_file.write("\n".join(results))
print("结果已写入 webshell_results.txt")
运行(python check_webshell.py test_dir),生成webshell_results.txt:
8.写在最后
8.1 总结一下
这篇文章都干了啥
-
为啥用机器学习,有哪些知识点
-
常用算法有哪些,应该咋选
-
从头撸了数据预处理
-
提取了攻击特征
-
模型是怎么训练的
-
怎么调优
等等等等。。。。
大多数时候,其实机器学习并没想的那么复杂,没人去改什么算法,你看到的“模型优化”“精度提升”——说到底,全靠取特征、调参数、调数据,把思路捋顺了,数据量上来比啥都管用
8.3接下来可以做啥
- 换个数据集,自己试试;
- 把代码封装成一个检测脚本;
- 加入模型训练、调参、可视化;
- 或者,用你自己的攻击数据来跑一轮。
无论是哪一种,都是进阶的开始。