
需求說明:
安全漏掃出Oracle、MySQL等數據庫存在大幾百個CVE漏洞,如何快速分析出每個CVE漏洞影響的數據庫版本、詳細漏洞說明等。
解決方案:
通過python程序,按年下載json格式的cve漏洞說明,自動轉換為csv格式,將csv文件、掃描出的漏洞列表分別導入到mysql庫不同表,進行分析查詢(或者直接使用execl進行對比查詢)。
實施步驟:
1.將CVE庫下載到本地
默認下載的CVE庫是json格式,例如:
https://nvd.nist.gov/feeds/json/cve/1.1/nvdcve-1.1-2025.json.zip
需要通過python進行遠程下載并且轉換為csv格式。
詳細python代碼見文章末尾。
python代碼執行過程如下:
[root@cjc-db-03 cvelist]# python3.6 2025_csv.py
/usr/local/lib/python3.6/site-packages/requests/__init__.py:104: RequestsDependencyWarning: urllib3 (1.26.20) or chardet (5.0.0)/charset_normalizer (2.0.12) doesn't match a supported version!
RequestsDependencyWarning)
============================================================
CVE 數據導出工具 (CSV 格式)
============================================================
目標年份: 2025
============================================================
? pandas 已安裝
? tqdm 已安裝
? chardet 已安裝
? html 已安裝
所有必要包已安裝完成
正在下載 2025 年的 CVE 數據...
來源: https://nvd.nist.gov/feeds/json/cve/1.1/nvdcve-1.1-2025.json.zip
? 下載成功!
正在解析 JSON 數據...
找到 18602 個 CVE 條目
處理CVE條目: 8%|█████▎ | 1559/18602 [00:00<00:01, 15580.9處理CVE條目: 17%|██████████▌ | 3118/18602 [00:00<00:01, 14處理CVE條目: 26%|████████████████▌ | 4889/18602 [00:00<00:處理CVE條目: 37%|███████████████████████▏ | 6831/18602 [00處理CVE條目: 46%|████████████████████████████▉ | 8554/1860處理CVE條目: 58%|███████████████████████████████████▊ | 107處理CVE條目: 68%|██████████████████████████████████████████ 處理CVE條目: 77%|███████████████████████████████████████████████▉ 處理CVE條目: 88%|███████████████████████████████████████████████████處理CVE條目: 98%|███████████████████████████████████████████████████處理CVE條目: 100%|██████████████████████████████████████████████████████████████| 18602/18602 [00:01<00:00, 16854.35it/s]
成功處理 18602 個 CVE 條目
正在生成 CSV 文件...
? 轉換完成!CSV 文件已保存為: CVE_2025_Report.csv
============================================================
導出成功!CSV 文件路徑: /cvelist/CVE_2025_Report.csv
============================================================
CSV 文件包含以下列:
- CVE ID: 漏洞的唯一標識符
- 描述: 漏洞的英文描述
- 發布時間: CVE 的發布日期
- CVSS 版本: 使用的 CVSS 版本 (v3.0 或 v2.0)
- CVSS 分數: 漏洞的 CVSS 基本分數
- 嚴重性: 漏洞的嚴重等級
- 供應商: 受影響的供應商列表
- 產品: 受影響的產品列表
- 參考鏈接: 相關參考鏈接
============================================================
文件大小: 10.64 MB
后續操作建議:
1. 使用 Excel 或文本編輯器打開: /cvelist/CVE_2025_Report.csv
2. 導入到 MySQL 數據庫: 使用 LOAD DATA INFILE 或 MySQL Workbench
3. 使用 Python pandas 進行數據分析: pd.read_csv('/cvelist/CVE_2025_Report.csv')
按 Enter 退出...
2.將下載后的csv文件導入到MySQL數據庫里。
MySQL:新增庫、表
create database cve_database;
use cve_database;
CREATE TABLE `2025_cve` (
`id` int NOT NULL AUTO_INCREMENT,
`cve_id` varchar(50) DEFAULT NULL,
`description` text,
`published_date` varchar(50) DEFAULT NULL,
`cvss_version` varchar(50) DEFAULT NULL,
`cvss_score` varchar(50) DEFAULT NULL,
`severity` varchar(50) DEFAULT NULL,
`vendors` text,
`products` text,
`ref_links` text,
PRIMARY KEY (`id`));
將CSV數據加載進MySQL
LOAD DATA INFILE '/cvelist/CVE_2025_Report.csv'
INTO TABLE 2025_cve
FIELDS TERMINATED BY ','
ENCLOSED BY '"'
LINES TERMINATED BY '\n'
IGNORE 1 ROWS -- 跳過標題行
(
cve_id,
description,
published_date, -- 原樣導入日期字符串
cvss_version,
cvss_score,
severity,
vendors,
products,
ref_links
);
相同的方法,每一年使用獨立的表,分別導入到 2024_cve,2023_cve,…,2018_cve表里。
創建視圖,方便查詢:
create view cvelist as select * from 2025_cve union all select * from 2024_cve union all select * from 2023_cve union all select * from 2022_cve union all select * from 2021_cve union all select * from 2020_cve union all select * from 2019_cve union all select * from 2018_cve;
3.將安全漏掃的數據導入到MySQL數據庫里
主要是序號和CVE編號兩列。
先創建對應表結構:
create table cve_check (id int,cve_id varchar(200));
ALTER TABLE cve_check CONVERT TO CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci;
準備CSV文件
原文件是execlg格式,需要另存為CSV格式,逗號分隔。例如
1,Apache Tomcat 安全漏洞(CVE-202x-xxxxx)
2,OpenSSL 信任管理問題漏洞(CVE-202x-xxxxx)
3,OpenSSL 信任管理問題漏洞(CVE-202x-xxxxx)
4,Oracle MySQL Server存在未明漏洞(CVE-202x-xxxxx)
5,Oracle MySQL Server存在未明漏洞(CVE-202x-xxxxx)
......
將csv導入到cve_check表里
LOAD DATA INFILE '/cvelist/001.csv'
INTO TABLE cve_check
FIELDS TERMINATED BY ','
ENCLOSED BY '"'
LINES TERMINATED BY '\n'
IGNORE 1 ROWS -- 跳過標題行
(
id,
cve_id
);
4.關聯查詢
安全漏掃表和CVE本地庫表cvelist進行關聯查詢,查詢出安全漏掃表里cve_id對應的漏洞描述和版本信息。
其中 安全漏掃表 記錄的cve_id格式如下:
Oracle MySQL Server存在未明漏洞(CVE-202x-xxxxx)
需要 SUBSTRING_INDEX 截取出括號內的CVE編號,輸出到cve_check01表里。
create table cve_check01 as select id,SUBSTRING_INDEX(SUBSTRING_INDEX(cve_id, '(', -1), ')', 1) AS cve_id from cve_check;
關聯查詢,得到需要的數據
SELECT t1.id,t1.cve_id,t2.description FROM cve_check01 t1 left join cvelist t2 on t1.cve_id=t2.cve_id order by t1.id;
導出數據:
SELECT t1.id, t1.cve_id, t2.description
FROM cve_check01 t1
LEFT JOIN cvelist t2 ON t1.cve_id = t2.cve_id
ORDER BY t1.id
INTO OUTFILE '/cvelist/xxx01.csv'
FIELDS TERMINATED BY ','
ENCLOSED BY '"'
LINES TERMINATED BY '\n';
最終將導出xxx01.csv表的description列,全部復制粘貼到安全掃描表里的漏洞說明列。
但是 漏洞說明 里的描述信息太多,如何把漏洞說明里的版本信息篩選出來,觀察發現,版本信息都是 "affected"開頭, “prior.結尾的”,所以需要使用 REGEXP_SUBSTR 函數。
例如:
affected are 5.6.41 and prior, 5.7.23 and prior and 8.0.12 and prior.
最終加上 affected_versions 列,導出
SELECT t1.id, t1.cve_id,REGEXP_SUBSTR(description,'affected are.*?prior\\.') AS affected_versions,t2.description
FROM cve_check01 t1
LEFT JOIN cvelist t2 ON t1.cve_id = t2.cve_id
ORDER BY t1.id
INTO OUTFILE '/cvelist/xxx01.csv'
FIELDS TERMINATED BY ','
ENCLOSED BY '"'
LINES TERMINATED BY '\n';
2025_csv.py 代碼如下:
[root@cjc-db-03 cvelist]# cat 2025_csv.py
import requests
import zipfile
import json
import pandas as pd
import os
import io
import re
import html
import sys
import subprocess
import time
import platform
from datetime import datetime
from tqdm import tqdm
def install_with_retry(package, retries=3, delay=5):
"""帶重試機制的包安裝函數"""
for attempt in range(retries):
try:
print(f"嘗試安裝 {package} (嘗試 {attempt+1}/{retries})")
# 使用國內鏡像源并增加超時時間
command = [
sys.executable, "-m", "pip", "install",
"--index-url", "https://pypi.tuna.tsinghua.edu.cn/simple",
"--trusted-host", "pypi.tuna.tsinghua.edu.cn",
"--timeout", "60", # 增加超時時間到60秒
"--retries", "3", # 設置重試次數
package
]
# 在 Windows 上禁用緩存以避免權限問題
if platform.system() == "Windows":
command.append("--no-cache-dir")
result = subprocess.run(
command,
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
print(f"? 成功安裝 {package}")
return True
except subprocess.CalledProcessError as e:
print(f"安裝 {package} 失敗: {e.stderr.strip()}")
print(f"等待 {delay} 秒后重試...")
time.sleep(delay)
delay *= 2 # 指數退避策略
print(f"? 無法安裝 {package},已達到最大重試次數")
return False
def check_and_install_requirements():
"""檢查并安裝所有必要的包"""
required_packages = ['pandas', 'tqdm', 'chardet', 'html']
# 檢查是否已安裝
for package in required_packages:
try:
__import__(package)
print(f"? {package} 已安裝")
except ImportError:
print(f"{package} 未安裝,開始安裝...")
if not install_with_retry(package):
print("安裝失敗,請手動運行以下命令安裝:")
print(f"pip install {package} --index-url https://pypi.tuna.tsinghua.edu.cn/simple --trusted-host pypi.tuna.tsinghua.edu.cn --timeout 60")
sys.exit(1)
print("所有必要包已安裝完成")
def clean_text(text):
"""清理文本確保CSV兼容性"""
if not isinstance(text, str) or pd.isna(text):
return ""
# 1. 解碼HTML實體
text = html.unescape(text)
# 2. 移除非UTF-8字符
text = re.sub(r'[^\x00-\x7F\x80-\xFF]', '', text)
# 3. 移除控制字符
text = re.sub(r'[\x00-\x1F\x7F]', ' ', text)
# 4. 替換特殊引號
text = text.replace('"', "'").replace('\\', '')
# 5. 規范化空格
text = re.sub(r'\s+', ' ', text).strip()
return text
def download_nvd_json(year):
"""
下載指定年份的 NVD JSON 壓縮文件
"""
url = f"https://nvd.nist.gov/feeds/json/cve/1.1/nvdcve-1.1-{year}.json.zip"
print(f"正在下載 {year} 年的 CVE 數據...")
print(f"來源: {url}")
try:
response = requests.get(url, timeout=30)
response.raise_for_status() # 檢查HTTP錯誤
print("? 下載成功!")
return io.BytesIO(response.content)
except requests.exceptions.RequestException as e:
print(f"? 下載失敗: {str(e)}")
return None
def extract_and_convert_to_csv(zip_stream, year):
"""
從內存中的 ZIP 流提取 JSON 文件并轉換為 CSV
"""
try:
# 使用內存中的 ZIP 文件
with zipfile.ZipFile(zip_stream) as zip_file:
json_filename = f"nvdcve-1.1-{year}.json"
# 檢查 ZIP 中是否存在該文件
if json_filename not in zip_file.namelist():
print(f"? 在 ZIP 文件中未找到 {json_filename}")
return False
# 讀取 JSON 內容
with zip_file.open(json_filename) as json_file:
print("正在解析 JSON 數據...")
data = json.load(json_file)
# 提取 CVE 數據
cve_list = []
total_items = len(data["CVE_Items"])
print(f"找到 {total_items} 個 CVE 條目")
for i, item in enumerate(tqdm(data["CVE_Items"], desc="處理CVE條目")):
try:
# 提取基本 CVE 信息
cve_id = item["cve"]["CVE_data_meta"]["ID"]
# 獲取英文描述
descriptions = item["cve"]["description"]["description_data"]
en_description = next((desc["value"] for desc in descriptions if desc["lang"] == "en"), "無英文描述")
# 清理描述文本
en_description = clean_text(en_description)
# 提取發布時間
published_date = item["publishedDate"]
# 提取嚴重性評分 (支持 CVSS v3 和 v2)
cvss_version = ""
base_severity = "N/A"
cvss_score = "N/A"
# 先嘗試獲取 CVSS v3
if "baseMetricV3" in item["impact"]:
cvss_version = "v3.0"
cvss_data = item["impact"]["baseMetricV3"]["cvssV3"]
cvss_score = cvss_data.get("baseScore", "N/A")
base_severity = cvss_data.get("baseSeverity", "N/A")
# 回退到 CVSS v2
elif "baseMetricV2" in item["impact"]:
cvss_version = "v2.0"
cvss_data = item["impact"]["baseMetricV2"]["cvssV2"]
cvss_score = cvss_data.get("baseScore", "N/A")
base_severity = item["impact"]["baseMetricV2"].get("severity", "N/A")
# 提取供應商和產品信息 (處理缺失字段)
vendors = set()
products = set()
# 安全地訪問 'affects' 字段
if "affects" in item["cve"]:
affects = item["cve"]["affects"]
if "vendor" in affects and "vendor_data" in affects["vendor"]:
for vendor_data in affects["vendor"]["vendor_data"]:
if "vendor_name" in vendor_data:
vendor_name = clean_text(vendor_data["vendor_name"])
if vendor_name:
vendors.add(vendor_name)
# 提取產品信息
if "product" in vendor_data and "product_data" in vendor_data["product"]:
for product_data in vendor_data["product"]["product_data"]:
if "product_name" in product_data:
product_name = clean_text(product_data["product_name"])
if product_name:
products.add(product_name)
vendor_list = ", ".join(sorted(vendors)) if vendors else "N/A"
product_list = ", ".join(sorted(products)) if products else "N/A"
# 提取參考鏈接
references = []
if "references" in item["cve"] and "reference_data" in item["cve"]["references"]:
for ref in item["cve"]["references"]["reference_data"]:
url = clean_text(ref["url"])
if url:
references.append(url)
reference_list = "; ".join(references) if references else "N/A"
# 添加數據到列表
cve_list.append({
"CVE ID": cve_id,
"描述": en_description,
"發布時間": published_date,
"CVSS 版本": cvss_version,
"CVSS 分數": cvss_score,
"嚴重性": base_severity,
"供應商": vendor_list,
"產品": product_list,
"參考鏈接": reference_list
})
except Exception as e:
# 記錄錯誤但繼續處理其他條目
print(f"! 處理條目 {i+1} 時出錯: {str(e)}")
continue
print(f"成功處理 {len(cve_list)} 個 CVE 條目")
# 創建 DataFrame
df = pd.DataFrame(cve_list)
# 設置輸出文件名
csv_filename = f"CVE_{year}_Report.csv"
# 導出到 CSV
print("正在生成 CSV 文件...")
df.to_csv(csv_filename, index=False, encoding='utf-8')
print(f"? 轉換完成!CSV 文件已保存為: {csv_filename}")
# 返回CSV文件路徑
return os.path.abspath(csv_filename)
except Exception as e:
print(f"? 處理過程中發生嚴重錯誤: {str(e)}")
return False
def main():
# 設置目標年份
year = 2025
print("=" * 60)
print("CVE 數據導出工具 (CSV 格式)")
print("=" * 60)
print(f"目標年份: {year}")
print("=" * 60)
# 檢查并安裝依賴
check_and_install_requirements()
# 下載 ZIP 文件到內存
zip_stream = download_nvd_json(year)
if zip_stream:
# 處理內存中的 ZIP 文件并轉換為 CSV
csv_path = extract_and_convert_to_csv(zip_stream, year)
if csv_path:
print("\n" + "=" * 60)
print(f"導出成功!CSV 文件路徑: {csv_path}")
print("=" * 60)
print("CSV 文件包含以下列:")
print("- CVE ID: 漏洞的唯一標識符")
print("- 描述: 漏洞的英文描述")
print("- 發布時間: CVE 的發布日期")
print("- CVSS 版本: 使用的 CVSS 版本 (v3.0 或 v2.0)")
print("- CVSS 分數: 漏洞的 CVSS 基本分數")
print("- 嚴重性: 漏洞的嚴重等級")
print("- 供應商: 受影響的供應商列表")
print("- 產品: 受影響的產品列表")
print("- 參考鏈接: 相關參考鏈接")
print("=" * 60)
# 提供文件大小信息
file_size = os.path.getsize(csv_path) / (1024 * 1024) # 轉換為MB
print(f"文件大小: {file_size:.2f} MB")
# 提供后續操作建議
print("\n后續操作建議:")
print(f"1. 使用 Excel 或文本編輯器打開: {csv_path}")
print("2. 導入到 MySQL 數據庫: 使用 LOAD DATA INFILE 或 MySQL Workbench")
print("3. 使用 Python pandas 進行數據分析: pd.read_csv('" + csv_path + "')")
else:
print("\n? CSV 文件生成失敗")
else:
print("\n? 無法繼續處理,因為下載失敗")
if __name__ == "__main__":
main()
print("\n按 Enter 退出...")
input()
歡迎關注我的公眾號《IT小Chen》




