心率曲线数据分类实践
约 4155 字大约 14 分钟
2025-01-06
摘要
本课程设计分别研究了结合主成分分析(PCA)和线性判别分析(LDA)降维技术的K最近邻(KNN)算法在心率曲线数据分类中的应用。通过对2126个胎儿心率曲线(CTG)数据集的分析,以实现自动分类胎儿的心率状态。数据集提供了心率曲线的统计特征,并将其分为10种模式和3种诊断分类(正常、可疑、病态)。本研究首先是分别选用PCA和LDA,以减少数据维度并保留关键信息,再通过KNN算法对降维后的数据进行分类,最后采用多类别AUROC(MC-AUROC)作为性能评估指标。
问题描述
本项目旨在通过模式识别技术分析心率曲线(CTG)数据,自动分类胎儿的心率状。 数据集包括2126个胎儿的心率曲线观察测量数据。它提供了心率曲线的统计特征,如出现异常短期变化的时间百分比、直方图宽度等。其中的心率曲线模式被分为10种类型,包括安静的警觉状态、激活的警觉状态、加速/减速模式等。此外,它还提供了心率曲线的诊断分类,分为正常、可疑、病态三种。 本项目把统计特征作为样本特征,把正常、可疑、病态分别编码为1、2、3作为分类标签。
研究方法
在本项目中,我们的目标是通过模式识别技术分析心率曲线(CTG)数据,自动分类胎儿的心率状态。为了实现这一目标,我们选择了K最近邻(KNN)算法作为分类器,并考虑使用主成分分析(PCA)和线性判别分析(LDA)两种降维技术来处理数据集中的高维特征。
KNN分类
选择KNN算法的原因主要基于其简单性和有效性。KNN算法是一种非参数的分类方法,这意味着它不需要对数据的分布做出任何假设,这在处理复杂或未知分布的数据时非常有用。 KNN算法的核心思想是通过计算新样本与训练集中样本的距离来确定新样本的类别。具体来说,对于一个新的样本点 x ,KNN算法会找到训练集中距离 x 最近的 k 个样本,这些样本被称为最近邻。然后,算法根据这些最近邻样本的类别来决定新样本的类别。如果 k 个最近邻中某个类别出现的频率最高,则新样本被分类到该类别。

设 x 为新样本,xi 为训练集中的第 i 个样本,表示新样本与训练集中样本之间的距离,通常使用欧氏距离来计算:
d(x,i)=j=1∑n(xj−xij)2
其中, xj 是新样本的特征值,xij 是第 i 个训练样本的第 j 个特征值, n 是特征的数量。 在确定了 k 个最近邻之后,KNN算法会进行投票,选择出现次数最多的类别作为新样本的预测类别:
类别(x)=mode(类别(x1),类别(x2),...,类别(xk)))
mode 函数返回出现次数最多的类别。
此外,KNN算法不需要复杂的模型训练过程,可以直接使用训练数据进行预测,这使得它在处理小规模或中等规模数据集时特别有效。尽管KNN算法在处理大规模数据集时可能会遇到性能瓶颈,但对于本项目中的中等规模数据集,KNN算法是一个合适的选择。通过这种方式,我们可以快速地对新样本进行分类,同时保持较高的准确性。
PCA/LDA降维
在KNN分类前,预处理阶段,我们首先使用PCA进行降维。PCA是一种无监督学习方法,它通过正交变换将原始数据转换为一组线性不相关的新变量,这些新变量称为主成分,按照方差大小排序,以保留最重要的数据信息。选择PCA的原因在于其简单性和有效性,PCA能够减少数据的维度,同时尽可能保留原始数据的变异性,这对于提高模型的性能和减少计算资源的消耗是非常有益的。
设 X 为原始数据矩阵,其中每一行代表一个样本,每一列代表一个特征。PCA的目标是找到一个变换 W ,使得变换后的数据 Y = XW 的方差最大化,同时保持数据的线性不相关性。 协方差矩阵:
C=n1i=1∑n(xi−xˉ)(xi−xˉ)T
这个变换可以通过求解它的特征值和特征向量来实现:
CW=λiW
特征向量 W 构成了新坐标系的基,而特征值决定了各主成分的重要性。
此外,考虑到我们的项目中数据集包含标签信息,我们进一步引入了LDA进行降维作为并行方法,进行比较。LDA是一种监督学习的降维技术,它不仅寻找数据的主要成分,还考虑了数据的分类标签,以找到最佳的投影方向,使得不同类别的数据在该方向上尽可能分开,而同类数据尽可能接近。与PCA相比,LDA利用了类别信息,这可能使得降维后的特征对于分类任务更加有效。
LDA的目标是最大化类间散布矩阵 SB 与类内散布矩阵 SW 的比例,即最大化以下比例:
J(W)=∣∣WTSWW∣∣∣∣WTSBW∣∣
其中,W 是我们寻找的投影矩阵。通过求解这个优化问题,我们可以得到最佳的投影方向,使得不同类别尽可能分开,而同类尽可能接近。
MC-AUROC评估
在降维后,我们将应用KNN算法进行分类。为了评估KNN分类器的性能,我们选择了MC-AUROC作为评估指标。AUROC是一种衡量模型在所有可能的分类阈值下性能的指标,它通过绘制真正例率(True Positive Rate, TPR)和假正例率(False Positive Rate, FPR)来评估模型的分类能力。AUROC值越接近1,表示模型的分类能力越强。在本项目中,由于存在三个类别(正常、可疑、病态),一般的AUROC更适用于二分类问题,因此我们选择使用MC-AUROC,它通过对所有类别的AUROC值进行加权平均,以得到一个综合的性能指标,这对于多类别问题来说是一个更加公平和全面的评估方法。
对于二分类问题,AUROC的计算可以使用以下公式:
AUROC=∫01TPR(FPR)dFPR
其中 TPR(FPR) 是在给定的假正例率下的真正例率。这个积分可以通过梯形法则来近似计算,即:
AUROC=n1i=1∑n(TPR(FPRi)+TPR(FPRi+1))/2
其中,TPRi 和 FPRi 分别是第 i 个点的真正例率和假正例率,n 是ROC曲线上点的总数。 对于多类别问题,我们可以为每个类别计算一个AUROC值。设 AUCi 为类别 i 的AUROC值, n 为类别总数,则MC-AUROC可以表示为:

最后,我们将比较PCA和LDA降维后KNN分类器的MC-AUROC值以有效地评估不同降维技术对KNN分类器性能的影响。我们不仅要保证分类的准确性,还要确保模型在处理实际的心率曲线数据时具有较高的泛化能力和鲁棒性,以此来选择最适合本项目的方法。通过这种方法,我们可以确保所选的降维技术和分类器能够有效地处理复杂的心率曲线数据,为胎儿心率状态的自动分类提供可靠的支持。
实现方法
分析历程
首先我们研究了数据集之后确定使用降维后加KNN分类器的实现方式。 初步尝试时,我们使用了PCA作为降维方法,然后使用KNN分类器对降维后的数据进行分类。 对于K值的选择,由于样本数量太大,我们决定先采用25作为K值进行初步的尝试。 对于降维保留主成分的数量,我们使用0.95作为判定降维所保留的主成分数量的方差贡献率临界值。 对于数据集的划分,我们先尝试了训练集测试集比例为6:4的比例,然后简单统计了一下分类器在 测试集中的表现(简单统计了正确率), 发现效果并不好。因此我们开始尝试增减训练集的占比来测试。 在不同划分比例的简单测试下,我们发现有时在训练集占比很小的情况下,PCA+KNN分类器的表现反而较好, 据此,我们判断数据集中的数据可能存在某些问题,而之前的简单分类器处理并没有仔细考虑。 简单分析后,我们认为数据集中可能存在一些离群点,而且这些离群点的分布可能比较集中, 具体表现为某几个比较接近的分割比例训练的分类器表现差异巨大。由此,我们考虑先随机打乱全部的数据, 然后在打乱的数据基础上按照不同比例划分数据集,同时我们认为应该充分发挥训练集的样本标签信息, 因此我们加入了LDA作为对照的降维方法。 在多次随机打乱和尝试不同的划分比例后,我们发现无论是采用PCA还是LDA降维在K值为25的情况下, 简单统计的准确率都只能勉强到达0.89,和之前仅采用PCA无随机打乱时的表现没有明显提升。 考虑到医疗类数据样本自身的特性,和样本类别不均衡的问题,尤其考虑到医疗类数据样本中类别为异常的样本数量非常少, 我们决定采用一种类似K折交叉验证的方式确定超参数。
类K折交叉验证
我们把训练迭代超参数的过程分为打乱轮次迭代和参数尝试迭代两个不同层级的阶段。 一轮打乱迭代以一次对整个数据集的随机打乱开始,然后按照训练集、验证集、测试集6:2:2的比例 划分打乱后的数据,而且同时需要保证在不同类别内部,所含有的训练集、验证集、测试集样本比例也为6:2:2。 即从整体上看,整个数据集都按照比例划分;从具体的某个类别内部看,该类别也被按照比例划分。 这样就即通过随机打乱减轻了离群点的影响,同时保证不会因为随机打乱出现极端的数据集划分 (比如整个验证集或测试集中只要一种类别的数据样本!)。
我们把 K 值的范围设定为[5,25], 因为太小的 K 值没有意义,一般来说K值也不宜超过25。 在一次随机打乱,分割好数据集后,程序开始在设定范围内逐个尝试不同的 K 值。使用分割好的训练集 训练分类器,然后使用验证集进行多类AUC-ROC评估,并计算出 au_roc 的值。 即一次打乱、分割产生的数据集可以产生21个 au_roc。 然后把一次打乱分割产生的训练集、验证集、测试集和21个K值对应的au_roc保存下来,作为一次训练的存档。 接着进行n次上述过程,一共产生 n 个训练存档,每个存档中有不同的训练集、验证集、测试集和21个K值对应的au_roc。在每个存档中选择中位数 au_roc 对应的K值作为该存档产生的超参数,然后在 n 个存档产生的 n 个超参数中,选择产生 au_roc 最大的 K 作为整体的最优超参数。
实验结果
我们利用python进行数据的处理和存储,在python中运行一系列分类器实验,并将结果保存为CSV文件,再由MATLAB程序读取这些文件并生成图表。绘制图表的主要目的是可视化输出结果,便于我们选取KNN分类器的超参数K以及对比PCA和LDA降维方法的稳定性。 首先我们分别绘制了在使用PCA降维方法和使用LDA降维方法下,在九轮数据打乱中分类器的AU-ROC得分随KNN分类器的超参数K的增大的变化曲线。输出结果如图所示:
我们根据输出结果分析可知在九轮打乱中,随着K值增大,LDA方法的AU-ROC得分快速上升并趋于稳定,而PCA方法的AU-ROC得分具有一定随机性,在部分轮次中,得分规律没有明显的上升或者平稳趋势。这充分说明了使用LDA降维方法,在分类器选取不同超参数时,分类器分类效果比较稳定;而使用PCA降维方法,在分类器选取不同超参数时,分类器分类效果并不稳定,换言之,在使用PCA降维方法时,分类器对超参数的变化非常敏感。 然后我们分别绘制了PCA和LDA降维方法在九轮打乱中位于中位数得分的K值对应的AU-ROC曲线图和PR曲线图,并从中挑选出了两种分类器最优的PCA和LDA分类结果如下:
结果显示,采用LDA-KNN分类器最佳K值在16附近,AU-ROC值为0.9668;采用PCA-KNN分类器最佳K值在23附近,AU-ROC值为0.9634,但从多次运行结果来看,采用PCA降维方法的分类器输出结果并不稳定。
实验结论
对比可以发现,采用LDA降维的效果比采用PCA降维好。 采用LDA+KNN的方法最佳K值在16附近,AU_ROC为0.9668;采用PCA+KNN的方法最佳K值在23附近,AU_ROC为0.9634,但多次运行来看,PCA并不稳定。
参考文献
数据来源
- Reference: D Ayres de Campos et al. (2000) SisPorto 2.0 A Program for Automated Analysis of Cardiotocograms. J Matern Fetal Med 5:311-318
附录
数据集
程序实现
import argparse
import os
import re
from datetime import datetime
from prettytable import PrettyTable
from classifier import PCAKNNClassifier, LDAKNNClassifier, BaseClassifier
from data import DataProvider
from save import TrainSave, TrainSaveLoader
from figure import draw_best_train_pr_roc_plots, draw_train_pr_roc_plots, draw_train_k_auroc_plots
from concurrent.futures import ThreadPoolExecutor
# 一些常量
saves_path = r'./saves'
best_test_saves_path = r'./best_tests'
default_k_min = 5
default_k_max = 25
default_shuffle_turns = 9
running_modes = [
{'编号': 1, '模式': '训练', '描述': '批量运行调整超参数'},
{'编号': 2, '模式': '展示', '描述': '加载历史数据并测试'}
]
support_methods = [
{'编号': 1, '方法': 'LDA', '描述': 'LDA降维+KNN分类'},
{'编号': 2, '方法': 'PCA', '描述': 'PCA降维+KNN分类'}
]
def mode_train(interactive: bool):
def train(classifier: BaseClassifier, data_provider: DataProvider, k_min, k_max, shuffle=False):
train_data, verification_data, test_data = (
data_provider.provide_balanced_split_data(train_ratio=0.6,
verification_ratio=0.2,
shuffle=shuffle))
save = TrainSave(classifier.__class__.__name__, train_data, verification_data, test_data)
for k in range(k_min, k_max + 1):
classifier.train(train_data, k=k)
result, probability = classifier.predict(verification_data.samples)
save.add_evaluation_result(k, result, probability)
return save
data_provider = DataProvider(r'./原始数据/S3-CTG.xlsx')
data_provider.load_data()
classifiers: list[BaseClassifier] = []
table = PrettyTable()
table.field_names = ["编号", "方法", "描述"]
for item in support_methods:
table.add_row([item['编号'], item['方法'], item['描述']])
while interactive:
print(table)
match input(prompt('选择要使用的方法(输入all表示使用所有方法)', suffix=':')):
case '1':
classifiers.append(LDAKNNClassifier())
break
case '2':
classifiers.append(PCAKNNClassifier())
break
case 'all':
break
case _:
print('请输入正确的方法编号!')
continue
if not interactive or len(classifiers) == 0:
print(table)
print('已选择所有方法')
classifiers.append(PCAKNNClassifier())
classifiers.append(LDAKNNClassifier())
# 输入超参数 K 的搜索范围,默认最小值为5,默认最大值为25
k_min = default_k_min
k_max = default_k_max
while interactive:
input_str = input(prompt(f'输入 K 的最小值,默认为 {k_min}', suffix=':'))
match input_str:
case '':
break
case _:
try:
k_min = int(input_str)
if k_min < 5:
print('K 的最小值不能小于5!')
continue
break
except ValueError:
print('请输入正确的整数!')
while interactive:
input_str = input(prompt(f'输入 K 的最大值,默认为 {k_max}', suffix=':'))
match input_str:
case '':
break
case _:
try:
k_max = int(input_str)
if k_max < k_min:
print('K 的最大值不能小于K的最小值!')
continue
break
except ValueError:
print('请输入正确的整数!')
if not interactive:
# 打印K默认范围
print(f'K 的默认范围是 {k_min} ~ {k_max}')
shuffle_turn = default_shuffle_turns
while interactive:
input_str = input(prompt(f'输入打乱次数,默认为 {shuffle_turn}', suffix=':'))
match input_str:
case '':
break
case _:
try:
shuffle_turn = int(input_str)
if shuffle_turn < 0:
print('打乱次数不能小于0!')
continue
break
except ValueError:
print('请输入正确的整数!')
if not interactive:
print(f'打乱次数默认为 {shuffle_turn}')
train_saves = []
print('开始训练')
timelabel = datetime.now().strftime("[%Y-%m-%d_%H_%M_%S]")
def task(classifier,t,i):
print(f'开始训练 {classifier.__class__.__name__},第 {t} 次打乱')
train_save = train(classifier, data_provider, k_min, k_max, shuffle=True)
train_save.save(saves_path, f'{timelabel}{train_save.classifier}', t)
return train_save
def callback(future):
train_saves.append(future.result())
try:
with ThreadPoolExecutor(max_workers=8) as executor:
futures = []
i = 1
total_tasks = shuffle_turn * len(classifiers)
current_completed_tasks = -1
for t in range(1, shuffle_turn + 1):
for classifier in classifiers:
future = executor.submit(task, classifier, t,i)
future.add_done_callback(callback)
futures.append(future)
i += 1
import time
while current_completed_tasks != total_tasks:
l = len(train_saves)
# if l > current_completed_tasks:
# 停2s
time.sleep(4)
current_completed_tasks = l
print(f'进度:{current_completed_tasks}/{total_tasks}')
except KeyboardInterrupt:
executor.shutdown(cancel_futures=True)
print('训练完成')
# Table打印summaries
table = PrettyTable()
table.field_names = ['分类器', '训练集大小', '验证集大小', '测试集大小', 'K值范围', '最优K值', '最差K值',
'中位数K值', '最优au_roc', '最差au_roc', '中位数au_roc']
for train_save in train_saves:
summary = train_save.summary()
table.add_row([round(item, 4) if isinstance(item, float) else item for item in summary.values()])
print(table)
print('绘制并保存 K-AU-ROC 曲线')
draw_train_pr_roc_plots(is_interactive=False)
print('绘制并保存 PR 曲线和 AU-ROC 曲线')
draw_train_k_auroc_plots(is_interactive=interactive)
def mode_show(interactive: bool):
def list_saves():
"""
列出saves下的TrainSave文件名
:return: saves下的TrainSave文件名
"""
table = PrettyTable()
table.field_names = ['编号', '时间', '分类器', '最大打乱轮数']
# 定义正则表达式
pattern = re.compile(r'^\[(?P<time>[^\]]+)\](?P<classifier>[^.]+)\.summary(?:\.(?P<shuffle>\d+))?\.csv$')
# 使用字典存储每个时间-分类器组合的最大打乱轮数
max_shuffle_dict = {}
for file in os.listdir(saves_path):
match = pattern.match(file)
if match:
# 提取时间
time = match.group('time')
# 提取分类器
classifier = match.group('classifier')
# 提取打乱次数(如果有)
shuffle = int(match.group('shuffle')) if match.group('shuffle') else 0
# 更新最大打乱轮数
key = (time, classifier)
if key in max_shuffle_dict:
max_shuffle_dict[key] = max(max_shuffle_dict[key], shuffle)
else:
max_shuffle_dict[key] = shuffle
# 将字典中的信息添加到表格中
i = 1
for (time, classifier), max_shuffle in max_shuffle_dict.items():
table.add_row([i, time, classifier, max_shuffle if max_shuffle > 0 else '无'])
i += 1
print(table)
return max_shuffle_dict
def load_saves(saves_dict):
"""
加载 TrainSaves,为每个分类器选出最佳 k 值
选取原则:每个分类器得到n个随机打乱的数据集,相当于每个分类器运行n轮,
每轮运行都会尝试范围内的全部 k 值并计算 au_roc,
选出 au_roc 的中位数和对应 k 值作为本轮的结果,
接着在产生的 n 个 au_roc 中选出最大的 au_roc 对应的 k 值作为该分类器的最佳 k 值。
:param saves_dict:
:return: train_saves 和 best_saves
"""
loader = TrainSaveLoader()
train_saves: dict[str, dict[int, TrainSave]] = {}
best_saves: dict[str, tuple[TrainSave, int, float]] = {}
# 逐分类器加载TrainSave
for (time, classifier), max_shuffle in saves_dict.items():
if classifier not in train_saves.keys():
train_saves[classifier] = {}
# 每轮打乱中根据 au_roc 的中位数选出一个 k 值,
# 然后选出对应 au_roc 最大的 k 作为最佳 k 值
best_k = -1
best_au_roc = -1
best_save = None
for t in range(1, max_shuffle + 1):
# 加载每轮打乱的TrainSave
print(f'加载 {classifier} 第 {t} 轮打乱的训练数据')
train_save = loader.load(path=saves_path, name=f'[{time}]{classifier}', shuffle_turns=t)
summary = train_save.summary()
median_k = summary['中位数K值']
median_au_roc = summary['中位数au_roc']
# 一个简单的选择排序
if median_au_roc > best_au_roc:
best_k = median_k
best_au_roc = median_au_roc
# 更新最佳 TrainSave
best_save = train_save
# 保存全部 TrainSave
train_saves[classifier][t] = train_save
best_saves[classifier] = (best_save, best_k, best_au_roc)
# Table 打印每个分类器的最佳超参数 k 值,以及对应的 au_roc
table = PrettyTable()
table.field_names = ['分类器', '最佳K值', '对应au_roc']
for classifier, (train_save, k, au_roc) in best_saves.items():
table.add_row([classifier, k, au_roc])
print(table)
return train_saves, best_saves
def test(classifier: BaseClassifier, train_save: TrainSave, k: int):
print(f'开始使用 {classifier_name} 预测测试集')
# 使用记录的训练集重新训练模型
classifier.train(train_save.train_data, k)
# 使用记录的测试集进行预测
predict_result, predict_probability = classifier.predict(train_save.test_data.samples)
# 结果保存为新的 TrainSave
new_train_save = TrainSave(classifier_name, train_save.train_data, train_save.verification_data,
train_save.test_data)
new_train_save.add_evaluation_result(k, predict_result, predict_probability,
true_labels=train_save.test_data.labels)
return new_train_save
saves_dict = list_saves()
_, best_saves = load_saves(saves_dict)
# 根据最优 TrainSave,使用其记录的训练集重新训练模型,使用其记录的测试集进行预测
tests = []
for classifier_name, (train_save, k, au_roc) in best_saves.items():
match classifier_name:
case PCAKNNClassifier.__name__:
tests.append(test(PCAKNNClassifier(), train_save, k))
case LDAKNNClassifier.__name__:
tests.append(test(LDAKNNClassifier(), train_save, k))
case _:
raise ValueError(f'{classifier_name} 不支持')
# 保存测试结果
os.makedirs(best_test_saves_path, exist_ok=True)
time = datetime.now().strftime("[%Y-%m-%d_%H_%M_%S]")
for test in tests:
test.save(best_test_saves_path, f'{time}{test.classifier}', 0)
print('绘制并保存 PR 曲线和 AU-ROC 曲线')
draw_best_train_pr_roc_plots(is_interactive=interactive)
def prompt(str, prefix='', suffix=' >'):
if len(prefix) > 0:
prefix = f'({prefix}) '
return f'{prefix}{str}{suffix} '
def main(args):
# 选择运行模式,要验证输入是否合法
table = PrettyTable()
table.field_names = ["编号", "模式", "描述"]
# 添加行数据
for item in running_modes:
table.add_row([item['编号'], item['模式'], item['描述']])
os.makedirs(saves_path, exist_ok=True)
should_exiting = False
if args.isPresenting:
print("当前为演示模式,启用交互")
while args.isPresenting and (not should_exiting):
# 打印表格
print(table)
print("可输入 exit 结束运行")
mode = input(prompt('选择运行模式', suffix=':'))
match mode:
case '1':
mode_train(interactive=True)
# 回车返回上一级
input('回车返回上一级')
# break
case '2':
mode_show(interactive=True)
# 回车返回上一级
input('回车返回上一级')
case 'exit':
should_exiting = True
case _:
print('请输入正确的模式编号!')
if not args.isPresenting:
print('当前非演示模式,禁用交互,全部按默认参数运行')
mode_train(interactive=False)
mode_show(interactive=False)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--isPresenting', action='store_true', help='是否展示运行模式', default=False)
args = parser.parse_args()
main(args)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import random
import numpy as np
import pandas as pd
class DataContainer:
"""
存储样本数据和对应的标签。
"""
def __init__(self, samples: list, labels: list):
"""
初始化 DataContainer。
:param samples: 样本数据列表
:param labels: 标签列表
"""
from numpy.ma.core import shape
if len(samples) != len(labels):
raise ValueError('样本的数量和样本标签的数量必须一致!')
self.samples = samples # 样本数据
self.labels = labels # 标签
self._num_samples = len(samples)
self._num_features = len(samples[0])
self._num_labels = len(labels)
self._num_classes = len(np.unique(self.labels))
def __len__(self):
"""
返回样本数量。
:return: 样本数量
"""
return len(self.samples)
def __str__(self):
"""
返回 DataContainer 的字符串表示形式。
:return: 字符串表示形式
"""
s = str([{'sample': self.samples[i], 'label': self.labels[i]} for i in range(0, len(self.samples))])
return s
def num_samples(self):
return self._num_samples
def num_features(self):
return self._num_features
def num_labels(self):
return self._num_labels
def num_classes(self):
return self._num_classes
def classes(self):
return np.unique(self.labels)
class DataProvider:
"""
提供数据读取和分割功能。
"""
def __init__(self, store_path=r'./data'):
"""
初始化 DataProvider。
:param store_path: 数据存储路径,默认为 './data'
"""
self.store_path = store_path # 数据存储路径
self._data: dict[str, list] = {} # 存储读取的数据
def __str__(self) -> str:
"""
返回 DataProvider 的字符串表示形式。
:return: 字符串表示形式
"""
s = str(self._data)
return s
def load_data(self):
"""
读取数据。
"""
df = pd.read_excel(self.store_path, 'Data')
# 之前采用PCA时没有注意到索引范围有误,导致特征选择出错
self._data['samples'] = df.iloc[:, 5:27].values[1:] # 样本数据
self._data['labels'] = df[['NSP']].values[1:] # 标签
def provide_data(self) -> DataContainer:
"""
提供全部数据。
:return: 全部数据的 DataContainer 对象
"""
raw_data = self._data
length = len(raw_data['samples'])
samples = [sample for sample in raw_data['samples']]
import numpy as np
labels = [np.ravel(label) for label in raw_data['labels']]
return DataContainer(samples, labels)
def provide_split_data(self, ratio=0.6, shuffle=False) -> tuple[DataContainer, DataContainer]:
"""
提供按比例分割的训练数据和测试数据。
:param ratio: 训练集占总数据的比例,默认为 0.6
:param shuffle: 是否打乱数据,默认为 False
:return: 训练数据和测试数据的 DataContainer 对象
"""
raw_data = self._data
length = len(raw_data['samples'])
data: list[tuple] = [(raw_data['samples'][i], raw_data['labels'][i]) for i in range(0, length)]
if shuffle:
random.shuffle(data)
samples_shuffled = [item[0] for item in data]
labels_shuffled = [item[1] for item in data]
sp_point = int(len(samples_shuffled) * ratio)
training_samples = samples_shuffled[:sp_point]
training_labels = labels_shuffled[:sp_point]
testing_samples = samples_shuffled[sp_point:]
testing_labels = labels_shuffled[sp_point:]
training_data = DataContainer(training_samples, training_labels)
testing_data = DataContainer(testing_samples, testing_labels)
return training_data, testing_data
def provide_balanced_split_data(self, train_ratio=0.6, verification_ratio=0.5, shuffle=False) -> tuple[DataContainer, DataContainer, DataContainer]:
"""
提供按比例分割的训练数据和验证数据。
:param train_ratio: 训练集占总数据的比例,默认为 0.6
:param verification_ratio: 验证集占非训练数据的比例,默认为 0.5
:param shuffle: 是否打乱数据,默认为 False
:return: 训练数据和验证数据的 DataContainer 对象
"""
# 获取数据集
data = self.provide_data()
# 将数据集按标签分类存储
samples_dict = {int(label): [data.samples[i] for i in range(0, data.num_samples()) if data.labels[i] == label]
for label in data.labels}
# 初始化训练集、验证集和测试集的列表
train_pairs = []
verification_pairs = []
test_data_pairs = []
# 遍历每个标签对应的样本
for label in samples_dict.keys():
samples = samples_dict[label]
# 如果需要打乱数据,则执行打乱操作
if shuffle:
random.shuffle(samples)
# 计算训练集和验证集的分割点
tr_sp_point = int(len(samples) * train_ratio)
ve_sp_point = int(len(samples) * (train_ratio + verification_ratio))
# 根据分割点将数据添加到对应的集合中
train_pairs.extend([(sample, label) for sample in samples[:tr_sp_point]])
verification_pairs.extend([(sample, label) for sample in samples[tr_sp_point:ve_sp_point]])
test_data_pairs.extend([(sample, label) for sample in samples[ve_sp_point:]])
# 将数据对列表转换为 DataContainer 对象
train_data = DataContainer([item[0] for item in train_pairs], [item[1] for item in train_pairs])
verification_data = DataContainer([item[0] for item in verification_pairs],
[item[1] for item in verification_pairs])
test_data = DataContainer([item[0] for item in test_data_pairs], [item[1] for item in test_data_pairs])
# 返回分割后的数据集
return train_data, verification_data, test_data
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import math
from abc import ABC, abstractmethod
from enum import Enum
from math import sqrt
import numpy as np
from sklearn.decomposition import PCA
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import StandardScaler
from data import DataContainer
class Memory:
"""
存储处理后的数据及其处理器和维度信息。
"""
def __init__(self, processed_data, processor, dimens: int):
self.processed_data = processed_data # 处理后的数据
self.processor = processor # 数据处理对象(PCA 或 LDA)
self.dimens = dimens # 保留的主成分个数
class BaseClassifier(ABC):
@staticmethod
def simple_name():
pass
"""
抽象基类,定义分类器的基本接口。
"""
@abstractmethod
def train(self, data: DataContainer,k: int):
"""
训练分类器。
:param data: 训练数据容器
"""
pass
@abstractmethod
def predict(self, samples: list) -> tuple[list, list]:
"""
预测样本的类别。
:param samples: 待预测的样本列表
:return: 预测的类别列表和概率列表
"""
pass
def _store_memory(self, memory):
"""
存储训练后记忆。
:param memory: 训练后记忆
"""
self._trained_memory = memory
def _get_memory(self):
"""
获取练后记忆。
:return: 训练后的记忆
"""
if not hasattr(self, '_trained_memory'):
raise ValueError('还没训练呢!')
return self._trained_memory
class PCAKNNClassifier(BaseClassifier):
"""
PCA 降维后 KNN 分类器。
"""
def __init__(self, dimens=-1, print_log=False):
self.print_log = print_log
self.dimens = dimens
@staticmethod
def simple_name():
return 'PCA+KNN'
def train(self, data: DataContainer, k: int):
if 0 < self.dimens < data.num_features():
dimens = self.dimens
else:
pca_scaler = StandardScaler()
pca = PCA()
scaled_samples = pca_scaler.fit_transform(data.samples)
_ = pca.fit_transform(scaled_samples)
if self.print_log:
print('主成分方差贡献率:', ["{:.8f}".format(x) for x in pca.explained_variance_ratio_])
temp = np.cumsum(pca.explained_variance_ratio_)
dimens = 0
for i in range(0, len(temp)):
if temp[i] >= 0.9:
dimens = i + 1
break
if self.print_log:
print(f'选择保留主成分个数为:{dimens}')
pca_scaler = StandardScaler()
pca = PCA(n_components=dimens)
scaled_samples = pca_scaler.fit_transform(data.samples)
pca_processed_samples = pca.fit_transform(scaled_samples)
knn = KNeighborsClassifier(n_neighbors=k)
knn.fit(pca_processed_samples, data.labels)
self._store_memory((pca_processed_samples, pca, pca_scaler, knn))
def predict(self, samples: list):
_, pca, pca_scaler, knn = self._get_memory()
processed_samples = pca.transform(pca_scaler.transform(samples))
return knn.predict(processed_samples), knn.predict_proba(processed_samples)
class LDAKNNClassifier(BaseClassifier):
def __init__(self, dimens=-1, print_log=False):
self.print_log = print_log
self.dimens = dimens
@staticmethod
def simple_name():
return 'LDA+KNN'
def train(self, data: DataContainer, k: int):
if 0 < self.dimens < data.num_features():
dimens = self.dimens
else:
scaler = StandardScaler()
scaled_samples = scaler.fit_transform(data.samples)
lda = LinearDiscriminantAnalysis()
lda.fit_transform(scaled_samples, data.labels)
if self.print_log:
print('主成分方差贡献率:', ["{:.8f}".format(x) for x in lda.explained_variance_ratio_])
temp = np.cumsum(lda.explained_variance_ratio_)
dimens = 0
for i in range(0, len(temp)):
if temp[i] >= 0.9:
dimens = i + 1
break
if self.print_log:
print(f'选择保留主成分个数为:{dimens}')
scaler = StandardScaler()
lda = LinearDiscriminantAnalysis(n_components=dimens)
processed_samples = lda.fit_transform(scaler.fit_transform(data.samples), data.labels)
knn = KNeighborsClassifier(n_neighbors=k)
knn.fit(processed_samples, data.labels)
self._store_memory((processed_samples, lda, scaler, knn))
def predict(self, samples: list):
_, lda, scaler, knn = self._get_memory()
processed_samples = lda.transform(scaler.transform(samples))
return knn.predict(processed_samples), knn.predict_proba(processed_samples)
class Classifier:
"""
分类器类,支持 PCA 和 LDA 两种降维方法。
"""
class Method(Enum):
"""
枚举类,定义分类器支持的方法。
"""
PCA = 1 # 主成分分析
LDA = 2 # 线性判别分析
def __init__(self, print_log=False):
"""
初始化分类器。
:param print_log: 是否打印日志,默认为 False
"""
self._pca_processed_data = None # PCA 处理后的数据
self._n = None # KNN 的邻居数
self._trained_memory = None # 训练后的内存
self.print_log = print_log # 是否打印日志
def train(self, training_data: DataContainer, method: Method, **train_args) -> None:
"""
训练分类器。
:param training_data: 训练数据容器
:param method: 降维方法(PCA 或 LDA)
:param train_args: 其他训练参数
"""
match method:
case Classifier.Method.PCA:
self._store_memory(self._pca_train(training_data.samples, **train_args))
knn_data = DataContainer(self._get_memory().processed_data, training_data.labels)
self._knn_init(knn_data, **train_args)
self._check_selected_method(method)
case Classifier.Method.LDA:
self._store_memory(self._lda_train(training_data.samples, training_data.labels, **train_args))
knn_data = DataContainer(self._get_memory().processed_data, training_data.labels)
self._knn_init(knn_data, **train_args)
self._check_selected_method(method)
def predict(self, data: list) -> list:
"""
预测数据。
:param data: 待预测的数据
:return: 预测结果
"""
match self.method:
case Classifier.Method.PCA:
return self._knn_predict(self._pca_process(data))
case Classifier.Method.LDA:
return self._knn_predict(self._lda_process(data))
def _pca_train(self, data: list, **args) -> Memory:
"""
使用 PCA 训练数据。
:param data: 训练数据
:param args: 其他参数
:return: 训练后的内存对象
"""
if ('dimens' in args) and (not hasattr(self, 'dimens')):
dimens = args['dimens']
else:
pca_scaler = StandardScaler()
pca = PCA()
scaled_samples = pca_scaler.fit_transform(data)
_ = pca.fit_transform(scaled_samples)
if self.print_log:
print('主成分方差贡献率:', ["{:.8f}".format(x) for x in pca.explained_variance_ratio_])
temp = np.cumsum(pca.explained_variance_ratio_)
dimens = 0
for i in range(0, len(temp)):
if temp[i] >= 0.9:
dimens = i + 1
break
if self.print_log:
print(f'选择保留主成分个数为:{dimens}')
pca_scaler = StandardScaler()
pca = PCA(n_components=dimens)
scaled_samples = pca_scaler.fit_transform(data)
pca_processed_samples = pca.fit_transform(scaled_samples)
return Memory(pca_processed_samples, pca, dimens)
def _pca_process(self, data: list, **args) -> list:
"""
使用 PCA 处理数据。
:param data: 待处理的数据
:param args: 其他参数
:return: 处理后的数据
"""
pca = self._get_memory().processor
scaler = StandardScaler()
scaled_samples = scaler.fit_transform(data)
pca_processed_samples = pca.transform(scaled_samples)
return pca_processed_samples
def _lda_train(self, data: list, labels: list, **args) -> Memory:
"""
使用 LDA 训练数据。
:param data: 训练数据
:param labels: 训练标签
:param args: 其他参数
:return: 训练后的内存对象
"""
if ('dimens' in args) and (not hasattr(self, 'dimens')):
dimens = args['dimens']
else:
lda_scaler = StandardScaler()
lda = LinearDiscriminantAnalysis()
scaled_samples = lda_scaler.fit_transform(data)
_ = lda.fit_transform(scaled_samples, np.ravel(labels))
if self.print_log:
print('主成分方差贡献率:', ["{:.8f}".format(x) for x in lda.explained_variance_ratio_])
temp = np.cumsum(lda.explained_variance_ratio_)
dimens = 0
for i in range(0, len(temp)):
if temp[i] >= 0.9:
dimens = i + 1
break
if self.print_log:
print(f'选择保留主成分个数为:{dimens}')
lda_scaler = StandardScaler()
lda = LinearDiscriminantAnalysis(n_components=dimens)
scaled_samples = lda_scaler.fit_transform(data)
lda.fit(scaled_samples, np.ravel(labels))
lda_processed_samples = lda.transform(scaled_samples)
return Memory(lda_processed_samples, lda, dimens)
def _lda_process(self, data: list, **args) -> list:
"""
使用 LDA 处理数据。
:param data: 待处理的数据
:param args: 其他参数
:return: 处理后的数据
"""
lda = self._get_memory().processor
scaler = StandardScaler()
scaled_samples = scaler.fit_transform(data)
lda_processed_samples = lda.transform(scaled_samples)
return lda_processed_samples
def _check_selected_method(self, method: Method):
"""
检查当前选择的降维方法是否与已选择的方法一致。
:param method: 当前选择的降维方法
"""
if hasattr(self, 'method'):
if self.method != method:
raise ValueError(f'当前已选择{self.method.name}作为训练方法,无法切换!')
else:
self.method = method
def _knn_init(self, training_data: DataContainer, **train_args) -> None:
"""
初始化 KNN 分类器。
:param training_data: 训练数据容器
:param train_args: 其他训练参数
"""
self._n = int(min(35.0, sqrt(len(training_data.samples))))
self._knn = KNeighborsClassifier(n_neighbors=self._n)
self._knn.fit(training_data.samples, np.ravel(training_data.labels))
def _knn_predict(self, data) -> tuple[list,list]:
"""
使用 KNN 分类器进行预测。
:param data: 待预测的数据
:return: 预测结果
"""
return self._knn.predict(data),self._knn.predict_proba(data)
def _store_memory(self, memory: Memory):
"""
存储训练后的内存对象。
:param memory: 训练后的内存对象
"""
self._trained_memory = memory
def _get_memory(self) -> Memory:
"""
获取训练后的内存对象。
:return: 训练后的内存对象
"""
if not hasattr(self, '_trained_memory'):
raise ValueError('还没训练呢!')
return self._trained_memory
import matlab
from matlab import engine
eng_future = matlab.engine.start_matlab(background=True)
print('后台异步启动MatlabPyEngine')
cded = False
def get_eng():
global eng_future
global cded
eng = eng_future.result()
if not cded:
cded = True
eng.cd(r'./figure_process', nargout=0)
eng.warning('off', nargout=0)
return eng
def draw_train_pr_roc_plots(is_interactive=False):
# 使用matlab eng_futureine 调用 figure_process/PlotFigure.m
get_eng().PlotFigure('../saves/', './pr_roc', is_interactive, nargout=0)
def draw_train_k_auroc_plots(is_interactive=False):
get_eng().plotAUROCFiles('../saves/', './k_auroc', is_interactive, nargout=0)
def draw_best_train_pr_roc_plots(is_interactive=False):
get_eng().PlotFigure('../best_tests/', './pr_roc', is_interactive, nargout=0)
import pandas as pd
from sklearn.metrics import roc_auc_score
from data import DataContainer
class TrainSave:
def __init__(self, classifier: str, train_data: DataContainer, verification_data: DataContainer,
test_data: DataContainer, rebuilt_from_file=False):
self.classifier = classifier
self.train_data = train_data
self.verification_data = verification_data
self.test_data = test_data
self.evaluation_results = {}
self.au_roc: dict[int, float] = {}
self.rebuilt_from_file = rebuilt_from_file
self.dataset_dfs = {}
if not rebuilt_from_file:
col = [f'特征{i}' for i in range(self.train_data.num_features())]
col.extend(['标签'])
tr_d = [[*self.train_data.samples[i], self.train_data.labels[i]] for i in
range(self.train_data.num_samples())]
tr_df = pd.DataFrame(columns=col, data=tr_d)
ve_d = [[*self.verification_data.samples[i], self.verification_data.labels[i]] for i in
range(self.verification_data.num_samples())]
ve_df = pd.DataFrame(columns=col, data=ve_d)
te_d = [[*self.test_data.samples[i], self.test_data.labels[i]] for i in range(self.test_data.num_samples())]
te_df = pd.DataFrame(columns=col, data=te_d)
self.dataset_dfs = {'train': tr_df, 'verification': ve_df, 'test': te_df}
def add_evaluation_result(self, k:int ,predict_result, predict_probability,true_labels = None):
if self.rebuilt_from_file:
return
if true_labels is None:
true_labels = self.verification_data.labels
au_roc = roc_auc_score(y_true=true_labels, y_score=predict_probability, multi_class='ovo',
average='weighted', labels=self.verification_data.classes())
self.au_roc[k] = au_roc
self.evaluation_results[k] = [[true_labels[i], predict_result[i], *predict_probability[i]] for
i in range(len(true_labels))]
def save(self, path, name, shuffle_turns=None | int):
if self.rebuilt_from_file:
print('此存档从文件加载而来,不允许修改')
return
suffix = 'csv'
if shuffle_turns is not None:
suffix = f'{shuffle_turns}.csv'
# 用pandas保存为CSV
# 训练集,验证集,测试集保存为三个CSV
for key, df in self.dataset_dfs.items():
df.to_csv(f'{path}/{name}.{key}.{suffix}', index=False)
# 保存评估结果
data_to_save = []
for k in self.evaluation_results.keys():
for result in self.evaluation_results[k]:
data_to_save.append([k, *result])
col = ['K', '真实标签', '预测标签', *[f'预测为{i + 1}的概率' for i in range(self.train_data.num_classes())]]
pd.DataFrame(data_to_save, columns=col).to_csv(f'{path}/{name}.evaluation.{suffix}', index=False)
au_roc_data = []
for k in self.au_roc.keys():
au_roc_data.append([k, self.au_roc[k]])
col = ['K', '验证集AU-ROC']
pd.DataFrame(au_roc_data, columns=col).to_csv(f'{path}/{name}.au_roc.{suffix}', index=False)
# 保存summary()返回的字典到csv
pd.DataFrame([self.summary()]).to_csv(f'{path}/{name}.summary.{suffix}', index=False)
def summary(self) -> dict:
# 返回一个字典
return {
'分类器': self.classifier,
'训练集大小': self.train_data.num_samples(),
'验证集大小': self.verification_data.num_samples(),
'测试集大小': self.test_data.num_samples(),
'K值范围': f'{min(self.evaluation_results.keys())}~{max(self.evaluation_results.keys())}',
# au_roc的键
'最优K值': max(self.au_roc, key=self.au_roc.get),
'最差K值': min(self.au_roc, key=self.au_roc.get),
'中位数K值': sorted(self.au_roc, key=self.au_roc.get)[len(self.au_roc) // 2],
'最优au_roc': float(max(self.au_roc.values())),
'最差au_roc': float(min(self.au_roc.values())),
'中位数au_roc': float(sorted(self.au_roc.values())[len(self.au_roc) // 2])
}
class TrainSaveLoader:
@staticmethod
def load_data_container_from_csv(file_path):
df = pd.read_csv(file_path)
samples = df.iloc[:, :-1].values.tolist()
labels = df.iloc[:, -1].values.tolist()
return DataContainer(samples, labels)
@classmethod
def load(cls, path, name, shuffle_turns=None):
# 构建文件名后缀
suffix = 'csv'
if shuffle_turns is not None:
suffix = f'{shuffle_turns}.csv'
# 加载数据集
train_data = cls.load_data_container_from_csv(f'{path}/{name}.train.{suffix}')
verification_data = cls.load_data_container_from_csv(f'{path}/{name}.verification.{suffix}')
test_data = cls.load_data_container_from_csv(f'{path}/{name}.test.{suffix}')
# 加载Summary以获取分类器名称
summary_df = pd.read_csv(f'{path}/{name}.summary.{suffix}')
classifier = summary_df['分类器'].iloc[0]
# 初始化TrainSave对象
train_save = TrainSave(classifier, train_data, verification_data, test_data, rebuilt_from_file=True)
# 加载评估结果
evaluation_df = pd.read_csv(f'{path}/{name}.evaluation.{suffix}')
evaluation_results = {}
for _, row in evaluation_df.iterrows():
k = int(row['K'])
if k not in evaluation_results.keys():
evaluation_results[k] = ([],[],[])
true_labels = row['真实标签']
predicted_labels = row['预测标签']
predicted_probabilities = row[
[f'预测为{i + 1}的概率' for i in range(train_data.num_classes())]].values.tolist()
evaluation_results[k][0].append(true_labels)
evaluation_results[k][1].append(predicted_labels)
evaluation_results[k][2].append(predicted_probabilities)
for k, (true_labels,predicted_labels, predicted_probabilities) in evaluation_results.items():
# train_save.add_evaluation_result(k, predicted_labels, predicted_probabilities)
t = (true_labels,predicted_labels, predicted_probabilities)
train_save.evaluation_results[k] = [[true_labels[i], predicted_labels[i], *predicted_probabilities[i]] for
i in range(verification_data.num_samples())]
# 加载AU-ROC
au_roc_df = pd.read_csv(f'{path}/{name}.au_roc.{suffix}')
for _, row in au_roc_df.iterrows():
k = int(row['K'])
au_roc = row['验证集AU-ROC']
train_save.au_roc[k] = au_roc
return train_save
function PlotFigure(basepath,outputpath,isVisible)
% PlotFigure 从给定的基本路径读取评估文件并绘制ROC曲线。
% 输入参数:
% basepath - 包含summary和evaluation文件的基本目录路径(字符串或字符向量)。
% outputpath - 相对于basepath的保存输出图像的子目录路径(字符串或字符向量)。
% isVisible - 控制是否显示生成的图形窗口(逻辑值)。
%
% 输出: 无直接输出,但会生成SVG格式的图形文件到指定路径,并可以选择性地显示图形窗口。
%
% 注意事项:
% - 文件命名需遵循特定模式,以确保正则表达式能正确提取分类器类型和轮次信息。
% - summary和evaluation文件必须成对存在,且仅文件名有所区别。
% - 每个分类器类型的每一轮将创建一个新的图形窗口,其中包含ROC和PR曲线。
%
% 定义所有要读取的文件路径模式
filePattern = strcat(basepath,'*.evaluation.*.csv');
% 获取所有符合模式的文件名
evaluationFiles = dir(fullfile(filePattern));
% 初始化变量名以匹配数据列
variableNamesSummary = {'Classifier', 'TrainingSetSize', 'ValidationSetSize', 'TestSetSize', 'KValueRange', 'OptimalK', 'WorstK', 'MedianK', 'OptimalAUROC', 'WorstAUROC', 'MedianAUROC'};
variableNamesEvaluation = {'K', 'TrueLabel', 'PredictedLabel', 'PredProbClass1', 'PredProbClass2', 'PredProbClass3'};
% 初始化结构体数组来保存每一对summary和evaluation文件的信息
fileInfo = struct('Classifier', {}, 'SummaryFile', {}, 'EvaluationFile', {}, 'MedianK', {}, 'Round', {});
% 读取每个summary文件并提取中位数K值,同时存储文件对信息
for fileIdx = 1:length(evaluationFiles)
evaluationFile = evaluationFiles(fileIdx).name;
disp(evaluationFile)
% 使用正则表达式提取文件名中的轮次数
roundMatch = regexp(evaluationFile, 'evaluation\.(\d+)\.', 'tokens');
if isempty(roundMatch) || isempty(roundMatch{1})
warning('Could not extract round number from file name: %s', evaluationFile);
continue;
end
roundNumber = str2double(roundMatch{1}{1}); % 提取并转换为数字
% 构建summary文件路径 (假设evaluation和summary文件在同一目录下,且只有文件名不同)
summaryFile = strrep(evaluationFile, '.evaluation.', '.summary.');
% 确保summary文件存在
if exist(fullfile(basepath, summaryFile), 'file') ~= 2
warning('Summary file not found for evaluation file: %s', evaluationFile);
continue;
end
% 读取CSV文件中的数据
summaryData = readtable(fullfile(basepath, summaryFile));
% 设置summary文件的变量名称
summaryData.Properties.VariableNames = variableNamesSummary;
% 提取分类器类型和中位数K值
classifierType = extractBetween(evaluationFile, ']', '.evaluation.'); % 假设文件命名规则包含分类器类型
medianK = summaryData.MedianK(1); % 使用圆括号
% 存储文件对及其信息,包括新的Round字段
fileInfo(end+1) = struct('Classifier', string(classifierType), 'SummaryFile', summaryFile, 'EvaluationFile', evaluationFile, 'MedianK', medianK, 'Round', roundNumber);
end
% 检查是否收集到了任何有效文件信息
if isempty(fileInfo)
error('No valid evaluation-summary file pairs found.');
end
% 提取分类器类型和轮次到单独的元胞数组
classifiers = [fileInfo.Classifier]; % 确保是字符串数组
% 获取唯一的分类器类型和轮次组合
uniqueClassifiers = unique(classifiers);
for clfIdx = 1:length(uniqueClassifiers)
classifierType = uniqueClassifiers(clfIdx);
% 获取当前分类器类型下的所有轮次
clfRounds = [fileInfo(strcmp([fileInfo.Classifier], classifierType)).Round];
uniqueRounds = unique(clfRounds);
% 对于每个唯一的分类器类型和轮次组合,创建一个新的图形窗口
for roundIdx = 1:length(uniqueRounds)
roundNumber = uniqueRounds(roundIdx);
aspectRatio = 16 / 9; widthFig = 800; heightFig = widthFig / aspectRatio;
fig = figure('Name', sprintf('%s, Round = %.0f', classifierType, roundNumber), 'NumberTitle', 'off','Visible', lower(logical(isVisible)),'Position', [100, 100, widthFig, heightFig]);
% 绘制ROC曲线
hold on;
title(sprintf('ROC Curves for %s, Round = %.0f', classifierType, roundNumber));
xlabel('False Positive Rate');
ylabel('True Positive Rate');
% 初始化图例字符串
legendStrRoc = {};
% 遍历所有文件对,仅绘制与当前分类器类型和轮次匹配的结果
relevantFiles = fileInfo(strcmp([fileInfo.Classifier], classifierType) & [fileInfo.Round] == roundNumber);
for fileInfoIdx = 1:length(relevantFiles)
evaluationData = readtable(fullfile(basepath, relevantFiles(fileInfoIdx).EvaluationFile));
% 设置evaluation文件的变量名称
evaluationData.Properties.VariableNames = variableNamesEvaluation;
% 提取真实标签和预测分数
trueLabels = evaluationData.TrueLabel;
predictedScores = table2array(evaluationData(:, {'PredProbClass1', 'PredProbClass2', 'PredProbClass3'}));
% 获取所有唯一的类别
allUniqueClasses = unique(trueLabels);
numClasses = numel(allUniqueClasses);
% 将真实标签转换为独热编码
oneHotTrueLabels = false(height(evaluationData), numClasses);
for i = 1:height(evaluationData)
[~, classIdx] = ismember(trueLabels(i), allUniqueClasses);
if ~isempty(classIdx)
oneHotTrueLabels(i, classIdx) = true;
end
end
% 初始化加权平均ROC的数据
weightedFroc = [];
weightedTroc = [];
totalWeight = 0;
% 共同的FROC网格
commonFroc = linspace(0, 1, 100);
% 绘制每个类别的ROC曲线,并标注K值
for classIdx = 1:numClasses
if sum(oneHotTrueLabels(:,classIdx)) > 0 && sum(~oneHotTrueLabels(:,classIdx)) > 0
% 绘制ROC曲线
[fpr, tpr, ~, aucRoc] = perfcurve(double(oneHotTrueLabels(:,classIdx)), predictedScores(:,classIdx), 1);
plot(fpr, tpr, '-','LineWidth', 2);
% 更新ROC图例
legendStrRoc{end+1} = sprintf('Class %d, K=%.0f (AUC: %.2f)', allUniqueClasses(classIdx), relevantFiles(fileInfoIdx).MedianK, aucRoc);
% 计算加权平均ROC曲线
% 计算每个类别的权重
weight = sum(oneHotTrueLabels(:,classIdx)) / height(evaluationData);
% 确保fpr和tpr是唯一的
[fpr, uniqueIdx] = unique(fpr);
tpr = tpr(uniqueIdx);
% 插值到共同的FPR网格
interpolatedTpr = interp1(fpr, tpr, commonFroc, 'linear', 'extrap');
% 加权平均ROC曲线的数据
if isempty(weightedFroc)
weightedFroc = commonFroc * weight;
weightedTroc = interpolatedTpr * weight;
else
weightedFroc = weightedFroc + (commonFroc * weight);
weightedTroc = weightedTroc + (interpolatedTpr * weight);
end
totalWeight = totalWeight + weight; % 更新总权重
else
warning('Not enough positive and negative examples for Class %d with Classifier %s and Round = %.0f', allUniqueClasses(classIdx), classifierType, roundNumber);
end
end
% 计算和绘制加权平均ROC曲线
if totalWeight > 0
weightedFroc = weightedFroc / totalWeight; % 归一化
weightedTroc = weightedTroc / totalWeight; % 归一化
plot(weightedFroc, weightedTroc, '--', 'LineWidth', 3, 'Color', 'k');
legendStrRoc{end+1} = 'Weighted Average ROC';
end
end
% 更新图例
legend(legendStrRoc, 'Location', 'best');
hold off;
% 检查并创建保存目录
saveDir = strcat(basepath,outputpath);
if ~exist(saveDir, 'dir')
mkdir(saveDir);
end
% 在这里插入保存SVG文件的代码
saveFileName = sprintf('%s_Round%.0f.svg', classifierType, roundNumber);
saveFilePath = fullfile(saveDir, saveFileName); % 修改为你想要保存的位置
% 保存当前图形窗口为SVG文件
saveas(fig, saveFilePath);
disp(['Saved ', saveFilePath]);
end
end
function plotAUROCFiles(basepath, outputpath, isVisible)
% PLOTAUROCFILES 读取包含 'au_roc' 字样的CSV文件并绘制AUROC曲线。
%
% 输入:
% basepath - 文件所在的目录路径(字符串或字符向量)。
% 必须包含多个CSV文件,其命名应符合特定模式以便提取分类器类型和轮次信息。
% outputpath - 图形输出保存的相对子目录路径(字符串或字符向量),相对于basepath。
% 如果该路径不存在,则会自动创建。
% isVisible - 逻辑值(true 或 false),用于控制图形窗口的可见性。
%
% 输出: 无直接输出,但会生成SVG格式的图形文件到指定路径,并可以选择性地显示图形窗口。
%
% 注意事项:
% - 文件名需遵循 [timestamp]ClassifierType.au_roc.[number].csv 的模式,其中 ClassifierType 是分类器的标识符。
% - 本函数假定所有CSV文件都包含两列:'k' 和 'au_roc',分别代表K值和相应的AUROC得分。
% isVisible - 逻辑值(true 或 false),用于控制图形窗口的可见性
% 使用 dir 函数查找包含 'au_roc' 字样的文件
files = dir(fullfile(basepath, '*au_roc*.csv'));
% 检查是否有匹配的文件
if isempty(files)
error('No files with "au_roc" in the name found in the specified directory.');
end
% 初始化变量名
variableNamesAu_roc = {'k', 'au_roc'};
% 创建一个容器来保存每个分类器的图形句柄
classifierFigs = containers.Map('KeyType','char','ValueType','any');
% 遍历每个文件
for i = 1:length(files)
% 获取文件名
fileName = fullfile(basepath, files(i).name);
% 提取分类器类型和打乱次数
[classifierType, roundNumber] = extractClassifierInfo(files(i).name);
% 读取CSV文件
data = readtable(fileName);
data.Properties.VariableNames = variableNamesAu_roc;
% 提取k和au_roc列
k = data.k;
au_roc = data.au_roc;
aspectRatio = 16 / 4.5; widthFig = 800; heightFig = widthFig / aspectRatio;
% 如果该分类器还没有对应的图形,则创建新的图形窗口
if ~isKey(classifierFigs, classifierType)
fig = figure('Name', classifierType, 'NumberTitle', 'off', 'Visible', logical(isVisible),'Position', [100, 100, widthFig, heightFig]);
hold on;
classifierFigs(classifierType) = fig; % 将图形句柄存入Map中
else
fig = classifierFigs(classifierType); % 获取已存在的图形句柄
set(0, 'CurrentFigure', fig)
set(fig, 'Visible', logical(isVisible));
end
% 绘制曲线
h = plot(k, au_roc, '-o', 'LineWidth', 1, 'MarkerFaceColor', 'b','MarkerSize', 4);
set(h, 'DisplayName', sprintf('Round %.0f', roundNumber));
% 设置图形标题和轴标签
title(sprintf('AUROC vs. k for %s', classifierType));
xlabel('k');
ylabel('AUROC');
% 添加图例
legend show;
% 保存图像到指定路径
saveDir = fullfile(basepath, outputpath);
if ~exist(saveDir, 'dir')
mkdir(saveDir);
end
% 仅当所有轮次都添加完毕后保存图像
if i == length(files) || ~strcmp(classifierType, extractClassifierInfo(files(min(i+1, length(files))).name))
saveFileName = sprintf('%s.svg', classifierType);
saveFilePath = fullfile(saveDir, saveFileName); % 修改为你想要保存的位置
saveas(fig, saveFilePath);
disp(['Saved ', saveFilePath]);
end
end
function [classifierType, roundNumber] = extractClassifierInfo(filename)
% EXTRACTCLASSIFIERINFO 提取分类器类型和轮次数的辅助函数
%
% 输入:
% filename - 文件名(字符串或字符向量)
%
% 输出:
% classifierType - 分类器类型(字符串)
% roundNumber - 轮次数(整数)
%
% 假设文件名格式为 [timestamp]ClassifierType.au_roc.[number].csv
%
% 使用正则表达式匹配文件名
match = regexp(filename, '^\[.*?\](LDAKNNClassifier|PCAKNNClassifier)\.au_roc\.(\d+)\.csv$', 'tokens');
if ~isempty(match)
classifierType = match{1}{1}; % 获取分类器类型 (LDAKNN 或 PCAKNN)
roundNumber = str2double(match{1}{2}); % 获取轮次数
else
error(['Invalid filename format: ', filename]);
end
end
end
贡献者
- SunRt233