Source code for btgym.algorithms.runner.synchro

import numpy as np
from logbook import Logger, StreamHandler, WARNING
import sys
import time

from btgym.algorithms.rollout import Rollout
from btgym.algorithms.memory import _DummyMemory
from btgym.algorithms.math_utils import softmax
from btgym.algorithms.utils import is_subdict


[docs]class BaseSynchroRunner(): """ Experience provider class. Interacts with environment and outputs data in form of rollouts augmented with relevant summaries and metadata. This runner is `synchronous` in sense that data collection is `in-process' and every rollout is collected by explicit call to respective `get_data()` method [this is unlike 'async-` thread-runner version found earlier in this this package which, once being started, runs on its own and can not be moderated]. Makes precise control on policy being executed possible. Does not support 'atari' mode. """ def __init__( self, env, task, rollout_length, episode_summary_freq, env_render_freq, ep_summary, test=False, policy=None, data_sample_config=None, memory_config=None, test_conditions=None, test_deterministic=True, slowdown_steps=0, global_step_op=None, aux_render_modes=None, _implemented_aux_render_modes=None, name='synchro', log_level=WARNING, **kwargs ): """ Args: env: BTgym environment instance task: int, runner task id rollout_length: int episode_summary_freq: int env_render_freq: int test: legacy, not used ep_summary: legacy, not used policy: policy instance to execute data_sample_config: dict, data sampling configuration dictionary memory_config: dict, replay memory configuration test_conditions: dict or None, dictionary of single experience conditions to check to mark it as test one. test_deterministic: bool, if True - act deterministically for test episodes slowdown_time: time to sleep between steps aux_render_modes: iterable of str, additional summaries to compute _implemented_aux_render_modes iterable of str, implemented additional summaries name: str, name scope log_level: int, logbook.level """ self.env = env self.task = task self.name = name self.rollout_length = rollout_length self.episode_summary_freq = episode_summary_freq self.env_render_freq = env_render_freq self.memory_config = memory_config self.policy = policy self.data_sample_config = data_sample_config self.log_level = log_level StreamHandler(sys.stdout).push_application() self.log = Logger('{}_Runner_{}'.format(self.name, self.task), level=self.log_level) # Aux rendering setup: if _implemented_aux_render_modes is None: self.implemented_aux_render_modes = [] else: self.implemented_aux_render_modes = _implemented_aux_render_modes self.aux_render_modes = [] if aux_render_modes is not None: for mode in aux_render_modes: if mode in self.implemented_aux_render_modes: self.aux_render_modes.append(mode) else: msg = 'Render mode `{}` is not implemented.'.format(mode) self.log.error(msg) raise NotImplementedError(msg) self.log.debug('self.render modes: {}'.format(self.aux_render_modes)) self.sess = None self.summary_writer = None self.global_step_op = global_step_op if self.task == 0 and slowdown_steps > 0 and self.global_step_op is not None: self.log.notice('is slowed down by {} global_iterations/step'.format(slowdown_steps)) self.slowdown_steps = slowdown_steps else: self.slowdown_steps = 0 if test_conditions is None: # Default test conditions are: experience comes from test episode, from target domain: self.test_conditions = { 'state': { 'metadata': { 'type': 1, 'trial_type': 1 } } } else: self.test_conditions = test_conditions # Actions for tests: self.test_deterministic = test_deterministic # Make replay memory: if self.memory_config is not None: self.memory = self.memory_config['class_ref'](**self.memory_config['kwargs']) else: self.memory = _DummyMemory() self.length = 0 self.local_episode = 0 self.reward_sum = 0 self.terminal_end = True # Summary averages accumulators: self.total_r = [] self.cpu_time = [] self.final_value = [] self.total_steps = [] self.total_steps_atari = [] self.info = [None] self.pre_experience = None self.state = None self.context = None self.last_action = None self.last_reward = None # Episode accumulators: self.ep_accum = None self.log.debug('__init__() done.') def sleep(self): if self.slowdown_steps > 0: start_global_step = self.sess.run(self.global_step_op) while start_global_step + self.slowdown_steps > self.sess.run(self.global_step_op): time.sleep(0.05)
[docs] def start_runner(self, sess, summary_writer, **kwargs): """ Legacy wrapper. """ self.start(sess, summary_writer, **kwargs)
[docs] def start(self, sess, summary_writer, init_context=None, data_sample_config=None): """ Executes initial sequence; fills initial replay memory if any. """ assert self.policy is not None, 'Initial policy not specified' self.sess = sess self.summary_writer = summary_writer # # Hacky but we need env.renderer methods ready: NOT HERE, went to VerboseRunner # self.env.renderer.initialize_pyplot() self.pre_experience, self.state, self.context, self.last_action, self.last_reward = self.get_init_experience( policy=self.policy, init_context=init_context, data_sample_config=data_sample_config ) if self.memory_config is not None: while not self.memory.is_full(): # collect some rollouts to fill memory: _ = self.get_data() self.log.notice('Memory filled') self.log.notice('started collecting data.')
[docs] def get_init_experience(self, policy, policy_sync_op=None, init_context=None, data_sample_config=None): """ Starts new environment episode. Args: policy: policy to execute. policy_sync_op: operation copying local behavioural policy params from global one init_context: initial policy context for new episode. data_sample_config: configuration dictionary of type `btgym.datafeed.base.EnvResetConfig` Returns: incomplete initial experience of episode as dictionary (misses bootstrapped R value), next_state, next, policy RNN context action_reward """ self.length = 0 self.reward_sum = 0 # Increment global and local episode counters: self.sess.run(self.policy.inc_episode) self.local_episode += 1 # self.log.warning('get_init_exp() data_sample_config: {}'.format(data_sample_config)) if data_sample_config is None: data_sample_config = policy.get_sample_config() # Pass sample config to environment (.get_sample_config() is actually aac framework method): init_state = self.env.reset(**data_sample_config) # Infer train/test episode type from initial state: self.is_test = is_subdict(self.test_conditions, {'state': init_state}) # self.log.warning('init_experience.is_test: {}'.format(self.is_test)) # Master worker always resets context at the episode beginning: # TODO: ! # if not self.data_sample_config['mode']: init_context = None # self.log.warning('init_context_passed: {}'.format(init_context)) # self.log.warning('state_metadata: {}'.format(state['metadata'])) init_action = self.env.action_space.encode(self.env.get_initial_action()) init_reward = np.asarray(0.0) # Update policy: if policy_sync_op is not None: self.sess.run(policy_sync_op) init_context = policy.get_initial_features(state=init_state, context=init_context) action, logits, value, next_context = policy.act( init_state, init_context, init_action[None, ...], init_reward[None, ...], self.is_test and self.test_deterministic, # deterministic actions for test episode ) next_state, reward, terminal, self.info = self.env.step(action['environment']) experience = { 'position': {'episode': self.local_episode, 'step': self.length}, 'state': init_state, 'action': action['one_hot'], 'reward': reward, 'value': value, 'terminal': terminal, 'context': init_context, 'last_action': init_action, 'last_reward': init_reward, 'r': None, # to be updated 'info': self.info[-1], } # Execute user-defined callbacks to policy, if any: for key, callback in policy.callback.items(): experience[key] = callback(**locals()) # reset per-episode counters and accumulators: self.ep_accum = { 'logits': [logits], 'value': [value], 'context': [init_context] } self.terminal_end = terminal #self.log.warning('init_experience_context: {}'.format(context)) # Take a nap: # self.sleep() return experience, next_state, next_context, action['encoded'], reward
[docs] def get_experience(self, policy, state, context, action, reward, policy_sync_op=None): """ Get single experience (possibly terminal). Returns: incomplete experience as dictionary (misses bootstrapped R value), next_state, next, policy RNN context action_reward """ # Update policy if operation has been provided: if policy_sync_op is not None: self.sess.run(policy_sync_op) # self.log.debug('Policy sync. ok!') # Continue adding experiences to rollout: next_action, logits, value, next_context = policy.act( state, context, action[None, ...], reward[None, ...], self.is_test and self.test_deterministic, # deterministic actions for test episode ) self.ep_accum['logits'].append(logits) self.ep_accum['value'].append(value) self.ep_accum['context'].append(next_context) # self.log.notice('context: {}'.format(context)) next_state, next_reward, terminal, self.info = self.env.step(next_action['environment']) # Partially compose experience: experience = { 'position': {'episode': self.local_episode, 'step': self.length}, 'state': state, 'action': next_action['one_hot'], 'reward': next_reward, 'value': value, 'terminal': terminal, 'context': context, 'last_action': action, 'last_reward': reward, 'r': None, 'info': self.info[-1], } for key, callback in policy.callback.items(): experience[key] = callback(**locals()) # Housekeeping: self.length += 1 # Take a nap: # self.sleep() return experience, next_state, next_context, next_action['encoded'], next_reward
[docs] def get_train_stat(self, is_test=False): """ Updates and computes average statistics for train episodes. Args: is_test: bool, current episode type Returns: dict of stats """ ep_stat = {} if not is_test: self.total_r += [self.reward_sum] episode_stat = self.env.get_stat() # get episode statistic last_i = self.info[-1] # pull most recent info self.cpu_time += [episode_stat['runtime'].total_seconds()] self.final_value += [last_i['broker_value']] self.total_steps += [episode_stat['length']] if self.local_episode % self.episode_summary_freq == 0: ep_stat = dict( total_r=np.average(self.total_r), cpu_time=np.average(self.cpu_time), final_value=np.average(self.final_value), steps=np.average(self.total_steps) ) self.total_r = [] self.cpu_time = [] self.final_value = [] self.total_steps = [] self.total_steps_atari = [] return ep_stat
[docs] def get_test_stat(self, is_test=False): """ Updates and computes statistics for single test episode. Args: is_test: bool, current episode type Returns: dict of stats """ ep_stat = {} if is_test: episode_stat = self.env.get_stat() # get episode statistic last_i = self.info[-1] # pull most recent info ep_stat = dict( total_r=self.reward_sum, final_value=last_i['broker_value'], steps=episode_stat['length'] ) return ep_stat
[docs] def get_ep_render(self, is_test=False): """ Collects environment renderings. Relies on environment renderer class methods, so it is only valid when environment rendering is enabled (typically it is true for master runner). Returns: dictionary of images as rgb arrays """ # Only render chief worker and test (slave) environment: # if self.task < 1 and ( # is_test or( # self.local_episode % self.env_render_freq == 0 and not self.data_sample_config['mode'] # ) # ): if self.task < 1 and self.local_episode % self.env_render_freq == 0: # Render environment (chief worker only): render_stat = { mode: self.env.render(mode)[None, :] for mode in self.env.render_modes } else: render_stat = None return render_stat
[docs] def get_data( self, policy=None, policy_sync_op=None, init_context=None, data_sample_config=None, rollout_length=None, force_new_episode=False ): """ Collects single trajectory rollout and bunch of summaries using specified policy. Updates episode statistics and replay memory. Args: policy: policy to execute policy_sync_op: operation copying local behavioural policy params from global one init_context: if specified, overrides initial episode context provided bu self.context (valid only if new episode is started within this rollout). data_sample_config: environment configuration parameters for next episode to sample: configuration dictionary of type `btgym.datafeed.base.EnvResetConfig rollout_length: length of rollout to collect, if specified - overrides self.rollout_length attr force_new_episode: bool, if True - resets the environment Returns: data dictionary """ if policy is None: policy = self.policy if init_context is None: init_context = self.context if rollout_length is None: rollout_length = self.rollout_length rollout = Rollout() train_ep_summary = None test_ep_summary = None render_ep_summary = None if self.terminal_end or force_new_episode: # Start new episode: self.pre_experience, self.state, self.context, self.last_action, self.last_reward = self.get_init_experience( policy=policy, policy_sync_op=policy_sync_op, init_context=init_context, data_sample_config=data_sample_config ) # self.log.warning( # 'started new episode with:\ndata_sample_config: {}\nforce_new_episode: {}'. # format(data_sample_config, force_new_episode) # ) # self.log.warning('pre_experience_metadata: {}'.format(self.pre_experience['state']['metadata'])) # NOTE: self.terminal_end is set actual via get_init_experience() method # Collect single rollout: while rollout.size < rollout_length - 1 and not self.terminal_end: if self.pre_experience['terminal']: # Episode has been just finished, # need to complete and push last experience and update all episode summaries self.pre_experience['r'] = np.asarray([0.0]) experience = None self.state = None self.context = None self.last_action = None self.last_reward = None self.terminal_end = True train_ep_summary = self.get_train_stat(self.is_test) test_ep_summary = self.get_test_stat(self.is_test) render_ep_summary = self.get_ep_render(self.is_test) else: experience, self.state, self.context, self.last_action, self.last_reward = self.get_experience( policy=policy, policy_sync_op=policy_sync_op, state=self.state, context=self.context, action=self.last_action, reward=self.last_reward ) # Complete previous experience by bootstrapping V from next one: self.pre_experience['r'] = experience['value'] # Push: rollout.add(self.pre_experience) # Where are you coming from? # self.is_test is updated by self.get_init_experience() # Only training rollouts are added to replay memory: if not self.is_test: self.memory.add(self.pre_experience) self.reward_sum += self.pre_experience['reward'] # Move one step froward: self.pre_experience = experience # Done collecting rollout, either got termination of episode or not: if not self.terminal_end: # Bootstrap: self.pre_experience['r'] = np.asarray( [ policy.get_value( self.pre_experience['state'], self.pre_experience['context'], self.pre_experience['last_action'][None, ...], self.pre_experience['last_reward'][None, ...], ) ] ) rollout.add(self.pre_experience) if not self.is_test: self.memory.add(self.pre_experience) # self.log.warning('rollout.terminal: {}'.format(self.terminal_end)) # self.log.warning('rollout.size: {}'.format(rollout.size)) # self.log.warning('rollout.is_test: {}'.format(self.is_test)) data = dict( on_policy=rollout, terminal=self.terminal_end, off_policy=self.memory.sample_uniform(sequence_size=rollout_length), off_policy_rp=self.memory.sample_priority(exact_size=True), ep_summary=train_ep_summary, test_ep_summary=test_ep_summary, render_summary=render_ep_summary, is_test=self.is_test, ) return data
[docs] def get_batch( self, size, policy=None, policy_sync_op=None, require_terminal=True, same_trial=True, init_context=None, data_sample_config=None ): """ Returns batch as list of 'size' or more rollouts collected under specified policy. Rollouts can be collected from several episodes consequently; there is may be more rollouts than set 'size' if it is necessary to collect at least one terminal rollout. Args: size: int, number of rollouts to collect policy: policy to use policy_sync_op: operation copying local behavioural policy params from global one require_terminal: bool, if True - require at least one terminal rollout to be present. same_trial: bool, if True - all episodes are sampled from same trial init_context: if specified, overrides initial episode context provided bu self.context data_sample_config: environment configuration parameters for all episodes in batch: configuration dictionary of type `btgym.datafeed.base.EnvResetConfig Returns: dict containing: 'data'key holding list of data dictionaries; 'terminal_context' key holding list of terminal output contexts. If 'require_terminal = True, this list is guarantied to hold at least one element. """ batch = [] terminal_context = [] if require_terminal: got_terminal = False else: got_terminal = True if same_trial: assert isinstance(data_sample_config, dict),\ 'get_batch(same_trial=True) expected `data_sample_config` dict., got: {}'.format(data_sample_config) # Collect first rollout: batch = [ self.get_data( policy=policy, policy_sync_op=policy_sync_op, init_context=init_context, data_sample_config=data_sample_config, force_new_episode=True ) ] if same_trial: # sample new episodes from same trial only: data_sample_config['trial_config']['get_new'] = False collected_size = 1 if batch[0]['terminal']: terminal_context.append(self.context) got_terminal = True # Collect others: while not (collected_size >= size and got_terminal): rollout_data = self.get_data( policy=policy, policy_sync_op=policy_sync_op, init_context=init_context, data_sample_config=data_sample_config ) batch.append(rollout_data) if rollout_data['terminal']: terminal_context.append(self.context) got_terminal = True collected_size += 1 data = dict( data=batch, terminal_context=terminal_context, ) return data
[docs]class VerboseSynchroRunner(BaseSynchroRunner): """ Extends `BaseSynchroRunner` class with additional visualisation summaries in some expense of running speed. """ def __init__( self, name='verbose_synchro', aux_render_modes=('action_prob', 'value_fn', 'lstm_1_h', 'lstm_2_h'), **kwargs ): super(VerboseSynchroRunner, self).__init__( name=name, aux_render_modes=aux_render_modes, _implemented_aux_render_modes=('action_prob', 'value_fn', 'lstm_1_h', 'lstm_2_h'), **kwargs ) self.norm_image = lambda x: np.round((x - x.min()) / np.ptp(x) * 255)
[docs] def get_ep_render(self, is_test=False): """ Collects episode, environment and policy visualisations. Relies on environment renderer class methods, so it is only valid when environment rendering is enabled (typically it is true for master runner). Returns: dictionary of images as rgb arrays """ # Only render chief worker and test (slave) environment: # if self.task < 1 and ( # is_test or( # self.local_episode % self.env_render_freq == 0 and not self.data_sample_config['mode'] # ) # ): if self.task < 1 and self.local_episode % self.env_render_freq == 0: # Render environment (chief worker only): render_stat = { mode: self.env.render(mode)[None, :] for mode in self.env.render_modes } # Update renderings with aux: # ep_a_logits = self.ep_accum['logits'] # ep_value = self.ep_accum['value'] # self.log.notice('ep_logits shape: {}'.format(np.asarray(ep_a_logits).shape)) # self.log.notice('ep_value shape: {}'.format(np.asarray(ep_value).shape)) # Unpack LSTM states: rnn_1, rnn_2 = zip(*self.ep_accum['context']) rnn_1 = [state[0] for state in rnn_1] rnn_2 = [state[0] for state in rnn_2] c1, h1 = zip(*rnn_1) c2, h2 = zip(*rnn_2) # Render everything implemented (doh!): implemented_aux_images = { 'action_prob': self.env.renderer.draw_plot( # data=softmax(np.asarray(ep_a_logits)[:, 0, :] - np.asarray(ep_a_logits).max()), data=softmax(np.asarray(self.ep_accum['logits'])), #[:, 0, :]), title='Episode actions probabilities', figsize=(12, 4), box_text='', xlabel='Backward env. steps', ylabel='R+', line_labels=['Hold', 'Buy', 'Sell', 'Close'] )[None, ...], 'value_fn': self.env.renderer.draw_plot( data=np.asarray(self.ep_accum['value']), title='Episode Value function', figsize=(12, 4), xlabel='Backward env. steps', ylabel='R', line_labels=['Value'] )[None, ...], # 'lstm_1_c': norm_image(np.asarray(c1).T[None, :, 0, :, None]), 'lstm_1_h': self.norm_image(np.asarray(h1).T[None, :, 0, :, None]), # 'lstm_2_c': norm_image(np.asarray(c2).T[None, :, 0, :, None]), 'lstm_2_h': self.norm_image(np.asarray(h2).T[None, :, 0, :, None]) } # Pick what has been set: aux_images = {summary: implemented_aux_images[summary] for summary in self.aux_render_modes} render_stat.update(aux_images) else: render_stat = None return render_stat
[docs] def start(self, sess, summary_writer, init_context=None, data_sample_config=None): """ Executes initial sequence; fills initial replay memory if any. Extra: initialises environment renderer to get aux. images. """ assert self.policy is not None, 'Initial policy not specified' self.sess = sess self.summary_writer = summary_writer # Hacky but we need env.renderer methods ready: self.env.renderer.initialize_pyplot() self.pre_experience, self.state, self.context, self.last_action, self.last_reward = self.get_init_experience( policy=self.policy, init_context=init_context, data_sample_config=data_sample_config ) if self.memory_config is not None: while not self.memory.is_full(): # collect some rollouts to fill memory: _ = self.get_data() self.log.notice('Memory filled') self.log.notice('started collecting data.')