[PSO文献2]Adaptive Particle Swarm Optimization

2020.06.24 更新错误的Schwefel,现在已经收敛到跟原作者一样好了


0. 前言:

本篇主要是纪录Adaptive Particle Swarm Optimization的阅读心得。

1. 动机:

最近脑子动到想用PSO来训练神经网路,所以蒐集了一些相关资料,预计将分享5篇文章

Particle swarm optimization (PSO). A tutorialAdaptive Particle Swarm OptimizationOrthogonal Learning Particle Swarm OptimizationComparison of particle swarm optimization and backpropagation as training algorithms for neural networksA hybrid particle swarm optimization–back-propagation algorithm for feedforward neural network training

2. 正文

原文的粒子定义如下,在一个D维度解空间中,粒子i的速度和位置定义如下:
http://img2.58codes.com/2024/20124766n3GSsfTwGR.png
http://img2.58codes.com/2024/20124766kaf55wZFWQ.png
粒子i在任意维度d的速度和位置更新公式:
http://img2.58codes.com/2024/20124766qLmTd5c5LK.png
http://img2.58codes.com/2024/20124766MjptRxiopA.png
原文提到PSO有两个版本,分别是global-version PSO(GPSO)和local-version PSO(LPSO)。在Advances of Computational Intelligence in Industrial Systems的第9页和Manufacturing Optimization through Intelligent Techniques的第41页有提到GPSO和LPSO的差别以及优缺点:

GPSO的C1和C2不为0,速度快但容易陷入区域最佳解LPSO的C2为0(不考虑其他粒子的最佳解),速度慢但不容易陷入区域最佳解

3. APSO的计算流程与算式

相较于传统PSO,APSO在计算过程中多了演化状态评估(Evolutionary State Estimation, ESE)和菁英学习策略(Elitist Learning Strategy, ELS)两个步骤。

ESE可以根据粒子在每次迭代的分布情形f来调整加速度常数C、惯性权重W,进而加速粒子收敛对于gBest提供专属的学习策略,进而提升所有粒子的求解效率

3.1 整体流程

不啰嗦,直接贴图。如果删去流程图的第三步骤,那就是传统PSO
http://img2.58codes.com/2024/20124766A3124XJeJg.png

作者定义的加速度限制为解空间的20%,也就是
http://img2.58codes.com/2024/20124766MeyUxMnzfh.png

由于作者没有特别说明,因此初始加速度我令为
http://img2.58codes.com/2024/20124766vjc286Z4jG.png

3.2 演化状态评估ESE

根据每次迭代粒子的分布情形来调整w、c1、c2
http://img2.58codes.com/2024/20124766XoI9HW4OaY.png

3.2.1 计算粒子彼此间的距离

http://img2.58codes.com/2024/20124766x4RPu6SCNc.png
很明显这是在求L2範数(欧式距离),然后为了编程方便,我把式子简化如下
http://img2.58codes.com/2024/20124766XMuTAaXf06.png

3.2.2 计算演化因子(evolutionary factor, f)

http://img2.58codes.com/2024/20124766dg92HA4Zyd.png
http://img2.58codes.com/2024/20124766jZucD5EKJB.png
http://img2.58codes.com/2024/20124766NHA5ubMMn6.png

dg是当前适应值最好的粒子的di如果分母为0则直接定义为收敛状态S3

3.2.3 模糊推论

作者对粒子群分布状况作了4个类别:探勘(exploration, S1)、开发(exploitation, S2)、收敛(Convergence, S3)、跳出(Jumping-out, S4)对f作模糊推论,得知当前粒子分布是属于哪一类别
http://img2.58codes.com/2024/20124766JAfP0sa4p5.png
http://img2.58codes.com/2024/20124766mIzfwaoVlY.png

原文没有完整说明规则库内容,我只能透过内文的範例进行推论,结果如下
http://img2.58codes.com/2024/20124766MGyuiXT5eU.png

必须遵守S1=>S2=>S3=>S4=>S1=>...这个顺序为了求解稳定,能不改变状态就不改变即使要改变状态,每次也只向右1步或往左1步初始状态我是令S1

3.2.4 惯性权重

http://img2.58codes.com/2024/20124766q08HyC5cd4.png
原文的初始惯性权重为0.9

3.2.5 加速度常数

加速度常数的範围需落在,初始值为2.0
http://img2.58codes.com/2024/20124766y6M09x5gml.png
http://img2.58codes.com/2024/201247662v9AYeE99A.png
如果c1+c2大于指定範围,则用下式调整
http://img2.58codes.com/2024/20124766SOoyyd9aXk.png

根据不同的状态,以加速度率(acceleration rate)调整加速度常数

加速度率不会随着迭代而改变
http://img2.58codes.com/2024/201247661ofgj7UvlX.png
http://img2.58codes.com/2024/201247660x0tCe4Kah.png
http://img2.58codes.com/2024/20124766gelknoXVSO.png

原文有明确定义所谓的轻微是指0.5倍的加速度率,因此可以得到下面结论
http://img2.58codes.com/2024/20124766xEWjehHqE7.png

3.3 菁英学习策略ELS

想像一下,某个粒子要更新自己的加速度,但是他发现自己就是pBest,同时他又是gBest,所以加速度更新公式的三个子项中就有2项是0。由于这个粒子在该次代没有任何可以学习的对象,因此他本次移动是毫无根据的,这会间接导致粒子子求解速度被拖慢,因此制定一个扰动策略(类似基因算法的突变)来尝试得到更好的gBest。当然,不是每次扰动都会得到更好的gBest,但我们可以取代最差的粒子,使得整体表现变得更均匀
http://img2.58codes.com/2024/20124766TdXKzO2ihy.png

只有在状态为S3时才执行ELS!!!1)令P=gBest,并且随机取得某一个维度位置d2)改变位置d的数值。其中平均值u=0
http://img2.58codes.com/2024/20124766k4DQ6mnyD6.png
http://img2.58codes.com/2024/20124766SMVT4GrIby.png
http://img2.58codes.com/2024/20124766Lh8V2f8XmV.png3)确保位置d仍在解空间内4)计算P的适应值v5-1)如果适应值v比先前的gBest还要好,则P取代gBest5-2)如果适应值v没有先前的gBest好,则P取代当前适应值最差的粒子

我自己对5-2作了修改,如下

新5-2)如果适应值v没有先前的gBest好,但确定胜过当前适应值最差的粒子才取代,同时如果胜过最差的pBest则也进行替换原因是在没有确认谁强谁弱就直接取代,怎么想都觉得有点奇怪

4. 实现

4.1 APSO代码

import numpy as npimport matplotlib.pyplot as pltnp.random.seed(42)class APSO():    def __init__(self, fit_func, num_dim, num_particle=20, max_iter=500,                 x_max=1, x_min=-1, w_max=0.9, w_min=0.4, c1=2.0, c2=2.0, k=0.2):        self.fit_func = fit_func                self.num_dim = num_dim        self.num_particle = num_particle        self.max_iter = max_iter                self.x_max = x_max        self.x_min = x_min        self.w = w_max                self.w_max = w_max        self.w_min = w_min        self.c1 = c1        self.c2 = c2        self.k = k        self._iter = 1        self.bestX_idx = -1        self.worstX_idx = -1        # case1. 仅初始化        self.delta = np.random.uniform(high=0.1, low=0.05, size=1)        self.state = 1        self.rulebase = [[1, 1, 4, 1],                          [2, 2, 2, 1],                          [2, 3, 3, 3],                          [4, 3, 4, 4]]        self.gBest_curve = np.zeros(self.max_iter)         self.X = np.random.uniform(size=[self.num_particle, self.num_dim])*(self.x_max-self.x_min) + self.x_min        self.V = np.random.uniform(size=[self.num_particle, self.num_dim])*(self.x_max-self.x_min) + self.x_min        self.v_max = self.k*(self.x_max-self.x_min)                self.pBest_X = self.X.copy()        self.pBest_score = self.fit_func(self.X).copy()        self.X_score = self.pBest_score.copy()        self.gBest_X = self.pBest_X[self.pBest_score.argmin()].copy()        self.gBest_score = self.pBest_score.min().copy()        self.gBest_curve[0] = self.gBest_score.copy()        self.bestX_idx = self.pBest_score.argmin()        self.worstX_idx = self.pBest_score.argmax()    def ESE(self):        # step 1.        d_set = np.zeros(self.num_particle)                              for i in range(self.num_particle):            d_set[i] = np.sum(np.linalg.norm((self.X[i, :].reshape(1, self.num_dim)-self.X), ord=2, axis=1))\                /(self.num_particle-1)                # step 2.                dmax = np.max(d_set)        dmin = np.min(d_set)        dg = d_set[self.bestX_idx].copy()                # 我自己额外加的,防止崩溃        if dmax==dmin==dg==0:            f = 0        else:            f = (dg-dmin)/(dmax-dmin)        if not(0<=f<=1):            print('f='+str(np.round(f, 3))+' 超出範围[0, 1]!!!')                        if f>1:                f = 1            elif f<0:                f = 0                     # step 3.        singleton = np.zeros(4)        if 0<=f<=0.4:            singleton[0] = 0        elif 0.4<f<=0.6:            singleton[0] = 5*f - 2        elif 0.6<f<=0.7:            singleton[0] = 1        elif 0.7<f<=0.8:            singleton[0] = -10*f + 8        elif 0.8<f<=1:            singleton[0] = 0        if 0<=f<=0.2:            singleton[1] = 0        elif 0.2<f<=0.3:            singleton[1] = 10*f - 2        elif 0.3<f<=0.4:            singleton[1] = 1        elif 0.4<f<=0.6:            singleton[1] = -5*f + 3        elif 0.6<f<=1:            singleton[1] = 0        if 0<=f<=0.1:            singleton[2] = 1        elif 0.1<f<=0.3:            singleton[2] = -5*f + 1.5        elif 0.3<f<=1:            singleton[2] = 0                    if 0<=f<=0.7:            singleton[3] = 0        elif 0.7<f<=0.9:            singleton[3] = 5*f - 3.5        elif 0.9<f<=1:            singleton[3] = 1                if np.max(singleton)==np.min(singleton)==0:            print('因为f超出範围[0, 1],所以模糊推论异常!!!')                t = np.argmax(singleton)        t_1 = self.state - 1        self.state = self.rulebase[int(t)][int(t_1)]            # (10)        self.w = 1/(1+1.5*np.exp(-2.6*f))        if not(self.w_min<=self.w<=self.w_max):            print('w='+str(np.round(self.w, 3))+' 超出範围[0.4, 0.9]!!!')            if self.w<self.w_min:                self.w = self.w_min            elif self.w>self.w_max:                self.w = self.w_max                           # (11)        if self.state==1:            self.c1 = self.c1 + self.delta            self.c2 = self.c2 - self.delta        if self.state==2:            self.c1 = self.c1 + 0.5*self.delta            self.c2 = self.c2 - 0.5*self.delta        if self.state==3:            self.c1 = self.c1 + 0.5*self.delta            self.c2 = self.c2 + 0.5*self.delta            self.ELS()        if self.state==4:            self.c1 = self.c1 - self.delta            self.c2 = self.c2 + self.delta        if not(1.5<=self.c1<=2.5):            # print('c1='+str(np.round(self.c1, 3))+' 超出範围[1.5, 2.5]!!!')             if self.c1<1.5:                self.c1 = 1.5            elif self.c1>2.5:                self.c1 = 2.5        if not(1.5<=self.c2<=2.5):            # print('c2='+str(np.round(self.c2, 3))+' 超出範围[1.5, 2.5]!!!')              if self.c2<1.5:                self.c2 = 1.5            elif self.c2>2.5:                self.c2 = 2.5                    # (12)        if not(3<=self.c1+self.c2<=4):            # print('c1='+str(np.round(self.c1, 3))+' + c2='+str(np.round(self.c2, 3))+' 超出範围[3, 4]!!!')            self.c1, self.c2 = 4.0*self.c1/(self.c1+self.c2), 4.0*self.c2/(self.c1+self.c2)                def ELS(self):        rho = 1 - ((1-0.1)*self._iter/self.max_iter)        d = np.random.randint(low=0, high=self.num_dim, size=1)        P = self.gBest_X.copy()        P[d] = P[d] + (self.x_max[d]-self.x_min[d])*np.random.normal(loc=0.0, scale=rho**2, size=1)                if P[d]>self.x_max[d]: P[d] = self.x_max[d]        if P[d]<self.x_min[d]: P[d] = self.x_min[d]                score = self.fit_func(P.reshape(1, -1))                if score<self.gBest_score:            self.gBest_X = P.copy()            self.gBest_score = score.copy()        else:             # # case1. 原文版本            # self.X[self.worstX_idx, :] = P.copy()                        # case2. 我的版本            if score<self.X_score[self.worstX_idx]:                self.X[self.worstX_idx, :] = P.copy()            if score<np.max(self.pBest_score):                idx = np.argmax(self.pBest_score)                self.pBest_score[idx] = score.copy()                self.pBest_X[idx, :] = P.copy()      def opt(self):        while(self._iter<self.max_iter):               # case2. 每代更新            # self.delta = np.random.uniform(high=0.1, low=0.05, size=1)            self.bestX_idx = self.X_score.argmin()            self.worstX_idx = self.X_score.argmax()                        self.ESE()            R1 = np.random.uniform(size=(self.num_particle, self.num_dim))            R2 = np.random.uniform(size=(self.num_particle, self.num_dim))            for i in range(self.num_particle):                                              self.V[i, :] = self.w * self.V[i, :] \                        + self.c1 * (self.pBest_X[i, :] - self.X[i, :]) * R1[i, :] \                        + self.c2 * (self.gBest_X - self.X[i, :]) * R2[i, :]                               self.V[i, self.v_max < self.V[i, :]] = self.v_max[self.v_max < self.V[i, :]]                self.V[i, -self.v_max > self.V[i, :]] = -self.v_max[-self.v_max > self.V[i, :]]                                self.X[i, :] = self.X[i, :] + self.V[i, :]                self.X[i, self.x_max < self.X[i, :]] = self.x_max[self.x_max < self.X[i, :]]                self.X[i, self.x_min > self.X[i, :]] = self.x_min[self.x_min > self.X[i, :]]                                score = self.fit_func(self.X[i, :])                self.X_score[i] = score.copy()                if score<self.pBest_score[i]:                    self.pBest_X[i, :] = self.X[i, :].copy()                    self.pBest_score[i] = score.copy()                    if score<self.gBest_score:                                                self.gBest_X = self.X[i, :].copy()                        self.gBest_score = score.copy()                                                   self.gBest_curve[i] = self.gBest_score.copy()                     self._iter = self._iter + 1                def plot_curve(self):        plt.figure()        plt.title('loss curve ['+str(round(self.gBest_curve[-1], 3))+']')        plt.plot(self.gBest_curve, label='loss')        plt.grid()        plt.legend()        plt.show()          

4.2 测试代码

from APSO import APSOimport numpy as npimport timeimport pandas as pdnp.random.seed(42)def Sphere(x):    if x.ndim==1:        x = x.reshape(1, -1)    return np.sum(x**2, axis=1)def Schwefel_P222(x):    if x.ndim==1:        x = x.reshape(1, -1)    return np.sum(np.abs(x), axis=1) + np.prod(np.abs(x), axis=1)def Quadric(x):    if x.ndim==1:        x = x.reshape(1, -1)        outer = 0    for i in range(x.shape[1]):        inner = np.sum(x[:, :i+1], axis=1)**2        outer = outer + inner        return outerdef Rosenbrock(x):    if x.ndim==1:        x = x.reshape(1, -1)         left = x[:, :-1].copy()    right = x[:, 1:].copy()        return np.sum(100*(right - left**2)**2 + (left-1)**2, axis=1)def Step(x):    if x.ndim==1:        x = x.reshape(1, -1)    return np.sum(np.round((x+0.5), 0)**2, axis=1)def Quadric_Noise(x):    if x.ndim==1:        x = x.reshape(1, -1)            matrix = np.arange(x.shape[1])+1         return np.sum((x**4)*matrix, axis=1)+np.random.rand(x.shape[0])def Schwefel(x):    if x.ndim==1:        x = x.reshape(1, -1)                 return -1*np.sum(x*np.sin(np.abs(x)**.5), axis=1)def Rastrigin(x):    if x.ndim==1:        x = x.reshape(1, -1)         return np.sum(x**2 - 10*np.cos(2*np.pi*x) + 10, axis=1)def Noncontinuous_Rastrigin(x):    if x.ndim==1:        x = x.reshape(1, -1)        outlier = np.abs(x)>=0.5    x[outlier] = np.round(2*x[outlier])/2        return np.sum(x**2 - 10*np.cos(2*np.pi*x) + 10, axis=1)def Ackley(x):    if x.ndim==1:        x = x.reshape(1, -1)        left = 20*np.exp(-0.2*(np.sum(x**2, axis=1)/x.shape[1])**.5)    right = np.exp(np.sum(np.cos(2*np.pi*x), axis=1)/x.shape[1])        return -left - right + 20 + np.edef Griewank(x):    if x.ndim==1:        x = x.reshape(1, -1)    left = np.sum(x**2, axis=1)/4000    right = np.prod( np.cos(x/((np.arange(x.shape[1])+1)**.5)), axis=1)    return left - right + 1d = 30g = 1000p = 20times = 30table = np.zeros((2, 11))for i in range(times):    x_max = 100*np.ones(d)    x_min = -100*np.ones(d)    optimizer = APSO(fit_func=Sphere, num_dim=d, num_particle=p, max_iter=g, x_max=x_max, x_min=x_min)    start = time.time()    optimizer.opt()    end = time.time()    table[0, 0] += optimizer.gBest_score    table[1, 0] += end - start        x_max = 10*np.ones(d)    x_min = -10*np.ones(d)    optimizer = APSO(fit_func=Schwefel_P222, num_dim=d, num_particle=p, max_iter=g, x_max=x_max, x_min=x_min)    start = time.time()    optimizer.opt()    end = time.time()    table[0, 1] += optimizer.gBest_score    table[1, 1] += end - start        x_max = 100*np.ones(d)    x_min = -100*np.ones(d)    optimizer = APSO(fit_func=Quadric, num_dim=d, num_particle=p, max_iter=g, x_max=x_max, x_min=x_min)    start = time.time()    optimizer.opt()    end = time.time()    table[0, 2] += optimizer.gBest_score    table[1, 2] += end - start        x_max = 10*np.ones(d)    x_min = -10*np.ones(d)    optimizer = APSO(fit_func=Rosenbrock, num_dim=d, num_particle=p, max_iter=g, x_max=x_max, x_min=x_min)    start = time.time()    optimizer.opt()    end = time.time()    table[0, 3] += optimizer.gBest_score    table[1, 3] += end - start        x_max = 100*np.ones(d)    x_min = -100*np.ones(d)    optimizer = APSO(fit_func=Step, num_dim=d, num_particle=p, max_iter=g, x_max=x_max, x_min=x_min)    start = time.time()    optimizer.opt()    end = time.time()    table[0, 4] += optimizer.gBest_score    table[1, 4] += end - start        x_max = 1.28*np.ones(d)    x_min = -1.28*np.ones(d)    optimizer = APSO(fit_func=Quadric_Noise, num_dim=d, num_particle=p, max_iter=g, x_max=x_max, x_min=x_min)    start = time.time()    optimizer.opt()    end = time.time()    table[0, 5] += optimizer.gBest_score    table[1, 5] += end - start        x_max = 500*np.ones(d)    x_min = -500*np.ones(d)    optimizer = APSO(fit_func=Schwefel, num_dim=d, num_particle=p, max_iter=g, x_max=x_max, x_min=x_min)    start = time.time()    optimizer.opt()    end = time.time()    table[0, 6] += optimizer.gBest_score    table[1, 6] += end - start        x_max = 5.12*np.ones(d)    x_min = -5.12*np.ones(d)    optimizer = APSO(fit_func=Rastrigin, num_dim=d, num_particle=p, max_iter=g, x_max=x_max, x_min=x_min)    start = time.time()    optimizer.opt()    end = time.time()    table[0, 7] += optimizer.gBest_score    table[1, 7] += end - start        x_max = 5.12*np.ones(d)    x_min = -5.12*np.ones(d)    optimizer = APSO(fit_func=Noncontinuous_Rastrigin, num_dim=d, num_particle=p, max_iter=g, x_max=x_max, x_min=x_min)    start = time.time()    optimizer.opt()    end = time.time()    table[0, 8] += optimizer.gBest_score    table[1, 8] += end - start        x_max = 32*np.ones(d)    x_min = -32*np.ones(d)    optimizer = APSO(fit_func=Ackley, num_dim=d, num_particle=p, max_iter=g, x_max=x_max, x_min=x_min)    start = time.time()    optimizer.opt()    end = time.time()    table[0, 9] += optimizer.gBest_score    table[1, 9] += end - start        x_max = 600*np.ones(d)    x_min = -600*np.ones(d)    optimizer = APSO(fit_func=Griewank, num_dim=d, num_particle=p, max_iter=g, x_max=x_max, x_min=x_min)    start = time.time()    optimizer.opt()    end = time.time()    table[0, 10] += optimizer.gBest_score    table[1, 10] += end - start    table = table / timestable = pd.DataFrame(table)table.columns=['Sphere', 'Schwefel_P222', 'Quadric', 'Rosenbrock', 'Step', 'Quadric_Noise', 'Schwefel',                 'Rastrigin', 'Noncontinuous_Rastrigin', 'Ackley', 'Griewank']table.index = ['mean score', 'mean time']

4.3 讨论

这边主要是探讨原文的ELS和我的ELS间对计算结果的差异,以及delta是要在每代都更新一次或者在初始化阶段完成定义后就不再更动,因此会有四个组合的比较。

因为不想等太久,所以

共同环境条件为:解空间维度d=30; 最大迭代g=1000; 粒子数p=20; 执行次数times=30; 每跑完一条旧更新gBest这个条件一定跑不到理想
http://img2.58codes.com/2024/201247666hHzRHOcmZ.png
http://img2.58codes.com/2024/20124766AuiEsAMbUK.png
可以看出我自己定义的ELS效果比原文好,而delta对于最终结果感觉没有什么关係

然后是採我的ELS+delta部每代更新的方式,将最大迭代g改成10000后其他条件不变的结果
http://img2.58codes.com/2024/20124766dPOjTeJlWS.png

5. 结论

魔改后的效果比原作者好,虽然我知道这样改有点奇怪就是了哈哈改成10000代后跑得有够慢,建议用cupy或者GPU版的pytorch改写后放在GPU上面跑会比较好

关于作者: 网站小编

码农网专注IT技术教程资源分享平台,学习资源下载网站,58码农网包含计算机技术、网站程序源码下载、编程技术论坛、互联网资源下载等产品服务,提供原创、优质、完整内容的专业码农交流分享平台。

热门文章