Python实现粒子群算法

一、推荐粒子群理论教程

优化算法笔记(三)粒子群算法(1) - 简书

二、Python实现代码

# -*- coding: utf-8 -*-

import numpy as np

np.random.seed(20)

'''
    设x0:价格,x1:促销,x2:竞品价格 x为三维数据
    实际情况这里应该是,比如:  利润 = F销量(x0,x1,x2) * x0 - 成本,其中F销量为调用销量预测的结果
    两种方案:一是fitfun作为参数传入,二是该条销量预测样本数据作为参数传入
'''


def get_max_lirun(x):
    # 更新features中的x
    # 调用销量预测模型获得结果
    # 计算适应度值
    f_value = (-2 * x[0, 0]) + (4000 * x[0, 1]) + (4000 * x[0, 2])
    return f_value


class Particle(object):
    '''
    粒子类
    __x: 粒子位置
    __v: 粒子速度
    __bestX: 粒子历史最优位置
    __fitnessValue: 粒子历史最优适应度
    '''

    def __init__(self, min_x, max_x, max_v, dim, fitness_fun):
        self.__x = np.array([np.random.uniform(i, j) for i, j in zip(min_x, max_x)]).reshape(1, -1)
        self.__v = np.array([np.random.uniform(-i, i) for i in max_v]).reshape(1, -1)
        self.__bestX = np.zeros((1, dim))
        self.__fitnessValue = fitness_fun(self.__x)

    @property
    def x(self):
        return self.__x

    @x.setter
    def x(self, x):
        if x.shape == self.__x.shape:
            self.__x = x
        else:
            print('输入维度错误!')

    @property
    def v(self):
        return self.__v

    @v.setter
    def v(self, v):
        if v.shape == self.__v.shape:
            self.__v = v
        else:
            print('输入维度错误!')

    @property
    def bestX(self):
        return self.__bestX

    @bestX.setter
    def bestX(self, bestX):
        self.__bestX = bestX

    @property
    def fitnessValue(self):
        return self.__fitnessValue

    @fitnessValue.setter
    def fitnessValue(self, f_value):
        self.__fitnessValue = f_value


class PSO(object):
    '''
    粒子群算法
    W: 惯性系数
    C1: 自我学习因子
    C2: 全局学习因子
    dim: 维度数(位置维度、速度维度一致)
    p_num: 粒子个数
    max_x: 位置范围(这里设为[-max_x,max_x]),实际场景需要再设置min_x
    max_v: 速度范围,实际场景需要再设置min_v
    iter_num: 迭代次数
    bestf_val: 群体历史最优适应度
    best_val_list: 历史迭代最优适应度记录
    particles: 粒子群
    '''

    def __init__(self, W, C1, C2, dim, p_num, min_x, max_x, max_v, iter_num, fitness_fun, bestf_val=-np.inf):
        self.__W = W
        self.__C1 = C1
        self.__C2 = C2
        self.__dim = dim
        self.__min_x = np.array(min_x).reshape(1, -1)
        self.__max_x = np.array(max_x).reshape(1, -1)
        self.__max_v = np.array(max_v).reshape(1, -1)
        self.__iter_num = iter_num
        self.__G = np.zeros((1, dim))
        self.__bestf_val = bestf_val
        self.__bestf_val_list = []
        self.__fitness_fun = fitness_fun
        self.__particles = [Particle(min_x, max_x, max_v, dim, fitness_fun) for i in range(p_num)]

    @property
    def G(self):
        return self.__G

    @property
    def bestf_val_list(self):
        return self.__bestf_val_list

    @property
    def bestf_val(self):
        return self.__bestf_val

    def update_all(self):
        for i in range(self.__iter_num):
            print('迭代第{}次'.format(i))
            for p in self.__particles:
                self._update_v(p)  # 更新粒子速度
                self._update_x(p)  # 更新粒子位置
                self._update_p_bestX(p)  # 更新局部最优和全局最优位置
            self.__bestf_val_list.append(self.__bestf_val)  # 记录本次迭代的最优适应度
            self.__W = max(self.__W - 0.2, 0)  # W递减

        return self.__G, self.__bestf_val_list

    def _update_v(self, p):
        # print('\n\n', np.round(p.v, 3))
        update_v = self.__W * p.v + np.random.rand() * self.__C1 * (p.bestX - p.x) + np.random.rand() * self.__C2 * (
                    self.__G - p.x)
        # print(np.round(update_v, 3))
        # ***************************************如果有步长限制,则v应调整为步长的整数倍***********************************
        # 限制v的范围
        update_v[update_v > self.__max_v] = self.__max_v[update_v > self.__max_v]
        update_v[update_v < -self.__max_v] = -self.__max_v[update_v < -self.__max_v]
        p.v = update_v

    def _update_x(self, p):
        update_x = p.x + p.v
        # 限制x的范围
        update_x[update_x < self.__min_x] = self.__min_x[update_x < self.__min_x]
        update_x[update_x > self.__max_x] = self.__max_x[update_x > self.__max_x]
        p.x = update_x

    def _update_p_bestX(self, p):
        # 计算当前粒子的当前适应度
        f_value = self.__fitness_fun(p.x)

        # 更新粒子的历史最优位置和适应度
        if f_value > p.fitnessValue:
            p.fitnessValue = f_value
            p.bestX = p.x

        # 更新群体的历史最优位置和适应度
        if f_value > self.__bestf_val:
            self.__bestf_val = f_value
            self.__G = p.x


if __name__ == '__main__':
    m = PSO(W=1, C1=2, C2=2, dim=3, p_num=20, min_x=[3000, 0, 1], max_x=[5000, 0, 1],
            max_v=[400, 0.2, 0.2], iter_num=20, fitness_fun=get_max_lirun)
    best_x, scores = m.update_all()
    print(best_x)
    print(scores)