Source code for btgym.algorithms.aac

###############################################################################
#
# Copyright (C) 2017 Andrew Muzikin
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
###############################################################################

from __future__ import print_function

import sys

import numpy as np
import tensorflow as tf
from logbook import Logger, StreamHandler

from btgym.algorithms.memory import Memory
from btgym.algorithms.rollout import make_data_getter
from btgym.algorithms.runner import BaseEnvRunnerFn, RunnerThread
from btgym.algorithms.math_utils import log_uniform
from btgym.algorithms.nn.losses import value_fn_loss_def, rp_loss_def, pc_loss_def, aac_loss_def, ppo_loss_def
from btgym.algorithms.utils import feed_dict_rnn_context, feed_dict_from_nested, batch_stack
from btgym.spaces import DictSpace as BaseObSpace
from btgym.spaces import ActionDictSpace as BaseAcSpace


[docs]class BaseAAC(object): """ Base Asynchronous Advantage Actor Critic algorithm framework class with auxiliary control tasks and option to run several instances of environment for every worker in vectorized fashion, PAAC-like. Can be configured to run with different losses and policies. Auxiliary tasks implementation borrows heavily from Kosuke Miyoshi code, under Apache License 2.0: https://miyosuda.github.io/ https://github.com/miyosuda/unreal Original A3C code comes from OpenAI repository under MIT licence: https://github.com/openai/universe-starter-agent Papers: https://arxiv.org/abs/1602.01783 https://arxiv.org/abs/1611.05397 """ def __init__(self, env, task, policy_config, log_level, name='AAC', on_policy_loss=aac_loss_def, off_policy_loss=aac_loss_def, vr_loss=value_fn_loss_def, rp_loss=rp_loss_def, pc_loss=pc_loss_def, runner_config=None, runner_fn_ref=BaseEnvRunnerFn, cluster_spec=None, random_seed=None, model_gamma=0.99, # decay model_gae_lambda=1.00, # GAE lambda model_beta=0.01, # entropy regularizer opt_max_env_steps=10 ** 7, opt_decay_steps=None, opt_end_learn_rate=None, opt_learn_rate=1e-4, opt_decay=0.99, opt_momentum=0.0, opt_epsilon=1e-8, rollout_length=20, time_flat=False, episode_train_test_cycle=(1,0), episode_summary_freq=2, # every i`th environment episode env_render_freq=10, # every i`th environment episode model_summary_freq=100, # every i`th algorithm iteration test_mode=False, # gym_atari test mode replay_memory_size=2000, replay_batch_size=None, replay_rollout_length=None, use_off_policy_aac=False, use_reward_prediction=False, use_pixel_control=False, use_value_replay=False, rp_lambda=1.0, # aux tasks loss weights pc_lambda=1.0, vr_lambda=1.0, off_aac_lambda=1, gamma_pc=0.9, # pixel change gamma-decay - not used rp_reward_threshold=0.1, # r.prediction: abs.rewards values bigger than this are considered non-zero rp_sequence_size=3, # r.prediction sampling clip_epsilon=0.1, num_epochs=1, pi_prime_update_period=1, global_step_op=None, global_episode_op=None, inc_episode_op=None, _use_global_network=True, _use_target_policy=False, # target policy tracking behavioral one with delay _use_local_memory=False, # in-place memory aux_render_modes=None, **kwargs): """ Args: env: environment instance or list of instances task: int, parent worker id policy_config: policy estimator class and configuration dictionary log_level: int, logbook.level name: str, class-wide name-scope on_policy_loss: callable returning tensor holding on_policy training loss graph and summaries off_policy_loss: callable returning tensor holding off_policy training loss graph and summaries vr_loss: callable returning tensor holding value replay loss graph and summaries rp_loss: callable returning tensor holding reward prediction loss graph and summaries pc_loss: callable returning tensor holding pixel_control loss graph and summaries runner_config: runner class and configuration dictionary, runner_fn_ref: callable defining environment runner execution logic, valid only if no 'runner_config' arg is provided cluster_spec: dict, full training cluster spec (may be used by meta-trainer) random_seed: int or None model_gamma: scalar, gamma discount factor model_gae_lambda: scalar, GAE lambda model_beta: entropy regularization beta, scalar or [high_bound, low_bound] for log_uniform. opt_max_env_steps: int, total number of environment steps to run training on. opt_decay_steps: int, learn ratio decay steps, in number of environment steps. opt_end_learn_rate: scalar, final learn rate opt_learn_rate: start learn rate, scalar or [high_bound, low_bound] for log_uniform distr. opt_decay: scalar, optimizer decay, if apll. opt_momentum: scalar, optimizer momentum, if apll. opt_epsilon: scalar, optimizer epsilon rollout_length: int, on-policy rollout length time_flat: bool, flatten rnn time-steps in rollouts while training - see `Notes` below episode_train_test_cycle: tuple or list as (train_number, test_number), def=(1,0): enables infinite loop such as: run `train_number` of train data episodes, than `test_number` of test data episodes, repeat. Should be consistent with provided dataset parameters (test data should exist if `test_number > 0`) episode_summary_freq: int, write episode summary for every i'th episode env_render_freq: int, write environment rendering summary for every i'th train step model_summary_freq: int, write model summary for every i'th train step test_mode: bool, True: Atari, False: BTGym replay_memory_size: int, in number of experiences replay_batch_size: int, mini-batch size for off-policy training, def = 1 replay_rollout_length: int off-policy rollout length by def. equals on_policy_rollout_length use_off_policy_aac: bool, use full AAC off-policy loss instead of Value-replay use_reward_prediction: bool, use aux. off-policy reward prediction task use_pixel_control: bool, use aux. off-policy pixel control task use_value_replay: bool, use aux. off-policy value replay task (not used if use_off_policy_aac=True) rp_lambda: reward prediction loss weight, scalar or [high, low] for log_uniform distr. pc_lambda: pixel control loss weight, scalar or [high, low] for log_uniform distr. vr_lambda: value replay loss weight, scalar or [high, low] for log_uniform distr. off_aac_lambda: off-policy AAC loss weight, scalar or [high, low] for log_uniform distr. gamma_pc: NOT USED rp_reward_threshold: scalar, reward prediction classification threshold, above which reward is 'non-zero' rp_sequence_size: int, reward prediction sample size, in number of experiences clip_epsilon: scalar, PPO: surrogate L^clip epsilon num_epochs: int, num. of SGD runs for every train step, val. > 1 should be used with caution. pi_prime_update_period: int, PPO: pi to pi_old update period in number of train steps, def: 1 global_step_op: external tf.variable holding global step counter global_episode_op: external tf.variable holding global episode counter inc_episode_op: external tf.op incrementing global step counter _use_global_network: bool, either to use parameter server policy instance _use_target_policy: bool, PPO: use target policy (aka pi_old), delayed by `pi_prime_update_period` delay _use_local_memory: bool: use in-process replay memory instead of runner-based one aux_render_modes: additional visualisations to include in per-episode rendering summary Note: - On `time_flat` arg: There are two alternatives to run RNN part of policy estimator: a. Feed initial RNN state for every experience frame in rollout (those are stored anyway if we want random memory repaly sampling) and do single time-step RNN advance for all experiences in a batch; this is when time_flat=True; b. Reshape incoming batch after convolution part of network in time-wise fashion for every rollout in a batch i.e. batch_size=number_of_rollouts and rnn_timesteps=max_rollout_length. In this case we need to feed initial rnn_states for rollouts only. There is some little extra work to pad rollouts to max_time_size and feed true rollout lengths to rnn. Thus, when time_flat=False, we unroll RNN in specified number of time-steps for every rollout. Both options has pros and cons: Unrolling dynamic RNN is computationally more expensive but gives clearly faster convergence, [possibly] due to the fact that RNN states for 2nd, 3rd, ... frames of rollouts are computed using updated policy estimator, which is supposed to be closer to optimal one. When time_flattened, every time-step uses RNN states computed when rollout was collected (i.e. by behavioral policy estimator with older parameters). Nevertheless, time_flat: - allows use of static RNN; - one can safely shuffle training batch or mix on-policy and off-policy data in single mini-batch, ensuring iid property; - allowing second-order derivatives which is impossible in current tf dynamic RNN implementation as it uses tf.while_loop internally; - computationally cheaper; """ # Logging: self.log_level = log_level self.name = name self.task = task self.cluster_spec = cluster_spec StreamHandler(sys.stdout).push_application() self.log = Logger('{}_{}'.format(self.name, self.task), level=self.log_level) # Get direct traceback: try: self.random_seed = random_seed if self.random_seed is not None: np.random.seed(self.random_seed) tf.set_random_seed(self.random_seed) self.log.debug('rnd_seed:{}, log_u_sample_(0,1]x5: {}'. format(random_seed, log_uniform([1e-10,1], 5))) if kwargs != {}: self.log.warning('Unexpected kwargs found: {}, ignored.'.format(kwargs)) self.env_list = env try: assert isinstance(self.env_list, list) except AssertionError: self.env_list = [env] self.ref_env = self.env_list[0] # reference instance to get obs shapes etc. try: assert isinstance(self.ref_env.observation_space, BaseObSpace) except AssertionError: self.log.exception( 'expected environment observation space of type {}, got: {}'.\ format(BaseObSpace, type(self.ref_env.observation_space)) ) raise AssertionError try: assert isinstance(self.ref_env.action_space, BaseAcSpace) except AssertionError: self.log.exception( 'expected environment observation space of type {}, got: {}'.\ format(BaseAcSpace, type(self.ref_env.action_space)) ) raise AssertionError self.policy_class = policy_config['class_ref'] self.policy_kwargs = policy_config['kwargs'] # Losses: self.on_policy_loss = on_policy_loss self.off_policy_loss = off_policy_loss self.vr_loss = vr_loss self.rp_loss = rp_loss self.pc_loss = pc_loss if runner_config is None: # Runner will be async. ThreadRunner class with runner_fn logic: self.runner_config = { 'class_ref': RunnerThread, 'kwargs': { 'runner_fn_ref': runner_fn_ref, } } else: self.runner_config = runner_config # AAC specific: self.model_gamma = model_gamma # decay self.model_gae_lambda = model_gae_lambda # general advantage estimator lambda self.model_beta = log_uniform(model_beta, 1) # entropy reg. self.time_flat = time_flat # Optimizer self.opt_max_env_steps = opt_max_env_steps self.opt_learn_rate = log_uniform(opt_learn_rate, 1) if opt_end_learn_rate is None: self.opt_end_learn_rate = self.opt_learn_rate else: self.opt_end_learn_rate = opt_end_learn_rate if opt_decay_steps is None: self.opt_decay_steps = self.opt_max_env_steps else: self.opt_decay_steps = opt_decay_steps self.opt_decay = opt_decay self.opt_epsilon = opt_epsilon self.opt_momentum = opt_momentum self.rollout_length = rollout_length # Data sampling control: self.num_train_episodes = episode_train_test_cycle[0] self.num_test_episodes = episode_train_test_cycle[-1] try: assert self.num_train_episodes + self.num_test_episodes > 0 and \ self.num_train_episodes >= 0 and \ self.num_test_episodes >= 0 except AssertionError: self.log.exception( 'Train/test episode cycle values could not be both zeroes or negative, got: train={}, test={}'.\ format(self.num_train_episodes, self.num_test_episodes) ) raise AssertionError self.current_train_episode = 0 self.current_test_episode = 0 # Summaries : self.episode_summary_freq = episode_summary_freq self.env_render_freq = env_render_freq self.model_summary_freq = model_summary_freq # If True - use ATARI gym env.: self.test_mode = test_mode # UNREAL/AUX and Off-policy specific: self.off_aac_lambda = log_uniform(off_aac_lambda, 1) self.rp_lambda = log_uniform(rp_lambda, 1) self.pc_lambda = log_uniform(pc_lambda, 1) self.vr_lambda = log_uniform(vr_lambda, 1) self.gamma_pc = gamma_pc self.replay_memory_size = replay_memory_size if replay_rollout_length is not None: self.replay_rollout_length = replay_rollout_length else: self.replay_rollout_length = rollout_length # by default off-rollout equals on-policy one self.rp_sequence_size = rp_sequence_size self.rp_reward_threshold = rp_reward_threshold if replay_batch_size is not None: self.replay_batch_size = replay_batch_size else: self.replay_batch_size = len(self.env_list) # by default off-batch equals on-policy one # PPO related: self.clip_epsilon = clip_epsilon self.num_epochs = num_epochs self.pi_prime_update_period = pi_prime_update_period # On/off switchers for off-policy training and auxiliary tasks: self.use_off_policy_aac = use_off_policy_aac self.use_reward_prediction = use_reward_prediction self.use_pixel_control = use_pixel_control if use_off_policy_aac: self.use_value_replay = False # v-replay is redundant in this case else: self.use_value_replay = use_value_replay self.use_any_aux_tasks = use_value_replay or use_pixel_control or use_reward_prediction self.use_local_memory = _use_local_memory self.use_memory = (self.use_any_aux_tasks or self.use_off_policy_aac) and not self.use_local_memory self.use_target_policy = _use_target_policy self.use_global_network = _use_global_network self.log.notice('learn_rate: {:1.6f}, entropy_beta: {:1.6f}'.format(self.opt_learn_rate, self.model_beta)) if self.use_off_policy_aac: self.log.notice('off_aac_lambda: {:1.6f}'.format(self.off_aac_lambda,)) if self.use_any_aux_tasks: self.log.notice('vr_lambda: {:1.6f}, pc_lambda: {:1.6f}, rp_lambda: {:1.6f}'. format(self.vr_lambda, self.pc_lambda, self.rp_lambda)) if aux_render_modes is not None: self.aux_render_modes = list(aux_render_modes) else: self.aux_render_modes = [] #self.log.notice( # 'AAC_{}: max_steps: {}, decay_steps: {}, end_rate: {:1.6f},'. # format(self.task, self.opt_max_env_steps, self.opt_decay_steps, self.opt_end_learn_rate)) self.worker_device = "/job:worker/task:{}/cpu:0".format(task) # Update policy configuration self.policy_kwargs.update( { 'ob_space': self.ref_env.observation_space, 'ac_space': self.ref_env.action_space, 'rp_sequence_size': self.rp_sequence_size, 'aux_estimate': self.use_any_aux_tasks, 'static_rnn': self.time_flat, 'task': self.task, 'cluster_spec': self.cluster_spec } ) if global_step_op is not None: self.global_step = global_step_op if global_episode_op is not None: self.global_episode = global_episode_op if inc_episode_op is not None: self.inc_episode = inc_episode_op # Should be defined later: self.sync = None self.sync_pi = None self.sync_pi_prime = None self.grads = None self.summary_writer = None self.local_steps = 0 # Start building graphs: self.log.debug('started building graphs...') if self.use_global_network: # PS: with tf.device(tf.train.replica_device_setter(1, worker_device=self.worker_device)): self.network = pi_global = self._make_policy('global') if self.use_target_policy: self.network_prime = self._make_policy('global_prime') else: self.network_prime = self._make_dummy_policy() else: self.network = pi_global = self._make_dummy_policy() self.network_prime = self._make_dummy_policy() # Worker: with tf.device(self.worker_device): with tf.variable_scope(self.name): self.local_network = pi = self._make_policy('local') if self.use_target_policy: self.local_network_prime = pi_prime = self._make_policy('local_prime') else: self.local_network_prime = pi_prime = self._make_dummy_policy() self.worker_device_callback_0() # if need more networks etc. # Meant for Batch-norm layers: pi.update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS, scope='.*local.*') # Just in case: self.dummy_pi = self._make_dummy_policy() self.log.debug('local_network_upd_ops_collection:\n{}'.format(pi.update_ops)) self.log.debug('\nlocal_network_var_list_to_save:') for v in pi.var_list: self.log.debug('{}: {}'.format(v.name, v.get_shape())) # Learning rate annealing: self.learn_rate_decayed = tf.train.polynomial_decay( self.opt_learn_rate, self.global_step + 1, self.opt_decay_steps, self.opt_end_learn_rate, power=1, cycle=False, ) # Freeze training if train_phase is False: self.train_learn_rate = self.learn_rate_decayed * tf.cast(pi.train_phase, tf.float64) self.log.debug('learn rate ok') # Define loss and related summaries self.loss, self.loss_summaries = self._make_loss(pi=pi, pi_prime=pi_prime) if self.use_global_network: # Define train, sync ops: self.train_op = self._make_train_op(pi=pi, pi_prime=pi_prime, pi_global=pi_global) else: self.train_op = [] # Model stat. summary, episode summary: self.model_summary_op, self.ep_summary = self._combine_summaries( policy=pi, model_summaries=self.loss_summaries ) # Make thread-runner processes: self.runners = self._make_runners(policy=pi) # Make rollouts provider[s] for async runners: if self.runner_config['class_ref'] == RunnerThread: # Make rollouts provider[s] for async threaded runners: self.data_getter = [make_data_getter(runner.queue) for runner in self.runners] else: # Else assume runner is in-thread synchro type and supports .get data() method: self.data_getter = [runner.get_data for runner in self.runners] self.log.debug('trainer.__init__() ok') except: msg = 'Base class __init__() exception occurred.' +\ '\n\nPress `Ctrl-C` or jupyter:[Kernel]->[Interrupt] for clean exit.\n' self.log.exception(msg) raise RuntimeError(msg) def worker_device_callback_0(self): pass def _make_loss(self, **kwargs): return self._make_base_loss(name=self.name, verbose=True, **kwargs) def _make_base_loss(self, pi, pi_prime, name='base', verbose=True): """ Defines base AAC on- and off-policy loss, auxiliary VR, RP and PC losses, placeholders and summaries. Args: pi: policy network obj. pi_prime: optional policy network obj. name: str, name scope verbose: summary level Returns: tensor holding estimated loss graph list of related summaries """ with tf.name_scope(name): # On-policy AAC loss definition: pi.on_pi_act_target = tf.placeholder( tf.float32, [None, self.ref_env.action_space.one_hot_depth], name="on_policy_action_pl" ) pi.on_pi_adv_target = tf.placeholder(tf.float32, [None], name="on_policy_advantage_pl") pi.on_pi_r_target = tf.placeholder(tf.float32, [None], name="on_policy_return_pl") clip_epsilon = tf.cast(self.clip_epsilon * self.learn_rate_decayed / self.opt_learn_rate, tf.float32) on_pi_loss, on_pi_summaries = self.on_policy_loss( act_target=pi.on_pi_act_target, adv_target=pi.on_pi_adv_target, r_target=pi.on_pi_r_target, pi_logits=pi.on_logits, pi_vf=pi.on_vf, pi_prime_logits=pi_prime.on_logits, entropy_beta=self.model_beta, epsilon=clip_epsilon, name='on_policy', verbose=verbose ) # Start accumulating total loss: loss = on_pi_loss model_summaries = on_pi_summaries # Off-policy losses: pi.off_pi_act_target = tf.placeholder( tf.float32, [None, self.ref_env.action_space.one_hot_depth], name="off_policy_action_pl") pi.off_pi_adv_target = tf.placeholder(tf.float32, [None], name="off_policy_advantage_pl") pi.off_pi_r_target = tf.placeholder(tf.float32, [None], name="off_policy_return_pl") if self.use_off_policy_aac: # Off-policy AAC loss graph mirrors on-policy: off_pi_loss, off_pi_summaries = self.off_policy_loss( act_target=pi.off_pi_act_target, adv_target=pi.off_pi_adv_target, r_target=pi.off_pi_r_target, pi_logits=pi.off_logits, pi_vf=pi.off_vf, pi_prime_logits=pi_prime.off_logits, entropy_beta=self.model_beta, epsilon=clip_epsilon, name='off_policy', verbose=False ) loss = loss + self.off_aac_lambda * off_pi_loss model_summaries += off_pi_summaries if self.use_pixel_control: # Pixel control loss: pi.pc_action = tf.placeholder(tf.float32, [None, self.ref_env.action_space.tensor_shape[0]], name="pc_action") pi.pc_target = tf.placeholder(tf.float32, [None, None, None], name="pc_target") pc_loss, pc_summaries = self.pc_loss( actions=pi.pc_action, targets=pi.pc_target, pi_pc_q=pi.pc_q, name='off_policy', verbose=verbose ) loss = loss + self.pc_lambda * pc_loss # Add specific summary: model_summaries += pc_summaries if self.use_value_replay: # Value function replay loss: pi.vr_target = tf.placeholder(tf.float32, [None], name="vr_target") vr_loss, vr_summaries = self.vr_loss( r_target=pi.vr_target, pi_vf=pi.vr_value, name='off_policy', verbose=verbose ) loss = loss + self.vr_lambda * vr_loss model_summaries += vr_summaries if self.use_reward_prediction: # Reward prediction loss: pi.rp_target = tf.placeholder(tf.float32, [None, 3], name="rp_target") rp_loss, rp_summaries = self.rp_loss( rp_targets=pi.rp_target, pi_rp_logits=pi.rp_logits, name='off_policy', verbose=verbose ) loss = loss + self.rp_lambda * rp_loss model_summaries += rp_summaries return loss, model_summaries def _make_train_op(self, pi, pi_prime, pi_global): """ Defines training op graph and supplementary sync operations. Args: pi: policy network obj. pi_prime: optional policy network obj. pi_global: shared policy network obj. hosted by parameter server Returns: tensor holding training op graph; """ # Each worker gets a different set of adam optimizer parameters: self.optimizer = tf.train.AdamOptimizer(self.train_learn_rate, epsilon=1e-5) # self.optimizer = tf.train.RMSPropOptimizer( # learning_rate=train_learn_rate, # decay=self.opt_decay, # momentum=self.opt_momentum, # epsilon=self.opt_epsilon, # ) # Clipped gradients: self.grads, _ = tf.clip_by_global_norm( tf.gradients(self.loss, pi.var_list), 40.0 ) self.grads_global_norm = tf.global_norm(self.grads) # Copy weights from the parameter server to the local model self.sync = self.sync_pi = tf.group( *[v1.assign(v2) for v1, v2 in zip(pi.var_list, pi_global.var_list)] ) if self.use_target_policy: # Copy weights from new policy model to target one: self.sync_pi_prime = tf.group( *[v1.assign(v2) for v1, v2 in zip(pi_prime.var_list, pi.var_list)] ) grads_and_vars = list(zip(self.grads, pi_global.var_list)) # Set global_step increment equal to observation space batch size: obs_space_keys = list(pi.on_state_in.keys()) # Handles case when 'external' is nested or flat dict: assert 'external' in obs_space_keys, \ 'Expected observation space to contain `external` mode, got: {}'.format(obs_space_keys) if isinstance(pi.on_state_in['external'], dict): stream = pi.on_state_in['external'][list(pi.on_state_in['external'].keys())[0]] else: stream = pi.on_state_in['external'] self.inc_step = self.global_step.assign_add(tf.shape(stream)[0]) train_op = self.optimizer.apply_gradients(grads_and_vars) self.log.debug('train_op defined') return train_op def _combine_summaries(self, policy=None, model_summaries=None): """ Defines model-wide and episode-related summaries Returns: model_summary op episode_summary op """ if model_summaries is not None: if self.use_global_network: # Model-wide statistics: with tf.name_scope('model'): model_summaries += [ tf.summary.scalar("grad_global_norm", self.grads_global_norm), # TODO: add gradient variance summary #tf.summary.scalar("learn_rate", self.train_learn_rate), tf.summary.scalar("learn_rate", self.learn_rate_decayed), # cause actual rate is a jaggy due to test freezes tf.summary.scalar("total_loss", self.loss), # tf.summary.scalar('roll_reward', tf.reduce_mean(self.local_network.on_last_reward_in)), # tf.summary.scalar('roll_advantage', tf.reduce_mean(self.local_network.on_pi_adv_target)), ] if policy is not None: model_summaries += [ tf.summary.scalar("var_global_norm", tf.global_norm(policy.var_list))] else: model_summaries = [] # Model stat. summary: model_summary = tf.summary.merge(model_summaries, name='model_summary') # Episode-related summaries: ep_summary = dict( # Summary placeholders render_atari=tf.placeholder(tf.uint8, [None, None, None, 1]), total_r=tf.placeholder(tf.float32, ), cpu_time=tf.placeholder(tf.float32, ), final_value=tf.placeholder(tf.float32, ), steps=tf.placeholder(tf.int32, ), ) if self.test_mode: # For Atari: ep_summary['render_op'] = tf.summary.image("model/state", ep_summary['render_atari']) else: # BTGym rendering: ep_summary.update( { mode: tf.placeholder(tf.uint8, [None, None, None, None], name=mode + '_pl') for mode in self.env_list[0].render_modes + self.aux_render_modes } ) ep_summary['render_op'] = tf.summary.merge( [tf.summary.image(mode, ep_summary[mode]) for mode in self.env_list[0].render_modes + self.aux_render_modes] ) # Episode stat. summary: ep_summary['btgym_stat_op'] = tf.summary.merge( [ tf.summary.scalar('episode_train/total_reward', ep_summary['total_r']), tf.summary.scalar('episode_train/cpu_time_sec', ep_summary['cpu_time']), tf.summary.scalar('episode_train/final_value', ep_summary['final_value']), tf.summary.scalar('episode_train/env_steps', ep_summary['steps']) ], name='episode_train_btgym' ) # Test episode stat. summary: ep_summary['test_btgym_stat_op'] = tf.summary.merge( [ tf.summary.scalar('episode_test/total_reward', ep_summary['total_r']), tf.summary.scalar('episode_test/final_value', ep_summary['final_value']), tf.summary.scalar('episode_test/env_steps', ep_summary['steps']) ], name='episode_test_btgym' ) ep_summary['atari_stat_op'] = tf.summary.merge( [ tf.summary.scalar('episode/total_reward', ep_summary['total_r']), tf.summary.scalar('episode/steps', ep_summary['steps']) ], name='episode_atari' ) self.log.debug('model-wide and episode summaries ok.') return model_summary, ep_summary def _make_runners(self, policy): """ Defines thread-runners processes instances. Args: policy: policy for runner to execute Returns: list of runners """ # Replay memory_config: if self.use_memory: memory_config = dict( class_ref=Memory, kwargs=dict( history_size=self.replay_memory_size, max_sample_size=self.replay_rollout_length, priority_sample_size=self.rp_sequence_size, reward_threshold=self.rp_reward_threshold, use_priority_sampling=self.use_reward_prediction, task=self.task, log_level=self.log_level, ) ) else: memory_config = None # Make runners: # `rollout_length` represents the number of "local steps": the number of time steps # we run the policy before we get full rollout, run train step and update the parameters. runners = [] task = 0 # Runners will have [worker_task][env_count] id's for env in self.env_list: kwargs=dict( env=env, policy=policy, task=self.task + task, rollout_length=self.rollout_length, # ~20 episode_summary_freq=self.episode_summary_freq, env_render_freq=self.env_render_freq, test=self.test_mode, ep_summary=self.ep_summary, memory_config=memory_config, log_level=self.log_level, global_step_op=self.global_step, aux_render_modes=self.aux_render_modes ) kwargs.update(self.runner_config['kwargs']) # New runner instance: runners.append(self.runner_config['class_ref'](**kwargs)) task += 0.01 self.log.debug('runners ok.') return runners def _make_step_counters(self): """ Defines operations for global step and global episode; Returns: None, sets attrs. """ self.global_step = tf.get_variable( "global_step", [], tf.int32, initializer=tf.constant_initializer( 0, dtype=tf.int32 ), trainable=False ) tf.add_to_collection(tf.GraphKeys.GLOBAL_STEP, self.global_step) self.reset_global_step = self.global_step.assign(0) self.global_episode = tf.get_variable( "global_episode", [], tf.int32, initializer=tf.constant_initializer( 0, dtype=tf.int32 ), trainable=False ) # Increment episode count: self.inc_episode = self.global_episode.assign_add(1) def _make_policy(self, scope): """ Configures and instantiates policy network and ops. Note: `global` name_scope networks should be defined first. Args: scope: name scope Returns: policy instance """ with tf.variable_scope(scope): # Make policy instance: network = self.policy_class(**self.policy_kwargs) if 'global' not in scope: try: # For locals those should be already defined: assert hasattr(self, 'global_step') and \ hasattr(self, 'global_episode') and \ hasattr(self, 'inc_episode') # Add attrs to local: network.global_step = self.global_step network.global_episode = self.global_episode network.inc_episode= self.inc_episode # Override with aac method: network.get_sample_config = self.get_sample_config except AssertionError: self.log.exception( '`global` name_scope network[s] should be defined before any `local` one[s].'. format(self.task) ) raise RuntimeError else: # Set counters: self._make_step_counters() return network def _make_dummy_policy(self): class _Dummy(object): """ Policy plug when target network is not used. """ def __init__(self): self.on_state_in = None self.off_state_in = None self.on_lstm_state_pl_flatten = None self.off_lstm_state_pl_flatten = None self.on_a_r_in = None self.off_a_r_in = None self.on_logits = None self.off_logits = None self.on_vf = None self.off_vf = None self.on_batch_size = None self.on_time_length = None self.off_batch_size = None self.off_time_length = None return _Dummy()
[docs] def get_data(self, **kwargs): """ Collect rollouts from every environment. Returns: dictionary of lists of data streams collected from every runner """ data_streams = [get_it(**kwargs) for get_it in self.data_getter] return {key: [stream[key] for stream in data_streams] for key in data_streams[0].keys()}
[docs] def get_sample_config(self, _new_trial=True, **kwargs): """ WARNING: _new_trial=True is quick fix, TODO: fix it properly! Returns environment configuration parameters for next episode to sample. By default is simple stateful iterator, works correctly with `DTGymDataset` data class, repeating cycle: - sample `num_train_episodes` from train data, - sample `num_test_episodes` from test data. Convention: supposed to override dummy method of local policy instance, see inside ._make_policy() method Returns: configuration dictionary of type `btgym.datafeed.base.EnvResetConfig` """ # sess = tf.get_default_session() if self.current_train_episode < self.num_train_episodes: episode_type = 0 # train self.current_train_episode += 1 self.log.debug( 'c_1, c_train={}, c_test={}, type={}'. format(self.current_train_episode, self.current_test_episode, episode_type) ) else: if self.current_test_episode < self.num_test_episodes: episode_type = 1 # test self.current_test_episode += 1 self.log.debug( 'c_2, c_train={}, c_test={}, type={}'. format(self.current_train_episode, self.current_test_episode, episode_type) ) else: # cycle end, reset and start new (rec. depth 1) self.current_train_episode = 0 self.current_test_episode = 0 self.log.debug( 'c_3, c_train={}, c_test={}'. format(self.current_train_episode, self.current_test_episode) ) return self.get_sample_config(_new_trial=True) # Compose btgym.datafeed.base.EnvResetConfig-consistent dict: sample_config = dict( episode_config=dict( get_new=True, sample_type=episode_type, b_alpha=1.0, b_beta=1.0 ), trial_config=dict( get_new=_new_trial, sample_type=episode_type, b_alpha=1.0, b_beta=1.0 ) ) return sample_config
[docs] def start(self, sess, summary_writer, **kwargs): """ Executes all initializing operations, starts environment runner[s]. Supposed to be called by parent worker just before training loop starts. Args: sess: tf session object. kwargs: not used by default. """ try: # Copy weights from global to local: sess.run(self.sync) # Start thread_runners: self._start_runners(sess, summary_writer, **kwargs) except Exception as e: msg = 'start() exception occurred' + \ '\n\nPress `Ctrl-C` or jupyter:[Kernel]->[Interrupt] for clean exit.\n' self.log.exception(msg) raise e
def _start_runners(self, sess, summary_writer, **kwargs): """ Args: sess: summary_writer: Returns: """ for runner in self.runners: runner.start_runner(sess, summary_writer, **kwargs) # starting runner threads self.summary_writer = summary_writer def _get_rp_feeder(self, pi, batch): """ Returns feed dictionary for `reward prediction` loss estimation subgraph. Args: pi: policy to feed """ feeder = feed_dict_from_nested(pi.rp_state_in, batch['state']) feeder.update( { pi.rp_target: batch['rp_target'], pi.rp_batch_size: batch['batch_size'], } ) return feeder def _get_vr_feeder(self, pi, batch): """ Returns feed dictionary for `value replay` loss estimation subgraph. Args: pi: policy to feed """ if not self.use_off_policy_aac: # use single pass of network on same off-policy batch feeder = feed_dict_from_nested(pi.vr_state_in, batch['state']) feeder.update(feed_dict_rnn_context(pi.vr_lstm_state_pl_flatten, batch['context'])) feeder.update( { pi.vr_batch_size: batch['batch_size'], pi.vr_time_length: batch['time_steps'], pi.vr_last_a_in: batch['last_action'], pi.vr_last_reward_in: batch['last_reward'], pi.vr_target: batch['r'] } ) else: feeder = {pi.vr_target: batch['r']} # redundant actually :) return feeder def _get_pc_feeder(self, pi, batch): """ Returns feed dictionary for `pixel control` loss estimation subgraph. Args: pi: policy to feed """ if not self.use_off_policy_aac: # use single pass of network on same off-policy batch feeder = feed_dict_from_nested(pi.pc_state_in, batch['state']) feeder.update( feed_dict_rnn_context(pi.pc_lstm_state_pl_flatten, batch['context'])) feeder.update( { pi.pc_last_a_in: batch['last_action'], pi.pc_last_reward_in: batch['last_reward'], pi.pc_action: batch['action'], pi.pc_target: batch['pixel_change'] } ) else: feeder = {pi.pc_action: batch['action'], pi.pc_target: batch['pixel_change']} return feeder def _process_rollouts(self, rollouts): """ rollout.process wrapper: makes single batch from list of rollouts Args: rollouts: list of btgym.algorithms.Rollout class instances Returns: single batch data """ batch = batch_stack( [ r.process( gamma=self.model_gamma, gae_lambda=self.model_gae_lambda, size=self.rollout_length, time_flat=self.time_flat, ) for r in rollouts ] ) return batch def _get_main_feeder( self, sess, on_policy_batch=None, off_policy_batch=None, rp_batch=None, is_train=True, pi=None, pi_prime=None): """ Composes entire train step feed dictionary. Args: sess: tf session obj. pi: policy to feed pi_prime: optional policy to feed on_policy_batch: on-policy data batch off_policy_batch: off-policy (replay memory) data batch rp_batch: off-policy reward prediction data batch is_train (bool): is data provided are train or test Returns: feed_dict (dict): train step feed dictionary """ feed_dict = {} # Feeder for on-policy AAC loss estimation graph: if on_policy_batch is not None: feed_dict = feed_dict_from_nested(pi.on_state_in, on_policy_batch['state']) feed_dict.update( feed_dict_rnn_context(pi.on_lstm_state_pl_flatten, on_policy_batch['context']) ) feed_dict.update( { pi.on_last_a_in: on_policy_batch['last_action'], pi.on_last_reward_in: on_policy_batch['last_reward'], pi.on_batch_size: on_policy_batch['batch_size'], pi.on_time_length: on_policy_batch['time_steps'], pi.on_pi_act_target: on_policy_batch['action'], pi.on_pi_adv_target: on_policy_batch['advantage'], pi.on_pi_r_target: on_policy_batch['r'], pi.train_phase: is_train, # Zeroes learn rate, [+ batch_norm + dropout] } ) if self.use_target_policy and pi_prime is not None: feed_dict.update( feed_dict_from_nested(pi_prime.on_state_in, on_policy_batch['state']) ) feed_dict.update( feed_dict_rnn_context(pi_prime.on_lstm_state_pl_flatten, on_policy_batch['context']) ) feed_dict.update( { pi_prime.on_batch_size: on_policy_batch['batch_size'], pi_prime.on_time_length: on_policy_batch['time_steps'], pi_prime.on_last_a_in: on_policy_batch['last_action'], pi_prime.on_last_reward_in: on_policy_batch['last_reward'], # TODO: pi prime train phase? } ) if (self.use_any_aux_tasks or self.use_off_policy_aac) and off_policy_batch is not None: # Feeder for off-policy AAC loss estimation graph: off_policy_feed_dict = feed_dict_from_nested(pi.off_state_in, off_policy_batch['state']) off_policy_feed_dict.update( feed_dict_rnn_context(pi.off_lstm_state_pl_flatten, off_policy_batch['context'])) off_policy_feed_dict.update( { pi.off_last_a_in: off_policy_batch['last_action'], pi.off_last_reward_in: off_policy_batch['last_reward'], pi.off_batch_size: off_policy_batch['batch_size'], pi.off_time_length: off_policy_batch['time_steps'], pi.off_pi_act_target: off_policy_batch['action'], pi.off_pi_adv_target: off_policy_batch['advantage'], pi.off_pi_r_target: off_policy_batch['r'], } ) if self.use_target_policy and pi_prime is not None: off_policy_feed_dict.update( feed_dict_from_nested(pi_prime.off_state_in, off_policy_batch['state']) ) off_policy_feed_dict.update( { pi_prime.off_batch_size: off_policy_batch['batch_size'], pi_prime.off_time_length: off_policy_batch['time_steps'], pi_prime.off_last_a_in: off_policy_batch['last_action'], pi_prime.off_last_reward_in: off_policy_batch['last_reward'], } ) off_policy_feed_dict.update( feed_dict_rnn_context( pi_prime.off_lstm_state_pl_flatten, off_policy_batch['context'] ) ) feed_dict.update(off_policy_feed_dict) # Update with reward prediction subgraph: if self.use_reward_prediction and rp_batch is not None: # Rebalanced 50/50 sample for RP: feed_dict.update(self._get_rp_feeder(pi, rp_batch)) # Pixel control ... if self.use_pixel_control and off_policy_batch is not None: feed_dict.update(self._get_pc_feeder(pi, off_policy_batch)) # VR... if self.use_value_replay and off_policy_batch is not None: feed_dict.update(self._get_vr_feeder(pi, off_policy_batch)) return feed_dict
[docs] def process_data(self, sess, data, is_train, pi, pi_prime=None): """ Processes data, composes train step feed dictionary. Args: sess: tf session obj. pi: policy to feed pi_prime: optional policy to feed data (dict): data dictionary is_train (bool): is data provided are train or test Returns: feed_dict (dict): train step feed dictionary """ # Process minibatch for on-policy train step: on_policy_batch = self._process_rollouts(data['on_policy']) if self.use_memory: # Process rollouts from replay memory: off_policy_batch = self._process_rollouts(data['off_policy']) if self.use_reward_prediction: # Rebalanced 50/50 sample for RP: rp_rollouts = data['off_policy_rp'] rp_batch = batch_stack([rp.process_rp(self.rp_reward_threshold) for rp in rp_rollouts]) else: rp_batch = None else: off_policy_batch = None rp_batch = None return self._get_main_feeder(sess, on_policy_batch, off_policy_batch, rp_batch, is_train, pi, pi_prime)
[docs] def process_summary(self, sess, data, model_data=None, step=None, episode=None): """ Fetches and writes summary data from `data` and `model_data`. Args: sess: tf summary obj. data(dict): thread_runner rollouts and metadata model_data(dict): model summary data step: int, global step or None episode: int, global episode number or None """ if step is None: step = sess.run(self.global_step) if episode is None: episode = sess.run(self.global_episode) # Every worker writes train episode summaries: ep_summary_feeder = {} # Look for train episode summaries from all env runners: for stat in data['ep_summary']: if stat is not None: for key in stat.keys(): if key in ep_summary_feeder.keys(): ep_summary_feeder[key] += [stat[key]] else: ep_summary_feeder[key] = [stat[key]] # Average values among thread_runners, if any, and write episode summary: if ep_summary_feeder != {}: ep_summary_feed_dict = { self.ep_summary[key]: np.average(list) for key, list in ep_summary_feeder.items() } if self.test_mode: # Atari: fetched_episode_stat = sess.run(self.ep_summary['atari_stat_op'], ep_summary_feed_dict) else: # BTGym fetched_episode_stat = sess.run(self.ep_summary['btgym_stat_op'], ep_summary_feed_dict) self.summary_writer.add_summary(fetched_episode_stat, episode) self.summary_writer.flush() # Every worker writes test episode summaries: test_ep_summary_feeder = {} # Look for test episode summaries: for stat in data['test_ep_summary']: if stat is not None: for key in stat.keys(): if key in test_ep_summary_feeder.keys(): test_ep_summary_feeder[key] += [stat[key]] else: test_ep_summary_feeder[key] = [stat[key]] # Average values among thread_runners, if any, and write episode summary: if test_ep_summary_feeder != {}: test_ep_summary_feed_dict = { self.ep_summary[key]: np.average(list) for key, list in test_ep_summary_feeder.items() } fetched_test_episode_stat = sess.run(self.ep_summary['test_btgym_stat_op'], test_ep_summary_feed_dict) self.summary_writer.add_summary(fetched_test_episode_stat, episode) # Look for renderings (chief worker only, always 0-numbered environment in a list): if self.task == 0: if data['render_summary'][0] is not None: #self.log.warning('data[render_summary]: {}'.format(data['render_summary'])) #self.log.warning('self.ep_summary: {}'.format(self.ep_summary)) render_feed_dict = { self.ep_summary[key]: pic for key, pic in data['render_summary'][0].items() } renderings = sess.run(self.ep_summary['render_op'], render_feed_dict) self.summary_writer.add_summary(renderings, episode) self.summary_writer.flush() # Every worker writes train episode summaries: if model_data is not None: self.summary_writer.add_summary(tf.Summary.FromString(model_data), step) self.summary_writer.flush()
[docs] def process(self, sess, **kwargs): """ Main train step method wrapper. Override if needed. Args: sess (tensorflow.Session): tf session obj. kwargs: any """ # return self._process(sess) self._process(sess)
def _process(self, sess): """ Grabs an on_policy_rollout [and off_policy rollout[s] from replay memory] that's been produced by the thread runner. If data identified as 'train data' - computes gradients and updates the parameters; writes summaries if any. The update is then sent to the parameter server. If on_policy_rollout identified as 'test data' - no policy update is performed (learn rate is set to zero); Note that test data does not get stored in replay memory (thread runner area). Writes all available summaries. Args: sess (tensorflow.Session): tf session obj. """ # Quick wrap to get direct traceback from this trainer if something goes wrong: try: # Collect data from child thread runners: data = self.get_data() # Copy weights from local policy to local target policy: if self.use_target_policy and self.local_steps % self.pi_prime_update_period == 0: sess.run(self.sync_pi_prime) # Test or train: if at least one on-policy rollout from parallel runners is test one - # set learn rate to zero for entire minibatch. Doh. try: is_train = not np.asarray([env['state']['metadata']['type'] for env in data['on_policy']]).any() except KeyError: is_train = True self.log.debug( 'Got rollout episode. type: {}, trial_type: {}, is_train: {}'.format( np.asarray([env['state']['metadata']['type'] for env in data['on_policy']]).any(), np.asarray([env['state']['metadata']['trial_type'] for env in data['on_policy']]).any(), is_train ) ) if is_train: # If there is no any test rollouts - do a train step: sess.run(self.sync_pi) # only sync at train time feed_dict = self.process_data(sess, data, is_train, self.local_network, self.local_network_prime) # Say `No` to redundant summaries: wirte_model_summary =\ self.local_steps % self.model_summary_freq == 0 #fetches = [self.train_op, self.local_network.debug] # include policy debug shapes fetches = [self.train_op] if wirte_model_summary: fetches_last = fetches + [self.model_summary_op, self.inc_step] else: fetches_last = fetches + [self.inc_step] # Do a number of SGD train epochs: # When doing more than one epoch, we actually use only last summary: for i in range(self.num_epochs - 1): fetched = sess.run(fetches, feed_dict=feed_dict) fetched = sess.run(fetches_last, feed_dict=feed_dict) if wirte_model_summary: model_summary = fetched[-2] else: model_summary = None self.local_steps += 1 # only update on train steps else: model_summary = None # Write down summaries: self.process_summary(sess, data, model_summary) # print debug info: #for k, v in fetched[1].items(): # print('{}: {}'.format(k,v)) #print('\n') #for k, v in feed_dict.items(): # try: # print(k, v.shape) # except: # print(k, type(v)) # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # except: msg = 'process() exception occurred' + \ '\n\nPress `Ctrl-C` or jupyter:[Kernel]->[Interrupt] for clean exit.\n' self.log.exception(msg) raise RuntimeError(msg)
[docs]class Unreal(BaseAAC): """ Unreal: Asynchronous Advantage Actor Critic with auxiliary control tasks. Auxiliary tasks implementation borrows heavily from Kosuke Miyoshi code, under Apache License 2.0: https://miyosuda.github.io/ https://github.com/miyosuda/unreal Original A3C code comes from OpenAI repository under MIT licence: https://github.com/openai/universe-starter-agent Papers: https://arxiv.org/abs/1602.01783 https://arxiv.org/abs/1611.05397 """ def __init__(self, **kwargs): """ See BaseAAC class args for details: Args: env: environment instance or list of instances task: int, parent worker id policy_config: policy estimator class and configuration dictionary log_level: int, logbook.level on_policy_loss: callable returning tensor holding on_policy training loss graph and summaries off_policy_loss: callable returning tensor holding off_policy training loss graph and summaries vr_loss: callable returning tensor holding value replay loss graph and summaries rp_loss: callable returning tensor holding reward prediction loss graph and summaries pc_loss: callable returning tensor holding pixel_control loss graph and summaries random_seed: int or None model_gamma: scalar, gamma discount factor model_gae_lambda: scalar, GAE lambda model_beta: entropy regularization beta, scalar or [high_bound, low_bound] for log_uniform. opt_max_env_steps: int, total number of environment steps to run training on. opt_decay_steps: int, learn ratio decay steps, in number of environment steps. opt_end_learn_rate: scalar, final learn rate opt_learn_rate: start learn rate, scalar or [high_bound, low_bound] for log_uniform distr. opt_decay: scalar, optimizer decay, if apll. opt_momentum: scalar, optimizer momentum, if apll. opt_epsilon: scalar, optimizer epsilon rollout_length: int, on-policy rollout length time_flat: bool, flatten rnn time-steps in rollouts while training - see `Notes` below episode_train_test_cycle: tuple or list as (train_number, test_number), def=(1,0): enables infinite loop such as: run `train_number` of train data episodes, than `test_number` of test data episodes, repeat. Should be consistent with provided dataset parameters (test data should exist if `test_number > 0`) episode_summary_freq: int, write episode summary for every i'th episode env_render_freq: int, write environment rendering summary for every i'th train step model_summary_freq: int, write model summary for every i'th train step test_mode: bool, True: Atari, False: BTGym replay_memory_size: int, in number of experiences replay_batch_size: int, mini-batch size for off-policy training, def = 1 replay_rollout_length: int off-policy rollout length by def. equals on_policy_rollout_length use_off_policy_aac: bool, use full AAC off-policy loss instead of Value-replay use_reward_prediction: bool, use aux. off-policy reward prediction task use_pixel_control: bool, use aux. off-policy pixel control task use_value_replay: bool, use aux. off-policy value replay task (not used if use_off_policy_aac=True) rp_lambda: reward prediction loss weight, scalar or [high, low] for log_uniform distr. pc_lambda: pixel control loss weight, scalar or [high, low] for log_uniform distr. vr_lambda: value replay loss weight, scalar or [high, low] for log_uniform distr. off_aac_lambda: off-policy AAC loss weight, scalar or [high, low] for log_uniform distr. gamma_pc: NOT USED rp_reward_threshold: scalar, reward prediction classification threshold, above which reward is 'non-zero' rp_sequence_size: int, reward prediction sample size, in number of experiences clip_epsilon: scalar, PPO: surrogate L^clip epsilon num_epochs: int, num. of SGD runs for every train step, val. > 1 should be used with caution. pi_prime_update_period: int, PPO: pi to pi_old update period in number of train steps, def: 1 _use_target_policy: bool, PPO: use target policy (aka pi_old), delayed by `pi_prime_update_period` delay Note: - On `time_flat` arg: There are two alternatives to run RNN part of policy estimator: a. Feed initial RNN state for every experience frame in rollout (those are stored anyway if we want random memory repaly sampling) and do single time-step RNN advance for all experiences in a batch; this is when time_flat=True; b. Reshape incoming batch after convolution part of network in time-wise fashion for every rollout in a batch i.e. batch_size=number_of_rollouts and rnn_timesteps=max_rollout_length. In this case we need to feed initial rnn_states for rollouts only. There is some little extra work to pad rollouts to max_time_size and feed true rollout lengths to rnn. Thus, when time_flat=False, we unroll RNN in specified number of time-steps for every rollout. Both options has pros and cons: Unrolling dynamic RNN is computationally more expensive but gives clearly faster convergence, [possibly] due to the fact that RNN states for 2nd, 3rd, ... frames of rollouts are computed using updated policy estimator, which is supposed to be closer to optimal one. When time_flattened, every time-step uses RNN states computed when rollout was collected (i.e. by behavioral policy estimator with older parameters). Nevertheless, time_flatting can be interesting because one can safely shuffle training batch or mix on-policy and off-policy data in single mini-batch, ensuring iid property and allowing, say, proper batch normalisation (this has yet to be tested). """ try: super(Unreal, self).__init__(name='UNREAL', **kwargs) except: msg = 'Child class Unreal __init()__ exception occurred' + \ '\n\nPress `Ctrl-C` or jupyter:[Kernel]->[Interrupt] for clean exit.\n' self.log.exception(msg) raise RuntimeError(msg)
[docs]class A3C(BaseAAC): """ Vanilla Asynchronous Advantage Actor Critic algorithm. Based on original code taken from OpenAI repository under MIT licence: https://github.com/openai/universe-starter-agent Paper: https://arxiv.org/abs/1602.01783 """ def __init__(self, **kwargs): """ A3C args. is a subset of BaseAAC arguments, see `BaseAAC` class for descriptions. Args: env: task: policy_config: log: random_seed: model_gamma: model_gae_lambda: model_beta: opt_max_env_steps: opt_decay_steps: opt_end_learn_rate: opt_learn_rate: opt_decay: opt_momentum: opt_epsilon: rollout_length: episode_summary_freq: env_render_freq: model_summary_freq: test_mode: """ super(A3C, self).__init__( on_policy_loss=aac_loss_def, use_off_policy_aac=False, use_reward_prediction=False, use_pixel_control=False, use_value_replay=False, _use_target_policy=False, name='A3C', **kwargs )
[docs]class PPO(BaseAAC): """ AAC with Proximal Policy Optimization surrogate L^Clip loss, optionally augmented with auxiliary control tasks. paper: https://arxiv.org/pdf/1707.06347.pdf Based on PPO-SGD code from OpenAI `Baselines` repository under MIT licence: https://github.com/openai/baselines Async. framework code comes from OpenAI repository under MIT licence: https://github.com/openai/universe-starter-agent """ def __init__(self, **kwargs): """ PPO args. is a subset of BaseAAC arguments, see `BaseAAC` class for descriptions. Args: env: task: policy_config: log_level: vr_loss: rp_loss: pc_loss: random_seed: model_gamma: model_gae_lambda: model_beta: opt_max_env_steps: opt_decay_steps: opt_end_learn_rate: opt_learn_rate: opt_decay: opt_momentum: opt_epsilon: rollout_length: episode_summary_freq: env_render_freq: model_summary_freq: test_mode: replay_memory_size: replay_rollout_length: use_off_policy_aac: use_reward_prediction: use_pixel_control: use_value_replay: rp_lambda: pc_lambda: vr_lambda: off_aac_lambda: rp_reward_threshold: rp_sequence_size: clip_epsilon: num_epochs: pi_prime_update_period: """ super(PPO, self).__init__( on_policy_loss=ppo_loss_def, off_policy_loss=ppo_loss_def, _use_target_policy=True, name='PPO', **kwargs )