Python实现粒子群算法
一、推荐粒子群理论教程
二、Python实现代码
# -*- coding: utf-8 -*-
import numpy as np
np.random.seed(20)
'''
设x0:价格,x1:促销,x2:竞品价格 x为三维数据
实际情况这里应该是,比如: 利润 = F销量(x0,x1,x2) * x0 - 成本,其中F销量为调用销量预测的结果
两种方案:一是fitfun作为参数传入,二是该条销量预测样本数据作为参数传入
'''
def get_max_lirun(x):
# 更新features中的x
# 调用销量预测模型获得结果
# 计算适应度值
f_value = (-2 * x[0, 0]) + (4000 * x[0, 1]) + (4000 * x[0, 2])
return f_value
class Particle(object):
'''
粒子类
__x: 粒子位置
__v: 粒子速度
__bestX: 粒子历史最优位置
__fitnessValue: 粒子历史最优适应度
'''
def __init__(self, min_x, max_x, max_v, dim, fitness_fun):
self.__x = np.array([np.random.uniform(i, j) for i, j in zip(min_x, max_x)]).reshape(1, -1)
self.__v = np.array([np.random.uniform(-i, i) for i in max_v]).reshape(1, -1)
self.__bestX = np.zeros((1, dim))
self.__fitnessValue = fitness_fun(self.__x)
@property
def x(self):
return self.__x
@x.setter
def x(self, x):
if x.shape == self.__x.shape:
self.__x = x
else:
print('输入维度错误!')
@property
def v(self):
return self.__v
@v.setter
def v(self, v):
if v.shape == self.__v.shape:
self.__v = v
else:
print('输入维度错误!')
@property
def bestX(self):
return self.__bestX
@bestX.setter
def bestX(self, bestX):
self.__bestX = bestX
@property
def fitnessValue(self):
return self.__fitnessValue
@fitnessValue.setter
def fitnessValue(self, f_value):
self.__fitnessValue = f_value
class PSO(object):
'''
粒子群算法
W: 惯性系数
C1: 自我学习因子
C2: 全局学习因子
dim: 维度数(位置维度、速度维度一致)
p_num: 粒子个数
max_x: 位置范围(这里设为[-max_x,max_x]),实际场景需要再设置min_x
max_v: 速度范围,实际场景需要再设置min_v
iter_num: 迭代次数
bestf_val: 群体历史最优适应度
best_val_list: 历史迭代最优适应度记录
particles: 粒子群
'''
def __init__(self, W, C1, C2, dim, p_num, min_x, max_x, max_v, iter_num, fitness_fun, bestf_val=-np.inf):
self.__W = W
self.__C1 = C1
self.__C2 = C2
self.__dim = dim
self.__min_x = np.array(min_x).reshape(1, -1)
self.__max_x = np.array(max_x).reshape(1, -1)
self.__max_v = np.array(max_v).reshape(1, -1)
self.__iter_num = iter_num
self.__G = np.zeros((1, dim))
self.__bestf_val = bestf_val
self.__bestf_val_list = []
self.__fitness_fun = fitness_fun
self.__particles = [Particle(min_x, max_x, max_v, dim, fitness_fun) for i in range(p_num)]
@property
def G(self):
return self.__G
@property
def bestf_val_list(self):
return self.__bestf_val_list
@property
def bestf_val(self):
return self.__bestf_val
def update_all(self):
for i in range(self.__iter_num):
print('迭代第{}次'.format(i))
for p in self.__particles:
self._update_v(p) # 更新粒子速度
self._update_x(p) # 更新粒子位置
self._update_p_bestX(p) # 更新局部最优和全局最优位置
self.__bestf_val_list.append(self.__bestf_val) # 记录本次迭代的最优适应度
self.__W = max(self.__W - 0.2, 0) # W递减
return self.__G, self.__bestf_val_list
def _update_v(self, p):
# print('\n\n', np.round(p.v, 3))
update_v = self.__W * p.v + np.random.rand() * self.__C1 * (p.bestX - p.x) + np.random.rand() * self.__C2 * (
self.__G - p.x)
# print(np.round(update_v, 3))
# ***************************************如果有步长限制,则v应调整为步长的整数倍***********************************
# 限制v的范围
update_v[update_v > self.__max_v] = self.__max_v[update_v > self.__max_v]
update_v[update_v < -self.__max_v] = -self.__max_v[update_v < -self.__max_v]
p.v = update_v
def _update_x(self, p):
update_x = p.x + p.v
# 限制x的范围
update_x[update_x < self.__min_x] = self.__min_x[update_x < self.__min_x]
update_x[update_x > self.__max_x] = self.__max_x[update_x > self.__max_x]
p.x = update_x
def _update_p_bestX(self, p):
# 计算当前粒子的当前适应度
f_value = self.__fitness_fun(p.x)
# 更新粒子的历史最优位置和适应度
if f_value > p.fitnessValue:
p.fitnessValue = f_value
p.bestX = p.x
# 更新群体的历史最优位置和适应度
if f_value > self.__bestf_val:
self.__bestf_val = f_value
self.__G = p.x
if __name__ == '__main__':
m = PSO(W=1, C1=2, C2=2, dim=3, p_num=20, min_x=[3000, 0, 1], max_x=[5000, 0, 1],
max_v=[400, 0.2, 0.2], iter_num=20, fitness_fun=get_max_lirun)
best_x, scores = m.update_all()
print(best_x)
print(scores)