[强化学习实战]DQN算法实战-小车上山(MountainCar-v0)
2021/4/25 12:27:12
本文主要是介绍[强化学习实战]DQN算法实战-小车上山(MountainCar-v0),对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
转:
[强化学习实战]DQN算法实战-小车上山(MountainCar-v0)
DQN算法实战-小车上山
- 案例分析
- 实验环境
- 用线性近似求解最优策略
- 用深度Q学习求解最优策略
- 参考
代码链接
案例分析
如图1所示,一个小车在一段范围内行驶。在任一时刻,在水平方向看,小车位置的范围是[-1.2,0.6],速度的范围是[-0.07,0.07]。在每个时刻,智能体可以对小车施加3种动作中的一种:向左施力、不施力、向右施力。智能体施力和小车的水平位置会共同决定小车下一时刻的速度。当某时刻小车的水平位置大于0.5时,控制目标成功达成,回合结束。控制的目标是让小车以尽可能少的步骤达到目标。一般认为,如果智能体在连续100个回合中的平均步数≤110,就认为问题解决了。
在绝大多数情况下,智能体简单向右施力并不足以让小车成功越过目标。假设智能体并不知道环境确定小车位置和速度的数学表达式。事实上,小车的位置和速度是有数学表达式的。记第t时刻(t=0,1,2,…)小车的位置为
X
t
(
X
t
∈
[
−
1.2
,
0.6
]
)
X_t(X_t∈[-1.2,0.6])
Xt(Xt∈[−1.2,0.6]),速度为
V
t
(
V
t
∈
[
−
0.07
,
0.07
]
)
V_t(V_t∈[-0.07,0.07])
Vt(Vt∈[−0.07,0.07]),智能体施力为
A
t
∈
0
,
1
,
2
A_t∈{0,1,2}
At∈0,1,2,初始状态
X
0
∈
[
−
0.6
,
−
0.4
)
,
V
0
=
0
X_0∈[-0.6,-0.4),V_0=0
X0∈[−0.6,−0.4),V0=0。从t时刻到
t
+
1
t+1
t+1时刻的更新式为
其中限制函数clip()限制了位置和速度的范围:
实验环境
Gym库内置的环境’MountainCar-v0’已经实现了小车上山环境。在这个环境中,每一步的奖励都是-1,回合的回报的值就是总步数的负数。导入这个环境,并查看其状态空间和动作空间,以及位置和速度的参数。
import numpy as np np.random.seed(0) import pandas as pd import matplotlib.pyplot as plt import gym import tensorflow.compat.v2 as tf tf.random.set_seed(0) from tensorflow import keras env = gym.make('MountainCar-v0') env.seed(0) print('观测空间 = {}'.format(env.observation_space)) print('动作空间 = {}'.format(env.action_space)) print('位置范围 = {}'.format((env.unwrapped.min_position, env.unwrapped.max_position))) print('速度范围 = {}'.format((-env.unwrapped.max_speed, env.unwrapped.max_speed))) print('目标位置 = {}'.format(env.unwrapped.goal_position))
使用这个环境。在代码清单2中的策略总是试图向右对小车施力。程序运行结果表明,仅仅简单地向右施力,是不可能让小车达到目标的。为了避免程序无穷尽地运行下去,这里限制了回合最大的步数为200。
positions, velocities = [], [] observation = env.reset() while True: positions.append(observation[0]) velocities.append(observation[1]) next_observation, reward, done, _ = env.step(2) if done: break observation = next_observation if next_observation[0] > 0.5: print('成功到达') else: print('失败退出') # 绘制位置和速度图像 fig, ax = plt.subplots() ax.plot(positions, label='position') ax.plot(velocities, label='velocity') ax.legend() plt.show()
用线性近似求解最优策略
本节我们将用形如
q
(
s
,
a
)
=
[
x
(
s
,
a
)
]
T
w
q(s,a)=[x(s,a)]^Tw
q(s,a)=[x(s,a)]Tw的线性组合来近似动作价值函数,求解最优策略。
在这个问题中,位置和速度都是连续的变量。要从连续空间中导出数目有限的特征,最简单的方法是采用独热编码(one-hot coding)。如图a所示:在二维的“位置–速度”空间中,我们可将其划分为许多小格。位置轴范围总长是
l
位
置
l_位置
l位置,每个小格的宽度是
δ
位
置
δ_位置
δ位置,那么位置轴有
b
位
置
=
[
l
位
置
÷
δ
速
度
]
b_{位置}=[l_{位置} ÷delta_{速度}]
b位置=[l位置÷δ速度]个小格;同理,速度范围总长l速度,每个小格长度
δ
δ
δ速度,
b
速
度
=
[
l
速
度
÷
δ
速
度
]
b_{速度} = [l_{速度}÷delta_{速度}]
b速度=[l速度÷δ速度]个小格。这样,整个空间有
b
位
置
b
速
度
b_{位置}b_{速度}
b位置b速度个小格。每个小格对应一个特征:当位置速度对位于某个小格时,那个小格对应的特征为1,其他小格对应的特征均为0。这样,独热编码就从连续的空间中提取出了
b
位
置
b
速
度
b_{位置}b_{速度}
b位置b速度个特征。采用独热编码后得到的价值函数,对于同一网格内的所有位置速度对,其价值函数的估计都是相同的。所以这只是一种近似。如果要让近似更准确,就要让每个小格的长度
δ
位
置
和
δ
速
度
δ_{位置}和δ_{速度}
δ位置和δ速度更小。但是,这样会增大特征的数目
b
位
置
b
速
度
b_{位置}b_{速度}
b位置b速度。
砖瓦编码(tile coding)可以在精度相同的情况下减少特征数目。如图b所示,砖瓦编码引入了多层大网格。本节用的m层砖瓦编码,每层的大网格都是原来独热编码小格的m位宽、m位长。在相邻两层之间,在两个维度上都偏移一个独热编码的小格。对于任意的位置速度对,它在每一层都会落在某个大网格里。这样,我们可以让每层中大网格对应的特征为1,其他特征为0。综合考虑所有层,总共大致有
b
位
置
b
速
度
/
m
b_{位置}b_{速度}/m
b位置b速度/m个特征,特征数大大减小。
TileCoder类实现了砖瓦编码。构造TileCoder类需要两个参数:参数layers表示要用几层砖瓦编码;参数features表示砖瓦编码应该得到多少特征,即x(s,a)的维度,它也是w的维度。构造TileCoder类对象后,就可以调用这个对象找到每个数据激活了哪些特征。调用的参数floats输入[0,1]间的浮点数的tuple,参数ints输入int元素的tuple(不参与砖瓦编码);返回int型列表,表示激活的参数指标。
class TileCoder: def __init__(self, layers, features): self.layers = layers self.features = features self.codebook = {} def get_feature(self, codeword): if codeword in self.codebook: return self.codebook[codeword] count = len(self.codebook) if count >= self.features: # 冲突处理 return hash(codeword) % self.features self.codebook[codeword] = count return count def __call__(self, floats=(), ints=()): dim = len(floats) scaled_floats = tuple(f * self.layers * self.layers for f in floats) features = [] for layer in range(self.layers): codeword = (layer,) + tuple(int((f + (1 + dim * i) * layer) / self.layers) for i, f in enumerate(scaled_floats)) + ints feature = self.get_feature(codeword) features.append(feature) return features
在小车上山任务中,如果我们对观测空间选取8层的砖瓦编码,那么观测空间第0层有8×8=64个砖瓦,剩下8-1=7层有(8+1)×(8+1)=81个砖瓦,一共有64+7×81=631个砖瓦。再考虑到动作有3种可能的取值,那么总共有631×3=1893个特征。接下来,我们运用砖瓦编码来实现函数近似的智能体。以下是函数近似SARSA算法的智能体类SARSAAgent和函数近似SARSA(λ)的智能体类SARSALambdaAgent。
class SARSAAgent: def __init__(self, env, layers=8, features=1893, gamma=1., learning_rate=0.03, epsilon=0.001): self.action_n = env.action_space.n # 动作数 self.obs_low = env.observation_space.low self.obs_scale = env.observation_space.high - env.observation_space.low # 观测空间范围 self.encoder = TileCoder(layers, features) # 砖瓦编码器 self.w = np.zeros(features) # 权重 self.gamma = gamma # 折扣 self.learning_rate = learning_rate # 学习率 self.epsilon = epsilon # 探索 def encode(self, observation, action): # 编码 states = tuple((observation - self.obs_low) / self.obs_scale) actions = (action,) return self.encoder(states, actions) def get_q(self, observation, action): # 动作价值 features = self.encode(observation, action) return self.w[features].sum() def decide(self, observation): # 判决 if np.random.rand() < self.epsilon: return np.random.randint(self.action_n) else: qs = [self.get_q(observation, action) for action in range(self.action_n)] return np.argmax(qs) def learn(self, observation, action, reward, next_observation, done, next_action): # 学习 u = reward + (1. - done) * self.gamma * self.get_q(next_observation, next_action) td_error = u - self.get_q(observation, action) features = self.encode(observation, action) self.w[features] += (self.learning_rate * td_error)
class SARSALambdaAgent(SARSAAgent): def __init__(self, env, layers=8, features=1893, gamma=1., learning_rate=0.03, epsilon=0.001, lambd=0.9): super().__init__(env=env, layers=layers, features=features, gamma=gamma, learning_rate=learning_rate, epsilon=epsilon) self.lambd = lambd self.z = np.zeros(features) # 初始化资格迹 def learn(self, observation, action, reward, next_observation, done, next_action): u = reward if not done: u += (self.gamma * self.get_q(next_observation, next_action)) self.z *= (self.gamma * self.lambd) features = self.encode(observation, action) self.z[features] = 1. # 替换迹 td_error = u - self.get_q(observation, action) self.w += (self.learning_rate * td_error * self.z) if done: self.z = np.zeros_like(self.z) # 为下一回合初始化资格迹
运用环境对象env和构造好的智能体对象agent,我们就可以用函数play_sarsa()训练智能体。对于训练了300个回合的SARSAAgent,平均回合奖励可以达到-121左右;对于训练了150个回合的SARSALambdaAgent,平均回合奖励可以达到-107左右。在这个实现中,SARSA(λ)算法比SARSA算法更为高效。事实上,SARSA(λ)算法是针对小车上山这个任务最有效的方法之一。
用深度Q学习求解最优策略
首先我们来看经验回放。代码清单中的类DQNReplayer实现了经验回放。构造这个类的参数中有个int型的参数capacity,表示存储空间最多可以存储几条经验。当要存储的经验数超过capacity时,会用最新的经验覆盖最早存入的经验。
class DQNReplayer: def __init__(self, capacity): self.memory = pd.DataFrame(index=range(capacity), columns=['observation', 'action', 'reward', 'next_observation', 'done']) self.i = 0 self.count = 0 self.capacity = capacity def store(self, *args): self.memory.loc[self.i] = args self.i = (self.i + 1) % self.capacity self.count = min(self.count + 1, self.capacity) def sample(self, size): indices = np.random.choice(self.count, size=size) return (np.stack(self.memory.loc[indices, field]) for field in self.memory.columns)
接下来我们来看函数近似部分。函数近似采用了矢量形式的近似函数 q ( s ; w ) , s ∈ ( S ) q(s;w),s∈(mathcal{S}) q(s;w),s∈(S),近似函数的形式为全连接神经网络。以下分别实现了带目标网络的深度Q学习智能体和双重Q学习智能体。它们和play_qlearning()函数结合,就实现了带目标网络的深度Q学习算法和双重Q学习算法。
class DQNAgent: def __init__(self, env, net_kwargs={}, gamma=0.99, epsilon=0.001, replayer_capacity=10000, batch_size=64): observation_dim = env.observation_space.shape[0] self.action_n = env.action_space.n self.gamma = gamma self.epsilon = epsilon self.batch_size = batch_size self.replayer = DQNReplayer(replayer_capacity) # 经验回放 self.evaluate_net = self.build_network(input_size=observation_dim, output_size=self.action_n, **net_kwargs) # 评估网络 self.target_net = self.build_network(input_size=observation_dim, output_size=self.action_n, **net_kwargs) # 目标网络 self.target_net.set_weights(self.evaluate_net.get_weights()) def build_network(self, input_size, hidden_sizes, output_size, activation=tf.nn.relu, output_activation=None, learning_rate=0.01): # 构建网络 model = keras.Sequential() for layer, hidden_size in enumerate(hidden_sizes): kwargs = dict(input_shape=(input_size,)) if not layer else {} model.add(keras.layers.Dense(units=hidden_size, activation=activation, **kwargs)) model.add(keras.layers.Dense(units=output_size, activation=output_activation)) # 输出层 optimizer = tf.optimizers.Adam(lr=learning_rate) model.compile(loss='mse', optimizer=optimizer) return model def learn(self, observation, action, reward, next_observation, done): self.replayer.store(observation, action, reward, next_observation, done) # 存储经验 observations, actions, rewards, next_observations, dones = self.replayer.sample(self.batch_size) # 经验回放 next_qs = self.target_net.predict(next_observations) next_max_qs = next_qs.max(axis=-1) us = rewards + self.gamma * (1. - dones) * next_max_qs targets = self.evaluate_net.predict(observations) targets[np.arange(us.shape[0]), actions] = us self.evaluate_net.fit(observations, targets, verbose=0) if done: # 更新目标网络 self.target_net.set_weights(self.evaluate_net.get_weights()) def decide(self, observation): # epsilon贪心策略 if np.random.rand() < self.epsilon: return np.random.randint(self.action_n) qs = self.evaluate_net.predict(observation[np.newaxis]) return np.argmax(qs)
def play_qlearning(env, agent, train=False, render=False): episode_reward = 0 observation = env.reset() while True: if render: env.render() action = agent.decide(observation) next_observation, reward, done, _ = env.step(action) episode_reward += reward if train: agent.learn(observation, action, reward, next_observation, done) if done: break observation = next_observation return episode_reward
class DoubleDQNAgent(DQNAgent): def learn(self, observation, action, reward, next_observation, done): self.replayer.store(observation, action, reward, next_observation, done) # 存储经验 observations, actions, rewards, next_observations, dones = self.replayer.sample(self.batch_size) # 经验回放 next_eval_qs = self.evaluate_net.predict(next_observations) next_actions = next_eval_qs.argmax(axis=-1) next_qs = self.target_net.predict(next_observations) next_max_qs = next_qs[np.arange(next_qs.shape[0]), next_actions] us = rewards + self.gamma * next_max_qs * (1. - dones) targets = self.evaluate_net.predict(observations) targets[np.arange(us.shape[0]), actions] = us self.evaluate_net.fit(observations, targets, verbose=0) if done: self.target_net.set_weights(self.evaluate_net.get_weights())
代码使用TensorFlow来实现,并同时兼容TensorFlow 1.X的最新稳定版本和TensorFlow 2.X的最新稳定版本。对于基于TensorFlow的程序,即使已经设置了随机数的种子,也不能保证完全复现。所以,运行结果有差异是正常现象。
参考
原理的介绍可以参考我之前的文章
函数近似方法与原理
线性近似与函数近似的收敛性
DQN算法原理
转:
[强化学习实战]DQN算法实战-小车上山(MountainCar-v0)
这篇关于[强化学习实战]DQN算法实战-小车上山(MountainCar-v0)的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-12-23DevExpress 怎么实现右键菜单(Context Menu)显示中文?-icode9专业技术文章分享
- 2024-12-22怎么通过控制台去看我的页面渲染的内容在哪个文件中呢-icode9专业技术文章分享
- 2024-12-22el-tabs 组件只被引用了一次,但有时会渲染两次是什么原因?-icode9专业技术文章分享
- 2024-12-22wordpress有哪些好的安全插件?-icode9专业技术文章分享
- 2024-12-22wordpress如何查看系统有哪些cron任务?-icode9专业技术文章分享
- 2024-12-21Svg Sprite Icon教程:轻松入门与应用指南
- 2024-12-20Excel数据导出实战:新手必学的简单教程
- 2024-12-20RBAC的权限实战:新手入门教程
- 2024-12-20Svg Sprite Icon实战:从入门到上手的全面指南
- 2024-12-20LCD1602显示模块详解