Source code for btgym.spaces

###############################################################################
#
# Copyright (C) 2017 Andrew Muzikin, muzikinae@gmail.com
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
###############################################################################

from gym import Space
from gym import spaces

from collections import OrderedDict
from itertools import product
from math import log2, ceil

from numpy import asarray, squeeze, zeros


[docs]class DictSpace(spaces.Dict): """ Wrapper for gym.spaces.Dict class. Adds support for .shape attribute. Defines space as nested dictionary of simpler gym spaces. """ def __init__(self, spaces_dict): """ Args: spaces_dict: [nested] dictionary of core Gym spaces. """ super(DictSpace, self).__init__(spaces_dict) self.shape = self._get_shape() def _get_shape(self): return OrderedDict([(k, space.shape) for k, space in self.spaces.items()])
[docs]class ActionDictSpace(DictSpace): """ Extension of OpenAI Gym DictSpace providing additional domain-specific functionality. Action space for btgym environments as shallow dictionary of discrete or continuous spaces. Defines several handy attributes and encoding conversion methods. """ def __init__(self, assets, base_actions=None,): """ Args: base_actions: None or iterable of base asset discrete actions; if no actions provided - continuous 1D base action space is set in [0,1] interval. assets: iterable of assets names """ assert not isinstance(assets, str),\ 'ActionDictSpace: expected `assets` be iterable, got <{}> type <{}>'.format(assets, type(assets)) self.assets = tuple(sorted(assets)) if base_actions is not None: # Discrete base actions provided, will use binary encoding for encode/decode methods self.base_actions = tuple(base_actions) self.base_actions_lookup_table = dict(list(enumerate(self.base_actions))) self.base_space = spaces.Discrete self.is_discrete = True self.tensor_shape = (len(self.assets), len(self.base_actions)) self.lookup_table = self._make_lookup_table( base_actions=list(self.base_actions_lookup_table.keys()), num_assets=len(self.assets) ) # Infer binary code length (depth): self.cardinality = len(list(self.lookup_table.keys())) self.encoded_depth = ceil(log2(self.cardinality)) self.one_hot_depth = self.cardinality spaces_dict = {key: spaces.Discrete(self.tensor_shape[-1]) for key in self.assets} self.encode_method = self._action_to_binary self.decode_method = self._binary_to_action self.one_hot_encode_method = self._to_one_hot self.one_hot_decode_method = None else: # Using continuous base actions, # encoding will be simply making 1D array out of shallow dictionary and back: self.base_actions = None self.base_actions_lookup_table = None self.base_space = spaces.Box self.is_discrete = False self.tensor_shape = (len(self.assets), 1) self.lookup_table = None self.cardinality = None # ~inf. self.encoded_depth = self.tensor_shape[0] self.one_hot_depth = self.tensor_shape[0] spaces_dict = { key: spaces.Box(low=0, high=1, shape=(self.tensor_shape[-1],), dtype='float32') for key in self.assets } # For continuous space encoding is simple: self.encode_method = self._action_to_vec self.decode_method = self._vec_to_action self.one_hot_encode_method = self._action_to_vec self.one_hot_decode_method = None super(ActionDictSpace, self).__init__(spaces_dict)
[docs] def get_initial_action(self): """ Returns: 'do nothing' action as OrderedDict (for discrete spaces) 'put all in cash' action as OrderedDict (for continuous actions) """ raise NotImplementedError
[docs] def encode(self, action): """ Given action returns it's encoding. Encoding method depends on type of base actions: - if base actions defined are discrete (gym.spaces.Discrete), binary encoding is used; - if base actions defined are continuous(gym.spaces.Box), encoding is translating shallow dictionary to vector of same values and back Args: action: action from this space (shallow dictionary) Returns: 1D array of floats in [0, 1] """ return self.encode_method(action)
[docs] def decode(self, code): """ Given code returns action. Encoding method depends on type of base actions: - if base actions defined are discrete (gym.spaces.Discrete), binary encoding is used; - if base actions defined are continuous(gym.spaces.Box), encoding is translating shallow dictionary to vector of same values and back Args: code: 1D array of floats in [0, 1] Returns: action from this space (shallow dictionary) """ return self.decode_method(code)
[docs] def one_hot_encode(self, action): """ Given action returns it's encoding. Encoding method depends on type of base actions: - if base actions defined are discrete (gym.spaces.Discrete), one_hot encoding is used; - if base actions defined are continuous(gym.spaces.Box), encoding is translating shallow dictionary to vector of same values and back Args: action: action from this space (shallow dictionary) Returns: 1D array of floats in [0, 1] """ return self.one_hot_encode_method(action)
def one_hot_decode(self, code): raise NotImplementedError def _to_one_hot(self, action): cat = self._vec_to_cat(self._action_to_vec(action)) one_hot = zeros(self.one_hot_depth) one_hot[cat] = 1 return squeeze(one_hot) def _vec_to_one_hot(self, vec): if self.cardinality is None: return vec else: one_hot = zeros(self.one_hot_depth) one_hot[self._vec_to_cat(vec)] = 1 return squeeze(one_hot) @staticmethod def _make_lookup_table(base_actions, num_assets): """ Creates lookup table for set of environment actions for K assets and N base actions as a cartesian product of K sets of N elements each. Args: base_actions: iterable of base asset actions num_assets: int, number of assets Returns: lookup table as dictionary form {num_0: env_action_0, ...} """ return dict(list(enumerate(product(list(base_actions), repeat=num_assets)))) def _action_to_binary(self, action): """ Given action returns it binary encoding Args: action: action from this space (shallow dictionary) Returns: 1D numpy array of floats in [0, 1] """ cat = self._vec_to_cat(self._action_to_vec(action)) bit_string = format(cat, 'b').zfill(self.encoded_depth) bit_array = asarray(list(bit_string), dtype='float') return bit_array def _binary_to_action(self, binary_code): """ Given binary action encoding, returns action Args: binary_code: 1D array of ints or floats in [0, 1] Returns: action from action space """ assert len(binary_code.shape) <= 1, \ 'Only 1D code vectors are supported, got array of shape: {}'.format(binary_code.shape) bit_string = '' for bit in list(binary_code.astype(int)): bit_string += str(bit) cat = int(bit_string, 2) return self._vec_to_action(self._cat_to_vec(cat)) def _action_to_vec(self, action): """ Given action returns its vector encoding. Args: action: action from this space (shallow dictionary) Returns: numpy array """ assert self.contains(action), 'Action {} does not belongs to this space'.format(action) if self.is_discrete: return asarray([action[key] for key in self.assets]) else: return asarray([action[key] for key in self.assets])[..., 0] def _vec_to_action(self, vector): """ Given vector encoding of an action returns action from this space. Args: vector: iterable of scalars Returns: action as shallow dictionary of scalars """ assert len(vector) == len(self.assets), \ 'Length of encoding and number of assets should match, got: {} / {}'.format(len(vector), len(self.assets)) if self.cardinality is None: action = OrderedDict([(asset, asarray([value])) for asset, value in zip(self.assets, vector)]) else: action = OrderedDict([(asset, value) for asset, value in zip(self.assets, vector)]) assert self.contains(action), 'Vector {} can not be converted to action of this space'.format(vector) return action def _vec_to_cat(self, action): """ Given action vector returns it's position (categorical encoding). Valid for dictionary of discrete base spaces only. Args: action: environment action as tuple, list or array of base asset cations Returns: int, position in lookup table Raises: ValueError, if no matches found """ assert self.lookup_table is not None, 'Lookup table not defined for base {}'.format(self.base_space) for key, value in self.lookup_table.items(): if list(value) == list(action): return key raise ValueError('Action vector {} is not in lookup table of this space.'.format(action)) def _cat_to_vec(self, category): """ Given integer as categorical encoding returns corresponding env. action vector. Valid for dictionary of discrete base spaces only. Args: category: int, encoding table: lookup table Returns: environment action as numpy array of base asset actions Raises: ValueError, if no matches found """ assert self.lookup_table is not None, 'Lookup table not defined for base {}'.format(self.base_space) try: return asarray(self.lookup_table[category]) except KeyError: raise ValueError('Category {} does not match action space.'.format(category))
class __DictSpace(Space): """ DEPRECATED Defines space as nested dictionary of simpler gym spaces. """ def __init__(self, spaces_dict): """ Args: spaces_dict: [nested] dictionary of core Gym spaces. """ self._nested_map(self._make_assert_gym_space(), spaces_dict) self.spaces = spaces_dict self.shape = self._nested_shape() @staticmethod def _gym_spaces(): attr_names = [attr for attr in dir(spaces) if attr[0].isupper()] return tuple([getattr(spaces, name) for name in attr_names]) @staticmethod def _contains(space, sample): return space.contains(sample) @staticmethod def _shape(space, *args): return space.shape @staticmethod def _sample(space, *args): return space.sample() def _make_assert_gym_space(self): gym_spaces = self._gym_spaces() def assert_gym_space(space, *args): try: assert isinstance(space, gym_spaces) except: raise AssertionError('Space {} is not valid Gym space'.format(type(space))) return assert_gym_space def _nested_contains(self, x): try: self._assert_structure(self.spaces, x) return self._nested_map(self._contains, self.spaces, x) except: return False def _nested_shape(self): return self._nested_map(self._shape, self.spaces) def _nested_sample(self): return self._nested_map(self._sample, self.spaces) def _assert_structure(self, s1, s2): if isinstance(s1, dict) or isinstance(s2, dict): try: assert isinstance(s1, dict) and isinstance(s2, dict) except: raise AssertionError('Args are not of the same structure. Got arg1: {}, arg2: {}'. format(type(s1), type(s2))) keys1 = set(s1.keys()) keys2 = set(s2.keys()) for key in keys1 | keys2: try: assert key in keys1 except: raise AssertionError('Key <{}> not present in arg1'.format(key)) try: assert key in keys2 except: raise AssertionError('Key <{}> not present in arg2'.format(key)) self._assert_structure(s1[key], s2[key]) def _nested_map(self, func, struct, *arg): if not callable(func): raise TypeError('`func` arg. must be callable.') if len(arg) == 0: struct2 = struct else: struct2 = arg[0] if isinstance(struct, dict): mapped = {key: self._nested_map(func, struct[key], struct2[key]) for key in struct.keys()} else: mapped = func(struct, struct2) return mapped def sample(self): """ Uniformly randomly sample a random element of this space. Returns: dictionary of samples """ return self._nested_sample() def contains(self, x): """ Return boolean specifying if x is a valid member of this space """ return self._nested_contains(x) def to_jsonable(self, sample_n): """Convert a batch of samples from this space to a JSONable data type.""" # By default, assume identity is JSONable #return sample_n raise NotImplementedError def from_jsonable(self, sample_n): """Convert a JSONable data type to a batch of samples from this space.""" # By default, assume identity is JSONable #return sample_n raise NotImplementedError