前言

下面记录下DQN算法以及一些细节,注意哦,本博客更多目的在于当下记录,并非完整严谨的哦,也或许有理解错误。

关于DQN,看了下网上的介绍以及从Q-Learning到DQN解决state和action无法枚举完的情况。另外也强烈推荐下面链接:

  1. 知乎网友实现DQN:可直接按照这个跑通体验下效果。
  2. PaddlePaddle/PARL:这个是paddle出的RLHF库,并提供了相应的examples帮助入门和深入,并且环境也帮忙解决好了,如果debug能力比较强的话,建议直接看这个哦。
  3. PyTorch DQN实现:这个是pytorch官方实现的DQN算法。

一些特别的点

1. 俺是value based的,所以不需要softmax

看下面这个DQN网络,你觉得有问题么?

1
2
3
4
5
6
7
8
9
10
11
12
13
class DQN(nn.Module):
def __init__(self, obs, action):
super().__init__()
self.fc1 = nn.Linear(obs, 64)
self.fc2 = nn.Linear(64, 64)
self.fc3 = nn.Linear(64, action)

def forward(self, obs):
x = torch.relu(self.fc1(obs))
x = torch.relu(self.fc2(x))
x = torch.softmax(self.fc3(x))
return x

啊啊啊,这里为什么要加softmax呢,是不是习惯了深度学习那套分类思想,再好好想想这里,是不是不应该加softmax。

2. eps是干嘛的,是否是必须的?

仔细观察下面代码,eps是在做sample时起作用的,也就是select_action这里,可以看到,在每个epoch结束时,更新新的eps值,从最开始的1,到最后的eps_end=0.1,它是一个比较平滑的曲线。

select_action可以观察到,action早期处于随机采样的状态,随着epoch的增加,action的决策更多过度到model决策。

那这里是否是必须的呢?

不,这里并不是必须的,因为完全可以在train之前加一个warmup步骤,让model一定程度上学会state到action这个变化。再到后面,就是正常训练流程,不需要sample这个过程了。

那这里更多起到什么作用?

我觉得有个点可以很好理解这里,即从完全小白到慢慢学习直至认知理解的过程。添加warmup,即先提前产生一批训练样本,而eps这里,即随着过程慢慢学习,不过相比warmup,可以更优采取权重随机采样方案,即torch.multinomial(torch.softmax(self(obs), dim=-1), 1).item(),它表现出来的特点是权重高的多次采样出现的频率也会更高,那么随着模型的优化,采样更优的可能性也会提升。

3. target_net是否是必须的?

不,这里也并不是必须的。观察整个过程,policy_net要比target_net新一个epoch,而且他俩实际上是在干同一个事情,那么可以将target_net指向policy_net,可以发现epoch的增加,reward的值也是正常提升的。

4. reply memory是干嘛的?

reply memory可以说记录了整个state和action等的过程,当然有一个maxlen来限制其大小,过早的数据就不要了。

不过需要指出的是,改动了上面这些点,虽然也可以收敛,但是可能会收敛变慢。

源码

这里借鉴了知乎网友实现DQN

pygame==2.1.0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119

import gym
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter
import numpy as np
import random
from collections import deque
from tqdm import tqdm


class DQN(nn.Module):
def __init__(self, state_dim, action_dim):
super(DQN, self).__init__()
self.fc1 = nn.Linear(state_dim, 64)
self.fc2 = nn.Linear(64, 64)
self.fc3 = nn.Linear(64, action_dim)

def forward(self, x):
x = torch.relu(self.fc1(x))
x = torch.relu(self.fc2(x))
x = self.fc3(x)
return x


class Agent():
def __init__(self, state_dim, action_dim, memory_size=10000, batch_size=64, gamma=0.99, lr=1e-4):
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.state_dim = state_dim
self.action_dim = action_dim
self.memory = deque(
maxlen=memory_size) # deque是一个双端队列,可以在队首或队尾插入或删除元素。在DQN算法中,我们使用deque实现经验池来存储之前的经验,因为它可以在队尾插入新的经验,并在队首删除最老的经验,从而保持经验池的大小不变。
self.batch_size = batch_size
self.gamma = gamma
self.lr = lr
self.policy_net = DQN(state_dim, action_dim).to(self.device)
self.target_net = DQN(state_dim, action_dim).to(self.device)
self.optimizer = optim.Adam(self.policy_net.parameters(), lr=self.lr)
self.loss_fn = nn.MSELoss()
self.steps = 0
self.writer = SummaryWriter()

def select_action(self, state, eps):
if random.random() < eps:
return random.randint(0, self.action_dim - 1)
else:
state = torch.FloatTensor(state).to(self.device)
with torch.no_grad():
action = self.policy_net(state).argmax().item()
return action

def store_transition(self, state, action, reward, next_state, done):
self.memory.append((state, action, reward, next_state, done))

def train(self):
if len(self.memory) < self.batch_size:
return
transitions = random.sample(self.memory, self.batch_size)
batch = list(zip(*transitions))

state_batch = torch.FloatTensor(batch[0]).to(self.device)
action_batch = torch.LongTensor(batch[1]).to(self.device)
reward_batch = torch.FloatTensor(batch[2]).to(self.device)
next_state_batch = torch.FloatTensor(batch[3]).to(self.device)
done_batch = torch.FloatTensor(batch[4]).to(self.device)

q_values = self.policy_net(state_batch).gather(1, action_batch.unsqueeze(1)).squeeze(1)
next_q_values = self.target_net(next_state_batch).max(1)[0]
expected_q_values = reward_batch + self.gamma * next_q_values * (1 - done_batch)

loss = self.loss_fn(q_values, expected_q_values.detach())

self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()

self.steps += 1
self.writer.add_scalar("Loss", loss.item(), self.steps)

def update_target(self):
self.target_net.load_state_dict(self.policy_net.state_dict())


def train_epoch(env, eps):
state = env.reset()
total_reward = 0
while True:
action = agent.select_action(state, eps)

next_state, reward, done, _ = env.step(action)

agent.store_transition(state, action, reward, next_state, done)
state = next_state
agent.train()

# env.render()
total_reward += reward
if done:
break
return total_reward


def train_dqn(env, agent: Agent, eps_start=1, eps_end=0.1, eps_decay=0.995, max_episodes=1000, max_steps=1000):
eps = eps_start
for episode in tqdm(range(max_episodes)):
reward = train_epoch(env, eps)
agent.update_target()
eps = max(eps * eps_decay, eps_end)
print(f'{episode} --> {reward}')


if __name__ == "__main__":
env = gym.make("CartPole-v1")
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
agent = Agent(state_dim, action_dim)
train_dqn(env, agent)