基于开源的easyFL,本文介绍几种常见的模拟联邦学习时划分数据集的方法并做实验观察联邦学习中数据非独立同分布对全局模型的影响。实验用的数据集MNIST数据集,模拟10个客户端进行50轮训练。

划分数据集

  • 划分的数据集与原数据集数据独立同分布;
  • 划分的数据集服从$|{D_i(Y)}|=K$, 即划分后的每个数据集都只有$K$个标签种类的数据;
  • 划分的数据集服从狄利克雷分布,$D_i$~$Dirichlet(\alpha P)$,其中$\alpha$越大,划分的数据集与原数据集分布越接近;
  • 使用对数正态分布来确定每个客户端的数据数量,Log-Normal Distribution。

划分数据集的可视化

划分数据集的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
num_clients = 10
iid_partition = {'benchmark':{'name':'flgo.benchmark.mnist_classification'},
'partitioner':{'name':'IIDPartitioner', 'para':{'num_clients':num_clients}}} # iid
diversity_partition01 = {'benchmark':{'name':'flgo.benchmark.mnist_classification'},
'partitioner':{'name': 'DiversityPartitioner','para':{'num_clients':num_clients, 'diversity':0.1}}}
diversity_partition05 = {'benchmark':{'name':'flgo.benchmark.mnist_classification'},
'partitioner':{'name': 'DiversityPartitioner','para':{'num_clients':num_clients, 'diversity':0.5}}}
diversity_partition09 = {'benchmark':{'name':'flgo.benchmark.mnist_classification'},
'partitioner':{'name': 'DiversityPartitioner','para':{'num_clients':num_clients, 'diversity':0.9}}}
dirichlet_partition01 = {'benchmark':{'name':'flgo.benchmark.mnist_classification'},
'partitioner':{'name': 'DirichletPartitioner','para':{'num_clients':num_clients, 'alpha':0.1}}}
dirichlet_partition10 = {'benchmark':{'name':'flgo.benchmark.mnist_classification'},
'partitioner':{'name': 'DirichletPartitioner','para':{'num_clients':num_clients, 'alpha':1.0}}}
dirichlet_partition50 = {'benchmark':{'name':'flgo.benchmark.mnist_classification'},
'partitioner':{'name': 'DirichletPartitioner','para':{'num_clients':num_clients, 'alpha':5.0}}}
task_dict = {
'mnist_iid_partition':iid_partition,
'mnist_diversity_partition01':diversity_partition01,
'mnist_diversity_partition05':diversity_partition05,
'mnist_diversity_partition09':diversity_partition09,
'mnist_dirichlet_partition01':dirichlet_partition01,
'mnist_dirichlet_partition10':dirichlet_partition10,
'mnist_dirichlet_partition50':dirichlet_partition50
}
for task in task_dict:
if not os.path.exists(task):
flgo.gen_task(task_dict[task], task)

可视化结果划分结果,独立同分布如图1所示。


图1 划分后的数据集与原数据集独立同分布
划分的数据集服从狄利克雷分布的划分结果如图2所示。

图2 从左到右分别为当$\alpha$为0.1,1.0,5.0时,狄利克雷分布划分的数据集
划分的数据集服从$|\{D_i(Y)\}|=K$时划分结果如图3所示。

图3 从左到右$K$分别等于1,5,9划分的数据集划分结果

实验

在划分的数据集上训练模型

模型选用CNN,代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from torch import nn
import torch.nn.functional as F
from flgo.utils.fmodule import FModule

class Model(FModule):
def __init__(self):
super().__init__()
self.conv1 = nn.Conv2d(in_channels=1, out_channels=32, kernel_size=5, padding=2)
self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=5, padding=2)
self.fc1 = nn.Linear(3136, 512)
self.fc2 = nn.Linear(512, 10)

def forward(self, x):
x = self.get_embedding(x)
x = self.fc2(x)
return x

def get_embedding(self, x):
x = x.view((x.shape[0],28,28))
x = x.unsqueeze(1)
x = F.max_pool2d(F.relu(self.conv1(x)), 2)
x = F.max_pool2d(F.relu(self.conv2(x)), 2)
x = x.view(-1, x.shape[1]*x.shape[2]*x.shape[3])
x = F.relu(self.fc1(x))
return x

def init_local_module(object):
pass

def init_global_module(object):
if 'Server' in object.__class__.__name__:
object.model = Model().to(object.device)

训练代码如下。

1
2
3
4
5
6
7
8
import flgo.algorithm.fedavg as fedavg
import flgo.experiment.analyzer
import os
options = {'gpu': {0, 1}, 'log_file':True,'batch_size':256, 'num_rounds':100, 'num_epochs':5,
'learning_rate':0.04, 'proportion':1.0, 'sample':'full', 'aggregate':'weighted_com'}
runners = [flgo.init(task, fedavg, options) for task in task_dict]
for runner in runners:
runner.run()

其中训练的参数选项如图4所示。


图4 训练参数选项

绘制训练结果

绘制训练结果的代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import flgo.experiment.analyzer as al
import matplotlib.pyplot as plt

div_recs = al.Selector({'task':[t for t in task_dict if 'mnist_div' in t or 'mnist_iid' in t], 'header':['fedavg']}) #其中header参数筛选绘制的结果 如这里只绘制fedavg的训练结果
plt.subplot(221)
for task in div_recs.tasks:
rec_list = div_recs.records[task]
for rec in rec_list:
plt.plot(rec.data['communication_round'], rec.data['test_accuracy'], label=task.split('/')[-1])
plt.title('testing accuracy on mnist - diversity')
plt.ylabel('test_accuracy')
plt.xlabel('communication round')
plt.legend()
plt.subplot(222)
for task in div_recs.tasks:
rec_list = div_recs.records[task]
for rec in rec_list:
plt.plot(rec.data['communication_round'], rec.data['test_loss'], label=task.split('/')[-1])
plt.title('testing loss on mnist - diversity')
plt.ylabel('test_loss')
plt.xlabel('communication round')
plt.legend()

plt.subplot(223)
dir_recs = al.Selector({'task':[task for task in task_dict if 'iid' in task or 'dir' in task], 'header':['fedavg']})
for task in dir_recs.tasks:
rec_list = dir_recs.records[task]
for rec in rec_list:
plt.plot(rec.data['communication_round'], rec.data['test_accuracy'], label=task.split('/')[-1])
plt.title('testing accuracy on mnist - Dirichlet')
plt.ylabel('test_accuracy')
plt.xlabel('communication round')
plt.legend()

plt.subplot(224)
dir_recs = al.Selector({'task':[task for task in task_dict if 'iid' in task or 'dir' in task], 'header':['fedavg']})
for task in dir_recs.tasks:
rec_list = dir_recs.records[task]
for rec in rec_list:
plt.plot(rec.data['communication_round'], rec.data['test_loss'], label=task.split('/')[-1])
plt.title('testing loss on mnist - Dirichlet')
plt.ylabel('test_loss')
plt.xlabel('communication round')
plt.legend()
plt.show()

实验结果如图5所示,可以看出当客户端的数据非独立同分布的程度很大时,模型收敛性受到了很大的影响。


图5 实验结果