2020.06.24 更新错误的Schwefel,现在已经收敛到跟原作者一样好了
0. 前言:
本篇主要是纪录Adaptive Particle Swarm Optimization的阅读心得。
1. 动机:
最近脑子动到想用PSO来训练神经网路,所以蒐集了一些相关资料,预计将分享5篇文章
Particle swarm optimization (PSO). A tutorialAdaptive Particle Swarm OptimizationOrthogonal Learning Particle Swarm OptimizationComparison of particle swarm optimization and backpropagation as training algorithms for neural networksA hybrid particle swarm optimization–back-propagation algorithm for feedforward neural network training2. 正文
原文的粒子定义如下,在一个D维度解空间中,粒子i的速度和位置定义如下:
粒子i在任意维度d的速度和位置更新公式:
原文提到PSO有两个版本,分别是global-version PSO(GPSO)和local-version PSO(LPSO)。在Advances of Computational Intelligence in Industrial Systems的第9页和Manufacturing Optimization through Intelligent Techniques的第41页有提到GPSO和LPSO的差别以及优缺点:
3. APSO的计算流程与算式
相较于传统PSO,APSO在计算过程中多了演化状态评估(Evolutionary State Estimation, ESE)和菁英学习策略(Elitist Learning Strategy, ELS)两个步骤。
ESE可以根据粒子在每次迭代的分布情形f来调整加速度常数C、惯性权重W,进而加速粒子收敛对于gBest提供专属的学习策略,进而提升所有粒子的求解效率3.1 整体流程
不啰嗦,直接贴图。如果删去流程图的第三步骤,那就是传统PSO
作者定义的加速度限制为解空间的20%,也就是
由于作者没有特别说明,因此初始加速度我令为
3.2 演化状态评估ESE
根据每次迭代粒子的分布情形来调整w、c1、c2
3.2.1 计算粒子彼此间的距离
很明显这是在求L2範数(欧式距离),然后为了编程方便,我把式子简化如下
3.2.2 计算演化因子(evolutionary factor, f)
3.2.3 模糊推论
作者对粒子群分布状况作了4个类别:探勘(exploration, S1)、开发(exploitation, S2)、收敛(Convergence, S3)、跳出(Jumping-out, S4)对f作模糊推论,得知当前粒子分布是属于哪一类别

原文没有完整说明规则库内容,我只能透过内文的範例进行推论,结果如下
3.2.4 惯性权重
原文的初始惯性权重为0.9
3.2.5 加速度常数
加速度常数的範围需落在,初始值为2.0
如果c1+c2大于指定範围,则用下式调整
根据不同的状态,以加速度率(acceleration rate)调整加速度常数
加速度率不会随着迭代而改变


原文有明确定义所谓的轻微是指0.5倍的加速度率,因此可以得到下面结论
3.3 菁英学习策略ELS
想像一下,某个粒子要更新自己的加速度,但是他发现自己就是pBest,同时他又是gBest,所以加速度更新公式的三个子项中就有2项是0。由于这个粒子在该次代没有任何可以学习的对象,因此他本次移动是毫无根据的,这会间接导致粒子子求解速度被拖慢,因此制定一个扰动策略(类似基因算法的突变)来尝试得到更好的gBest。当然,不是每次扰动都会得到更好的gBest,但我们可以取代最差的粒子,使得整体表现变得更均匀



我自己对5-2作了修改,如下
新5-2)如果适应值v没有先前的gBest好,但确定胜过当前适应值最差的粒子才取代,同时如果胜过最差的pBest则也进行替换原因是在没有确认谁强谁弱就直接取代,怎么想都觉得有点奇怪4. 实现
4.1 APSO代码
import numpy as npimport matplotlib.pyplot as pltnp.random.seed(42)class APSO(): def __init__(self, fit_func, num_dim, num_particle=20, max_iter=500, x_max=1, x_min=-1, w_max=0.9, w_min=0.4, c1=2.0, c2=2.0, k=0.2): self.fit_func = fit_func self.num_dim = num_dim self.num_particle = num_particle self.max_iter = max_iter self.x_max = x_max self.x_min = x_min self.w = w_max self.w_max = w_max self.w_min = w_min self.c1 = c1 self.c2 = c2 self.k = k self._iter = 1 self.bestX_idx = -1 self.worstX_idx = -1 # case1. 仅初始化 self.delta = np.random.uniform(high=0.1, low=0.05, size=1) self.state = 1 self.rulebase = [[1, 1, 4, 1], [2, 2, 2, 1], [2, 3, 3, 3], [4, 3, 4, 4]] self.gBest_curve = np.zeros(self.max_iter) self.X = np.random.uniform(size=[self.num_particle, self.num_dim])*(self.x_max-self.x_min) + self.x_min self.V = np.random.uniform(size=[self.num_particle, self.num_dim])*(self.x_max-self.x_min) + self.x_min self.v_max = self.k*(self.x_max-self.x_min) self.pBest_X = self.X.copy() self.pBest_score = self.fit_func(self.X).copy() self.X_score = self.pBest_score.copy() self.gBest_X = self.pBest_X[self.pBest_score.argmin()].copy() self.gBest_score = self.pBest_score.min().copy() self.gBest_curve[0] = self.gBest_score.copy() self.bestX_idx = self.pBest_score.argmin() self.worstX_idx = self.pBest_score.argmax() def ESE(self): # step 1. d_set = np.zeros(self.num_particle) for i in range(self.num_particle): d_set[i] = np.sum(np.linalg.norm((self.X[i, :].reshape(1, self.num_dim)-self.X), ord=2, axis=1))\ /(self.num_particle-1) # step 2. dmax = np.max(d_set) dmin = np.min(d_set) dg = d_set[self.bestX_idx].copy() # 我自己额外加的,防止崩溃 if dmax==dmin==dg==0: f = 0 else: f = (dg-dmin)/(dmax-dmin) if not(0<=f<=1): print('f='+str(np.round(f, 3))+' 超出範围[0, 1]!!!') if f>1: f = 1 elif f<0: f = 0 # step 3. singleton = np.zeros(4) if 0<=f<=0.4: singleton[0] = 0 elif 0.4<f<=0.6: singleton[0] = 5*f - 2 elif 0.6<f<=0.7: singleton[0] = 1 elif 0.7<f<=0.8: singleton[0] = -10*f + 8 elif 0.8<f<=1: singleton[0] = 0 if 0<=f<=0.2: singleton[1] = 0 elif 0.2<f<=0.3: singleton[1] = 10*f - 2 elif 0.3<f<=0.4: singleton[1] = 1 elif 0.4<f<=0.6: singleton[1] = -5*f + 3 elif 0.6<f<=1: singleton[1] = 0 if 0<=f<=0.1: singleton[2] = 1 elif 0.1<f<=0.3: singleton[2] = -5*f + 1.5 elif 0.3<f<=1: singleton[2] = 0 if 0<=f<=0.7: singleton[3] = 0 elif 0.7<f<=0.9: singleton[3] = 5*f - 3.5 elif 0.9<f<=1: singleton[3] = 1 if np.max(singleton)==np.min(singleton)==0: print('因为f超出範围[0, 1],所以模糊推论异常!!!') t = np.argmax(singleton) t_1 = self.state - 1 self.state = self.rulebase[int(t)][int(t_1)] # (10) self.w = 1/(1+1.5*np.exp(-2.6*f)) if not(self.w_min<=self.w<=self.w_max): print('w='+str(np.round(self.w, 3))+' 超出範围[0.4, 0.9]!!!') if self.w<self.w_min: self.w = self.w_min elif self.w>self.w_max: self.w = self.w_max # (11) if self.state==1: self.c1 = self.c1 + self.delta self.c2 = self.c2 - self.delta if self.state==2: self.c1 = self.c1 + 0.5*self.delta self.c2 = self.c2 - 0.5*self.delta if self.state==3: self.c1 = self.c1 + 0.5*self.delta self.c2 = self.c2 + 0.5*self.delta self.ELS() if self.state==4: self.c1 = self.c1 - self.delta self.c2 = self.c2 + self.delta if not(1.5<=self.c1<=2.5): # print('c1='+str(np.round(self.c1, 3))+' 超出範围[1.5, 2.5]!!!') if self.c1<1.5: self.c1 = 1.5 elif self.c1>2.5: self.c1 = 2.5 if not(1.5<=self.c2<=2.5): # print('c2='+str(np.round(self.c2, 3))+' 超出範围[1.5, 2.5]!!!') if self.c2<1.5: self.c2 = 1.5 elif self.c2>2.5: self.c2 = 2.5 # (12) if not(3<=self.c1+self.c2<=4): # print('c1='+str(np.round(self.c1, 3))+' + c2='+str(np.round(self.c2, 3))+' 超出範围[3, 4]!!!') self.c1, self.c2 = 4.0*self.c1/(self.c1+self.c2), 4.0*self.c2/(self.c1+self.c2) def ELS(self): rho = 1 - ((1-0.1)*self._iter/self.max_iter) d = np.random.randint(low=0, high=self.num_dim, size=1) P = self.gBest_X.copy() P[d] = P[d] + (self.x_max[d]-self.x_min[d])*np.random.normal(loc=0.0, scale=rho**2, size=1) if P[d]>self.x_max[d]: P[d] = self.x_max[d] if P[d]<self.x_min[d]: P[d] = self.x_min[d] score = self.fit_func(P.reshape(1, -1)) if score<self.gBest_score: self.gBest_X = P.copy() self.gBest_score = score.copy() else: # # case1. 原文版本 # self.X[self.worstX_idx, :] = P.copy() # case2. 我的版本 if score<self.X_score[self.worstX_idx]: self.X[self.worstX_idx, :] = P.copy() if score<np.max(self.pBest_score): idx = np.argmax(self.pBest_score) self.pBest_score[idx] = score.copy() self.pBest_X[idx, :] = P.copy() def opt(self): while(self._iter<self.max_iter): # case2. 每代更新 # self.delta = np.random.uniform(high=0.1, low=0.05, size=1) self.bestX_idx = self.X_score.argmin() self.worstX_idx = self.X_score.argmax() self.ESE() R1 = np.random.uniform(size=(self.num_particle, self.num_dim)) R2 = np.random.uniform(size=(self.num_particle, self.num_dim)) for i in range(self.num_particle): self.V[i, :] = self.w * self.V[i, :] \ + self.c1 * (self.pBest_X[i, :] - self.X[i, :]) * R1[i, :] \ + self.c2 * (self.gBest_X - self.X[i, :]) * R2[i, :] self.V[i, self.v_max < self.V[i, :]] = self.v_max[self.v_max < self.V[i, :]] self.V[i, -self.v_max > self.V[i, :]] = -self.v_max[-self.v_max > self.V[i, :]] self.X[i, :] = self.X[i, :] + self.V[i, :] self.X[i, self.x_max < self.X[i, :]] = self.x_max[self.x_max < self.X[i, :]] self.X[i, self.x_min > self.X[i, :]] = self.x_min[self.x_min > self.X[i, :]] score = self.fit_func(self.X[i, :]) self.X_score[i] = score.copy() if score<self.pBest_score[i]: self.pBest_X[i, :] = self.X[i, :].copy() self.pBest_score[i] = score.copy() if score<self.gBest_score: self.gBest_X = self.X[i, :].copy() self.gBest_score = score.copy() self.gBest_curve[i] = self.gBest_score.copy() self._iter = self._iter + 1 def plot_curve(self): plt.figure() plt.title('loss curve ['+str(round(self.gBest_curve[-1], 3))+']') plt.plot(self.gBest_curve, label='loss') plt.grid() plt.legend() plt.show()
4.2 测试代码
from APSO import APSOimport numpy as npimport timeimport pandas as pdnp.random.seed(42)def Sphere(x): if x.ndim==1: x = x.reshape(1, -1) return np.sum(x**2, axis=1)def Schwefel_P222(x): if x.ndim==1: x = x.reshape(1, -1) return np.sum(np.abs(x), axis=1) + np.prod(np.abs(x), axis=1)def Quadric(x): if x.ndim==1: x = x.reshape(1, -1) outer = 0 for i in range(x.shape[1]): inner = np.sum(x[:, :i+1], axis=1)**2 outer = outer + inner return outerdef Rosenbrock(x): if x.ndim==1: x = x.reshape(1, -1) left = x[:, :-1].copy() right = x[:, 1:].copy() return np.sum(100*(right - left**2)**2 + (left-1)**2, axis=1)def Step(x): if x.ndim==1: x = x.reshape(1, -1) return np.sum(np.round((x+0.5), 0)**2, axis=1)def Quadric_Noise(x): if x.ndim==1: x = x.reshape(1, -1) matrix = np.arange(x.shape[1])+1 return np.sum((x**4)*matrix, axis=1)+np.random.rand(x.shape[0])def Schwefel(x): if x.ndim==1: x = x.reshape(1, -1) return -1*np.sum(x*np.sin(np.abs(x)**.5), axis=1)def Rastrigin(x): if x.ndim==1: x = x.reshape(1, -1) return np.sum(x**2 - 10*np.cos(2*np.pi*x) + 10, axis=1)def Noncontinuous_Rastrigin(x): if x.ndim==1: x = x.reshape(1, -1) outlier = np.abs(x)>=0.5 x[outlier] = np.round(2*x[outlier])/2 return np.sum(x**2 - 10*np.cos(2*np.pi*x) + 10, axis=1)def Ackley(x): if x.ndim==1: x = x.reshape(1, -1) left = 20*np.exp(-0.2*(np.sum(x**2, axis=1)/x.shape[1])**.5) right = np.exp(np.sum(np.cos(2*np.pi*x), axis=1)/x.shape[1]) return -left - right + 20 + np.edef Griewank(x): if x.ndim==1: x = x.reshape(1, -1) left = np.sum(x**2, axis=1)/4000 right = np.prod( np.cos(x/((np.arange(x.shape[1])+1)**.5)), axis=1) return left - right + 1d = 30g = 1000p = 20times = 30table = np.zeros((2, 11))for i in range(times): x_max = 100*np.ones(d) x_min = -100*np.ones(d) optimizer = APSO(fit_func=Sphere, num_dim=d, num_particle=p, max_iter=g, x_max=x_max, x_min=x_min) start = time.time() optimizer.opt() end = time.time() table[0, 0] += optimizer.gBest_score table[1, 0] += end - start x_max = 10*np.ones(d) x_min = -10*np.ones(d) optimizer = APSO(fit_func=Schwefel_P222, num_dim=d, num_particle=p, max_iter=g, x_max=x_max, x_min=x_min) start = time.time() optimizer.opt() end = time.time() table[0, 1] += optimizer.gBest_score table[1, 1] += end - start x_max = 100*np.ones(d) x_min = -100*np.ones(d) optimizer = APSO(fit_func=Quadric, num_dim=d, num_particle=p, max_iter=g, x_max=x_max, x_min=x_min) start = time.time() optimizer.opt() end = time.time() table[0, 2] += optimizer.gBest_score table[1, 2] += end - start x_max = 10*np.ones(d) x_min = -10*np.ones(d) optimizer = APSO(fit_func=Rosenbrock, num_dim=d, num_particle=p, max_iter=g, x_max=x_max, x_min=x_min) start = time.time() optimizer.opt() end = time.time() table[0, 3] += optimizer.gBest_score table[1, 3] += end - start x_max = 100*np.ones(d) x_min = -100*np.ones(d) optimizer = APSO(fit_func=Step, num_dim=d, num_particle=p, max_iter=g, x_max=x_max, x_min=x_min) start = time.time() optimizer.opt() end = time.time() table[0, 4] += optimizer.gBest_score table[1, 4] += end - start x_max = 1.28*np.ones(d) x_min = -1.28*np.ones(d) optimizer = APSO(fit_func=Quadric_Noise, num_dim=d, num_particle=p, max_iter=g, x_max=x_max, x_min=x_min) start = time.time() optimizer.opt() end = time.time() table[0, 5] += optimizer.gBest_score table[1, 5] += end - start x_max = 500*np.ones(d) x_min = -500*np.ones(d) optimizer = APSO(fit_func=Schwefel, num_dim=d, num_particle=p, max_iter=g, x_max=x_max, x_min=x_min) start = time.time() optimizer.opt() end = time.time() table[0, 6] += optimizer.gBest_score table[1, 6] += end - start x_max = 5.12*np.ones(d) x_min = -5.12*np.ones(d) optimizer = APSO(fit_func=Rastrigin, num_dim=d, num_particle=p, max_iter=g, x_max=x_max, x_min=x_min) start = time.time() optimizer.opt() end = time.time() table[0, 7] += optimizer.gBest_score table[1, 7] += end - start x_max = 5.12*np.ones(d) x_min = -5.12*np.ones(d) optimizer = APSO(fit_func=Noncontinuous_Rastrigin, num_dim=d, num_particle=p, max_iter=g, x_max=x_max, x_min=x_min) start = time.time() optimizer.opt() end = time.time() table[0, 8] += optimizer.gBest_score table[1, 8] += end - start x_max = 32*np.ones(d) x_min = -32*np.ones(d) optimizer = APSO(fit_func=Ackley, num_dim=d, num_particle=p, max_iter=g, x_max=x_max, x_min=x_min) start = time.time() optimizer.opt() end = time.time() table[0, 9] += optimizer.gBest_score table[1, 9] += end - start x_max = 600*np.ones(d) x_min = -600*np.ones(d) optimizer = APSO(fit_func=Griewank, num_dim=d, num_particle=p, max_iter=g, x_max=x_max, x_min=x_min) start = time.time() optimizer.opt() end = time.time() table[0, 10] += optimizer.gBest_score table[1, 10] += end - start table = table / timestable = pd.DataFrame(table)table.columns=['Sphere', 'Schwefel_P222', 'Quadric', 'Rosenbrock', 'Step', 'Quadric_Noise', 'Schwefel', 'Rastrigin', 'Noncontinuous_Rastrigin', 'Ackley', 'Griewank']table.index = ['mean score', 'mean time']
4.3 讨论
这边主要是探讨原文的ELS和我的ELS间对计算结果的差异,以及delta是要在每代都更新一次或者在初始化阶段完成定义后就不再更动,因此会有四个组合的比较。
因为不想等太久,所以
共同环境条件为:解空间维度d=30; 最大迭代g=1000; 粒子数p=20; 执行次数times=30; 每跑完一条旧更新gBest这个条件一定跑不到理想

可以看出我自己定义的ELS效果比原文好,而delta对于最终结果感觉没有什么关係
然后是採我的ELS+delta部每代更新的方式,将最大迭代g改成10000后其他条件不变的结果