###############################################################################
#
# Copyright (C) 2017 Andrew Muzikin
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
###############################################################################
from logbook import Logger, StreamHandler, WARNING
import sys
import numpy as np
#from .plotter import BTgymPlotter
from .plotter import DrawCerebro
[docs]class BTgymRendering():
"""
Handles BTgym Environment rendering.
Note:
Call `initialize_pyplot()` method before first render() call!
"""
# Here we'll keep last rendered image for each rendering mode:
rgb_dict = dict()
render_modes = ['episode', 'human']
params = dict(
# Plotting controls, can be passed as kwargs:
render_state_as_image=True,
render_state_channel=0,
render_size_human=(6, 3.5),
render_size_state=(7, 3.5),
render_size_episode=(12,8),
render_rowsmajor_episode=1,
render_dpi=75,
render_plotstyle='seaborn',
render_cmap='PRGn',
render_xlabel='Relative timesteps',
render_ylabel='Value',
render_title='local step: {}, state observation min: {:.4f}, max: {:.4f}',
render_boxtext=dict(fontsize=12,
fontweight='bold',
color='w',
bbox={'facecolor': 'k', 'alpha': 0.3, 'pad': 3},
),
plt_backend='Agg', # Not used.
)
enabled = True
ready = False
def __init__(self, render_modes, **kwargs):
"""
Plotting controls, can be passed as kwargs.
Args:
render_state_as_image=True,
render_state_channel=0,
render_size_human=(6, 3.5),
render_size_state=(7, 3.5),
render_size_episode=(12,8),
render_dpi=75,
render_plotstyle='seaborn',
render_cmap='PRGn',
render_xlabel='Relative timesteps',
render_ylabel='Value',
render_title='local step: {}, state observation min: {:.4f}, max: {:.4f}',
render_boxtext=dict(fontsize=12,
fontweight='bold',
color='w',
bbox={'facecolor': 'k', 'alpha': 0.3, 'pad': 3},
)
"""
# Update parameters with relevant kwargs:
for key, value in kwargs.items():
if key in self.params.keys():
self.params[key] = value
# Unpack it as attributes:
for key, value in self.params.items():
setattr(self, key, value)
# Logging:
if 'log_level' not in dir(self):
self.log_level = WARNING
StreamHandler(sys.stdout).push_application()
self.log = Logger('BTgymRenderer', level=self.log_level)
#from matplotlib.backends.backend_agg import FigureCanvasAgg as FigureCanvas
#self.FigureCanvas = FigureCanvas
self.plt = None # Will set it inside server process when calling initialize_pyplot().
#self.plotter = BTgymPlotter() # Modified bt.Cerebro() plotter, to get episode renderings.
# Set empty plugs for each render mode:
self.render_modes = render_modes
for mode in self.render_modes:
self.rgb_dict[mode] = self.rgb_empty()
[docs] def initialize_pyplot(self):
"""
Call me before use!
[Supposed to be done inside already running server process]
"""
if not self.ready:
from multiprocessing import Pipe
self.out_pipe, self.in_pipe = Pipe()
if self.plt is None:
import matplotlib
matplotlib.use(self.plt_backend, force=True)
import matplotlib.pyplot as plt
self.plt = plt
self.ready = True
[docs] def to_string(self, dictionary, excluded=[]):
"""
Converts given dictionary to more-or-less good looking `text block` string.
"""
text = ''
for k, v in dictionary.items():
if k not in excluded:
if type(v) in [float]:
v = '{:.4f}'.format(v)
text += '{}: {}\n'.format(k, v)
return text[:-1]
[docs] def rgb_empty(self):
"""
Returns empty 'plug' image.
"""
return (np.random.rand(100, 200, 3) * 255).astype(dtype=np.uint8)
[docs] def parse_response(self, state, mode, reward, info, done,):
"""
Converts environment response to plotting attributes:
state, title, text.
"""
if len(state[mode].shape) <= 2:
state = np.asarray(state[mode])
elif len(state[mode].shape) == 3:
if state[mode].shape[1] == 1:
# Assume 2nd dim (H) is fake expansion for 1D input, so can render all channels:
state = np.asarray(state[mode][:, 0, :])
else:
# Assume it is HWC 2D input, only can render single channel:
state = np.asarray(state[mode][:, :, self.render_state_channel])
else:
raise NotImplementedError(
'2D rendering can be done for obs. state tensor with rank <= 3; ' +\
'got state shape: {}'.format(np.asarray(state[mode]).shape))
# Figure out how to deal with info output:
try:
assert type(info[-1]) == dict
info_dict = info[-1]
except AssertionError:
try:
assert type(info) == dict
info_dict = info
except AssertionError:
try:
info_dict = {'info': str(dict)}
except:
info_dict = {}
# Add records:
info_dict.update(reward=reward, is_done=done,)
# Try to get step information:
try:
current_step = info_dict['step']
except:
current_step = '--'
# Set box text, excluding redundant fields:
box_text = self.to_string(info_dict, excluded=['step'])
# Set title output:
title = self.render_title.format(current_step, state.min(), state.max())
return state, title, box_text
[docs] def render(self, mode_list, cerebro=None, step_to_render=None, send_img=True):
"""
Renders given mode if possible, else
just passes last already rendered image.
Returns rgb image as numpy array.
Logic:
- If `cerebro` arg is received:
render entire episode, using built-in backtrader plotting feature,
update stored `episode` image.
- If `step_to_render' arg is received:
- if mode = 'raw_state':
render current state observation in conventional 'price' format,
update stored `raw_state` image;
- if mode = something_else':
visualise observation as 'seen' by agent,
update stored 'agent' image.
Returns:
`mode` image.
Note:
It can actually return several modes in a single dict.
It prevented by Gym modes convention, but done internally at the end of the episode.
"""
if type(mode_list) == str:
mode_list = [mode_list]
if cerebro is not None:
self.rgb_dict['episode'] = self.draw_episode(cerebro)
# Try to render given episode:
#try:
# Get picture of entire episode:
#fig = cerebro.plot(plotter=self.plotter, # Modified plotter class, doesnt actually save anything.
# savefig=True,
# width=self.render_size_episode[0],
# height=self.render_size_episode[1],
# dpi=self.render_dpi,
# use=None, #self.plt_backend,
# iplot=False,
# figfilename='_tmp_btgym_render.png',
# )[0][0]
#fig.canvas.draw()
#rgb_array = np.fromstring(fig.canvas.tostring_rgb(), dtype=np.uint8, sep='')
#self.rgb_dict['episode'] = rgb_array.reshape(fig.canvas.get_width_height()[::-1] + (3,))
# Clean up:
#self.plt.gcf().clear()
#self.plt.close(fig)
#except:
# Just keep previous rendering
# pass
if step_to_render is not None:
# Perform step rendering:
# Unpack:
raw_state, state, reward, done, info = step_to_render
for mode in mode_list:
if mode in self.render_modes and mode not in ['episode', 'human']:
# Render user-defined (former agent) mode state:
agent_state, title, box_text = self.parse_response(state, mode, reward, info, done)
if self.render_state_as_image:
self.rgb_dict[mode] = self.draw_image(agent_state,
figsize=self.render_size_state,
title='{} / {}'.format(mode, title),
box_text=box_text,
ylabel=self.render_ylabel,
xlabel=self.render_xlabel,
)
else:
self.rgb_dict[mode] = self.draw_plot(agent_state,
figsize=self.render_size_state,
title='{} / {}'.format(mode, title),
box_text=box_text,
ylabel=self.render_ylabel,
xlabel=self.render_xlabel,
)
if 'human' in mode:
# Render `human` state:
human_state, title, box_text = self.parse_response(raw_state, mode, reward, info, done)
self.rgb_dict['human'] = self.draw_plot(human_state,
figsize=self.render_size_human,
title=title,
box_text=box_text,
ylabel='Price',
xlabel=self.render_xlabel,
line_labels=['Open', 'High', 'Low', 'Close'],
)
if send_img:
return self.rgb_dict
else:
# this case is for internal use only;
# now `mode` supposed to contain several modes, let's return dictionary of arrays:
return_dict = dict()
for entry in mode_list:
if entry in self.rgb_dict.keys():
# ...and it is legal:
return_dict[entry] = self.rgb_dict[entry]
else:
return_dict[entry] = self.rgb_empty()
return return_dict
[docs] def draw_plot(self, data, figsize=(10,6), title='', box_text='', xlabel='X', ylabel='Y', line_labels=None):
"""
Visualises environment state as 2d line plot.
Retrurns image as rgb_array.
Args:
data: np.array of shape [num_values, num_lines]
figsize: figure size (in.)
title:
box_text:
xlabel:
ylabel:
line_labels: iterable holding line legends as str
Returns:
rgb image as np.array of size [with, height, 3]
"""
if line_labels is None:
# If got no labels - make it numbers:
if len(data.shape) > 1:
line_labels = ['line_{}'.format(i) for i in range(data.shape[-1])]
else:
line_labels = ['line_0']
data = data[:, None]
else:
assert len(line_labels) == data.shape[-1], \
'Expected `line_labels` kwarg consist of {} names, got: {}'. format(data.shape[-1], line_labels)
fig = self.plt.figure(figsize=figsize, dpi=self.render_dpi, )
#ax = fig.add_subplot(111)
self.plt.style.use(self.render_plotstyle)
self.plt.title(title)
# Plot x axis as reversed time-step embedding:
xticks = np.linspace(data.shape[0] - 1, 0, int(data.shape[0]), dtype=int)
self.plt.xticks(xticks.tolist(), (- xticks[::-1]).tolist(), visible=False)
# Set every 5th tick label visible:
for tick in self.plt.xticks()[1][::5]:
tick.set_visible(True)
self.plt.xlabel(xlabel)
self.plt.ylabel(ylabel)
self.plt.grid(True)
# Switch off antialiasing:
#self.plt.setp([ax.get_xticklines() + ax.get_yticklines() + ax.get_xgridlines() + ax.get_ygridlines()],antialiased=False)
#self.plt.rcParams['text.antialiased']=False
# Add Info box:
self.plt.text(0, data.min(), box_text, **self.render_boxtext)
for line, label in enumerate(line_labels):
self.plt.plot(data[:, line], label=label)
self.plt.legend()
self.plt.tight_layout()
fig.canvas.draw()
# Save it to a numpy array:
rgb_array = np.fromstring(fig.canvas.tostring_rgb(), dtype=np.uint8, sep='')
# Clean up:
self.plt.close(fig)
#self.plt.gcf().clear()
return rgb_array.reshape(fig.canvas.get_width_height()[::-1] + (3,))
[docs] def draw_image(self, data, figsize=(12,6), title='', box_text='', xlabel='X', ylabel='Y', line_labels=None):
"""
Visualises environment state as image.
Returns rgb_array.
"""
fig = self.plt.figure(figsize=figsize, dpi=self.render_dpi, )
#ax = fig.add_subplot(111)
self.plt.style.use(self.render_plotstyle)
self.plt.title(title)
# Plot x axis as reversed time-step embedding:
xticks = np.linspace(data.shape[0] - 1, 0, int(data.shape[0]), dtype=int)
self.plt.xticks(xticks.tolist(), (- xticks[::-1]).tolist(), visible=False)
# Set every 5th tick label visible:
for tick in self.plt.xticks()[1][::5]:
tick.set_visible(True)
#self.plt.yticks(visible=False)
self.plt.xlabel(xlabel)
self.plt.ylabel(ylabel)
self.plt.grid(False)
# Switch off antialiasing:
# self.plt.setp([ax.get_xticklines() + ax.get_yticklines() + ax.get_xgridlines() + ax.get_ygridlines()],antialiased=False)
# self.plt.rcParams['text.antialiased']=False
#self.log.warning('render_data_shape:{}'.format(data.shape))
# Add Info box:
self.plt.text(0, data.shape[1] - 1, box_text, **self.render_boxtext)
im = self.plt.imshow(data.T, aspect='auto', cmap=self.render_cmap)
self.plt.colorbar(im, use_gridspec=True)
self.plt.tight_layout()
fig.canvas.draw()
# Save it to a numpy array:
rgb_array = np.fromstring(fig.canvas.tostring_rgb(), dtype=np.uint8, sep='')
# Clean up:
self.plt.close(fig)
#self.plt.gcf().clear()
#ax.cla()
return rgb_array.reshape(fig.canvas.get_width_height()[::-1] + (3,))
[docs] def draw_episode(self, cerebro):
"""
Hacky way to render episode.
Due to backtrader/matplotlib memory leaks have to encapsulate it in separate process.
Strange but reliable. PID's are driving crazy.
Args:
cerebro instance
Returns:
rgb array.
"""
draw_process = DrawCerebro(cerebro=cerebro,
width=self.render_size_episode[0],
height=self.render_size_episode[1],
dpi=self.render_dpi,
result_pipe=self.in_pipe,
rowsmajor=self.render_rowsmajor_episode,
)
draw_process.start()
#print('Plotter PID: {}'.format(draw_process.pid))
try:
rgb_array = self.out_pipe.recv()
draw_process.terminate()
draw_process.join()
return rgb_array
except:
return self.rgb_empty()
[docs]class BTgymNullRendering():
"""
Empty renderer to use when resources are concern.
"""
enabled = False
def __init__(self, *args, **kwargs):
self.plug = (np.random.rand(100, 200, 3) * 255).astype(dtype=np.uint8)
self.params = {'rendering': 'disabled'}
self.render_modes = []
# self.log_level = WARNING
# StreamHandler(sys.stdout).push_application()
# self.log = Logger('BTgymRenderer', level=self.log_level)
def initialize_pyplot(self):
pass
def render(self, mode_list, **kwargs):
# self.log.debug('render() call to environment with disabled rendering. Returning dict of null-images.')
if type(mode_list) == str:
mode_list = [mode_list]
rgb_dict = {}
for mode in mode_list:
rgb_dict[mode] = self.plug
return rgb_dict
def draw_plot(self, *args, **kwargs):
# self.log.debug('draw_plot() call to environment with disabled rendering. Returning null-image.')
return self.plug
def draw_image(self, *args, **kwargs):
# self.log.debug('draw_image() call to environment with disabled rendering. Returning null-image.')
return self.plug
def draw_episode(self, *args, **kwargs):
# self.log.debug('draw_episode() call to environment with disabled rendering. Returning null-image.')
return self.plug