Source code for btgym.research.casual_conv.strategy

import numpy as np

from gym import spaces
from btgym import DictSpace
import backtrader.indicators as btind
from backtrader import Indicator

from btgym.strategy.utils import tanh, exp_scale

from btgym.research.gps.strategy import GuidedStrategy_0_0
from btgym.research.strategy_gen_4 import DevStrat_4_12


[docs]class CasualConvStrategy(DevStrat_4_12): # class CasualConvStrategy(GuidedStrategy_0_0): """ Provides stream of data for casual convolutional encoder """ # Time embedding period: time_dim = 128 # NOTE: changed this --> change Policy UNREAL for aux. pix control task upsampling params # Hyperparameters for estimating signal features: # features_parameters = [8, 32, 64] features_parameters = [8, 32, 128, 512] num_features = len(features_parameters) # Number of environment steps to skip before returning next response, # e.g. if set to 10 -- agent will interact with environment every 10th step; # every other step agent action is assumed to be 'hold': skip_frame = 10 # Number of timesteps reward estimation statistics are averaged over, should be: # skip_frame_period <= avg_period <= time_embedding_period: avg_period = 20 # Possible agent actions: portfolio_actions = ('hold', 'buy', 'sell', 'close') gamma = 0.99 # fi_gamma, should be MDP gamma decay reward_scale = 1 # reward multiplicator state_ext_scale = np.linspace(4e3, 1e3, num=num_features) params = dict( # Note: fake `Width` dimension to use 2d conv etc.: state_shape= { 'external': spaces.Box(low=-100, high=100, shape=(time_dim, 1, num_features), dtype=np.float32), 'internal': spaces.Box(low=-2, high=2, shape=(avg_period, 1, 5), dtype=np.float32), 'datetime': spaces.Box(low=0, high=1, shape=(1, 5), dtype=np.float32), # 'expert': spaces.Box(low=0, high=10, shape=(len(portfolio_actions),), dtype=np.float32), # TODO: change inheritance! 'metadata': DictSpace( { 'type': spaces.Box( shape=(), low=0, high=1, dtype=np.uint32 ), 'trial_num': spaces.Box( shape=(), low=0, high=10 ** 10, dtype=np.uint32 ), 'trial_type': spaces.Box( shape=(), low=0, high=1, dtype=np.uint32 ), 'sample_num': spaces.Box( shape=(), low=0, high=10 ** 10, dtype=np.uint32 ), 'first_row': spaces.Box( shape=(), low=0, high=10 ** 10, dtype=np.uint32 ), 'timestamp': spaces.Box( shape=(), low=0, high=np.finfo(np.float64).max, dtype=np.float64 ), } ) }, cash_name='default_cash', asset_names=['default_asset'], start_cash=None, commission=None, leverage=1.0, drawdown_call=5, target_call=19, portfolio_actions=portfolio_actions, skip_frame=skip_frame, state_ext_scale=state_ext_scale, # EURUSD state_int_scale=1.0, gamma=gamma, reward_scale=1.0, metadata={}, ) def set_datalines(self): self.data.features = [ btind.SimpleMovingAverage(self.datas[0], period=period) for period in self.features_parameters ] self.data.dim_sma = btind.SimpleMovingAverage( self.datas[0], period=(np.asarray(self.features_parameters).max() + self.time_dim) ) self.data.dim_sma.plotinfo.plot = False
[docs]class MaxPool(Indicator): """ Custom period `sliding candle` upper bound. """ lines = ('max',) params = (('period', 1),) plotinfo = dict( subplot=False, plotlinevalues=False, ) def __init__(self): self.addminperiod(self.params.period) def next(self): self.lines.max[0] = np.frombuffer(self.data.high.get(size=self.p.period)).max()
[docs]class MinPool(Indicator): """ Custom period `sliding candle` lower bound. """ lines = ('min',) params = (('period', 1),) plotinfo = dict( subplot=False, plotlinevalues=False, ) def __init__(self): self.addminperiod(self.params.period) def next(self): self.lines.min[0] = np.frombuffer(self.data.low.get(size=self.p.period)).min()
[docs]class CasualConvStrategy_0(CasualConvStrategy): """ Casual convolutional encoder + `sliding candle` price data features instead of SMA. """ # Time embedding period: # time_dim = 512 # NOTE: changed this --> change Policy UNREAL for aux. pix control task upsampling params time_dim = 128 # time_dim = 32 # Periods for estimating signal features, # note: here number of feature channels is doubled due to fact Hi/Low values computed for each period specified: # features_parameters = [8, 32, 128, 512] # features_parameters = [2, 8, 32, 64, 128] features_parameters = [8, 16, 32, 64, 128, 256] num_features = len(features_parameters) # Number of environment steps to skip before returning next response, # e.g. if set to 10 -- agent will interact with environment every 10th step; # every other step agent action is assumed to be 'hold': skip_frame = 10 # Number of timesteps reward estimation statistics are collected over, should be: # skip_frame_period <= avg_period <= time_embedding_period: avg_period = 20 # Possible agent actions: portfolio_actions = ('hold', 'buy', 'sell', 'close') gamma = 0.99 # fi_gamma, should be MDP gamma decay reward_scale = 1 # reward multiplicator state_ext_scale = np.linspace(2e3, 1e3, num=num_features) params = dict( # Note: fake `Width` dimension to stay in convention with 2d conv. dims: state_shape= { 'external': spaces.Box(low=-100, high=100, shape=(time_dim, 1, num_features * 2), dtype=np.float32), 'internal': spaces.Box(low=-2, high=2, shape=(avg_period, 1, 5), dtype=np.float32), 'datetime': spaces.Box(low=0, high=1, shape=(1, 5), dtype=np.float32), # 'expert': spaces.Box(low=0, high=10, shape=(len(portfolio_actions),), dtype=np.float32), # TODO: change inheritance! 'metadata': DictSpace( { 'type': spaces.Box( shape=(), low=0, high=1, dtype=np.uint32 ), 'trial_num': spaces.Box( shape=(), low=0, high=10 ** 10, dtype=np.uint32 ), 'trial_type': spaces.Box( shape=(), low=0, high=1, dtype=np.uint32 ), 'sample_num': spaces.Box( shape=(), low=0, high=10 ** 10, dtype=np.uint32 ), 'first_row': spaces.Box( shape=(), low=0, high=10 ** 10, dtype=np.uint32 ), 'timestamp': spaces.Box( shape=(), low=0, high=np.finfo(np.float64).max, dtype=np.float64 ), } ) }, cash_name='default_cash', asset_names=['default_asset'], start_cash=None, commission=None, leverage=1.0, drawdown_call=5, target_call=19, portfolio_actions=portfolio_actions, initial_action=None, initial_portfolio_action=None, skip_frame=skip_frame, state_ext_scale=state_ext_scale, # EURUSD state_int_scale=1.0, gamma=gamma, reward_scale=1.0, metadata={}, ) def set_datalines(self): features_low = [MinPool(self.data, period=period) for period in self.features_parameters] features_high = [MaxPool(self.data, period=period) for period in self.features_parameters] # If `scale` was scalar - make it vector: if len(np.asarray(self.p.state_ext_scale).shape) < 1: self.p.state_ext_scale = np.repeat(np.asarray(self.p.state_ext_scale), self.num_features) # Sort features by `period` for .get_external_state() to estimate # more or less sensible gradient; double-stretch scale vector accordingly: # TODO: maybe 2 separate conv. encoders for hi/low? self.data.features = [] for f1, f2 in zip(features_low, features_high): self.data.features += [f1, f2] self.p.state_ext_scale = np.repeat(self.p.state_ext_scale, 2) # print('p.state_ext_scale: ', self.p.state_ext_scale, self.p.state_ext_scale.shape) self.data.dim_sma = btind.SimpleMovingAverage( self.datas[0], period=(np.asarray(self.features_parameters).max() + self.time_dim) ) self.data.dim_sma.plotinfo.plot = False
import scipy.signal as signal from scipy.stats import zscore
[docs]class CasualConvStrategy_1(CasualConvStrategy_0): """ CWT. again. """ # Time embedding period: # time_dim = 512 # NOTE: changed this --> change Policy UNREAL for aux. pix control task upsampling params # NOTE_2: should be power of 2 if using casual conv. state encoder time_dim = 128 # time_dim = 32 # Periods for estimating signal features, # note: here number of feature channels is doubled due to fact Hi/Low values computed for each period specified: # features_parameters = [8, 32, 128, 512] # features_parameters = [2, 8, 32, 64, 128] # features_parameters = [8, 16, 32, 64, 128, 256] # # num_features = len(features_parameters) # Number of environment steps to skip before returning next response, # e.g. if set to 10 -- agent will interact with environment every 10th step; # every other step agent action is assumed to be 'hold': skip_frame = 10 # Number of timesteps reward estimation statistics are collected over, should be: # skip_frame_period <= avg_period <= time_embedding_period # NOTE_: should be power of 2 if using casual conv. state encoder: avg_period = 20 # Possible agent actions: portfolio_actions = ('hold', 'buy', 'sell', 'close') gamma = 0.99 # fi_gamma, should be MDP gamma decay reward_scale = 1 # reward multiplicator num_features = 16 cwt_signal_scale = 3e3 # first gradient scaling [scalar] cwt_lower_bound = 3.0 # CWT scales cwt_upper_bound = 90.0 state_ext_scale = np.linspace(1, 3, num=num_features) params = dict( # Note: fake `Width` dimension to stay in convention with 2d conv. dims: state_shape= { 'raw': spaces.Box(low=-100, high=100, shape=(time_dim, 4), dtype=np.float32), # 'external': spaces.Box(low=-100, high=100, shape=(time_dim, 1, num_features), dtype=np.float32), 'external': spaces.Box(low=-100, high=100, shape=(time_dim, num_features, 1), dtype=np.float32), # 'external_2': spaces.Box(low=-100, high=100, shape=(time_dim, 1, 4), dtype=np.float32), 'internal': spaces.Box(low=-2, high=2, shape=(avg_period, 1, 5), dtype=np.float32), 'datetime': spaces.Box(low=0, high=1, shape=(1, 5), dtype=np.float32), # 'expert': spaces.Box(low=0, high=10, shape=(len(portfolio_actions),), dtype=np.float32), # TODO: change inheritance! 'metadata': DictSpace( { 'type': spaces.Box( shape=(), low=0, high=1, dtype=np.uint32 ), 'trial_num': spaces.Box( shape=(), low=0, high=10 ** 10, dtype=np.uint32 ), 'trial_type': spaces.Box( shape=(), low=0, high=1, dtype=np.uint32 ), 'sample_num': spaces.Box( shape=(), low=0, high=10 ** 10, dtype=np.uint32 ), 'first_row': spaces.Box( shape=(), low=0, high=10 ** 10, dtype=np.uint32 ), 'timestamp': spaces.Box( shape=(), low=0, high=np.finfo(np.float64).max, dtype=np.float64 ), } ) }, cash_name='default_cash', asset_names=['default_asset'], start_cash=None, commission=None, leverage=1.0, drawdown_call=5, target_call=19, portfolio_actions=portfolio_actions, initial_action=None, initial_portfolio_action=None, skip_frame=skip_frame, state_ext_scale=state_ext_scale, # EURUSD state_int_scale=1.0, gamma=gamma, reward_scale=1.0, metadata={}, cwt_lower_bound=cwt_lower_bound, cwt_upper_bound=cwt_upper_bound, cwt_signal_scale=cwt_signal_scale, ) def __init__(self, **kwargs): super(CasualConvStrategy_1, self).__init__(**kwargs) # self.num_channels = self.p.state_shape['external'].shape[-1] self.num_channels = self.num_features # Define CWT scales: self.cwt_width = np.linspace(self.p.cwt_lower_bound, self.p.cwt_upper_bound, self.num_channels) def set_datalines(self): self.data.dim_sma = btind.SimpleMovingAverage( self.datas[0], period=(np.asarray(self.features_parameters).max() + self.time_dim) ) self.data.dim_sma.plotinfo.plot = False def get_external_state(self): # Use Hi-Low median as signal: x = ( np.frombuffer(self.data.high.get(size=self.time_dim)) + np.frombuffer(self.data.low.get(size=self.time_dim)) ) / 2 # Differences along time dimension: d_x = np.gradient(x, axis=0) * self.p.cwt_signal_scale # Compute continuous wavelet transform using Ricker wavelet: cwt_x = signal.cwt(d_x, signal.ricker, self.cwt_width).T norm_x = cwt_x # Note: differences taken once again along channels axis, # apply weighted scaling to normalize channels # norm_x = np.gradient(cwt_x, axis=-1) # norm_x = zscore(norm_x, axis=0) * self.p.state_ext_scale # norm_x *= self.p.state_ext_scale out_x = tanh(norm_x) # out_x = np.clip(norm_x, -10, 10) # return out_x[:, None, :] return out_x[..., None] def get_external_2_state(self): x = np.stack( [ np.frombuffer(self.data.high.get(size=self.time_dim)), np.frombuffer(self.data.open.get(size=self.time_dim)), np.frombuffer(self.data.low.get(size=self.time_dim)), np.frombuffer(self.data.close.get(size=self.time_dim)), ], axis=-1 ) # # Differences along features dimension: d_x = np.gradient(x, axis=-1) * self.p.cwt_signal_scale # Compute continuous wavelet transform using Ricker wavelet: # cwt_x = signal.cwt(d_x, signal.ricker, self.cwt_width).T norm_x = d_x # Note: differences taken once again along channels axis, # apply weighted scaling to normalize channels # norm_x = np.gradient(cwt_x, axis=-1) # norm_x = zscore(norm_x, axis=0) * self.p.state_ext_scale # norm_x *= self.p.state_ext_scale out_x = tanh(norm_x) # out_x = np.clip(norm_x, -10, 10) return out_x[:, None, :]
[docs]class CasualConvStrategyMulti(CasualConvStrategy_0): """ CWT + multiply data streams. Beta - data names are class hard-coded. TODO: pass data streams names as params """ # Time embedding period: # NOTE_2: should be power of 2 if using casual conv. state encoder time_dim = 128 # Periods for estimating signal features, # note: here number of feature channels is doubled due to fact Hi/Low values computed for each period specified: # features_parameters = [8, 32, 128, 512] # features_parameters = [2, 8, 32, 64, 128] # features_parameters = [8, 16, 32, 64, 128, 256] # # num_features = len(features_parameters) # Number of environment steps to skip before returning next response, # e.g. if set to 10 -- agent will interact with environment every 10th step; # every other step agent action is assumed to be 'hold': skip_frame = 10 # Number of timesteps reward estimation statistics are collected over, should be: # skip_frame_period <= avg_period <= time_embedding_period # NOTE_: should be power of 2 if using casual conv. state encoder: avg_period = 20 # Possible agent actions: portfolio_actions = ('hold', 'buy', 'sell', 'close') gamma = 0.99 # fi_gamma, should be MDP gamma decay reward_scale = 1 # reward multiplicator num_features = 16 # TODO: 8? (was: 16) cwt_signal_scale = 3e3 # first gradient scaling [scalar] cwt_lower_bound = 4.0 # CWT scales TODO: 8.? (was : 3.) cwt_upper_bound = 100.0 state_ext_scale = { 'USD': np.linspace(1, 2, num=num_features), 'GBP': np.linspace(1, 2, num=num_features), 'CHF': np.linspace(1, 2, num=num_features), 'JPY': np.linspace(5e-3, 1e-2, num=num_features), } order_size = { 'USD': 1000, 'GBP': 1000, 'CHF': 1000, 'JPY': 1000, } params = dict( # Note: fake `Width` dimension to stay in convention with 2d conv. dims: state_shape= { 'raw': spaces.Box(low=-1000, high=1000, shape=(time_dim, 4), dtype=np.float32), 'external': DictSpace( { 'USD': spaces.Box(low=-1000, high=1000, shape=(time_dim, 1, num_features), dtype=np.float32), 'GBP': spaces.Box(low=-1000, high=1000, shape=(time_dim, 1, num_features), dtype=np.float32), 'CHF': spaces.Box(low=-1000, high=1000, shape=(time_dim, 1, num_features), dtype=np.float32), 'JPY': spaces.Box(low=-1000, high=1000, shape=(time_dim, 1, num_features), dtype=np.float32), } ), 'internal': spaces.Box(low=-2, high=2, shape=(avg_period, 1, 5), dtype=np.float32), 'datetime': spaces.Box(low=0, high=1, shape=(1, 5), dtype=np.float32), # 'expert': DictSpace( # { # 'USD': spaces.Box(low=0, high=10, shape=(len(portfolio_actions),), dtype=np.float32), # 'GBP': spaces.Box(low=0, high=10, shape=(len(portfolio_actions),), dtype=np.float32), # 'CHF': spaces.Box(low=0, high=10, shape=(len(portfolio_actions),), dtype=np.float32), # 'JPY': spaces.Box(low=0, high=10, shape=(len(portfolio_actions),), dtype=np.float32), # } # ), 'metadata': DictSpace( { 'type': spaces.Box( shape=(), low=0, high=1, dtype=np.uint32 ), 'trial_num': spaces.Box( shape=(), low=0, high=10 ** 10, dtype=np.uint32 ), 'trial_type': spaces.Box( shape=(), low=0, high=1, dtype=np.uint32 ), 'sample_num': spaces.Box( shape=(), low=0, high=10 ** 10, dtype=np.uint32 ), 'first_row': spaces.Box( shape=(), low=0, high=10 ** 10, dtype=np.uint32 ), 'timestamp': spaces.Box( shape=(), low=0, high=np.finfo(np.float64).max, dtype=np.float64 ), } ) }, cash_name='EUR', asset_names={'USD', 'GBP', 'CHF', 'JPY'}, start_cash=None, commission=None, leverage=1.0, drawdown_call=5, target_call=19, portfolio_actions=portfolio_actions, initial_action=None, initial_portfolio_action=None, order_size=order_size, skip_frame=skip_frame, state_ext_scale=state_ext_scale, state_int_scale=1.0, gamma=gamma, # base_dataline='USD', reward_scale=1.0, metadata={}, cwt_lower_bound=cwt_lower_bound, cwt_upper_bound=cwt_upper_bound, cwt_signal_scale=cwt_signal_scale, ) def __init__(self, **kwargs): self.data_streams = {} super(CasualConvStrategyMulti, self).__init__(**kwargs) # self.num_channels = self.p.state_shape['external'].shape[-1] self.num_channels = self.num_features # Define CWT scales: self.cwt_width = np.linspace(self.p.cwt_lower_bound, self.p.cwt_upper_bound, self.num_channels) # print('p: ', dir(self.p))
[docs] def nextstart(self): """ Overrides base method augmenting it with estimating expert actions before actual episode starts. """ # This value shows how much episode records we need to spend # to estimate first environment observation: self.inner_embedding = self.data.close.buflen() self.log.info('Inner time embedding: {}'.format(self.inner_embedding)) # Now when we know exact maximum possible episode length - # can extract relevant episode data and make expert predictions: # data = self.datas[0].p.dataname.values[self.inner_embedding:, :] data = {d._name : d.p.dataname.as_matrix()[self.inner_embedding:, :] for d in self.datas}
# Note: need to form sort of environment 'custom candels' by taking min and max price values over every # skip_frame period; this is done inside Oracle class; # TODO: shift actions forward to eliminate one-point prediction lag? # expert_actions is a matrix representing discrete distribution over actions probabilities # of size [max_env_steps, action_space_size]: # self.expert_actions = { # key: self.expert.fit(episode_data=line, resampling_factor=self.p.skip_frame) # for key, line in data.items() # } # def get_expert_state(self): # # self.current_expert_action = self.expert_actions[self.env_iteration] # self.current_expert_action = { # key: line[self.env_iteration] for key, line in self.expert_actions.items() # } # # return self.current_expert_action def set_datalines(self): self.data_streams = { stream._name: stream for stream in self.datas } # self.data = self.data_streams[self.p.base_dataline] # TODO: ??!! self.data.dim_sma = btind.SimpleMovingAverage( self.data, period=(np.asarray(self.features_parameters).max() + self.time_dim) ) self.data.dim_sma.plotinfo.plot = False def get_external_state(self): return {key: self.get_single_external_state(key) for key in self.data_streams.keys()} def get_single_external_state(self, key): # Use Hi-Low median as signal: x = ( np.frombuffer(self.data_streams[key].high.get(size=self.time_dim)) + np.frombuffer(self.data_streams[key].low.get(size=self.time_dim)) ) / 2 # Differences along time dimension: d_x = np.gradient(x, axis=0) * self.p.cwt_signal_scale # Compute continuous wavelet transform using Ricker wavelet: cwt_x = signal.cwt(d_x, signal.ricker, self.cwt_width).T norm_x = cwt_x # Note: differences taken once again along channels axis, # apply weighted scaling to normalize channels # norm_x = np.gradient(cwt_x, axis=-1) # norm_x = zscore(norm_x, axis=0) * self.p.state_ext_scale norm_x *= self.p.state_ext_scale[key] out_x = tanh(norm_x) # out_x = np.clip(norm_x, -10, 10) # return out_x[:, None, :] return out_x[:, None, :]