前言 这个是RLHF系列中的策略梯度部分,在看了Hands-on-RL 和parl 两者实现后,感觉整体难度并不是很高,但是当自己从零实现时还是会莫名其妙多一些问题,相比深度学习来讲,还是有蛮多小细节是需要额外注意的。
注意点 1. log平滑 这里是指learn阶段中的获取最大期望阶段,如下代码所示:
1 2 output = self.model(obs_bs) output = torch.log(output.gather(-1 , action_bs.reshape(-1 , 1 )))
在最开始自己实现时,我没有加log进行平滑,发现模型没法收敛(CarPole-v0 reward最大得分为200),一直是8,9徘徊。后来我看了上述实现,发现这里多了个log,这里让我觉得很困惑,因为我觉得这一步是不必要的,原因有以下几个方面:
model.forward部分,已经用softmax做归一化了,已经避免了差异较大的情况。
从某种角度来讲,我甚至觉得model.forward中的softmax部分也不应该添加。因为本质来讲就是希望期望最大嘛。
但是呢,如果不加log这一步,模型就无法收敛。
2. 折扣因子 这里是指在每一次done,产生了一批state、reward、action之后,在进行计算loss时,下一步的reward还要考虑当前步reward的结果,即下一步的reward一定要小于当前步的reward。也就是calc_reward_to_go
函数这里。
这里同样也会觉得很困惑,因为如果希望期望最大,那就reward * prob使其概率最大即可。
如果看Hands-on-RL 他的实现,不会感到任何困惑,因为他是用for来做的。但是呢,parl 的实现在考虑了reward * prob使其概率最大
这一步之后,又添加了calc_reward_to_go
函数,从而本来是个离散的东西,强生生的给变成了一个连续状态的事情,关键呢,怎么看都不像是连续的,因为当前步在计算loss时也并没有跟上一步扯上直接的关系。
这一步还好,从理解角度来讲,我会更倾向Hands-on-RL 的实现,容易理解。
总结 关于第一点,我的感觉是步子不能迈太大,宁可慢慢收敛,也要比无法收敛更强,例如model部分我尝试改成如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class PolicyNet (torch.nn.Module): def __init__ (self, state_dim, hidden_dim, action_dim ): super (PolicyNet, self).__init__() self.state_dim = state_dim self.action_dim = action_dim self.fc1 = torch.nn.Linear(state_dim, hidden_dim) self.fc2 = torch.nn.Linear(hidden_dim, action_dim) def forward (self, x ): x = F.relu(self.fc1(x)) return self.fc2(x) @torch.no_grad() def sample (self, obs ): result = self(torch.FloatTensor(obs)) result = F.softmax(result, dim=-1 ) return torch.multinomial(result, 1 ).item()
即forward部分不用softmax,剩下代码保持不变,也同样处于无法收敛状态,似乎来看,softmax+log才是这个算法成功的关键。
这里让我想到一个事情,在深度学习Layer参数初始化的过程,比如:
我们会是这么写,基本不会关心a这个linear的参数是如何初始化的。是因为框架内部已经考虑了kaiming、xavier、uniform等各种初始化技术。如果不加这些参数初始化技术,模型基本也很难收敛。
所以如果应用的话,可以采用现成的实现,如果研究的话,其中一些细节可以慢慢调整。
源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 import randomimport gymimport torchimport torch.nn.functional as Fimport numpy as npimport matplotlib.pyplot as pltfrom tqdm import tqdmimport rl_utilsfrom parl.env import CompatWrapperclass PolicyNet (torch.nn.Module): def __init__ (self, state_dim, hidden_dim, action_dim ): super (PolicyNet, self).__init__() self.state_dim = state_dim self.action_dim = action_dim self.fc1 = torch.nn.Linear(state_dim, hidden_dim) self.fc2 = torch.nn.Linear(hidden_dim, action_dim) def forward (self, x ): x = F.relu(self.fc1(x)) return F.softmax(self.fc2(x), dim=-1 ) @torch.no_grad() def sample (self, obs ): result = self(torch.FloatTensor(obs)) return torch.multinomial(result, 1 ).item() def calc_reward_to_go (reward_list, gamma=1.0 ): for i in range (len (reward_list) - 2 , -1 , -1 ): reward_list[i] += gamma * reward_list[i + 1 ] return np.array(reward_list) class Agent (object ): def __init__ (self, state_dim, action_dim, hidden_dim ): self.model = PolicyNet(state_dim=state_dim, hidden_dim=hidden_dim, action_dim=action_dim) self.optim = torch.optim.Adam(self.model.parameters(), lr=1e-3 ) def train (self, env ): total_reward = 0 obs_bs, action_bs, next_obs_bs, reward_bs, done_bs = [], [], [], [], [] obs = env.reset() while True : action = self.model.sample(obs=obs) next_obs, reward, done, info = env.step(action=action) if done: break obs_bs.append(obs) action_bs.append(action) next_obs_bs.append(next_obs) reward_bs.append(reward) done_bs.append(done) total_reward += reward obs = next_obs if not obs_bs: return total_reward reward_bs = calc_reward_to_go(reward_bs) obs_bs = torch.FloatTensor(np.array(obs_bs)) action_bs = torch.LongTensor(np.array(action_bs)) reward_bs = torch.FloatTensor(np.array(reward_bs)) output = self.model(obs_bs) output = torch.log(output.gather(-1 , action_bs.reshape(-1 , 1 ))) loss = torch.mean(-1 * output.flatten() * reward_bs) self.optim.zero_grad() loss.backward() self.optim.step() return total_reward if __name__ == '__main__' : env = CompatWrapper(gym.make('CartPole-v0' )) agent = Agent(env.observation_space.shape[0 ], action_dim=env.action_space.n, hidden_dim=64 ) for epoch in range (10000 ): total_reward = agent.train(env=env) print (f'Epoch[{epoch} ]-->{total_reward} ' )