``` 导入 datetime 导入数学 导入随机 导入 AST 将 gymnasium 导入为 Gym 从 Gymnasium 导入空间 from gymnasium.utils import seeding 将 numpy 导入为 NP from stable_baselines3.common.vec_env import DummyVecEnv from stable_baselines3.common.torch_layers import BaseFeaturesExtractor, MlpExtractor Import Torch (导入火炬) 将 torch.nn 导入为 nn 从 stable_baselines3 进口 PPO 从 stable_baselines3.common.policies 导入 ActorCriticPolicy from meta.env_fx_trading.util.log_render import render_to_file from meta.env_fx_trading.util.plot_chart 导入 TradingChart from meta.env_fx_trading.util.read_config import EnvConfig class tgym(健身房.Env): “”“外汇/期货/期权交易 GYM 环境 1. 三个作空间(0 买入、1 卖出、2 无) 2. 同一时间范围内的多个交易对(EURUSD、GBPUSD... 3. 时间范围从 1 分钟到每天,只要使用烛台柱(开盘价、最高价、最低价、收盘价) 4. 使用 StopLose、ProfitTaken 实现奖励。每对都可以在配置文件中配置自己的 SL 和 PT 5. 配置隔夜现金罚金以及每对的交易手续费和隔夜持仓罚金 6. 将数据集拆分为每日、每周或每月...,具有固定的时间步长,在 len(df) 结束时。业务 logic 将强制在最后收盘价 (游戏 _compute_reward) 关闭所有持仓。 7. 必须具有 df 列名称:[(time_col),(asset_col),Open,Close,High,Low,day](区分大小写) 8. 加法指标可以在数据处理过程中添加。Finta 提供 78 种 TA 指标 9. 在 json 配置文件中处理的自定义观察列表。 10. ProfitTaken = fraction_action * max_profit_taken + 止损。 11. SL 是预先固定的 12. 限价单可以配置,如果 limit_order == True,则动作将预设在柱线的低点或高点买入或卖出, 带 limit_order_expiration (n 条)。如果价格交叉,它将触发。否则,它将是 drop off 13. 渲染模式: human -- 在控制台上显示每个步骤实现的奖励 file -- 创建事务日志 graph -- 在 Graph 中创建交易 (开发中) 14. 15. 奖励,我们希望激励长期持续的利润。 在每个步骤中,我们将奖励设置为账户余额乘以 到目前为止时间步数的一小部分。这样做的目的是拖延 在早期阶段过快地奖励代理并允许其探索 在过于深入地优化单个策略之前。 它还将奖励在更长时间内保持较高余额的代理, 而不是那些使用不可持续的策略快速赚钱的人。 16. Observation_space 包含我们需要代理的所有输入变量 在进行交易或不进行交易之前考虑。我们希望我们的代理 “看到” 游戏窗口中的外汇数据点(开盘价、最高价、最低价、收盘价、时间序列、TA), 以及其他一些数据点,如账户余额、当前头寸、 和当前利润。这里的直觉是,对于每个时间步,我们都需要我们的代理 考虑导致当前价格的价格行为,以及他们的 拥有投资组合的状态,以便为下一步行动做出明智的决策。 17. 奖励为外汇交易单位积分,可为每个交易对配置 18. 为了使未实现利润奖励反映市场状况,我们将计算每种资产的 ATR,并使用它来动态扩展奖励。 """ 元数据 = {“render.modes”: [“graph”, “human”, “file”, “none”]} 防守 __init__( 自我 DF / event_map, currency_map, env_config_file=“./neo_finrl/env_fx_trading/config/gdbusd-test-1.json”, ): 断言 df.ndim == 2 超级(tgym, self).__init__() self.cf = 环境配置 (env_config_file) self.observation_list = self.cf.env_parameters(“observation_list”) # 经济数据映射 self.event_map = event_map self.currency_map = currency_map self.max_events = 8 self.df = df.copy() 如果 'events' 不在 self.df.columns 中: raise ValueError(“DataFrame 必须包含一个 'events' 列”) def parse_events(x): 如果 isinstance(x, str): 尝试: 解析 = ast.literal_eval(x) 返回已解析 if isinstance(parsed, list) else [] except (ValueError, SyntaxError): 返回 [] 返回 x if isinstance(x, list) else [] self.df['事件'] = self.df['事件'].apply(parse_events) 如果不是 isinstance(self.df['events'].iloc[0], list): raise ValueError(“'事件'必须是一个列表”) 如果 self.df['events'].iloc[0] 而不是 isinstance(self.df['events'].iloc[0][0], dict): raise ValueError(“'events' 中的元素必须是字典”) self.balance_initial = self.cf.env_parameters(“余额”) self.over_night_cash_penalty = self.cf.env_parameters(“over_night_cash_penalty”) self.asset_col = self.cf.env_parameters(“asset_col”) self.time_col = self.cf.env_parameters(“time_col”) self.random_start = self.cf.env_parameters(“random_start”) self.log_filename = ( self.cf.env_parameters(“log_filename”) + datetime.datetime.now().strftime(“%Y%m%d%H%M%S”) + “.csv” ) self.df[“_time”] = self.df[self.time_col] self.df[“_day”] = self.df[“工作日”] self.assets = self.df[self.asset_col].unique() self.dt_datetime = self.df[self.time_col].sort_values().unique() self.df = self.df.set_index(self.time_col) self.visualization = False # 重置值 self.equity_list = [0] * len(self.assets) self.balance = self.balance_initial self.total_equity = self.balance + sum(self.equity_list) self.ticket_id = 0 self.transaction_live = [] self.transaction_history = [] self.transaction_limit_order = [] self.current_draw_downs = [0.0] * len(self.assets) self.max_draw_downs = [0.0] * len(self.assets) self.max_draw_down_pct = sum(self.max_draw_downs) / self.balance * 100 self.current_step = 0 self.episode = 0 # 从 0 开始,在剧集结束时递增 self.current_holding = [0] * len(self.assets) self.tranaction_open_this_step = [] self.tranaction_close_this_step = [] self.current_day = 0 self.done_information = “” self.log_header = 真 # 缓存数据 self.cached_ohlc_data = [self.get_observation_vector(_dt) 表示 self.dt_datetime _dt] self.cached_economic_data = [self.get_economic_vector(_dt) 表示 self.dt_datetime 年的 _dt] self.cached_time_serial = ( self.df[[“_time”, “_day”]].sort_values(“_time”).drop_duplicates().values.tolist() ) self.reward_range = (-np.inf, np.inf) self.action_space = 空格。MultiDiscrete([3] * len(self.assets)) # 0: 买入, 1: 卖出, 2: 无 self.observation_space = 空格。字典({ “ohlc_data”:空格。框(low=-np.inf, high=np.inf, shape=(len(self.assets) * len(self.observation_list),), dtype=np.float32), “event_ids”:空格。框(low=0, high=len(self.event_map)-1, shape=(self.max_events,), dtype=np.int32), “currency_ids”:空格。框(低=0, 高=len(self.currency_map)-1, shape=(self.max_events,), dtype=np.int32), “economic_numeric”:空格。盒子(low=-np.inf, high=np.inf, shape=(self.max_events * 6,), dtype=np.float32), “portfolio_data”:空格。盒子(low=-np.inf, high=np.inf, shape=(3 + 2 * len(self.assets),), dtype=np.float32) }) 打印( f“初始完成:\n” f“observation_list:{self.observation_list}\n” f“资产:{self.assets}\n” f“时间序列: {min(self.dt_datetime)} -> {max(self.dt_datetime)} 长度: {len(self.dt_datetime)}\n” f“事件:{len(self.event_map)},货币:{len(self.currency_map)}” ) self._seed() def _seed(self, seed=None): self.np_random,种子 = seeding.np_random(种子) 返回 [种子] def _take_action(self, actions, done): 奖励 = [0] * len(self.assets) self.tranaction_open_this_step = [] self.tranaction_close_this_step = [] for i, _action in enumerate(actions): # 动作现在是整数 [0, 1, 2] self._o = self.get_observation(self.current_step, i, “打开”) self._h = self.get_observation(self.current_step, i, “高”) self._l = self.get_observation(self.current_step, i, “低”) self._c = self.get_observation(self.current_step, i, “关闭”) self._t = self.get_observation(self.current_step, i, “_time”) self._day = self.get_observation(self.current_step, i, “_day”) rewards[i] = self._calculate_reward(i, done, _action) # 通过行动以获得探索奖励 print(f“资产 {self.assets[i]}: Action={_action}, reward={rewards[i]}, Holding={self.current_holding[i]}”) 如果 self.cf.symbol(self.assets[i], “limit_order”): self._limit_order_process(i, _action, 完成) if (_action in (0, 1) 且未完成 和 self.current_holding[i] < self.cf.symbol(self.assets[i], “max_current_holding”)): # 使用固定 PT,因为 action 是离散的(没有小数分量) _profit_taken = self.cf.symbol(self.assets[i], “profit_taken_max”) + self.cf.symbol(self.assets[i], “stop_loss_max”) self.ticket_id += 1 如果 self.cf.symbol(self.assets[i], “limit_order”): 交易 = { “票证”:self.ticket_id、 “Symbol”:self.assets[i], “ActionTime”:self._t、 “类型”:_action、 “手数”: 1, “ActionPrice”: self._l if _action == 0 else self._h, “止损”: self.cf.symbol(self.assets[i], “stop_loss_max”), “PT”:_profit_taken、 “MaxDD”:0、 “Swap(掉期)”:0.0、 “CloseTime”: “”, ///// “ClosePrice(收盘价)”: 0.0, “点”:0、 “奖励”: -self.cf.symbol(self.assets[i], “transaction_fee”), “DateDuration”:self._day、 “状态”:0、 “LimitStep”:self.current_step、 “ActionStep”: -1, “CloseStep”:-1、 } self.transaction_limit_order.append(事务) 还: 交易 = { “票证”:self.ticket_id、 “Symbol”:self.assets[i], “ActionTime”:self._t、 “类型”:_action、 “手数”: 1, “ActionPrice”:self._c、 “止损”: self.cf.symbol(self.assets[i], “stop_loss_max”), “PT”:_profit_taken、 “MaxDD”:0、 “Swap(掉期)”:0.0、 “CloseTime”: “”, ///// “ClosePrice(收盘价)”: 0.0, “点”:0、 “奖励”: -self.cf.symbol(self.assets[i], “transaction_fee”), “DateDuration”:self._day、 “状态”:0、 “LimitStep”:self.current_step、 “ActionStep”:self.current_step、 “CloseStep”:-1、 } self.current_holding[i] += 1 self.tranaction_open_this_step.append(事务) self.balance -= self.cf.symbol(self.assets[i], “transaction_fee”) self.transaction_live.append(事务) return sum(奖励) def _calculate_reward(self, i, done, action): _total_reward = 0 _max_draw_down = 0 # 探索激励:交易小奖励,无所作为惩罚 if not any(tr[“Symbol”] == self.assets[i] for tr in self.transaction_live) 且未完成: _total_reward += 0.01 if action in (0, 1) else -0.01 for tr in self.transaction_live[:]: # 复制以避免修改问题 if tr[“Symbol”] == self.assets[i]: _point = self.cf.symbol(self.assets[i], “点”) 如果 self._day > tr[“DateDuration”]: tr[“DateDuration”] = self._day tr[“奖励”] -= self.cf.symbol(self.assets[i], “over_night_penalty”) if tr[“Type”] == 0: # 买入 _sl_price = tr[“ActionPrice”] - tr[“SL”] / _point _pt_price = tr[“ActionPrice”] + tr[“PT”] / _point 如果完成: p = (self._c - tr[“ActionPrice”]) * _point self._manage_tranaction(tr, p, self._c, status=2) _total_reward += p elif self._l <= _sl_price: self._manage_tranaction(tr, -tr[“SL”], _sl_price) _total_reward += -tr[“SL”] self.current_holding[i] -= 1 elif self._h >= _pt_price: self._manage_tranaction(tr, tr[“PT”], _pt_price) _total_reward += tr[“PT”] self.current_holding[i] -= 1 还: 未实现 = (self._c - tr[“ActionPrice”]) * _point _total_reward += 未实现 * 0.1 # 中间奖励 self.current_draw_downs[i] = int((self._l - tr[“ActionPrice”]) * _point) _max_draw_down += self.current_draw_downs[i] 如果 self.current_draw_downs[i] < 0 和 tr[“MaxDD”] > self.current_draw_downs[i]: tr[“最大DD”] = self.current_draw_downs[i] elif tr[“Type”] == 1: # 卖出 _sl_price = tr[“ActionPrice”] + tr[“SL”] / _point _pt_price = tr[“ActionPrice”] - tr[“PT”] / _point 如果完成: p = (tr[“ActionPrice”] - self._c) * _point self._manage_tranaction(tr, p, self._c, status=2) _total_reward += p elif self._h >= _sl_price: self._manage_tranaction(tr, -tr[“SL”], _sl_price) _total_reward += -tr[“SL”] self.current_holding[i] -= 1 elif self._l <= _pt_price: self._manage_tranaction(tr, tr[“PT”], _pt_price) _total_reward += tr[“PT”] self.current_holding[i] -= 1 还: 未实现 = (tr[“ActionPrice”] - self._c) * _point _total_reward += 未实现 * 0.1 # 中间奖励 self.current_draw_downs[i] = int((tr[“ActionPrice”] - self._h) * _point) _max_draw_down += self.current_draw_downs[i] 如果 self.current_draw_downs[i] < 0 和 tr[“MaxDD”] > self.current_draw_downs[i]: tr[“最大DD”] = self.current_draw_downs[i] 如果 _max_draw_down > self.max_draw_downs[i]: self.max_draw_downs[i] = _max_draw_down 返回 _total_reward def _limit_order_process(self, i, _action, done): 对于 self.transaction_limit_order 中的 tr[:]: if tr[“Symbol”] == self.assets[i]: if tr[“Type”] != _action 或 done: self.transaction_limit_order.删除 (tr) tr[“状态”] = 3 tr[“CloseStep”] = self.current_step self.transaction_history.append(tr) elif (tr[“ActionPrice”] >= self._l 和 _action == 0) 或 ( tr[“ActionPrice”] <= self._h 和 _action == 1): tr[“ActionStep”] = self.current_step self.current_holding[i] += 1 self.balance -= self.cf.symbol(self.assets[i], “transaction_fee”) self.transaction_limit_order.删除 (tr) self.transaction_live.append(tr) self.tranaction_open_this_step.append(tr) elif (tr[“LimitStep”] + self.cf.symbol(self.assets[i], “limit_order_expiration”) > self.current_step): tr[“CloseStep”] = self.current_step tr[“状态”] = 4 self.transaction_limit_order.删除 (tr) self.transaction_history.append(tr) def _manage_tranaction(self, tr, _p, close_price, status=1): self.transaction_live.remove(tr) tr[“收盘价”] = close_price tr[“点”] = int(_p) tr[“奖励”] = int(tr[“奖励”] + _p) tr[“状态”] = 状态 tr[“CloseTime”] = self._t self.balance += int(tr[“奖励”]) self.total_equity -= int(abs(tr[“奖励”])) self.tranaction_close_this_step.append(tr) self.transaction_history.append(tr) def step(self, actions): self.current_step += 1 # 定义终止和截断条件 terminated = self.balance <= 0 # 剧集因破产而结束(最终状态) truncated = self.current_step == len(self.dt_datetime) - 1 # 由于最大步数(时间限制)而结束剧集 done = terminated or truncated # 合并为 VecEnv 的单个 'done' 标志 # 对于渲染或剧集跟踪,您仍然可以检查任一条件是否为 true 如果完成: self.done_information += f“集数: {self.episode} 平衡: {self.balance} 步数: {self.current_step}\n” self.visualization = 真 self.episode += 1 # 增加 episode 计数器 base_reward = self._take_action(作,完成) # 持续盈利奖励 (docstring point 15) sustained_reward = self.balance * (self.current_step / len(self.dt_datetime)) * 0.01 total_reward = base_reward + sustained_reward 如果 self._day > self.current_day: self.current_day = self._day self.balance -= self.over_night_cash_penalty 如果 self.balance != 0: self.max_draw_down_pct = abs(sum(self.max_draw_downs) / self.balance * 100) OBS = { “ohlc_data”: np.array(self.cached_ohlc_data[self.current_step], dtype=np.float32), “event_ids”: self.cached_economic_data[self.current_step][“event_ids”], “currency_ids”: self.cached_economic_data[self.current_step][“currency_ids”], “economic_numeric”: self.cached_economic_data[self.current_step][“numeric”], “portfolio_data”:np.array( [self.balance, self.total_equity, self.max_draw_down_pct] + self.current_holding + self.current_draw_downs, 数据类型=np.float32 ) } print(f“步骤 {self.current_step}: 基础奖励={base_reward}, 持续奖励={sustained_reward}, 总计={total_reward}, 余额={self.balance}”) # 信息字典保持不变 info = {“关闭”: self.tranaction_close_this_step} 返回 obs、total_reward、terminated、truncated、info def get_observation(self, _step, _iter=0, col=None): 如果 col 为 None: 返回 self.cached_ohlc_data[_step] 如果 col == “_day”: 返回 self.cached_time_serial[_step][1] elif col == “_time”: 返回 self.cached_time_serial[_step][0] 尝试: col_pos = self.observation_list.index(col) except ValueError 之外: raise ValueError(f“在 observation_list 中找不到列 '{col}”) 返回 self.cached_ohlc_data[_step][_iter * len(self.observation_list) + col_pos] def get_observation_vector(self, _dt, cols=无): cols = self.observation_list 如果 cols 不是其他 cols v = [] for a in self.assets 中: 子集 = self.df.query(f'{self.asset_col} == “{a}” & {self.time_col} == “{_dt}”') assert not subset.empty v += subset.loc[_dt, cols].tolist() 断言 len(v) == len(self.assets) * len(cols) 返回 v def get_economic_vector(self, _dt): 子集 = self.df.loc[_dt] events = subset['events'] if isinstance(subset, pd.系列) else subset['events'].iloc[0] event_ids = [self.event_map[e['event']] for e in events[:self.max_events]] + [0] * (self.max_events - len(events)) currency_ids = [self.currency_map.get(e['currency'], 0) for e in events[:self.max_events]] + [0] * (self.max_events - len(events)) numeric_fields = ['actual_norm', 'forecast_norm', 'previous_norm', 'surprise_norm', 'event_freq', 'impact_code'] 数字 = 事件中 e 的 [e[字段] 对于numeric_fields中的字段 :self.max_events] + [0] * (self.max_events * 6 - len(事件) * 6) 返回 { “event_ids”: np.array(event_ids, dtype=np.int32), “currency_ids”: np.array(currency_ids, dtype=np.int32), “数字”: np.array(数字, dtype=np.float32) } def reset(self, seed=None, options=None): # 设置可重复性的种子 如果 seed 不是 None: self._seed(种子) 如果self.random_start: self.current_step = random.choice(range(int(len(self.dt_datetime) * 0.5))) 还: self.current_step = 0 self.equity_list = [0] * len(self.assets) self.balance = self.balance_initial self.total_equity = self.balance + sum(self.equity_list) self.ticket_id = 0 self.transaction_live = [] self.transaction_history = [] self.transaction_limit_order = [] self.current_draw_downs = [0.0] * len(self.assets) self.max_draw_downs = [0.0] * len(self.assets) self.max_draw_down_pct = sum(self.max_draw_downs) / self.balance * 100 self.current_holding = [0] * len(self.assets) self.tranaction_open_this_step = [] self.tranaction_close_this_step = [] self.current_day = 0 self.done_information = “” self.log_header = 真 self.visualization = False OBS = { “ohlc_data”: np.array(self.cached_ohlc_data[self.current_step], dtype=np.float32), “event_ids”: self.cached_economic_data[self.current_step][“event_ids”], “currency_ids”: self.cached_economic_data[self.current_step][“currency_ids”], “economic_numeric”: self.cached_economic_data[self.current_step][“numeric”], “portfolio_data”:np.array( [self.balance, self.total_equity, self.max_draw_down_pct] + self.current_holding + self.current_draw_downs, 数据类型=np.float32 ) } 信息 = {} 返回 OBS、INFO def render(self, mode=“human”, title=None, **kwargs): if mode in (“human”, “file”): 打印输出 = 模式 == “人类” 下午 = { “log_header”: self.log_header, “log_filename”: self.log_filename, “printout”: 打印输出, “balance”: self.balance, “balance_initial”:self.balance_initial、 “tranaction_close_this_step”:self.tranaction_close_this_step、 “done_information”:self.done_information、 } render_to_file(**pm) 如果 self.log_header: self.log_header = 假 elif 模式 == “graph” 和 self.visualization: print(“绘图...”) p = TradingChart(self.df, self.transaction_history) p.plot() def close(个体): 通过 def get_sb_env(个体经营): e = DummyVecEnv([lambda: self]) obs = e.reset() 返回 E、OBS class CustomFeaturesExtractor(BaseFeaturesExtractor): def __init__(self, observation_space): n_assets = (observation_space.spaces[“portfolio_data”].shape[0] - 3) // 2 ohlc_dim = observation_space.spaces[“ohlc_data”].shape[0] max_events = observation_space.spaces[“event_ids”].shape[0] economic_numeric_dim = observation_space.spaces[“economic_numeric”].shape[0] portfolio_dim = observation_space.spaces[“portfolio_data”].形状[0] features_dim = ohlc_dim + 2 * max_events + economic_numeric_dim + portfolio_dim super().__init__(observation_space, features_dim=features_dim) # num_embeddings=唯一数据的数量 self.event_embedding = nn.嵌入(num_embeddings=129, embedding_dim=max_events) self.currency_embedding = nn。嵌入(num_embeddings=6, embedding_dim=max_events) print(f“自定义功能提取器: n_assets={n_assets}, features_dim={features_dim}”) def forward(self, obs): ohlc_data = obs[“ohlc_data”] event_ids = obs[“event_ids”].to(torch.long) currency_ids = obs[“currency_ids”].to(torch.long) economic_numeric = obs[“economic_numeric”] portfolio_data = obs[“portfolio_data”] event_emb = self.event_embedding(event_ids).均值(dim=1) currency_emb = self.currency_embedding(currency_ids).mean(dim=1) 特征 = torch.cat([ohlc_data, event_emb, currency_emb, economic_numeric, portfolio_data], dim=1) 返回功能 类 CustomMultiInputPolicy(ActorCriticPolicy): def __init__(self, observation_space, action_space, lr_schedule, *args, **kwargs): super().__init__( observation_space, action_space, lr_schedule, features_extractor_class=CustomFeaturesExtractor, features_extractor_kwargs={}, net_arch=dict(pi=[64, 64], vf=[64, 64]), *args、 **kwargs ) features_dim = self.features_extractor.features_dim self.mlp_extractor = MlpExtractor( features_dim, net_arch=self.net_arch[“pi”], activation_fn=nn.ReLU , 设备=self.device ) self.action_net = nn.Linear(64, action_space.shape[0] * 3) # n_assets * 3 个动作 self.value_net = nn.线性(64, 1) # 初始化 timestep (替换 step_counter) self.num_timesteps = 0 def forward(self, obs, deterministic=False): # 在每次前向传递上增加时间步长 self.num_timesteps += 1 特征 = self.extract_features(OBS) latent_pi,latent_vf = self.mlp_extractor(特征) logits = self.action_net(latent_pi) # [批量, n_assets * 3] 值 = self.value_net(latent_vf) # 修复:对 MultiDiscrete 使用 Categorical logits = logits.view(-1, self.action_space.shape[0], 3) # [批量, n_assets, 3] 分布 = torch.distributions.Categorical(logits=logits) actions = distribution.sample() 如果不是确定性的 else distribution.mode # [批量, n_assets] log_prob = distribution.log_prob(作).sum(dim=-1, keepdim=True) # 条件调试打印以避免控制台泛洪 如果 self.num_timesteps % 1000 == 0: print(f“步骤 {self.num_timesteps}, 训练: {self.training}, Logits: {logits}”) print(f“步骤 {self.num_timesteps},作: {actions}”) 返回作、值log_prob def extract_features(自身,OBS): 返回self.features_extractor (OBS) def predict_values(自身, OBS): 特征 = self.extract_features(OBS) _, latent_vf = self.mlp_extractor(特征) 返回 self.value_net(latent_vf) ``` 我的 evaluate 函数是错误的吗? ``` # 训练评估函数 def evaluate(模型, env_vec, n_episodes=10): total_rewards = [] 对于 range(n_episodes) 中的 _: obs = env_vec.reset() 完成 = np.array([假] * env_vec.num_envs) episode_rewards = np.zeros(env_vec.num_envs) step_count = 0 max_steps = len(env_vec.envs[0].dt_datetime) while not np.all(done) 且 step_count < max_steps * 2: 作, _ = model.predict(obs, deterministic=True) step_result = env_vec.step(作) 如果 len(step_result) == 4: obs、rewards、done、info = step_result 还: obs、rewards、terminated、truncated、info = step_result 完成 = np.logical_or(终止,截断) episode_rewards += 奖励 step_count += 1 total_rewards.extend(episode_rewards) 返回 np.mean(total_rewards) # Optuna 的目标函数 def 目标(试用): # 定义超参数搜索空间 learning_rate = trial.suggest_float(“learning_rate”, 1e-5, 1e-3, log=True) n_steps = trial.suggest_int(“n_steps”, 1024, 8192, step=1024) total_timesteps = trial.suggest_int(“total_timesteps”, 500000, 2000000, step=100000) batch_size = trial.suggest_categorical(“batch_size”, [64, 128, 256, 512]) 伽玛 = trial.suggest_float(“伽玛”, 0.9, 0.9999) gae_lambda = trial.suggest_float(“gae_lambda”, 0.8, 0.99) clip_range = trial.suggest_float(“clip_range”, 0.1, 0.3) n_epochs = trial.suggest_int(“n_epochs”, 5, 20) # 在训练集上训练 PPO 模型 模型 = PPO( CustomMultiInputPolicy 和 train_env_vec, learning_rate=learning_rate、 n_steps=n_steps、 batch_size=batch_size, n_epochs=n_epochs, gamma=伽玛, gae_lambda=gae_lambda、 clip_range=clip_range、 verbose=1, # 0:训练时无输出, # 1:打印基础训练进度, # 2:更详细的输出(其他详细信息,如优化步骤、损失值(例如,策略损失、值损失)和学习率更新。 ) eval_callback = EarlyStoppingCallback( eval_env=val_env_vec、 eval_freq=10000, n_eval_episodes=5, patience=3, # 如果 3 次评估后没有改善,则停止 min_delta=0.01, # 要考虑的最小改进 详细 = 1 ) # print(model.policy) # 应该显示 in_features=95 的 mlp_extractor model.learn(total_timesteps=total_timesteps, callback=eval_callback) val_avg_reward = evaluate(model, val_env_vec, 10) # 评估函数 print(f“验证平均奖励:{val_avg_reward:.2f}”) return val_avg_reward # 最大化奖励 # 指定 SQLite 数据库文件 db_path = 'optuna_study.db' # 运行优化 研究 = optuna.create_study( study_name='OHLC_EconomicCalender_ppo_study', 存储=f'sqlite:///{db_path}', direction=“maximize”, load_if_exists=真 ) study.optimize(objective, n_trials=20) # 根据资源调整 Trial 数量 ``` 因为它 step() 返回一个总奖励,所以 evatuate() 将所有奖励合并total_rewards.extend(episode_rewards)。 还有一个问题是,即使没有买入和卖出动作,模型仍然会给予持续的利润奖励,让模型知道即使没有做出任何买入卖出动作,它仍然会有奖励。 这会导致: ``` 步骤 48138: 基础奖励 = -0.01,持续奖励 = 997.9378076899109, 总计 = 997.9278076899109, 余额 = 99800 资产 XAUUSD:作 = 2, 奖励 = -0.01, 持仓 = 0 步骤 48139: 基础奖励 = -0.01,持续奖励 = 997.9585384599407, 总计 = 997.9485384599407, 余额 = 99800 资产 XAUUSD:作 = 2,奖励 = 0,持仓 = 0 步骤 48140: 基础奖励 = 0,持续奖励 = 997.9792692299702, 总计 = 997.9792692299702, 余额 = 99800 最终测试平均奖金: 24021378.62 夏普比率:-0.10 最大回撤: -0.20% 胜率: 0.00% 利润因子: inf ``` 最终测试平均奖励高,但利润低,因为所有作 = 2。 evaluate() 正确吗?我应该编辑 Reward Make 模型 Learn on Actual Max Profit 策略吗?
Let’s analyze your concerns step-by-step and address whether your evaluate() function is correct, as well as the reward design issue causing the model to favor inaction (Action=2, "Nothing"). I’ll provide insights and suggestions to improve both the evaluation function and the reward mechanism to align with your goal of maximizing actual trading profits.
1. Is the evaluate() Function Correct?
Your evaluate() function is mostly correct in terms of structure and intent, but there are a few potential issues and improvements to consider, especially given the behavior you’re observing. Let’s break it down:
Current Implementation:
- Purpose: The function evaluates the trained PPO model over n_episodes episodes, accumulating rewards per episode and returning the mean total reward.
- Logic:
- Resets the vectorized environment (env_vec).
- Runs episodes until either done (terminated or truncated) or a safety limit (max_steps * 2) is reached.
- Uses model.predict() with deterministic=True to select actions.
- Accumulates rewards with episode_rewards += rewards.
- Extends total_rewards with episode rewards and computes the mean.
Observations:
- Reward Accumulation:
- You’re summing rewards over all steps in an episode (episode_rewards += rewards) and then extending total_rewards with these sums. This is fine for a vectorized environment with one environment (env_vec.num_envs = 1), but if env_vec contains multiple environments, episode_rewards is an array, and total_rewards.extend(episode_rewards) correctly flattens it into a list of per-episode rewards.
- The mean (np.mean(total_rewards)) assumes all episodes are equally weighted, which is standard practice.
- Step Result Handling:
- You handle both Gymnasium’s old 4-tuple (obs, rewards, done, info) and new 5-tuple (obs, rewards, terminated, truncated, info) return formats. This is good for compatibility.
- done = np.logical_or(terminated, truncated) correctly combines termination conditions for the vectorized environment.
- Termination Conditions:
- The loop exits when np.all(done) (all environments are done) or step_count < max_steps * 2. The * 2 multiplier is a safety buffer, but it could allow episodes to run longer than intended if done isn’t triggered correctly (e.g., due to a bug in step()).
Potential Issues:
- Infinite Loops: If done is never True (e.g., balance > 0 and current_step doesn’t reach len(dt_datetime) - 1), the max_steps * 2 cap prevents infinite loops, but it might mask issues in the environment’s termination logic.
- Reward Interpretation: The function assumes total_reward from step() represents meaningful performance. However, as you’ve noticed, the sustained reward dominates, skewing the evaluation toward inaction.
Verdict:
The evaluate() function is technically correct for computing the mean cumulative reward over episodes, but it’s not effectively evaluating trading performance because the reward design incentivizes doing nothing (Action=2). The issue lies more in the reward mechanism than the evaluation logic itself.
2. Reward Design Problem: Sustained Profit Reward Dominates
Your observation is spot-on: the sustained profit reward (self.balance * (self.current_step / len(self.dt_datetime)) * 0.01) gives a positive reward even when no trades are made (Action=2). This encourages the agent to avoid trading to maximize cumulative reward, as seen in your logs:
Step 48138: Base Reward=-0.01, Sustained Reward=997.9378076899109, Total=997.9278076899109, Balance=99800
Asset XAUUSD: Action=2, Reward=-0.01, Holding=0
...
Final Test Average Reward: 24021378.62
Sharpe Ratio: -0.10, Max Drawdown: -0.20%, Win Rate: 0.00%
Why This Happens:
- Sustained Reward Formula: self.balance * (self.current_step / len(self.dt_datetime)) * 0.01
- As current_step increases, this term grows linearly, even if balance remains constant (e.g., 99800).
- With no trades, balance doesn’t decrease (no transaction fees or losses), so the sustained reward accumulates to a large positive value over time.
- Base Reward: For Action=2, the base reward is small (-0.01 or 0), dwarfed by the sustained reward.
- Result: The agent learns that doing nothing (Action=2) maximizes total reward by avoiding losses and letting the sustained reward accrue.
Consequences:
- High Average Reward, No Profit: The Final Test Average Reward: 24021378.62 reflects the sustained reward piling up, not actual trading profits.
- Metrics Misalignment: Sharpe Ratio, Max Drawdown, and Win Rate indicate poor trading performance because no trades occur.
Intended Goal (Docstring Point 15):
"Reward, we want to incentivize profit that is sustained over long periods of time... delay rewarding the agent too fast in the early stages and allow it to explore sufficiently before optimizing a single strategy too deeply. It will also reward agents that maintain a higher balance for longer..."
The current implementation achieves the "delay rewarding too fast" part but fails to tie the reward to active trading profits. Instead, it rewards inaction.
3. Fixing the Reward Mechanism
To align the reward with your goal of maximizing trading profits, we need to:
- Tie Sustained Reward to Trading Activity: Reward sustained profits only when the agent actively trades and maintains profitable positions.
- Penalize Inaction: Discourage Action=2 when trading opportunities exist.
- Scale Rewards with Market Conditions: Use ATR (as mentioned in docstring point 18) to make rewards reflect volatility and risk.
Suggested Reward Redesign:
Modify the step() method’s reward calculation:
def step(self, actions):
self.current_step += 1
terminated = self.balance <= 0
truncated = self.current_step == len(self.dt_datetime) - 1
done = terminated or truncated
if done:
self.done_information += f"Episode: {self.episode} Balance: {self.balance} Step: {self.current_step}\n"
self.visualization = True
self.episode += 1
# Calculate base trading reward
base_reward = self._take_action(actions, done)
# Calculate unrealized profit from open positions
unrealized_profit = 0
atr_scaling = 0 # For market condition scaling
for i, asset in enumerate(self.assets):
atr = self.get_observation(self.current_step, i, "ATR") # Assume ATR is in observation_list
atr_scaling += atr # Sum ATR across assets for normalization
for tr in self.transaction_live:
if tr["Symbol"] == asset:
if tr["Type"] == 0: # Buy
unrealized = (self._c - tr["ActionPrice"]) * self.cf.symbol(asset, "point")
else: # Sell
unrealized = (tr["ActionPrice"] - self._c) * self.cf.symbol(asset, "point")
unrealized_profit += unrealized
atr_scaling = atr_scaling / len(self.assets) if atr_scaling > 0 else 1 # Avoid division by 0
# Sustained reward: only applies to unrealized/realized profits, scaled by ATR
sustained_reward = (unrealized_profit + base_reward) * (self.current_step / len(self.dt_datetime)) * 0.01 / atr_scaling
# Penalty for inaction if no positions are held
if not self.transaction_live and all(a == 2 for a in actions):
sustained_reward -= 0.1 # Small penalty to encourage exploration
total_reward = base_reward + sustained_reward
if self._day > self.current_day:
self.current_day = self._day
self.balance -= self.over_night_cash_penalty
if self.balance != 0:
self.max_draw_down_pct = abs(sum(self.max_draw_downs) / self.balance * 100)
obs = {...} # Same as before
info = {"Close": self.tranaction_close_this_step}
return obs, total_reward, terminated, truncated, info
Changes Explained:
- Unrealized Profit in Sustained Reward:
- Compute unrealized_profit from open positions and add it to base_reward to form the basis of sustained_reward.
- This ensures the sustained reward reflects trading performance, not just balance preservation.
- ATR Scaling (Docstring Point 18):
- Divide by average ATR to normalize rewards based on market volatility. High volatility (large ATR) reduces the reward multiplier, encouraging risk-adjusted strategies.
- Inaction Penalty:
- Apply a small negative reward (-0.1) when no positions are open and all actions are 2. This pushes the agent to explore trading rather than sit idle.
- Balance Adjustment:
- Keep base_reward for immediate trade outcomes (e.g., SL/PT hits, fees), while sustained_reward incentivizes maintaining profitable positions over time.
Expected Outcome:
- The agent will prioritize opening and holding profitable trades to maximize unrealized_profit and base_reward, which amplify sustained_reward.
- Inaction (Action=2) will be less appealing due to the penalty and lack of profit contribution.
4. Improving the evaluate() Function
While the evaluate() function is structurally sound, you can enhance it to better reflect trading performance:
Modified evaluate():
def evaluate(model, env_vec, n_episodes=10):
total_rewards = []
total_profits = [] # Track actual trading profit
for _ in range(n_episodes):
obs = env_vec.reset()
done = np.array([False] * env_vec.num_envs)
episode_rewards = np.zeros(env_vec.num_envs)
episode_profit = 0 # Sum of realized profits/losses
step_count = 0
max_steps = len(env_vec.envs[0].dt_datetime)
while not np.all(done) and step_count < max_steps:
action, _ = model.predict(obs, deterministic=True)
obs, rewards, terminated, truncated, info = env_vec.step(action)
done = np.logical_or(terminated, truncated)
episode_rewards += rewards
# Extract profit from closed transactions
for env_idx in range(env_vec.num_envs):
if info[env_idx]["Close"]:
for tr in info[env_idx]["Close"]:
episode_profit += tr["Reward"] # Add realized reward (profit/loss)
step_count += 1
total_rewards.extend(episode_rewards)
total_profits.append(episode_profit)
mean_reward = np.mean(total_rewards)
mean_profit = np.mean(total_profits)
print(f"Mean Reward: {mean_reward:.2f}, Mean Trading Profit: {mean_profit:.2f}")
return mean_profit # Return profit instead of reward for optimization
# Update objective to use profit
def objective(trial):
# ... (same as before)
model.learn(total_timesteps=total_timesteps, callback=eval_callback)
val_avg_profit = evaluate(model, val_env_vec, 10)
print(f"Validation Average Profit: {val_avg_profit:.2f}")
return val_avg_profit # Maximize profit, not raw reward
Changes:
- Track Trading Profit:
- Use info["Close"] to sum realized profits/losses from closed trades (tr["Reward"]).
- This separates actual trading performance from the sustained reward artifact.
- Return Mean Profit:
- Optimize for mean_profit instead of mean_reward in Optuna, aligning with your goal of maximizing trading gains.
- Tighter Step Limit:
- Changed max_steps * 2 to max_steps to match the environment’s intended length, assuming done triggers correctly.
5. Validation and Next Steps
Test the Changes:
- Run a small training session with the updated reward and evaluate() function.
- Check logs to ensure the agent takes Actions 0 and 1 more frequently and that total_reward correlates with trading profits.
Additional Metrics:
- Compute Sharpe Ratio, Max Drawdown, and Win Rate within evaluate() using transaction_history to validate trading performance.
Hyperparameter Tuning:
- With the new reward, re-run Optuna to find parameters that maximize val_avg_profit.
Conclusion
- Is evaluate() Correct?: Yes, but it was optimizing the wrong metric due to the reward design. The updated version focuses on trading profit.
- Reward Fix: Tie sustained reward to trading profits, scale with ATR, and penalize inaction to encourage a profit-maximizing strategy.
These changes should steer your model toward learning an effective trading strategy rather than exploiting the sustained reward loophole. Let me know if you’d like help implementing or debugging further!
Let’s address your questions one by one, providing detailed explanations and actionable solutions where needed.
1. Optuna Optimizes for mean_profit, Will the RL Model Optimize for mean_reward?
Explanation:
- Optuna’s Objective: In your updated objective() function, Optuna optimizes for val_avg_profit (returned by evaluate()), which is the mean trading profit extracted from closed transactions. This is an external evaluation metric used to tune hyperparameters.
- RL Model’s Objective: The reinforcement learning (RL) model (PPO in this case) optimizes for the cumulative reward it receives from the environment’s step() method, which is total_reward = base_reward + sustained_reward. The model doesn’t directly see or optimize for mean_profit—it learns based on the rewards defined in tgym.step().
Key Insight:
- If mean_profit (evaluation metric) and total_reward (RL training signal) are not aligned, the RL model might optimize a strategy that maximizes total_reward but doesn’t improve mean_profit. For example, if sustained_reward still dominates and rewards inaction, the model might avoid trading, even though Optuna is pushing for higher profits via hyperparameter tuning.
Resolution:
To ensure the RL model optimizes for trading profits (aligned with mean_profit):
- Modify the reward function in tgym.step() to closely reflect realized and unrealized trading profits, as we discussed previously.
- The sustained reward redesign I proposed ties sustained_reward to (unrealized_profit + base_reward), which should correlate better with mean_profit. This bridges the gap between what the RL model optimizes (total_reward) and what Optuna evaluates (mean_profit).
Verdict:
- Yes, the RL model optimizes for mean_reward (cumulative total_reward over an episode), not mean_profit directly. However, by redesigning sustained_reward to depend on trading profits, you align the two objectives, making the RL model indirectly optimize for profit-like behavior.
2. Sustained Reward Calculation Without ATR—Can RSI Be Used? Normalized RSI Concerns
Current Code:
unrealized_profit = 0
atr_scaling = 0
for i, asset in enumerate(self.assets):
atr = self.get_observation(self.current_step, i, "ATR") # Assume ATR is in observation_list
atr_scaling += atr
for tr in self.transaction_live:
if tr["Symbol"] == asset:
if tr["Type"] == 0: # Buy
unrealized = (self._c - tr["ActionPrice"]) * self.cf.symbol(asset, "point")
else: # Sell
unrealized = (tr["ActionPrice"] - self._c) * self.cf.symbol(asset, "point")
unrealized_profit += unrealized
atr_scaling = atr_scaling / len(self.assets) if atr_scaling > 0 else 1
sustained_reward = (unrealized_profit + base_reward) * (self.current_step / len(self.dt_datetime)) * 0.01 / atr_scaling
- Purpose of ATR: ATR (Average True Range) measures volatility. Dividing by atr_scaling normalizes the reward based on market conditions—higher volatility reduces the reward multiplier, encouraging risk-adjusted strategies.
Can RSI Replace ATR?
- RSI (Relative Strength Index): RSI measures momentum and overbought/oversold conditions (typically 0–100, or normalized in your case). Unlike ATR, it doesn’t directly measure volatility or price range, so it’s not a perfect substitute for scaling rewards by market risk.
- Feasibility: Yes, you can use RSI instead of ATR, but the interpretation changes:
- High RSI (e.g., >70) indicates overbought conditions, potentially signaling a reversal.
- Low RSI (e.g., <30) indicates oversold conditions.
- Using RSI as a scaling factor could penalize rewards in overbought/oversold markets, encouraging trades when RSI is moderate (e.g., 40–60).
Your Normalized RSI:
rolling_mean = ohlc_df['RSI'].rolling(window=100, min_periods=1).mean()
rolling_std = ohlc_df['RSI'].rolling(window=100, min_periods=1).std()
ohlc_df['RSI'] = (ohlc_df['RSI'] - rolling_mean) / rolling_std
ohlc_df['RSI'] = ohlc_df['RSI'].fillna(0)
- Normalization: Your RSI is standardized (z-score) over a 100-period rolling window, meaning it’s centered around 0 with a standard deviation of 1. Values might range from, say, -3 to +3, depending on market conditions, instead of the raw 0–100 scale.
Can Normalized RSI Be Used?
- Yes, but with adjustments:
- Problem: Dividing by a normalized RSI (which can be negative or near zero) could lead to invalid or unstable rewards (e.g., division by zero or negative scaling).
- Solution: Transform the normalized RSI to a positive, meaningful scaling factor. For example:
- Use the absolute value: rsi_scaling = abs(rsi) + 1 (ensures positivity, baseline of 1).
- Or map it to a volatility-like metric: rsi_scaling = 1 + (rsi ** 2) (squares emphasize extremes, always positive).
Modified Code with RSI:
unrealized_profit = 0
rsi_scaling = 0
for i, asset in enumerate(self.assets):
rsi = self.get_observation(self.current_step, i, "RSI") # Assume RSI is in observation_list
rsi_scaling += abs(rsi) + 1 # Positive scaling, +1 avoids zero
for tr in self.transaction_live:
if tr["Symbol"] == asset:
if tr["Type"] == 0: # Buy
unrealized = (self._c - tr["ActionPrice"]) * self.cf.symbol(asset, "point")
else: # Sell
unrealized = (tr["ActionPrice"] - self._c) * self.cf.symbol(asset, "point")
unrealized_profit += unrealized
rsi_scaling = rsi_scaling / len(self.assets) if rsi_scaling > 0 else 1
sustained_reward = (unrealized_profit + base_reward) * (self.current_step / len(self.dt_datetime)) * 0.01 / rsi_scaling
- Effect: Higher absolute RSI (indicating strong momentum) increases rsi_scaling, reducing the sustained reward multiplier. This encourages trading in less extreme conditions, though it’s less directly tied to volatility than ATR.
Normalization Concern:
- Normalized RSI is fine as long as you handle edge cases (e.g., zero or negative values). Raw RSI (0–100) could also work if you scale it (e.g., rsi_scaling = rsi / 100), but your normalized version is already in observation_list, so it’s practical to use it with the adjustment above.
3. How Does evaluate() Know the Model Made a Buy/Sell Decision to Extract Profit?
Current Code:
while not np.all(done) and step_count < max_steps:
action, _ = model.predict(obs, deterministic=True)
obs, rewards, terminated, truncated, info = env_vec.step(action)
done = np.logical_or(terminated, truncated)
episode_rewards += rewards
# Extract profit from closed transactions
for env_idx in range(env_vec.num_envs):
if info[env_idx]["Close"]:
for tr in info[env_idx]["Close"]:
episode_profit += tr["Reward"] # Add realized reward (profit/loss)
step_count += 1
How It Works:
- Model Decision: model.predict(obs, deterministic=True) outputs an action (e.g., [0, 2, 1] for three assets: Buy, Nothing, Sell). This reflects the model’s decision at each step.
- Environment Step: env_vec.step(action) executes the action in the environment and returns obs, rewards, terminated, truncated, info.
- In tgym.step(), the _take_action() method processes the action:
- If action == 0 (Buy) or 1 (Sell), it opens a new transaction (added to transaction_live).
- Existing transactions are checked for SL/PT hits or episode end (done=True), closing them and moving them to transaction_history and tranaction_close_this_step.
- The info dictionary contains "Close": self.tranaction_close_this_step, a list of transactions closed during this step.
- In tgym.step(), the _take_action() method processes the action:
- Profit Extraction:
- info[env_idx]["Close"] is a list of closed transactions for environment env_idx.
- Each transaction tr has a "Reward" field, which includes the realized profit/loss (e.g., PT, -SL, or final close value minus fees).
- episode_profit += tr["Reward"] sums these realized rewards, tracking actual trading profit.
How It Knows Buy/Sell Happened:
- The model doesn’t need to explicitly “tell” evaluate() about buy/sell decisions. The environment (tgym) handles this:
- Buy/Sell actions lead to open transactions.
- Closed transactions (due to SL, PT, or episode end) are recorded in info["Close"].
- evaluate() only looks at closed transactions, so it captures profit/loss after the model’s decisions play out.
Verification:
- If info[env_idx]["Close"] is empty, no transactions closed this step (e.g., no SL/PT hit, or all actions were 2).
- If the model only chooses Action=2, transaction_live remains empty, and info["Close"] will be empty unless forced closure occurs at episode end.
Potential Issue:
- If no transactions close (e.g., all positions stay open until done), episode_profit might underrepresent performance. You could add unrealized profit tracking if desired.
4. Implementing transaction_history
Current Usage:
- transaction_history is a list in tgym that stores all closed transactions (moved from transaction_live via _manage_tranaction()).
- It’s updated in:
- _manage_tranaction() (SL/PT hits or forced closure).
- _limit_order_process() (expired/canceled limit orders).
Enhancing transaction_history:
To make it more useful (e.g., for metrics like Sharpe Ratio), ensure it captures all relevant data and add a method to analyze it.
Modified _manage_tranaction:
def _manage_tranaction(self, tr, _p, close_price, status=1):
self.transaction_live.remove(tr)
tr["ClosePrice"] = close_price
tr["Point"] = int(_p)
tr["Reward"] = int(tr["Reward"] + _p) # Realized profit/loss
tr["Status"] = status # 1=SL/PT, 2=Forced close, 3=Canceled limit, 4=Expired limit
tr["CloseTime"] = self._t
tr["CloseStep"] = self.current_step
self.balance += int(tr["Reward"])
self.total_equity -= int(abs(tr["Reward"]))
self.tranaction_close_this_step.append(tr)
self.transaction_history.append(tr)
- Added: "CloseStep" to track when it closed.
Method to Analyze transaction_history:
Add this to tgym for post-episode analysis:
def analyze_transaction_history(self):
if not self.transaction_history:
return {"trades": 0, "win_rate": 0.0, "profit_factor": 0.0, "sharpe_ratio": 0.0}
trades = len(self.transaction_history)
rewards = [tr["Reward"] for tr in self.transaction_history]
wins = sum(1 for r in rewards if r > 0)
losses = sum(1 for r in rewards if r < 0)
gross_profit = sum(r for r in rewards if r > 0)
gross_loss = abs(sum(r for r in rewards if r < 0))
win_rate = wins / trades if trades > 0 else 0.0
profit_factor = gross_profit / gross_loss if gross_loss > 0 else float("inf")
# Sharpe Ratio (simplified, assumes risk-free rate = 0)
returns = np.array(rewards, dtype=np.float32)
sharpe_ratio = np.mean(returns) / np.std(returns) if np.std(returns) > 0 else 0.0
return {
"trades": trades,
"win_rate": win_rate,
"profit_factor": profit_factor,
"sharpe_ratio": sharpe_ratio,
"total_profit": sum(rewards)
}
Integration in evaluate():
def evaluate(model, env_vec, n_episodes=10):
total_rewards = []
total_profits = []
metrics = []
for _ in range(n_episodes):
obs = env_vec.reset()
done = np.array([False] * env_vec.num_envs)
episode_rewards = np.zeros(env_vec.num_envs)
episode_profit = 0
step_count = 0
max_steps = len(env_vec.envs[0].dt_datetime)
while not np.all(done) and step_count < max_steps:
action, _ = model.predict(obs, deterministic=True)
obs, rewards, terminated, truncated, info = env_vec.step(action)
done = np.logical_or(terminated, truncated)
episode_rewards += rewards
for env_idx in range(env_vec.num_envs):
if info[env_idx]["Close"]:
for tr in info[env_idx]["Close"]:
episode_profit += tr["Reward"]
step_count += 1
total_rewards.extend(episode_rewards)
total_profits.append(episode_profit)
metrics.append(env_vec.envs[0].analyze_transaction_history()) # Assumes 1 env
mean_reward = np.mean(total_rewards)
mean_profit = np.mean(total_profits)
avg_metrics = {
k: np.mean([m[k] for m in metrics]) for k in metrics[0].keys()
}
print(f"Mean Reward: {mean_reward:.2f}, Mean Profit: {mean_profit:.2f}")
print(f"Metrics: {avg_metrics}")
return mean_profit
Output:
- transaction_history now tracks all closed trades with detailed metadata.
- analyze_transaction_history() provides trading metrics, which evaluate() averages across episodes.
Summary
- Optuna vs. RL: Optuna optimizes mean_profit, RL optimizes total_reward. Align them by tying sustained_reward to profits.
- RSI Instead of ATR: Yes, use abs(rsi) + 1 for scaling. Your normalized RSI works with this adjustment.
- Profit Extraction: evaluate() knows buy/sell outcomes via info["Close"], populated by tgym.step().
- Transaction History: Enhanced with more fields and an analysis method, integrated into evaluate().
Let me know if you need further clarification or code adjustments!
Let’s address your questions with detailed explanations and complete code solutions.
1. Training the Final Best Model After Optuna and Evaluating It
Process Overview:
- After Optuna Tuning: Optuna identifies the best hyperparameters by maximizing mean_profit. You then train a final model using these parameters on the full training dataset (or a combined train+validation set) to create the "best model."
- Evaluation: Yes, you should evaluate the final model on a separate test dataset to assess its generalization performance. Using the same evaluate() function is appropriate, as it provides consistent metrics (e.g., mean_profit, trading metrics) across tuning and final testing.
Steps:
- Retrieve the best hyperparameters from the Optuna study.
- Train the PPO model with these parameters on the training environment.
- Evaluate the trained model on a test environment using evaluate().
Full Code:
import optuna
from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import EarlyStoppingCallback
import numpy as np
# Assuming train_env_vec, val_env_vec, and test_env_vec are predefined DummyVecEnv instances
# and CustomMultiInputPolicy is defined as in your original code
# Evaluation function (from previous response)
def evaluate(model, env_vec, n_episodes=10):
total_rewards = []
total_profits = []
metrics = []
for _ in range(n_episodes):
obs = env_vec.reset()
done = np.array([False] * env_vec.num_envs)
episode_rewards = np.zeros(env_vec.num_envs)
episode_profit = 0
step_count = 0
max_steps = len(env_vec.envs[0].dt_datetime)
while not np.all(done) and step_count < max_steps:
action, _ = model.predict(obs, deterministic=True)
obs, rewards, terminated, truncated, info = env_vec.step(action)
done = np.logical_or(terminated, truncated)
episode_rewards += rewards
for env_idx in range(env_vec.num_envs):
if info[env_idx]["Close"]:
for tr in info[env_idx]["Close"]:
episode_profit += tr["Reward"]
step_count += 1
total_rewards.extend(episode_rewards)
total_profits.append(episode_profit)
metrics.append(env_vec.envs[0].analyze_transaction_history())
mean_reward = np.mean(total_rewards)
mean_profit = np.mean(total_profits)
avg_metrics = {k: np.mean([m[k] for m in metrics]) for k in metrics[0].keys()}
print(f"Mean Reward: {mean_reward:.2f}, Mean Profit: {mean_profit:.2f}")
print(f"Metrics: {avg_metrics}")
return mean_profit
# Load the Optuna study and get best parameters
db_path = 'optuna_study.db'
study = optuna.load_study(study_name='OHLC_EconomicCalender_ppo_study', storage=f'sqlite:///{db_path}')
best_params = study.best_params
print("Best Hyperparameters:", best_params)
# Train the final model with best parameters
final_model = PPO(
CustomMultiInputPolicy,
train_env_vec, # Use full training environment
learning_rate=best_params["learning_rate"],
n_steps=best_params["n_steps"],
batch_size=best_params["batch_size"],
n_epochs=best_params["n_epochs"],
gamma=best_params["gamma"],
gae_lambda=best_params["gae_lambda"],
clip_range=best_params["clip_range"],
verbose=1,
tensorboard_log="./ppo_fx_trading_tensorboard/" # Optional: for logging
)
# Early stopping callback (optional, adjust as needed)
eval_callback = EarlyStoppingCallback(
eval_env=val_env_vec,
eval_freq=10000,
n_eval_episodes=5,
patience=3,
min_delta=0.01,
verbose=1
)
# Train the final model
final_model.learn(total_timesteps=best_params["total_timesteps"], callback=eval_callback)
# Save the final model
final_model.save("final_ppo_fx_trading_model")
# Evaluate on test data
print("\nEvaluating Final Model on Test Data:")
test_avg_profit = evaluate(final_model, test_env_vec, n_episodes=10)
print(f"Test Average Profit: {test_avg_profit:.2f}")
# Optional: Load and re-evaluate to verify
loaded_model = PPO.load("final_ppo_fx_trading_model")
test_avg_profit_loaded = evaluate(loaded_model, test_env_vec, n_episodes=10)
print(f"Test Average Profit (Loaded Model): {test_avg_profit_loaded:.2f}")
Explanation:
- Best Parameters: Retrieved from the Optuna study using study.best_params.
- Training: The PPO model is instantiated with the best hyperparameters and trained on train_env_vec. You can include the validation set here if you want a larger training dataset for the final model.
- Evaluation: The same evaluate() function is used on test_env_vec to assess performance on unseen data. This ensures consistency with the tuning phase.
- Saving/Loading: The model is saved for future use and reloaded to verify integrity.
Notes:
- Ensure train_env_vec, val_env_vec, and test_env_vec are properly defined (e.g., split your dataset into train/validation/test beforehand).
- Adjust n_episodes in evaluate() based on your test set size and desired statistical robustness.
2. Using ATR for sustained_reward with pandas_ta
Goal:
Replace the placeholder ATR in sustained_reward with a real calculation using pandas_ta and decide whether normalization is needed.
Code to Calculate ATR:
import pandas as pd
import pandas_ta as ta
# Assuming self.df contains OHLC data with columns: Open, High, Low, Close, asset_col, time_col
def calculate_atr(self):
# Group by asset to calculate ATR for each trading pair
atr_dfs = []
for asset in self.assets:
asset_df = self.df[self.df[self.asset_col] == asset][["Open", "High", "Low", "Close"]].copy()
# Calculate ATR with default period=14
asset_df["ATR"] = ta.atr(
high=asset_df["High"],
low=asset_df["Low"],
close=asset_df["Close"],
length=14
)
asset_df["asset"] = asset
asset_df[self.time_col] = self.df[self.df[self.asset_col] == asset][self.time_col]
atr_dfs.append(asset_df[[self.time_col, "asset", "ATR"]])
# Combine ATR data into the main DataFrame
atr_df = pd.concat(atr_dfs).set_index(self.time_col)
self.df = self.df.merge(atr_df, left_index=True, right_index=True, how="left")
self.df["ATR"] = self.df["ATR"].fillna(method="ffill") # Forward fill NaNs
# Update observation_list to include ATR
if "ATR" not in self.observation_list:
self.observation_list.append("ATR")
# Recache observations with ATR
self.cached_ohlc_data = [self.get_observation_vector(_dt) for _dt in self.dt_datetime]
# Call this in __init__ after setting up self.df
def __init__(self, df, event_map, currency_map, env_config_file="./neo_finrl/env_fx_trading/config/gdbusd-test-1.json"):
# ... (existing init code)
self.calculate_atr()
# ... (rest of init code)
# Updated step() with ATR
def step(self, actions):
self.current_step += 1
terminated = self.balance <= 0
truncated = self.current_step == len(self.dt_datetime) - 1
done = terminated or truncated
if done:
self.done_information += f"Episode: {self.episode} Balance: {self.balance} Step: {self.current_step}\n"
self.visualization = True
self.episode += 1
base_reward = self._take_action(actions, done)
# Calculate unrealized profit and ATR scaling
unrealized_profit = 0
atr_scaling = 0
for i, asset in enumerate(self.assets):
atr = self.get_observation(self.current_step, i, "ATR")
atr_scaling += atr
for tr in self.transaction_live:
if tr["Symbol"] == asset:
if tr["Type"] == 0: # Buy
unrealized = (self._c - tr["ActionPrice"]) * self.cf.symbol(asset, "point")
else: # Sell
unrealized = (tr["ActionPrice"] - self._c) * self.cf.symbol(asset, "point")
unrealized_profit += unrealized
atr_scaling = atr_scaling / len(self.assets) if atr_scaling > 0 else 1
sustained_reward = (unrealized_profit + base_reward) * (self.current_step / len(self.dt_datetime)) * 0.01 / atr_scaling
if not self.transaction_live and all(a == 2 for a in actions):
sustained_reward -= 0.1
total_reward = base_reward + sustained_reward
if self._day > self.current_day:
self.current_day = self._day
self.balance -= self.over_night_cash_penalty
if self.balance != 0:
self.max_draw_down_pct = abs(sum(self.max_draw_downs) / self.balance * 100)
obs = {
"ohlc_data": np.array(self.cached_ohlc_data[self.current_step], dtype=np.float32),
"event_ids": self.cached_economic_data[self.current_step]["event_ids"],
"currency_ids": self.cached_economic_data[self.current_step]["currency_ids"],
"economic_numeric": self.cached_economic_data[self.current_step]["numeric"],
"portfolio_data": np.array(
[self.balance, self.total_equity, self.max_draw_down_pct] + self.current_holding + self.current_draw_downs,
dtype=np.float32
)
}
info = {"Close": self.tranaction_close_this_step}
return obs, total_reward, terminated, truncated, info
Explanation:
- ATR Calculation: pandas_ta.atr() computes ATR using High, Low, and Close prices with a default period of 14. It’s calculated per asset and merged into self.df.
- Integration: Added to __init__ to preprocess data, updating observation_list and cached_ohlc_data.
- Step Update: No change needed beyond ensuring "ATR" is accessible via get_observation().
Should You Normalize ATR?
- Raw ATR: ATR is in the same units as price (e.g., pips or points) and naturally reflects volatility. It’s typically not normalized because:
- It’s already a relative measure of price movement.
- Normalizing (e.g., z-score) could distort its meaning across assets with different price scales.
- Your Context: Since atr_scaling averages ATR across assets and prevents division by zero, raw ATR works well as is. Normalization isn’t necessary unless:
- You’re comparing ATR across vastly different assets (e.g., XAUUSD vs. EURUSD) and want scale invariance.
- You observe reward instability due to ATR magnitude differences.
- Recommendation: Don’t normalize ATR unless testing shows reward scaling issues. If needed, use a simple scaling (e.g., atr / max_atr_per_asset) instead of z-score.
3. Appending analyze_transaction_history() to a Log File
Goal:
Create a log file early and append analyze_transaction_history() results during evaluate() for real-time monitoring.
Modified Code:
import os
import csv
from datetime import datetime
# In tgym class
def __init__(self, df, event_map, currency_map, env_config_file="./neo_finrl/env_fx_trading/config/gdbusd-test-1.json"):
# ... (existing init code)
self.log_file = f"transaction_analysis_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
# Initialize log file with header
with open(self.log_file, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=["timestamp", "episode", "trades", "win_rate", "profit_factor", "sharpe_ratio", "total_profit"])
writer.writeheader()
def analyze_transaction_history(self):
if not self.transaction_history:
metrics = {"trades": 0, "win_rate": 0.0, "profit_factor": 0.0, "sharpe_ratio": 0.0, "total_profit": 0.0}
else:
trades = len(self.transaction_history)
rewards = [tr["Reward"] for tr in self.transaction_history]
wins = sum(1 for r in rewards if r > 0)
losses = sum(1 for r in rewards if r < 0)
gross_profit = sum(r for r in rewards if r > 0)
gross_loss = abs(sum(r for r in rewards if r < 0))
win_rate = wins / trades if trades > 0 else 0.0
profit_factor = gross_profit / gross_loss if gross_loss > 0 else float("inf")
returns = np.array(rewards, dtype=np.float32)
sharpe_ratio = np.mean(returns) / np.std(returns) if np.std(returns) > 0 else 0.0
total_profit = sum(rewards)
metrics = {
"trades": trades,
"win_rate": win_rate,
"profit_factor": profit_factor,
"sharpe_ratio": sharpe_ratio,
"total_profit": total_profit
}
# Append to log file
with open(self.log_file, 'a', newline='') as f:
writer = csv.DictWriter(f, fieldnames=["timestamp", "episode", "trades", "win_rate", "profit_factor", "sharpe_ratio", "total_profit"])
metrics["timestamp"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
metrics["episode"] = self.episode
writer.writerow(metrics)
return metrics
# Updated evaluate()
def evaluate(model, env_vec, n_episodes=10):
total_rewards = []
total_profits = []
metrics = []
for ep in range(n_episodes):
obs = env_vec.reset()
done = np.array([False] * env_vec.num_envs)
episode_rewards = np.zeros(env_vec.num_envs)
episode_profit = 0
step_count = 0
max_steps = len(env_vec.envs[0].dt_datetime)
while not np.all(done) and step_count < max_steps:
action, _ = model.predict(obs, deterministic=True)
obs, rewards, terminated, truncated, info = env_vec.step(action)
done = np.logical_or(terminated, truncated)
episode_rewards += rewards
for env_idx in range(env_vec.num_envs):
if info[env_idx]["Close"]:
for tr in info[env_idx]["Close"]:
episode_profit += tr["Reward"]
step_count += 1
total_rewards.extend(episode_rewards)
total_profits.append(episode_profit)
metrics.append(env_vec.envs[0].analyze_transaction_history()) # Logs to file here
mean_reward = np.mean(total_rewards)
mean_profit = np.mean(total_profits)
avg_metrics = {k: np.mean([m[k] for m in metrics]) for k in metrics[0].keys()}
print(f"Mean Reward: {mean_reward:.2f}, Mean Profit: {mean_profit:.2f}")
print(f"Average Metrics: {avg_metrics}")
return mean_profit
Explanation:
- Log File Creation: Initialized in __init__ with a timestamped filename and a CSV header.
- Appending Data: analyze_transaction_history() writes each episode’s metrics to the log file with a timestamp and episode number.
- Real-Time Monitoring: You can open transaction_analysis_log_YYYYMMDD_HHMMSS.csv in a text editor or CSV viewer (e.g., Excel) during evaluation. For true real-time updates, ensure the file is flushed (newline='' helps), and use a tool that auto-refreshes (e.g., tail -f in a terminal if converted to plain text).
Sample Log Output:
timestamp,episode,trades,win_rate,profit_factor,sharpe_ratio,total_profit
2025-03-12 10:00:01,1,5,0.6,2.5,1.2,150
2025-03-12 10:00:05,2,3,0.33,1.0,0.5,-10
Summary
- Final Model Training: Use best Optuna parameters, train on train_env_vec, and evaluate on test_env_vec with evaluate().
- ATR for Sustained Reward: Implemented with pandas_ta, no normalization needed unless scaling issues arise.
- Logging Transaction History: Appends to a CSV log file, viewable in real-time during evaluate().
Let me know if you need further refinements or debugging assistance!
Let’s dive into your questions with detailed explanations and provide actionable code solutions where needed. I’ll address each question step-by-step to ensure clarity and practicality.
1. Should I Use Train + Validation Dataset for the Final Model? What About Validation and Test Datasets?
Understanding the Context:
- During Optuna Tuning: You split your data into three sets:
- Train Dataset: Used to train the PPO model (train_env_vec) in each trial.
- Validation Dataset: Used to evaluate the model (val_env_vec) via evaluate() to guide Optuna’s hyperparameter optimization.
- Test Dataset: Reserved for final evaluation (test_env_vec) to assess generalization on unseen data.
- Post-Tuning (Final Model): After Optuna identifies the best hyperparameters, you train the "final best model." The question is how to use these datasets effectively.
Options for Final Model Training:
- Train on Train Dataset Only:
- Use train_env_vec alone for consistency with Optuna’s training phase.
- Evaluate on val_env_vec during training (e.g., via EarlyStoppingCallback) to monitor progress.
- Reserve test_env_vec for final evaluation and QuantStats reporting.
- Pros: Maintains the same training data as during tuning, avoids overfitting to validation data.
- Cons: Doesn’t leverage the validation data for training, potentially underutilizing available data.
- Train on Train + Validation Dataset:
- Combine train_env_vec and val_env_vec into a single environment for final training.
- Skip validation during training (or use a small holdout from the combined set).
- Evaluate on test_env_vec for final performance and QuantStats.
- Pros: Maximizes training data, potentially improving model robustness.
- Cons: Risks overfitting to the validation set, which was used to select hyperparameters, reducing the ability to detect overfitting during training.
Best Practice Recommendation:
- Use Train Dataset Only for Final Training:
- Reason: During Optuna tuning, the validation set (val_env_vec) was used to evaluate and select the best hyperparameters. If you include it in final training, you might overfit to the validation data, inflating performance estimates. Keeping the training data consistent with the tuning phase ensures the model learns from the same distribution it was optimized for.
- Validation Role: Use val_env_vec during training with EarlyStoppingCallback to prevent overfitting and stop when performance plateaus.
- Test Role: Use test_env_vec for final evaluation and QuantStats reporting to assess true generalization on unseen data.
Updated Code with QuantStats:
Here’s how to train the final model and evaluate it with QuantStats:
import optuna
from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import EvalCallback
import numpy as np
import pandas as pd
import quantstats as qs
# Assuming train_env_vec, val_env_vec, test_env_vec are predefined
# and CustomMultiInputPolicy, tgym are defined
# Evaluation function (with returns for QuantStats)
def evaluate(model, env_vec, n_episodes=10):
total_rewards = []
total_profits = []
metrics = []
returns = [] # For QuantStats
for ep in range(n_episodes):
obs = env_vec.reset()
done = np.array([False] * env_vec.num_envs)
episode_rewards = np.zeros(env_vec.num_envs)
episode_profit = 0
step_count = 0
max_steps = len(env_vec.envs[0].dt_datetime)
episode_returns = []
while not np.all(done) and step_count < max_steps:
action, _ = model.predict(obs, deterministic=True)
obs, rewards, terminated, truncated, info = env_vec.step(action)
done = np.logical_or(terminated, truncated)
episode_rewards += rewards
for env_idx in range(env_vec.num_envs):
if info[env_idx]["Close"]:
for tr in info[env_idx]["Close"]:
episode_profit += tr["Reward"]
episode_returns.append(tr["Reward"]) # Track per-trade returns
step_count += 1
total_rewards.extend(episode_rewards)
total_profits.append(episode_profit)
metrics.append(env_vec.envs[0].analyze_transaction_history())
returns.extend(episode_returns if episode_returns else [episode_profit]) # Fallback to total profit
mean_reward = np.mean(total_rewards)
mean_profit = np.mean(total_profits)
avg_metrics = {k: np.mean([m[k] for m in metrics]) for k in metrics[0].keys()}
print(f"Mean Reward: {mean_reward:.2f}, Mean Profit: {mean_profit:.2f}")
print(f"Metrics: {avg_metrics}")
# QuantStats reporting (for test evaluation)
if "test" in env_vec.envs[0].log_filename: # Hack to detect test env
returns_series = pd.Series(returns, index=pd.date_range(start="2025-03-12", periods=len(returns), freq="D"))
qs.reports.html(returns_series, output="quantstats_report.html", title="FX Trading Performance")
return mean_profit
# Load best parameters from Optuna
db_path = 'optuna_study.db'
study = optuna.load_study(study_name='OHLC_EconomicCalender_ppo_study', storage=f'sqlite:///{db_path}')
best_params = study.best_params
print("Best Hyperparameters:", best_params)
# Train final model on training data only
final_model = PPO(
CustomMultiInputPolicy,
train_env_vec,
learning_rate=best_params["learning_rate"],
n_steps=best_params["n_steps"],
batch_size=best_params["batch_size"],
n_epochs=best_params["n_epochs"],
gamma=best_params["gamma"],
gae_lambda=best_params["gae_lambda"],
clip_range=best_params["clip_range"],
verbose=1,
tensorboard_log="./tensorboard_logs/"
)
# Early stopping on validation set
eval_callback = EarlyStoppingCallback(
eval_env=val_env_vec,
eval_freq=10000,
n_eval_episodes=5,
patience=3,
min_delta=0.01,
verbose=1
)
# Train the final model
final_model.learn(total_timesteps=best_params["total_timesteps"], callback=eval_callback)
final_model.save("ppo_xauusd_optimized")
# Evaluate on test data with QuantStats
print("\nEvaluating Final Model on Test Data:")
test_avg_profit = evaluate(final_model, test_env_vec, n_episodes=10)
print(f"Test Average Profit: {test_avg_profit:.2f}")
# Clean up
train_env_vec.close()
val_env_vec.close()
test_env_vec.close()
- Training: Uses train_env_vec only.
- Validation: EarlyStoppingCallback evaluates on val_env_vec.
- Test: evaluate() on test_env_vec generates a QuantStats HTML report (quantstats_report.html) with metrics like Sharpe Ratio, based on per-trade returns.
2. What Does EarlyStoppingCallback Do? Is It Working Correctly?
Purpose:
- EarlyStoppingCallback: Inherits from EvalCallback in Stable-Baselines3 to periodically evaluate the model on a separate environment (val_env_vec) and stop training if performance doesn’t improve.
- How It Works:
- Evaluates every eval_freq steps (e.g., 10,000) over n_eval_episodes (e.g., 5).
- Tracks the best mean reward (best_reward) and counts evaluations without improvement (no_improvement_count).
- Stops training if no_improvement_count reaches patience (e.g., 3) and the improvement is less than min_delta (e.g., 0.01).
Code Analysis:
class EarlyStoppingCallback(EvalCallback):
def __init__(self, eval_env, eval_freq, n_eval_episodes, patience, min_delta, verbose=0):
super().__init__(eval_env=eval_env, eval_freq=eval_freq, n_eval_episodes=n_eval_episodes, verbose=verbose, deterministic=True)
self.patience = patience
self.min_delta = min_delta
self.best_reward = -float('inf')
self.no_improvement_count = 0
def _on_step(self):
continue_training = super()._on_step()
if not continue_training:
return False
if self.last_mean_reward is not None:
current_reward = self.last_mean_reward
if current_reward > self.best_reward + self.min_delta:
self.best_reward = current_reward
self.no_improvement_count = 0
if self.verbose > 0:
print(f"New best reward: {self.best_reward:.2f}")
else:
self.no_improvement_count += 1
if self.verbose > 0:
print(f"No improvement for {self.no_improvement_count}/{self.patience} evaluations")
if self.no_improvement_count >= self.patience:
if self.verbose > 0:
print(f"Early stopping triggered after {self.patience} evaluations without improvement")
return False
return True
- Correctness: Yes, it works as intended:
- super()._on_step() triggers evaluation at eval_freq intervals, setting self.last_mean_reward.
- Compares current_reward to best_reward + min_delta to reset or increment no_improvement_count.
- Returns False to stop training when patience is exceeded.
- Potential Issue: If evaluate() returns mean_profit but the model optimizes total_reward, the callback might stop based on profit rather than reward. Ensure alignment (e.g., modify evaluate() to return mean_reward for consistency during training).
Fix for Alignment:
# In evaluate(), add return_mean_reward option
def evaluate(model, env_vec, n_episodes=10, return_mean_reward=False):
# ... (existing code)
return mean_reward if return_mean_reward else mean_profit
# Update EarlyStoppingCallback to use mean_reward
eval_callback = EarlyStoppingCallback(
eval_env=val_env_vec,
eval_freq=10000,
n_eval_episodes=5,
patience=3,
min_delta=0.01,
verbose=1,
callback_on_new_best=None,
eval_callback=lambda model, env: evaluate(model, env, n_eval_episodes=5, return_mean_reward=True)
)
- Note: This requires modifying EvalCallback to accept a custom evaluation function, or adjusting evaluate() to align with reward during training.
3. Fine-Tune Epochs or Fix Them? Can Callback Prevent Overfitting?
Options:
- Fine-Tune n_epochs with Optuna:
- Pros: Optimizes the number of epochs for your specific problem.
- Cons: Increases tuning complexity and computation time.
- Fix n_epochs (e.g., 50 or 100):
- Pros: Simpler, reduces tuning variables.
- Cons: Risk of underfitting (too low) or overfitting (too high).
- Set Large n_epochs + Early Stopping:
- Pros: Lets the model train until optimal, stopping automatically via callback.
- Cons: Requires a reliable callback metric.
Recommendation:
- Set a Large n_epochs (e.g., 100) and Use Early Stopping:
- Reason: RL models like PPO can benefit from multiple epochs per update, but overfitting is a risk. EarlyStoppingCallback monitors validation performance and stops training when improvement stalls, effectively preventing overfitting.
- Callback Limitation: It stops training but doesn’t revert to a previous model state (e.g., before overfitting). Stable-Baselines3 saves the best model during EvalCallback if best_model_save_path is set.
Adjusted Code:
final_model = PPO(
# ... (other params)
n_epochs=100 # Large fixed value
)
eval_callback = EarlyStoppingCallback(
eval_env=val_env_vec,
eval_freq=10000,
n_eval_episodes=5,
patience=3,
min_delta=0.01,
verbose=1,
best_model_save_path="./best_model/" # Save best model
)
final_model.learn(total_timesteps=best_params["total_timesteps"], callback=eval_callback)
final_model = PPO.load("./best_model/best_model") # Load best model post-training
- Outcome: Training stops at the optimal point, and you load the best model, avoiding overfitting.
4. Adjusting n_episodes vs. n_epochs
Definitions:
- n_epochs: Number of passes over the collected data (e.g., n_steps timesteps) during each PPO update. It’s a training hyperparameter.
- n_episodes: Number of complete episodes run during evaluation in evaluate(). It’s an evaluation parameter, not directly tied to training.
Are They the Same?
- No:
- n_epochs controls how many times the model updates its policy per rollout (e.g., 10 epochs over 2048 steps).
- n_episodes determines how many full episodes are simulated to compute metrics like mean_profit.
How to Adjust n_episodes?
- Guideline: Set n_episodes based on statistical reliability:
- Small (e.g., 5–10): Quick evaluation during tuning or early stopping.
- Larger (e.g., 20–50): Final test evaluation for robust averages (e.g., QuantStats).
- Example: Use 5 for EarlyStoppingCallback (fast feedback) and 20 for test evaluation (better precision).
Code Adjustment:
# Early stopping: 5 episodes
eval_callback = EarlyStoppingCallback(eval_env=val_env_vec, eval_freq=10000, n_eval_episodes=5, patience=3, min_delta=0.01, verbose=1)
# Test evaluation: 20 episodes
test_avg_profit = evaluate(final_model, test_env_vec, n_episodes=20)
5. Where to Place self.calculate_atr() in __init__()? Save self.df to CSV?
Placement:
- Where: After initializing self.df and setting up observation_list, but before caching cached_ohlc_data. This ensures ATR is calculated and included in the observations.
- Why: calculate_atr() modifies self.df and observation_list, which are used in get_observation_vector().
Updated __init__() with CSV Save:
def __init__(self, df, event_map, currency_map, env_config_file="./neo_finrl/env_fx_trading/config/gdbusd-test-1.json"):
assert df.ndim == 2
super(tgym, self).__init__()
self.cf = EnvConfig(env_config_file)
self.observation_list = self.cf.env_parameters("observation_list")
# Economic data mappings
self.event_map = event_map
self.currency_map = currency_map
self.max_events = 8
self.df = df.copy()
if 'events' not in self.df.columns:
raise ValueError("DataFrame must contain an 'events' column")
def parse_events(x):
if isinstance(x, str):
try:
parsed = ast.literal_eval(x)
return parsed if isinstance(parsed, list) else []
except (ValueError, SyntaxError):
return []
return x if isinstance(x, list) else []
self.df['events'] = self.df['events'].apply(parse_events)
if not isinstance(self.df['events'].iloc[0], list):
raise ValueError("'events' must be a list")
if self.df['events'].iloc[0] and not isinstance(self.df['events'].iloc[0][0], dict):
raise ValueError("Elements in 'events' must be dictionaries")
self.balance_initial = self.cf.env_parameters("balance")
self.over_night_cash_penalty = self.cf.env_parameters("over_night_cash_penalty")
self.asset_col = self.cf.env_parameters("asset_col")
self.time_col = self.cf.env_parameters("time_col")
self.random_start = self.cf.env_parameters("random_start")
self.log_filename = (
self.cf.env_parameters("log_filename")
+ datetime.datetime.now().strftime("%Y%m%d%H%M%S")
+ ".csv"
)
self.analyze_transaction_history_log_filename = (
"transaction_history_log"
+ datetime.datetime.now().strftime("%Y%m%d%H%M%S")
+ ".csv"
)
self.df["_time"] = self.df[self.time_col]
self.df["_day"] = self.df["weekday"]
self.assets = self.df[self.asset_col].unique()
self.dt_datetime = self.df[self.time_col].sort_values().unique()
self.df = self.df.set_index(self.time_col)
self.visualization = False
# Calculate ATR and save DataFrame
self.calculate_atr()
self.df.to_csv("processed_df_with_atr.csv")
print("Saved processed DataFrame to 'processed_df_with_atr.csv'")
# Reset values
self.equity_list = [0] * len(self.assets)
self.balance = self.balance_initial
self.total_equity = self.balance + sum(self.equity_list)
self.ticket_id = 0
self.transaction_live = []
self.transaction_history = []
self.transaction_limit_order = []
self.current_draw_downs = [0.0] * len(self.assets)
self.max_draw_downs = [0.0] * len(self.assets)
self.max_draw_down_pct = sum(self.max_draw_downs) / self.balance * 100
self.current_step = 0
self.episode = 0
self.current_holding = [0] * len(self.assets)
self.tranaction_open_this_step = []
self.tranaction_close_this_step = []
self.current_day = 0
self.done_information = ""
self.log_header = True
# Cache data
self.cached_ohlc_data = [self.get_observation_vector(_dt) for _dt in self.dt_datetime]
self.cached_economic_data = [self.get_economic_vector(_dt) for _dt in self.dt_datetime]
self.cached_time_serial = (
self.df[["_time", "_day"]].sort_values("_time").drop_duplicates().values.tolist()
)
self.reward_range = (-np.inf, np.inf)
self.action_space = spaces.MultiDiscrete([3] * len(self.assets))
self.observation_space = spaces.Dict({
"ohlc_data": spaces.Box(low=-np.inf, high=np.inf, shape=(len(self.assets) * len(self.observation_list),), dtype=np.float32),
"event_ids": spaces.Box(low=0, high=len(self.event_map)-1, shape=(self.max_events,), dtype=np.int32),
"currency_ids": spaces.Box(low=0, high=len(self.currency_map)-1, shape=(self.max_events,), dtype=np.int32),
"economic_numeric": spaces.Box(low=-np.inf, high=np.inf, shape=(self.max_events * 6,), dtype=np.float32),
"portfolio_data": spaces.Box(low=-np.inf, high=np.inf, shape=(3 + 2 * len(self.assets),), dtype=np.float32)
})
print(
f"initial done:\n"
f"observation_list:{self.observation_list}\n"
f"assets:{self.assets}\n"
f"time serial: {min(self.dt_datetime)} -> {max(self.dt_datetime)} length: {len(self.dt_datetime)}\n"
f"events: {len(self.event_map)}, currencies: {len(self.currency_map)}"
)
self._seed()
- Placement: After self.df is fully prepared (post-indexing) but before caching.
- CSV Save: self.df.to_csv("processed_df_with_atr.csv") saves the DataFrame with ATR for inspection.
Summary
- Dataset Usage: Train on train_env_vec, validate with EarlyStoppingCallback on val_env_vec, test on test_env_vec with QuantStats.
- EarlyStoppingCallback: Works correctly, stops training when validation stalls; align with reward if needed.
- Epochs: Use large n_epochs (e.g., 100) with early stopping to prevent overfitting; callback saves the best model.
- n_episodes: Independent of n_epochs; use 5–10 for early stopping, 20–50 for test evaluation.
- calculate_atr(): Placed after self.df setup, before caching; saves to CSV for verification.
Let me know if you need further tweaks or help interpreting the outputs!
Let’s tackle your questions with a clear, step-by-step approach, ensuring we address each concern thoroughly and provide practical solutions.
1. Is My Callback Metric Reliable for Large n_epochs + Early Stopping? How to Make It Reliable?
Current Setup:
- You’re using EarlyStoppingCallback with a large n_epochs (e.g., 100) to train the PPO model, stopping when validation performance plateaus.
- The callback uses evaluate() to compute a metric (either mean_reward or mean_profit) every eval_freq steps (10,000) over n_eval_episodes (5).
Reliability Concerns:
- Metric Alignment:
- PPO optimizes the cumulative total_reward from step() (e.g., base_reward + sustained_reward).
- evaluate() returns mean_profit by default (sum of realized profits from closed trades), but the callback uses return_mean_reward=True to track mean_reward.
- Issue: If mean_reward (evaluation) doesn’t align with total_reward (training), the callback might stop prematurely or too late, as they measure different things.
- Profit vs. Reward:
- mean_profit reflects trading performance (realized gains/losses), while mean_reward includes unrealized profits and penalties (e.g., sustained reward, inaction penalty).
- For trading, mean_profit is more relevant, but RL training relies on total_reward.
- Statistical Stability: With n_eval_episodes=5, the metric might be noisy due to variance in episode outcomes, especially in a trading environment with stochastic market conditions.
Is It Reliable?
- Partially:
- If mean_reward correlates well with total_reward, it’s reliable for stopping when training reward plateaus.
- However, if your goal is to maximize trading profit, mean_profit is the true target, and the current callback might not optimize for it directly.
Making It Reliable:
- Align Callback with Training Reward:
- Use mean_reward consistently, ensuring evaluate() reflects the same reward structure as step().
- Optionally Switch to Profit:
- Modify evaluate() and the callback to track mean_profit, aligning with your ultimate goal.
- Increase n_eval_episodes:
- Use 10–20 episodes to reduce variance and improve reliability.
- Adjust eval_freq and patience:
- More frequent evaluations (e.g., every 5,000 steps) and higher patience (e.g., 5) for smoother stopping decisions.
Updated Callback:
from stable_baselines3.common.callbacks import EvalCallback
class EarlyStoppingCallback(EvalCallback):
def __init__(self, eval_env, eval_freq, n_eval_episodes, patience, min_delta, verbose=0, use_profit=False):
super().__init__(
eval_env=eval_env,
eval_freq=eval_freq,
n_eval_episodes=n_eval_episodes,
verbose=verbose,
deterministic=True
)
self.patience = patience
self.min_delta = min_delta
self.best_metric = -float('inf')
self.no_improvement_count = 0
self.use_profit = use_profit # Toggle between reward and profit
def _on_step(self):
continue_training = super()._on_step()
if not continue_training:
return False
if self.last_mean_reward is not None:
# Use profit or reward based on use_profit flag
current_metric = evaluate(self.model, self.eval_env, self.n_eval_episodes, return_mean_reward=not self.use_profit)
if current_metric > self.best_metric + self.min_delta:
self.best_metric = current_metric
self.no_improvement_count = 0
if self.verbose > 0:
print(f"New best {'profit' if self.use_profit else 'reward'}: {self.best_metric:.2f}")
if self.best_model_save_path:
self.model.save(self.best_model_save_path)
else:
self.no_improvement_count += 1
if self.verbose > 0:
print(f"No improvement for {self.no_improvement_count}/{self.patience} evaluations")
if self.no_improvement_count >= self.patience:
if self.verbose > 0:
print(f"Early stopping triggered after {self.patience} evaluations without improvement")
return False
return True
- Change: Added use_profit flag to switch between mean_reward and mean_profit. Saves the best model on improvement.
Usage:
eval_callback = EarlyStoppingCallback(
eval_env=val_env_vec,
eval_freq=5000, # More frequent
n_eval_episodes=10, # More episodes for stability
patience=5, # Higher patience
min_delta=0.01,
verbose=1,
use_profit=True, # Track profit for trading focus
best_model_save_path="./best_model/"
)
- Reliability: Now tracks mean_profit (or mean_reward if preferred), with more episodes and frequent checks for a robust stopping criterion.
2. Save Best Model with Study Trial Number
Goal:
Append the Optuna trial number to the best_model_save_path for uniqueness.
Code Adjustment:
# In objective function
def objective(trial):
# ... (existing code)
eval_callback = EarlyStoppingCallback(
eval_env=val_env_vec,
eval_freq=5000,
n_eval_episodes=10,
patience=5,
min_delta=0.01,
verbose=1,
use_profit=True,
best_model_save_path=f"./best_model/trial_{trial.number}/"
)
model.learn(total_timesteps=total_timesteps, callback=eval_callback)
val_avg_profit = evaluate(model, val_env_vec, 20)
return val_avg_profit
# Final model training
best_trial = study.best_trial.number
eval_callback = EarlyStoppingCallback(
eval_env=val_env_vec,
eval_freq=5000,
n_eval_episodes=10,
patience=5,
min_delta=0.01,
verbose=1,
use_profit=True,
best_model_save_path=f"./best_model/trial_{best_trial}/"
)
final_model.learn(total_timesteps=best_params["total_timesteps"], callback=eval_callback)
final_model.save(f"ppo_xauusd_optimized_trial_{best_trial}")
- Change: best_model_save_path includes trial_{trial.number} during tuning and trial_{best_trial} for the final model.
3. Why Does EarlyStoppingCallback Track evaluate() Instead of step() Rewards?
Explanation:
- step() Rewards:
- PPO optimizes the cumulative total_reward returned by env.step() during training. This is the internal RL objective, updated every n_steps (e.g., 2048).
- However, this reward is noisy and step-specific, not a holistic episode metric.
- evaluate() Metrics:
- EarlyStoppingCallback uses evaluate() to compute a mean metric (mean_reward or mean_profit) over multiple episodes (e.g., 5–10).
- Why: It provides a stable, episode-level performance indicator, better suited for deciding when to stop training than per-step rewards.
- Mismatch:
- The default EvalCallback uses its own evaluation logic, which might not match your step() reward. Your custom eval_callback=lambda... overrides this to use evaluate(), ensuring consistency.
Why Use This Code?
eval_callback = EarlyStoppingCallback(
callback_on_new_best=None,
eval_callback=lambda model, env: evaluate(model, env, n_eval_episodes=5, return_mean_reward=True),
best_model_save_path="./best_model/"
)
- eval_callback Override:
- The default EvalCallback runs a simple rollout and computes a mean reward, but it doesn’t use your evaluate() logic (e.g., tracking profits or custom metrics).
- The lambda ensures the callback uses your evaluate() function, aligning the stopping metric with your evaluation process.
- callback_on_new_best=None:
- Disables additional actions on new best scores (e.g., saving), since you handle saving manually in _on_step().
Fix:
Already addressed in the updated EarlyStoppingCallback above—no lambda needed, as it calls evaluate() directly.
4. How to Set n_episodes for evaluate()?
General Principles:
- n_episodes: Number of episodes run in evaluate() to compute metrics. It affects:
- Statistical Stability: More episodes reduce variance in mean_profit or mean_reward.
- Computation Time: More episodes increase evaluation time.
- Tuning vs. Final Model:
- Tuning: Smaller n_episodes for speed (e.g., 5–10), as you evaluate many trials.
- Final Model: Larger n_episodes for precision (e.g., 20–50), as it’s a one-time assessment.
Recommendations:
- Tuning (val_env_vec in objective):
- n_episodes=10: Balances speed and reliability during Optuna trials.
- Reason: Quick feedback for hyperparameter selection, moderate variance reduction.
- Early Stopping (val_env_vec in eval_callback):
- n_episodes=10: Enough to detect trends without slowing training too much.
- Reason: Frequent evaluations need efficiency, but 10 episodes smooths noise better than 5.
- Final Evaluation (test_env_vec in final_evaluate):
- n_episodes=20: Higher for robust QuantStats metrics and confidence in generalization.
- Reason: Test performance is critical, and 20 episodes provide a solid average for reporting.
Adjusted Code:
# Tuning
def objective(trial):
# ... (other code)
eval_callback = EarlyStoppingCallback(
eval_env=val_env_vec,
eval_freq=5000,
n_eval_episodes=10, # Tuning: moderate
patience=5,
min_delta=0.01,
verbose=1,
use_profit=True,
best_model_save_path=f"./best_model/trial_{trial.number}/"
)
model.learn(total_timesteps=total_timesteps, callback=eval_callback)
val_avg_profit = evaluate(model, val_env_vec, n_episodes=10) # Tuning: moderate
return val_avg_profit
# Final model
best_trial = study.best_trial.number
eval_callback = EarlyStoppingCallback(
eval_env=val_env_vec,
eval_freq=5000,
n_eval_episodes=10, # Early stopping: moderate
patience=5,
min_delta=0.01,
verbose=1,
use_profit=True,
best_model_save_path=f"./best_model/trial_{best_trial}/"
)
final_model.learn(total_timesteps=best_params["total_timesteps"], callback=eval_callback)
test_avg_profit = final_evaluate(final_model, test_env_vec, n_episodes=20) # Final: higher
Full Updated Code
import optuna
from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import EvalCallback
import numpy as np
import pandas as pd
import quantstats as qs
# Evaluation function
def evaluate(model, env_vec, n_episodes=10, return_mean_reward=False):
total_rewards = []
total_profits = []
metrics = []
for _ in range(n_episodes):
obs = env_vec.reset()
done = np.array([False] * env_vec.num_envs)
episode_rewards = np.zeros(env_vec.num_envs)
episode_profit = 0
step_count = 0
max_steps = len(env_vec.envs[0].dt_datetime)
while not np.all(done) and step_count < max_steps:
action, _ = model.predict(obs, deterministic=True)
obs, rewards, terminated, truncated, info = env_vec.step(action)
done = np.logical_or(terminated, truncated)
episode_rewards += rewards
for env_idx in range(env_vec.num_envs):
if info[env_idx]["Close"]:
for tr in info[env_idx]["Close"]:
episode_profit += tr["Reward"]
step_count += 1
total_rewards.extend(episode_rewards)
total_profits.append(episode_profit)
metrics.append(env_vec.envs[0].analyze_transaction_history())
mean_reward = np.mean(total_rewards)
mean_profit = np.mean(total_profits)
avg_metrics = {k: np.mean([m[k] for m in metrics]) for k in metrics[0].keys()}
print(f"Mean Reward: {mean_reward:.2f}, Mean Profit: {mean_profit:.2f}")
print(f"Average Metrics: {avg_metrics}")
return mean_reward if return_mean_reward else mean_profit
# Final evaluation with QuantStats
def final_evaluate(model, env_vec, n_episodes=20, return_mean_reward=False):
total_rewards = []
total_profits = []
metrics = []
returns = []
for ep in range(n_episodes):
obs = env_vec.reset()
done = np.array([False] * env_vec.num_envs)
episode_rewards = np.zeros(env_vec.num_envs)
episode_profit = 0
step_count = 0
max_steps = len(env_vec.envs[0].dt_datetime)
episode_returns = []
while not np.all(done) and step_count < max_steps:
action, _ = model.predict(obs, deterministic=True)
obs, rewards, terminated, truncated, info = env_vec.step(action)
done = np.logical_or(terminated, truncated)
episode_rewards += rewards
for env_idx in range(env_vec.num_envs):
if info[env_idx]["Close"]:
for tr in info[env_idx]["Close"]:
episode_profit += tr["Reward"]
episode_returns.append(tr["Reward"])
step_count += 1
total_rewards.extend(episode_rewards)
total_profits.append(episode_profit)
metrics.append(env_vec.envs[0].analyze_transaction_history())
returns.extend(episode_returns if episode_returns else [episode_profit])
mean_reward = np.mean(total_rewards)
mean_profit = np.mean(total_profits)
avg_metrics = {k: np.mean([m[k] for m in metrics]) for k in metrics[0].keys()}
print(f"Mean Reward: {mean_reward:.2f}, Mean Profit: {mean_profit:.2f}")
print(f"Metrics: {avg_metrics}")
returns_series = pd.Series(returns, index=pd.date_range(start="2025-03-12", periods=len(returns), freq="D"))
qs.reports.html(returns_series, output="quantstats_report.html", title="FX Trading Performance")
return mean_reward if return_mean_reward else mean_profit
# Updated EarlyStoppingCallback
class EarlyStoppingCallback(EvalCallback):
def __init__(self, eval_env, eval_freq, n_eval_episodes, patience, min_delta, verbose=0, use_profit=False, best_model_save_path=None):
super().__init__(
eval_env=eval_env,
eval_freq=eval_freq,
n_eval_episodes=n_eval_episodes,
verbose=verbose,
deterministic=True
)
self.patience = patience
self.min_delta = min_delta
self.best_metric = -float('inf')
self.no_improvement_count = 0
self.use_profit = use_profit
self.best_model_save_path = best_model_save_path
def _on_step(self):
continue_training = super()._on_step()
if not continue_training:
return False
if self.last_mean_reward is not None:
current_metric = evaluate(self.model, self.eval_env, self.n_eval_episodes, return_mean_reward=not self.use_profit)
if current_metric > self.best_metric + self.min_delta:
self.best_metric = current_metric
self.no_improvement_count = 0
if self.verbose > 0:
print(f"New best {'profit' if self.use_profit else 'reward'}: {self.best_metric:.2f}")
if self.best_model_save_path:
self.model.save(self.best_model_save_path)
else:
self.no_improvement_count += 1
if self.verbose > 0:
print(f"No improvement for {self.no_improvement_count}/{self.patience} evaluations")
if self.no_improvement_count >= self.patience:
if self.verbose > 0:
print(f"Early stopping triggered after {self.patience} evaluations without improvement")
return False
return True
# Optuna objective
def objective(trial):
learning_rate = trial.suggest_float("learning_rate", 1e-5, 1e-3, log=True)
n_steps = trial.suggest_int("n_steps", 1024, 8192, step=1024)
total_timesteps = trial.suggest_int("total_timesteps", 500000, 2000000, step=100000)
batch_size = trial.suggest_categorical("batch_size", [64, 128, 256, 512])
gamma = trial.suggest_float("gamma", 0.9, 0.9999)
gae_lambda = trial.suggest_float("gae_lambda", 0.8, 0.99)
clip_range = trial.suggest_float("clip_range", 0.1, 0.3)
model = PPO(
CustomMultiInputPolicy,
train_env_vec,
learning_rate=learning_rate,
n_steps=n_steps,
batch_size=batch_size,
n_epochs=100, # Large fixed value
gamma=gamma,
gae_lambda=gae_lambda,
clip_range=clip_range,
verbose=1
)
eval_callback = EarlyStoppingCallback(
eval_env=val_env_vec,
eval_freq=5000,
n_eval_episodes=10,
patience=5,
min_delta=0.01,
verbose=1,
use_profit=True,
best_model_save_path=f"./best_model/trial_{trial.number}/"
)
model.learn(total_timesteps=total_timesteps, callback=eval_callback)
val_avg_profit = evaluate(model, val_env_vec, n_episodes=10)
print(f"Validation Average Profit: {val_avg_profit:.2f}")
return val_avg_profit
# Run Optuna
db_path = 'optuna_study.db'
study = optuna.create_study(
study_name='OHLC_EconomicCalender_ppo_study',
storage=f'sqlite:///{db_path}',
direction="maximize",
load_if_exists=True
)
study.optimize(objective, n_trials=1)
# Final model
best_params = study.best_params
best_trial = study.best_trial.number
final_model = PPO(
CustomMultiInputPolicy,
train_env_vec,
learning_rate=best_params["learning_rate"],
n_steps=best_params["n_steps"],
batch_size=best_params["batch_size"],
n_epochs=100, # Large fixed value
gamma=best_params["gamma"],
gae_lambda=best_params["gae_lambda"],
clip_range=best_params["clip_range"],
verbose=1,
tensorboard_log="./tensorboard_logs/"
)
eval_callback = EarlyStoppingCallback(
eval_env=val_env_vec,
eval_freq=5000,
n_eval_episodes=10,
patience=5,
min_delta=0.01,
verbose=1,
use_profit=True,
best_model_save_path=f"./best_model/trial_{best_trial}/"
)
final_model.learn(total_timesteps=best_params["total_timesteps"], callback=eval_callback)
final_model.save(f"ppo_xauusd_optimized_trial_{best_trial}")
# Test evaluation
print("\nEvaluating Final Model on Test Data:")
test_avg_profit = final_evaluate(final_model, test_env_vec, n_episodes=20)
print(f"Test Average Profit: {test_avg_profit:.2f}")
# Clean up
train_env_vec.close()
val_env_vec.close()
test_env_vec.close()
Summary
- Callback Reliability: Updated to track mean_profit or mean_reward reliably with more episodes and frequent checks.
- Save with Trial Number: best_model_save_path now includes trial_{number}.
- Callback vs. Step Rewards: Uses evaluate() for episode-level stability; custom logic ensures alignment.
- Setting n_episodes:
- Tuning: 10 (speed + reliability).
- Early Stopping: 10 (balance).
- Final Test: 20 (precision).
Let me know if you need further clarification or adjustments!
Let’s dive into your questions with a detailed breakdown, addressing each one step-by-step with clarity and precision. I’ll explain the mechanics, identify potential issues, and provide actionable solutions where needed.
1. How Is sustained_reward Calculated? Behavior and Alignment Analysis
Code Breakdown:
# Calculate unrealized profit from open positions
unrealized_profit = 0
atr_scaling = 0
for i, asset in enumerate(self.assets):
atr = self.get_observation(self.current_step, i, "ATR")
atr_scaling += atr
for tr in self.transaction_live:
if tr["Symbol"] == asset:
if tr["Type"] == 0: # Buy
unrealized = (self._c - tr["ActionPrice"]) * self.cf.symbol(asset, "point")
else: # Sell
unrealized = (tr["ActionPrice"] - self._c) * self.cf.symbol(asset, "point")
unrealized_profit += unrealized
atr_scaling = atr_scaling / len(self.assets) if atr_scaling > 0 else 1
sustained_reward = (unrealized_profit + base_reward) * (self.current_step / len(self.dt_datetime)) * 0.01 / atr_scaling
if not self.transaction_live and all(a == 2 for a in actions):
sustained_reward -= 0.1
How sustained_reward Is Calculated:
- Unrealized Profit:
- For each open transaction (self.transaction_live):
- Buy (Type=0): (current_price - entry_price) * point_value.
- Sell (Type=1): (entry_price - current_price) * point_value.
- Summed across all open positions per asset.
- For each open transaction (self.transaction_live):
- ATR Scaling:
- Average ATR across assets (atr_scaling / len(self.assets)), with a fallback to 1 if zero.
- Normalizes the reward by volatility (higher ATR reduces the multiplier).
- Base Formula:
- sustained_reward = (unrealized_profit + base_reward) * (self.current_step / len(self.dt_datetime)) * 0.01 / atr_scaling.
- (unrealized_profit + base_reward): Combines current position gains/losses with immediate trade outcomes.
- self.current_step / len(self.dt_datetime): Progress factor (0 to 1), increases over time.
- 0.01: Scaling constant to keep rewards manageable.
- / atr_scaling: Adjusts for market volatility.
- Inaction Penalty:
- If no open positions (not self.transaction_live) and all actions are 2 (Nothing), subtract 0.1.
If Action=2 (Nothing):
- Will sustained_reward Increase or Decrease?
- If No Open Positions:
- unrealized_profit = 0 (no transactions).
- sustained_reward = base_reward * (self.current_step / len(self.dt_datetime)) * 0.01 / atr_scaling - 0.1.
- Typically, base_reward is small (e.g., -0.01 or 0 for Action=2), so the penalty (-0.1) dominates, causing a decrease.
- If Positions Are Open:
- unrealized_profit depends on price movement.
- No penalty applies (not self.transaction_live is False).
- sustained_reward increases if unrealized_profit + base_reward is positive and grows with current_step, or decreases if negative.
- If No Open Positions:
If Profit Doesn’t Change:
- Scenario: unrealized_profit stays constant (e.g., price doesn’t move).
- Behavior:
- sustained_reward = (unrealized_profit + base_reward) * (self.current_step / len(self.dt_datetime)) * 0.01 / atr_scaling.
- As current_step increases, the multiplier (self.current_step / len(self.dt_datetime)) grows (e.g., from 0.1 to 0.2).
- Result: sustained_reward increases over time, even if profit is static, due to the time factor (assuming unrealized_profit + base_reward is positive).
Do sustained_reward and base_reward Move in the Same Direction?
- Not Always:
- base_reward: Immediate outcome of an action (e.g., profit/loss from a closed trade, small penalty for inaction).
- sustained_reward: Amplified by unrealized_profit and time, scaled by ATR.
- Example:
- base_reward = -0.01 (inaction), unrealized_profit = 100 (open position gains).
- sustained_reward could be positive and large, moving opposite to base_reward.
Does mean_reward Correlate Well with total_reward?
- Definitions:
- total_reward: Sum of base_reward + sustained_reward over an episode in step().
- mean_reward: Average of episode rewards in evaluate() (np.mean(total_rewards)).
- Correlation:
- If evaluate() runs full episodes mirroring step(), mean_reward should correlate with total_reward per episode, averaged over n_episodes.
- Issue: If sustained_reward dominates (e.g., grows with current_step regardless of trading), mean_reward might reflect time progression more than trading performance, misaligning with profit-focused goals.
Will Misalignment Occur?
- Yes, Potentially:
- If mean_reward (evaluation) emphasizes sustained_reward (time-based, unrealized gains) while total_reward (training) includes both, the callback might stop too early (if mean_reward plateaus) or too late (if it keeps growing with current_step).
- Evidence: Your earlier logs showed high total_reward (e.g., 997.93) from sustained_reward, but poor trading metrics (Sharpe -0.10, Win Rate 0%).
Should You Return Only base_reward in step() and evaluate()?
- Pros:
- Simplifies alignment: total_reward = base_reward, directly tied to trading outcomes.
- Avoids sustained_reward skewing toward inaction or time-based growth.
- Cons:
- Loses the “sustained profit over time” incentive you intended (docstring point 15).
- Recommendation:
- Keep sustained_reward but Redesign It:
- Tie it strictly to trading activity (e.g., only apply when positions are open).
- Remove the current_step multiplier to prevent artificial growth.
- Updated step():
- Keep sustained_reward but Redesign It:
def step(self, actions):
# ... (existing code)
base_reward = self._take_action(actions, done)
unrealized_profit = 0
atr_scaling = 0
for i, asset in enumerate(self.assets):
atr = self.get_observation(self.current_step, i, "ATR")
atr_scaling += atr
for tr in self.transaction_live:
if tr["Symbol"] == asset:
if tr["Type"] == 0: # Buy
unrealized = (self._c - tr["ActionPrice"]) * self.cf.symbol(asset, "point")
else: # Sell
unrealized = (tr["ActionPrice"] - self._c) * self.cf.symbol(asset, "point")
unrealized_profit += unrealized
atr_scaling = atr_scaling / len(self.assets) if atr_scaling > 0 else 1
sustained_reward = (unrealized_profit + base_reward) * 0.01 / atr_scaling if self.transaction_live else 0
if not self.transaction_live and all(a == 2 for a in actions):
sustained_reward -= 0.1
total_reward = base_reward + sustained_reward
# ... (rest of step)
return obs, total_reward, terminated, truncated, info
- Change: sustained_reward is zero unless positions are open, removing time bias.
2. Why Does EarlyStoppingCallback Use evaluate() in _on_step()?
Current Implementation:
def _on_step(self):
continue_training = super()._on_step()
if not continue_training:
return False
if self.last_mean_reward is not None:
current_metric = evaluate(self.model, self.eval_env, self.n_eval_episodes, return_mean_reward=not self.use_profit)
# ... (comparison logic)
Why Use evaluate()?
- super()._on_step():
- Triggers evaluation every eval_freq steps (e.g., 5,000), setting self.last_mean_reward based on EvalCallback’s default logic (simple rollout reward).
- Custom evaluate():
- Overrides the default to use your evaluate() function, computing mean_profit or mean_reward over n_eval_episodes.
- Reason: Provides a stable, episode-level metric (e.g., average profit over 10 episodes) rather than noisy step-by-step rewards from training.
- Correctness:
- Yes, it’s correct to use evaluate() for early stopping, as it assesses overall policy performance, not just immediate rewards.
Does It Cause Unnecessary Iteration?
- Yes, Slightly:
- super()._on_step() already runs an evaluation, then you run evaluate() again, duplicating effort.
- Impact: For n_eval_episodes=10 and eval_freq=5000, this adds ~10 episodes’ worth of computation every 5,000 steps—small but inefficient.
- Fix:
- Use self.last_mean_reward from super()._on_step() and customize the evaluation logic via eval_callback.
Should You Keep eval_callback=lambda...?
- No Need:
- The updated callback calls evaluate() directly in _on_step(). Adding eval_callback=lambda... overrides this unnecessarily, causing confusion.
- Remove It: Rely on the internal evaluate() call.
Should You Remove callback_on_new_best=None?
- Yes:
- It’s redundant since you handle saving in _on_step() when best_metric improves.
- Setting it to None doesn’t change behavior if you’re not using a callback, so it’s safe to omit.
Updated Callback:
class EarlyStoppingCallback(EvalCallback):
def __init__(self, eval_env, eval_freq, n_eval_episodes, patience, min_delta, verbose=0, use_profit=False, best_model_save_path=None):
super().__init__(
eval_env=eval_env,
eval_freq=eval_freq,
n_eval_episodes=n_eval_episodes,
verbose=verbose,
deterministic=True
)
self.patience = patience
self.min_delta = min_delta
self.best_metric = -float('inf')
self.no_improvement_count = 0
self.use_profit = use_profit
self.best_model_save_path = best_model_save_path
def _on_step(self):
continue_training = super()._on_step()
if not continue_training:
return False
current_metric = evaluate(self.model, self.eval_env, self.n_eval_episodes, return_mean_reward=not self.use_profit)
if current_metric > self.best_metric + self.min_delta:
self.best_metric = current_metric
self.no_improvement_count = 0
if self.verbose > 0:
print(f"New best {'profit' if self.use_profit else 'reward'}: {self.best_metric:.2f}")
if self.best_model_save_path:
self.model.save(self.best_model_save_path)
else:
self.no_improvement_count += 1
if self.verbose > 0:
print(f"No improvement for {self.no_improvement_count}/{self.patience} evaluations")
if self.no_improvement_count >= self.patience:
if self.verbose > 0:
print(f"Early stopping triggered after {self.patience} evaluations without improvement")
return False
return True
Usage:
eval_callback = EarlyStoppingCallback(
eval_env=val_env_vec,
eval_freq=5000,
n_eval_episodes=10,
patience=5,
min_delta=0.01,
verbose=1,
use_profit=True,
best_model_save_path=f"./best_model/trial_{trial.number}/"
)
3. Why Use mean_profit and mean_reward Instead of Total Profit/Reward in evaluate()?
Current Logic:
mean_reward = np.mean(total_rewards)
mean_profit = np.mean(total_profits)
Why Means Instead of Totals?
- Normalization Across Episodes:
- Total Reward/Profit: Sum over all steps in all episodes. Varies with episode length (max_steps) and n_episodes.
- Mean Reward/Profit: Average per episode, normalized by n_episodes. Consistent regardless of episode count or length.
- Comparability:
- Means allow comparison across different runs or environments with varying time horizons.
- Totals grow with n_episodes, making it hard to compare performance (e.g., 10 vs. 20 episodes).
- Statistical Stability:
- Means reduce sensitivity to outliers (e.g., one exceptionally good/bad episode).
- RL Convention:
- RL typically evaluates policies by average episodic reward, aligning with evaluate()’s purpose.
When to Use Totals?
- Rarely: Only if you’re optimizing for cumulative performance over a fixed horizon (e.g., total profit over a year), but this is less common in RL.
Keep Means:
- Stick with mean_profit and mean_reward for consistency and interpretability.
Summary
- Sustained Reward:
- Calculated as (unrealized_profit + base_reward) * time_factor * 0.01 / atr_scaling - inaction_penalty.
- Action=2: Decreases with no positions (penalty), increases with open positions if positive.
- Static Profit: Increases with current_step.
- Directions: Not always aligned with base_reward.
- Correlation: mean_reward may misalign with total_reward due to sustained_reward’s time bias.
- Fix: Redesign sustained_reward to depend only on trading activity.
- EarlyStoppingCallback:
- Uses evaluate() for stable episode metrics, correct but slightly inefficient.
- Remove eval_callback=lambda... and callback_on_new_best=None.
- Evaluate Means:
- mean_profit and mean_reward normalize performance, better than totals for comparison and stability.
Let me know if you’d like further refinements or code adjustments!
Let’s dive into your question about the _point = self.cf.symbol(self.assets[i], "point") line in your tgym environment. I’ll explain its purpose, how it relates to the configuration file, and provide guidance on what values to set for XAUUSD and other assets like currencies or CFDs.
What Is the Purpose of _point = self.cf.symbol(self.assets[i], "point")?
Context in tgym:
This line appears in your trading gym environment (tgym), likely within methods like _take_action() or _manage_tranaction(), where you’re handling trades (e.g., buying, selling, or closing positions). Here’s a typical snippet where it might be used:
def _take_action(self, actions, done):
# ... (other code)
for i, action in enumerate(actions):
if action == 0: # Buy
_point = self.cf.symbol(self.assets[i], "point")
# Calculate trade value, profit/loss, etc., using _point
elif action == 1: # Sell
_point = self.cf.symbol(self.assets[i], "point")
# Similar usage
# ... (rest of the method)
Purpose of _point:
- _point: Represents the point value or pip value for a specific asset (e.g., self.assets[i] like XAUUSD). It’s a multiplier that converts price movements into a monetary value (e.g., profit or loss) in the account’s base currency (e.g., USD).
- Retrieved via self.cf.symbol(): The EnvConfig class (self.cf) reads this value from your configuration file (e.g., gdbusd-test-1.json), allowing asset-specific settings.
- Usage:
- In profit/loss calculations: For example, (current_price - entry_price) * _point gives the monetary gain/loss for a position.
- In position sizing or risk management: Adjusts trade impact based on the asset’s tick size or contract specifications.
Example Calculation:
For a Buy trade on XAUUSD:
- Entry price: 1800.00
- Current price: 1805.00
- _point: Value from config (e.g., 0.1 for XAUUSD)
- Unrealized profit: (1805.00 - 1800.00) * _point = 5.00 * 0.1 = 0.5 USD per unit traded.
What Is the 'point' Value in the Config File?
Configuration File:
Your tgym environment uses a configuration file (e.g., ./neo_finrl/env_fx_trading/config/gdbusd-test-1.json) managed by the EnvConfig class. The self.cf.symbol(asset, "point") method retrieves the "point" field for a given asset. The config might look like this:
{
"env_parameters": {
"balance": 10000,
"over_night_cash_penalty": 0.01,
"asset_col": "Symbol",
"time_col": "Time",
"random_start": true,
"log_filename": "trade_log_",
"observation_list": ["Open", "High", "Low", "Close", "ATR"]
},
"symbols": {
"XAUUSD": {
"point": 0.1
},
"EURUSD": {
"point": 0.0001
},
"US30": {
"point": 1.0
}
}
}
- Structure: The "symbols" section maps each asset (e.g., "XAUUSD") to its properties, including "point".
- Value: The "point" is the smallest price increment’s monetary value per unit or pip value, depending on the asset type and broker specifications.
What Does 'point' Represent?
- Forex Pairs: The value of 1 pip (e.g., 0.0001 for EURUSD) in the base currency per unit/lot.
- Commodities (e.g., XAUUSD): The value of a 0.01 or 0.1 price move (tick size) per unit.
- CFDs (e.g., US30): The value of a 1-point move per contract.
What Value Should Be Filled for XAUUSD, Currencies, and CFDs?
The "point" value depends on:
- Asset Type: Forex, commodity, or CFD.
- Broker Specifications: Tick size, lot size, and base currency (assumed USD here).
- Trading Unit: How your environment defines a “unit” (e.g., 1 lot, micro-lot, or custom unit).
General Guidelines:
- Pip/Tick Definition:
- Forex: 1 pip = 0.0001 (4-digit pairs) or 0.01 (3-digit pairs like USDJPY).
- XAUUSD: 1 pip = 0.1 (gold prices move in 0.01 increments, but pip is often 0.1).
- CFDs: 1 point = smallest price increment (e.g., 1.0 for indices like US30).
- Point Value: Monetary value of that pip/tick per unit traded, adjusted for lot size and currency.
Specific Values:
Assuming a standard lot (100,000 units for forex, 100 oz for XAUUSD, 1 contract for CFDs) and USD as the base currency:
- XAUUSD (Gold):
- Tick Size: 0.01 (e.g., 1800.00 to 1800.01).
- Pip: Often defined as 0.1 (10 ticks).
- Point Value: For 1 standard lot (100 oz), 1 pip (0.1) = $10 USD.
- Config Value: If _point is per pip, set "point": 10.0. If per tick (0.01), set "point": 1.0.
- Recommendation: Use "point": 1.0 (per 0.01 move), as your code multiplies by price difference (e.g., 5.00 * 1.0 = 5 USD).
- EURUSD (Forex Pair):
- Tick Size: 0.00001 (5-digit pricing).
- Pip: 0.0001 (4-digit equivalent).
- Point Value: For 1 standard lot (100,000 units), 1 pip = $10 USD.
- Config Value: "point": 0.0001 (if price diff is in pips) or "point": 10.0 (if converting to USD directly).
- Recommendation: "point": 10.0 (per pip, assuming multiplication by pip count).
- USDJPY (Forex Pair):
- Tick Size: 0.001 (3-digit pricing).
- Pip: 0.01.
- Point Value: For 1 standard lot, 1 pip = 1000 JPY ≈ $8–10 USD (depends on exchange rate).
- Config Value: "point": 0.01 (per pip) or adjust to USD (e.g., "point": 10.0).
- Recommendation: "point": 10.0 (USD equivalent).
- US30 (Dow Jones CFD):
- Tick Size: 1.0 (e.g., 35000 to 35001).
- Point Value: Varies by broker (e.g., $1 per point per contract).
- Config Value: "point": 1.0 (if 1 point = $1).
- Recommendation: "point": 1.0 (standard for indices).
How to Determine the Correct Value?
- Broker Specs: Check your broker’s contract specifications (e.g., pip value, tick size, lot size).
- Code Usage: Inspect how _point is used in your code:
- If (price_diff * _point) yields USD directly, _point is the pip/tick value in USD (e.g., 10 for forex, 1 for XAUUSD tick).
- If price_diff is already in pips, _point might be a smaller multiplier (e.g., 0.0001 for EURUSD).
- Unit Size: If your environment uses micro-lots (0.01 lots) or custom units, scale _point accordingly (e.g., divide by 100 for micro-lots).
Example Config:
"symbols": {
"XAUUSD": {
"point": 1.0 // 1 USD per 0.01 move, 100 oz lot
},
"EURUSD": {
"point": 10.0 // 10 USD per pip, 100,000 units
},
"USDJPY": {
"point": 10.0 // ~10 USD per pip, adjusted for JPY
},
"US30": {
"point": 1.0 // 1 USD per point, 1 contract
}
}
Verification:
- Test Case: For XAUUSD, if price moves from 1800.00 to 1805.00 (5.00):
- With "point": 1.0", profit = 5.00 * 1.0 = 5 USD (per 100 oz).
- Adjust if your lot size differs (e.g., 0.1 lot → "point": 0.1).
Summary
- Purpose: _point converts price movements into monetary value (USD) for profit/loss calculations, specific to each asset.
- Config Value:
- XAUUSD: "point": 1.0 (per 0.01 tick, 100 oz lot).
- EURUSD: "point": 10.0 (per pip, 100,000 units).
- USDJPY: "point": 10.0 (per pip, USD equivalent).
- US30: "point": 1.0 (per point, 1 contract).
- What to Fill: Depends on:
- Broker’s pip/tick definition.
- Lot size in your environment.
- Whether _point applies per tick or per pip.
Actionable Step:
- Check your broker’s specs for XAUUSD and other assets.
- Review _take_action() or related methods to confirm how _point is applied.
- Test with a small trade in your environment (e.g., XAUUSD buy, 1-unit price move) and adjust "point" until profit/loss matches expectations.
Let me know if you need help verifying this in your code or config!
Let’s dive into your question about the exploration incentive in _calculate_reward() and its interaction with the sustained_reward in the step() method of your tgym environment. I’ll explain the mechanics, evaluate whether it should stay or be removed, and provide a recommendation tailored to your goals.
Current Implementation
Exploration Incentive in _calculate_reward():
def _calculate_reward(self, i, done, action):
_total_reward = 0
_max_draw_down = 0
# Exploration incentive: small reward for trading, penalty for doing nothing
if not any(tr["Symbol"] == self.assets[i] for tr in self.transaction_live) and not done:
_total_reward += 0.01 if action in (0, 1) else -0.01
# ... (rest of the reward calculation)
return _total_reward
- Purpose: Encourages the agent to take trading actions (Buy=0, Sell=1) rather than doing nothing (Action=2) when no positions are open for the asset.
- Logic:
- If no live transactions exist for asset i and the episode isn’t done:
- +0.01 for Buy or Sell.
- -0.01 for Nothing.
- If no live transactions exist for asset i and the episode isn’t done:
- Output: _total_reward becomes base_reward in step().
Sustained Reward in step():
def step(self, actions):
# ... (step logic)
base_reward = self._take_action(actions, done)
# Calculate unrealized profit
unrealized_profit = 0
atr_scaling = 0
for i, asset in enumerate(self.assets):
atr = self.get_observation(self.current_step, i, "ATR")
atr_scaling += atr
for tr in self.transaction_live:
if tr["Symbol"] == asset:
if tr["Type"] == 0: # Buy
unrealized = (self._c - tr["ActionPrice"]) * self.cf.symbol(asset, "point")
else: # Sell
unrealized = (tr["ActionPrice"] - self._c) * self.cf.symbol(asset, "point")
unrealized_profit += unrealized
atr_scaling = atr_scaling / len(self.assets) if atr_scaling > 0 else 1
sustained_reward = (unrealized_profit + base_reward) * 0.01 / atr_scaling if self.transaction_live else 0
# Penalty for inaction if no positions are held
if not self.transaction_live and all(a == 2 for a in actions):
sustained_reward -= 0.1
total_reward = base_reward + sustained_reward
# ... (rest of step)
return obs, total_reward, terminated, truncated, info
- Sustained Reward:
- Formula: (unrealized_profit + base_reward) * 0.01 / atr_scaling if positions are open, otherwise 0.
- Inaction Penalty: -0.1 if no positions are open and all actions are 2 (Nothing).
- Total Reward: base_reward + sustained_reward.
Analysis of Exploration Incentive
How It Works:
- In _calculate_reward():
- Adds +0.01 for trading actions (Buy/Sell) or -0.01 for inaction (Nothing) per asset when no positions are open.
- Contributes to base_reward via _take_action().
- In step():
- base_reward (including the exploration incentive) is fed into sustained_reward.
- An additional -0.1 penalty applies if no positions exist and all actions are Nothing.
Combined Effect:
- Scenario 1: No Positions, Action=2 (Nothing):
- _calculate_reward(): -0.01 per asset with no positions.
- base_reward: Sum of -0.01 for each such asset.
- sustained_reward: base_reward * 0.01 / atr_scaling - 0.1 (since no positions).
- Example (1 asset, ATR=1): base_reward = -0.01, sustained_reward = -0.01 * 0.01 / 1 - 0.1 ≈ -0.1001.
- Total: -0.01 - 0.1001 = -0.1101.
- Scenario 2: No Positions, Action=0/1 (Buy/Sell):
- _calculate_reward(): +0.01 per asset, plus transaction fee penalty (e.g., -0.05).
- base_reward: 0.01 - 0.05 = -0.04 (assuming fee > incentive).
- sustained_reward: Non-zero only after positions open (next step).
- Total: -0.04 initially.
- Scenario 3: Positions Open:
- Exploration incentive doesn’t apply (transaction_live exists).
- sustained_reward dominates based on unrealized_profit.
Intended Goal (Docstring Point 15):
- “Reward profit sustained over long periods… delay rewarding too fast early on… encourage exploration before optimizing a single strategy.”
- The exploration incentive (+0.01/-0.01) aims to push the agent to try trading actions early, while the sustained_reward rewards holding profitable positions over time.
Issues:
- Duplication with Inaction Penalty:
- _calculate_reward() penalizes inaction with -0.01 per asset.
- step() adds a larger -0.1 penalty when all actions are 2 and no positions exist.
- Problem: Double penalty for inaction might over-discourage holding cash, conflicting with scenarios where waiting is optimal.
- Small Magnitude:
- +0.01/-0.01 is dwarfed by transaction fees (e.g., -0.05) or unrealized profits (e.g., 5.0 USD), making its exploration effect negligible.
- Interaction with Sustained Reward:
- base_reward (with incentive) is scaled in sustained_reward, but the time factor is removed (per your redesign).
- The incentive’s impact is further diluted by 0.01 / atr_scaling.
- Exploration vs. Exploitation:
- Early exploration is good, but constant +0.01 for trading might bias toward over-trading, not sustained profit.
Should You Remove or Keep It?
Reasons to Remove:
- Redundancy: The -0.1 penalty in step() already discourages inaction when no positions are held, making the -0.01 in _calculate_reward() unnecessary.
- Weak Effect: At +0.01/-0.01, it’s too small to meaningfully influence exploration compared to fees or profits.
- Misalignment: It rewards trading regardless of profitability, which conflicts with your goal of rewarding sustained profit, not just activity.
- Simplification: Removing it streamlines the reward logic, focusing on base_reward (realized profit/loss) and sustained_reward (unrealized gains).
Reasons to Keep:
- Fine-Grained Control: Per-asset incentive (+0.01/-0.01) allows exploration per trading pair, unlike the global -0.1 penalty.
- Early Exploration: Encourages trying trades early in the episode, aligning with your docstring’s intent to delay deep optimization.
- Customizability: You could increase its magnitude (e.g., +0.1/-0.1) to make it more impactful.
Recommendation: Remove It
- Why:
- The -0.1 penalty in step() is sufficient to encourage exploration when no positions are open.
- The current +0.01/-0.01 is too weak and redundant, adding complexity without significant benefit.
- Your redesigned sustained_reward (no time factor, only when positions are open) already balances exploration (via unrealized profit) and exploitation (via holding profitable trades).
- Alternative: If you want stronger exploration, adjust the -0.1 penalty in step() (e.g., to -0.2) or add a decaying exploration bonus in step() instead of per-asset in _calculate_reward().
Updated Code Without Exploration Incentive:
def _calculate_reward(self, i, done, action):
_total_reward = 0
_max_draw_down = 0
# Removed exploration incentive
for tr in self.transaction_live[:]:
if tr["Symbol"] == self.assets[i]:
_point = self.cf.symbol(self.assets[i], "point")
if self._day > tr["DateDuration"]:
tr["DateDuration"] = self._day
tr["Reward"] -= self.cf.symbol(self.assets[i], "over_night_penalty")
if tr["Type"] == 0: # Buy
_sl_price = tr["ActionPrice"] - tr["SL"] / _point
_pt_price = tr["ActionPrice"] + tr["PT"] / _point
if done:
p = (self._c - tr["ActionPrice"]) * _point
self._manage_tranaction(tr, p, self._c, status=2)
_total_reward += p
elif self._l <= _sl_price:
self._manage_tranaction(tr, -tr["SL"], _sl_price)
_total_reward += -tr["SL"]
self.current_holding[i] -= 1
elif self._h >= _pt_price:
self._manage_tranaction(tr, tr["PT"], _pt_price)
_total_reward += tr["PT"]
self.current_holding[i] -= 1
else:
unrealized = (self._c - tr["ActionPrice"]) * _point
_total_reward += unrealized * 0.1 # Intermediate reward
self.current_draw_downs[i] = int((self._l - tr["ActionPrice"]) * _point)
_max_draw_down += self.current_draw_downs[i]
if self.current_draw_downs[i] < 0 and tr["MaxDD"] > self.current_draw_downs[i]:
tr["MaxDD"] = self.current_draw_downs[i]
elif tr["Type"] == 1: # Sell
_sl_price = tr["ActionPrice"] + tr["SL"] / _point
_pt_price = tr["ActionPrice"] - tr["PT"] / _point
if done:
p = (tr["ActionPrice"] - self._c) * _point
self._manage_tranaction(tr, p, self._c, status=2)
_total_reward += p
elif self._h >= _sl_price:
self._manage_tranaction(tr, -tr["SL"], _sl_price)
_total_reward += -tr["SL"]
self.current_holding[i] -= 1
elif self._l <= _pt_price:
self._manage_tranaction(tr, tr["PT"], _pt_price)
_total_reward += tr["PT"]
self.current_holding[i] -= 1
else:
unrealized = (tr["ActionPrice"] - self._c) * _point
_total_reward += unrealized * 0.1 # Intermediate reward
self.current_draw_downs[i] = int((tr["ActionPrice"] - self._h) * _point)
_max_draw_down += self.current_draw_downs[i]
if self.current_draw_downs[i] < 0 and tr["MaxDD"] > self.current_draw_downs[i]:
tr["MaxDD"] = self.current_draw_downs[i]
if _max_draw_down > self.max_draw_downs[i]:
self.max_draw_downs[i] = _max_draw_down
return _total_reward
def step(self, actions):
self.current_step += 1
terminated = self.balance <= 0
truncated = self.current_step == len(self.dt_datetime) - 1
done = terminated or truncated
if done:
self.done_information += f"Episode: {self.episode} Balance: {self.balance} Step: {self.current_step}\n"
self.visualization = True
self.episode += 1
base_reward = self._take_action(actions, done)
unrealized_profit = 0
atr_scaling = 0
for i, asset in enumerate(self.assets):
atr = self.get_observation(self.current_step, i, "ATR")
atr_scaling += atr
for tr in self.transaction_live:
if tr["Symbol"] == asset:
if tr["Type"] == 0: # Buy
unrealized = (self._c - tr["ActionPrice"]) * self.cf.symbol(asset, "point")
else: # Sell
unrealized = (tr["ActionPrice"] - self._c) * self.cf.symbol(asset, "point")
unrealized_profit += unrealized
atr_scaling = atr_scaling / len(self.assets) if atr_scaling > 0 else 1
sustained_reward = (unrealized_profit + base_reward) * 0.01 / atr_scaling if self.transaction_live else 0
if not self.transaction_live and all(a == 2 for a in actions):
sustained_reward -= 0.1 # Retained penalty
total_reward = base_reward + sustained_reward
# ... (rest of step unchanged)
return obs, total_reward, terminated, truncated, info
Final Thoughts
- Remove the Exploration Incentive: It’s redundant and ineffective in its current form. The -0.1 penalty in step() handles exploration adequately.
- Monitor Behavior: After removal, check if the agent still explores enough early on. If it overly favors inaction, consider increasing the -0.1 penalty or adding a decaying bonus in step() (e.g., +0.1 * (1 - current_step / max_steps) when trading).
- Alignment with Goal: This keeps the focus on sustained profit (via sustained_reward) while avoiding unnecessary complexity.
Let me know if you’d like to explore alternative exploration strategies or need help testing this change!
Let’s analyze whether you should remove the intermediate unrealized reward calculation (_total_reward += unrealized * 0.1) from the _calculate_reward() method in your tgym environment. I’ll break down its purpose, its interaction with the rest of your reward structure, and provide a recommendation based on your goals.
Current Implementation
Code Snippet in Question:
if tr["Type"] == 0: # Buy
# ... (SL/PT/done logic)
else:
unrealized = (self._c - tr["ActionPrice"]) * _point
_total_reward += unrealized * 0.1 # Intermediate reward
elif tr["Type"] == 1: # Sell
# ... (SL/PT/done logic)
else:
unrealized = (tr["ActionPrice"] - self._c) * _point
_total_reward += unrealized * 0.1 # Intermediate reward
- Context: This code runs when an open position (Buy or Sell) hasn’t hit Stop Loss (SL), Profit Take (PT), or the episode end (done).
- Purpose: Adds a fraction (10%) of the unrealized profit/loss to _total_reward (which becomes base_reward in step()).
- Calculation:
- Buy: unrealized = (current_close - entry_price) * point_value.
- Sell: unrealized = (entry_price - current_close) * point_value.
- _total_reward += unrealized * 0.1: Scales the unrealized gain/loss by 0.1.
Full Reward Structure:
- In _calculate_reward():
- Realized rewards from SL/PT/done + intermediate unrealized reward (unrealized * 0.1).
- Returns _total_reward as base_reward.
- In step():
- Computes unrealized_profit (100% of unrealized gains/losses) for all open positions.
- sustained_reward = (unrealized_profit + base_reward) * 0.01 / atr_scaling if positions are open, else 0.
- Adds an inaction penalty (-0.1) if no positions and all actions are Nothing.
- total_reward = base_reward + sustained_reward.
Example:
- Scenario: XAUUSD, Buy at 1800.00, current close = 1805.00, _point = 1.0, ATR = 1.0, 1 position open.
- _calculate_reward():
- unrealized = (1805 - 1800) * 1 = 5.
- _total_reward += 5 * 0.1 = 0.5.
- base_reward = 0.5.
- step():
- unrealized_profit = 5.
- sustained_reward = (5 + 0.5) * 0.01 / 1 = 0.055.
- total_reward = 0.5 + 0.055 = 0.555.
- _calculate_reward():
Should You Remove It?
Purpose of Intermediate Unrealized Reward:
- Intent: Provides immediate feedback to the agent about the profitability of open positions at each step, rather than waiting for SL/PT or episode end.
- Docstring Goal (Point 15): “Reward profit sustained over long periods… delay rewarding too fast early on.”
- This intermediate reward partially aligns with rewarding sustained profit but acts as an early signal, potentially conflicting with the “delay” aspect.
- Effect: Encourages the agent to hold positions that are currently profitable and close those that aren’t, shaping behavior before realization.
Pros of Keeping It:
- Shaping Behavior:
- Gives the agent a continuous signal about position performance, helping it learn to manage trades mid-episode.
- Without it, base_reward is 0 unless SL/PT/done triggers, which might make learning sparse and slow.
- Complements Sustained Reward:
- Adds a small, immediate component to base_reward, while sustained_reward handles longer-term unrealized gains.
- Encourages Active Management:
- Rewards holding winning positions and penalizes holding losers incrementally.
Cons of Keeping It:
- Duplication with sustained_reward:
- step() already includes unrealized_profit in sustained_reward, scaled differently (0.01 / atr_scaling vs. 0.1).
- Example: Unrealized profit of 5 contributes 0.5 to base_reward and 0.05 to sustained_reward (ATR=1), creating overlap.
- Issue: Double-counting unrealized gains might overemphasize holding positions, even unprofitable ones.
- Conflicts with Delayed Reward Goal:
- Per your docstring, you want to delay rewards to encourage exploration and avoid early optimization.
- This immediate 0.1 * unrealized provides feedback too soon, potentially biasing the agent toward short-term gains.
- Complexity:
- Adds an extra layer to the reward, making it harder to tune and interpret.
Interaction with sustained_reward:
- Current:
- base_reward (with unrealized * 0.1) feeds into sustained_reward, amplifying the unrealized effect.
- Total contribution of unrealized profit of 5: 0.5 + (5 + 0.5) * 0.01 = 0.555.
- Without Intermediate Reward:
- base_reward = 0 (unless SL/PT/done).
- sustained_reward = 5 * 0.01 / 1 = 0.05.
- total_reward = 0 + 0.05 = 0.05.
- Difference: Keeping it boosts the reward significantly (0.555 vs. 0.05), emphasizing unrealized gains more heavily.
Alignment with Goals:
- Sustained Profit: You want to reward holding profitable positions over time. sustained_reward already does this, scaled by ATR and position existence.
- Exploration: Immediate rewards might reduce exploration by locking the agent into optimizing current unrealized gains too early.
- Sparse Rewards: Removing it makes base_reward sparse (only SL/PT/done), but sustained_reward still provides continuous feedback.
Recommendation: Remove It
Why Remove:
- Redundancy:
- sustained_reward in step() already captures unrealized profit, scaled appropriately by ATR and limited to open positions. The 0.1 * unrealized in _calculate_reward() duplicates this effort.
- Better Alignment with Docstring:
- Removing it delays reward feedback until positions are realized (SL/PT/done) or sustained via sustained_reward, encouraging exploration and long-term strategies over short-term unrealized gains.
- Simplification:
- Streamlines the reward structure: base_reward for realized profits, sustained_reward for unrealized profits, and a penalty for inaction.
- Avoid Over-Rewarding:
- Prevents excessive reward inflation (e.g., 0.555 vs. 0.05), which could bias the agent toward holding positions regardless of long-term viability.
Updated _calculate_reward():
def _calculate_reward(self, i, done, action):
_total_reward = 0
_max_draw_down = 0
for tr in self.transaction_live[:]:
if tr["Symbol"] == self.assets[i]:
_point = self.cf.symbol(self.assets[i], "point")
if self._day > tr["DateDuration"]:
tr["DateDuration"] = self._day
tr["Reward"] -= self.cf.symbol(self.assets[i], "over_night_penalty")
if tr["Type"] == 0: # Buy
_sl_price = tr["ActionPrice"] - tr["SL"] / _point
_pt_price = tr["ActionPrice"] + tr["PT"] / _point
if done:
p = (self._c - tr["ActionPrice"]) * _point
self._manage_tranaction(tr, p, self._c, status=2)
_total_reward += p
elif self._l <= _sl_price:
self._manage_tranaction(tr, -tr["SL"], _sl_price)
_total_reward += -tr["SL"]
self.current_holding[i] -= 1
elif self._h >= _pt_price:
self._manage_tranaction(tr, tr["PT"], _pt_price)
_total_reward += tr["PT"]
self.current_holding[i] -= 1
else:
# Removed intermediate unrealized reward
self.current_draw_downs[i] = int((self._l - tr["ActionPrice"]) * _point)
_max_draw_down += self.current_draw_downs[i]
if self.current_draw_downs[i] < 0 and tr["MaxDD"] > self.current_draw_downs[i]:
tr["MaxDD"] = self.current_draw_downs[i]
elif tr["Type"] == 1: # Sell
_sl_price = tr["ActionPrice"] + tr["SL"] / _point
_pt_price = tr["ActionPrice"] - tr["PT"] / _point
if done:
p = (tr["ActionPrice"] - self._c) * _point
self._manage_tranaction(tr, p, self._c, status=2)
_total_reward += p
elif self._h >= _sl_price:
self._manage_tranaction(tr, -tr["SL"], _sl_price)
_total_reward += -tr["SL"]
self.current_holding[i] -= 1
elif self._l <= _pt_price:
self._manage_tranaction(tr, tr["PT"], _pt_price)
_total_reward += tr["PT"]
self.current_holding[i] -= 1
else:
# Removed intermediate unrealized reward
self.current_draw_downs[i] = int((tr["ActionPrice"] - self._h) * _point)
_max_draw_down += self.current_draw_downs[i]
if self.current_draw_downs[i] < 0 and tr["MaxDD"] > self.current_draw_downs[i]:
tr["MaxDD"] = self.current_draw_downs[i]
if _max_draw_down > self.max_draw_downs[i]:
self.max_draw_downs[i] = _max_draw_down
return _total_reward
Impact:
- Base Reward: Only includes realized profits/losses (SL, PT, or forced close).
- Sustained Reward: Solely responsible for rewarding unrealized profits, scaled by 0.01 / atr_scaling.
- Example (Same Scenario):
- base_reward = 0.
- sustained_reward = 5 * 0.01 / 1 = 0.05.
- total_reward = 0 + 0.05 = 0.05.
Potential Concerns and Mitigations
- Sparse Rewards:
- Concern: base_reward becomes zero unless a position closes, potentially slowing learning.
- Mitigation: sustained_reward still provides continuous feedback for open positions, preventing fully sparse rewards. If learning is too slow, increase the 0.01 scaling in sustained_reward (e.g., to 0.05).
- Exploration:
- Concern: Without intermediate rewards, the agent might not adjust open positions dynamically.
- Mitigation: The -0.1 inaction penalty in step() encourages opening positions, and sustained_reward incentivizes holding profitable ones.
Final Thoughts 最后的思考
- Remove It: The intermediate unrealized reward is redundant with sustained_reward, misaligns with your delayed reward goal, and complicates tuning. Removing it simplifies the structure while preserving your intent.
Remove It:中间未实现的奖励与 sustained_reward 是多余的,与您的延迟奖励目标不一致,并使调整复杂化。删除它可简化结构,同时保留您的意图。 - Monitor Performance: After removal, check if the agent learns effectively. If it struggles with sparse rewards, adjust sustained_reward’s scaling factor or reintroduce a smaller, time-decaying intermediate reward (e.g., unrealized * 0.01 * (1 - current_step / max_steps)).
监控性能:删除后,检查代理是否有效学习。如果难以处理稀疏奖励,请调整 sustained_reward 的缩放系数或重新引入更小的、时间衰减的中间奖励(例如,未实现 * 0.01 * (1 - current_step / max_steps))。
Let me know if you’d like help testing this change or exploring alternatives!
如果您想帮助测试此更改或探索替代方案,请告诉我!
Let’s address your two questions by analyzing your tgym environment code in the context of the concern about exploration and dynamic adjustment of open positions, including Stop Loss (SL) and Take Profit (PT).
1. Does Your Code Implement the Mitigation for the Exploration Concern?
Concern Recap:
- Concern: Without intermediate unrealized rewards (previously unrealized * 0.1 in _calculate_reward()), the agent might not adjust open positions dynamically, potentially leading to a passive strategy where it opens positions but doesn’t manage them effectively.
- Mitigation:
- The -0.1 inaction penalty in step() encourages opening positions.
- The sustained_reward incentivizes holding profitable positions over time.
Relevant Code:
- Inaction Penalty in step():
if not self.transaction_live and all(a == 2 for a in actions):
sustained_reward -= 0.1 # Small penalty to encourage exploration
- Sustained Reward in step():
sustained_reward = (unrealized_profit + base_reward) * 0.01 / atr_scaling if self.transaction_live else 0
- Base Reward in _calculate_reward():
- Now only includes realized profits/losses (SL, PT, or forced close at episode end), no intermediate unrealized rewards.
Analysis:
- Does the -0.1 Penalty Encourage Opening Positions?
- Yes: When no positions are open (not self.transaction_live) and the agent chooses all "Nothing" actions (all(a == 2)), it receives a -0.1 penalty. This discourages sitting idle and pushes the agent to open positions (Buy=0 or Sell=1).
- Evidence: The penalty is applied consistently in step(), affecting total_reward negatively unless the agent takes action to open trades.
- Does sustained_reward Incentivize Holding Profitable Positions?
- Yes: When positions are open (self.transaction_live), sustained_reward is calculated as (unrealized_profit + base_reward) * 0.01 / atr_scaling. This rewards the agent for unrealized profits, scaled by market volatility (ATR), encouraging it to hold positions that are currently profitable.
- Example: For a Buy position with unrealized_profit = 5, base_reward = 0, and atr_scaling = 1, sustained_reward = 5 * 0.01 = 0.05, providing positive feedback for holding.
- Mitigation Effectiveness:
- Opening Positions: The -0.1 penalty addresses the concern by ensuring the agent doesn’t remain inactive when no positions are open, promoting exploration of trading actions.
- Holding Profitable Positions: sustained_reward provides a continuous signal to maintain profitable trades, aligning with your goal of rewarding sustained profit over time (docstring point 15).
- Dynamic Adjustment: The mitigation assumes the agent will adjust positions based on these incentives, but this depends on its ability to close or modify existing trades (more on this in Question 2).
Conclusion for Question 1:
- Yes, Your Code Implements the Mitigation:
- The -0.1 penalty encourages opening positions when none exist, addressing the exploration aspect.
- sustained_reward incentivizes holding profitable positions, supporting dynamic management by rewarding unrealized gains.
- Caveat: While the mitigation is implemented, its effectiveness in ensuring dynamic adjustment (e.g., closing losing positions or optimizing open ones) depends on the action space and reward signals, which we’ll explore next.
2. Can Your Code Adjust Open Positions Dynamically? Can It Adjust SL/PT Dynamically?
Current Action Space and Mechanics:
- Action Space:
self.action_space = spaces.MultiDiscrete([3] * len(self.assets)) # 0: Buy, 1: Sell, 2: Nothing
- 3 actions per asset: Buy (0), Sell (1), Nothing (2).
- Position Management in _take_action():
def _take_action(self, actions, done):
rewards = [0] * len(self.assets)
self.tranaction_open_this_step = []
self.tranaction_close_this_step = []
for i, _action in enumerate(actions):
# ... (OHLC retrieval)
rewards[i] = self._calculate_reward(i, done, _action)
if self.cf.symbol(self.assets[i], "limit_order"):
self._limit_order_process(i, _action, done)
if (_action in (0, 1) and not done and
self.current_holding[i] < self.cf.symbol(self.assets[i], "max_current_holding")):
_profit_taken = self.cf.symbol(self.assets[i], "profit_taken_max") + self.cf.symbol(self.assets[i], "stop_loss_max")
# ... (transaction creation logic)
self.current_holding[i] += 1
self.transaction_live.append(transaction)
return sum(rewards)
- Reward Calculation in _calculate_reward():
def _calculate_reward(self, i, done, action):
_total_reward = 0
for tr in self.transaction_live[:]:
if tr["Symbol"] == self.assets[i]:
_point = self.cf.symbol(self.assets[i], "point")
if tr["Type"] == 0: # Buy
_sl_price = tr["ActionPrice"] - tr["SL"] / _point
_pt_price = tr["ActionPrice"] + tr["PT"] / _point
if done:
p = (self._c - tr["ActionPrice"]) * _point
self._manage_tranaction(tr, p, self._c, status=2)
_total_reward += p
elif self._l <= _sl_price:
self._manage_tranaction(tr, -tr["SL"], _sl_price)
_total_reward += -tr["SL"]
self.current_holding[i] -= 1
elif self._h >= _pt_price:
self._manage_tranaction(tr, tr["PT"], _pt_price)
_total_reward += tr["PT"]
self.current_holding[i] -= 1
elif tr["Type"] == 1: # Sell
# ... (similar SL/PT logic)
return _total_reward
Can It Adjust Open Positions Dynamically?
- What "Dynamic Adjustment" Means: The ability to actively manage existing positions (e.g., closing them early, opening opposite trades, or modifying parameters) based on current market conditions or portfolio state.
- Current Capabilities:
- Opening Positions:
- Buy (0) or Sell (1) actions open new positions if current_holding[i] < max_current_holding.
- Works dynamically per step based on the agent’s action.
- Closing Positions:
- Positions close automatically when:
- SL is hit (self._l <= _sl_price for Buy, self._h >= _sl_price for Sell).
- PT is hit (self._h >= _pt_price for Buy, self._l <= _pt_price for Sell).
- Episode ends (done).
- Limitation: No explicit action to close an existing position manually. The agent can’t choose to close a position before SL/PT unless it opens an opposite trade (e.g., Sell after Buy), but this isn’t explicitly supported or tracked as a close action.
- Positions close automatically when:
- Opposite Trades:
- If a Buy position is open, a Sell action opens a new Sell position but doesn’t close the Buy position unless max_current_holding is reached (limiting further trades).
- No netting or hedging logic (e.g., closing a Buy with a Sell directly).
- Opening Positions:
- Observation Feedback:
- portfolio_data includes current_holding and current_draw_downs, giving the agent visibility into open positions.
- sustained_reward reflects unrealized profits, incentivizing holding winners or closing losers indirectly.
- Answer:
- Partially Yes: The agent can adjust open positions dynamically by opening new ones, and SL/PT triggers close them automatically. However, it lacks direct control to manually close or modify existing positions mid-episode without relying on SL/PT.
Can It Dynamically Adjust Stop Loss (SL) and Take Profit (PT)?
- Current SL/PT Setup:
- Defined in _take_action() when opening a position:
_profit_taken = self.cf.symbol(self.assets[i], "profit_taken_max") + self.cf.symbol(self.assets[i], "stop_loss_max")
"SL": self.cf.symbol(self.assets[i], "stop_loss_max"),
"PT": _profit_taken,
- SL and PT are fixed values from the config file (e.g., stop_loss_max, profit_taken_max) set at trade initiation.
- Adjustment Logic:
- No mechanism exists to modify SL or PT after a position is opened. They remain static until the position closes.
- The agent can’t issue an action to update SL/PT based on new market conditions (e.g., trailing stop or tightening PT).
- Reward Influence:
- sustained_reward reflects unrealized profits but doesn’t adjust SL/PT.
- _calculate_reward() enforces fixed SL/PT thresholds without flexibility.
- Answer:
- No: Your code cannot dynamically adjust SL or PT once a position is opened. These values are pre-fixed per asset from the config and remain unchanged throughout the position’s life.
Enhancing Dynamic Adjustment:
- To Adjust Open Positions:
- Expand Action Space: Add actions like "Close Buy" (3) and "Close Sell" (4) to action_space (e.g., spaces.MultiDiscrete([5] * len(self.assets))).
- Modify _take_action(): Handle close actions by removing specific transactions from transaction_live and updating current_holding.
python
if _action == 3: # Close Buy for tr in self.transaction_live[:]: if tr["Symbol"] == self.assets[i] and tr["Type"] == 0: p = (self._c - tr["ActionPrice"]) * _point self._manage_tranaction(tr, p, self._c, status=5) # New status for manual close self.current_holding[i] -= 1 rewards[i] += p
- To Adjust SL/PT Dynamically:
- Expand Action Space: Use continuous actions (e.g., spaces.Box) to specify SL/PT adjustments (e.g., [action_type, sl_adjust, pt_adjust] per asset).
- Modify Transaction: Update SL and PT in transaction_live based on actions.
python
if _action[0] == 2: # Adjust SL/PT for tr in self.transaction_live: if tr["Symbol"] == self.assets[i]: tr["SL"] = max(0, tr["SL"] + _action[1]) # Adjust SL tr["PT"] = max(0, tr["PT"] + _action[2]) # Adjust PT
- Config Update: Allow dynamic SL/PT ranges in the config file.
Summary
- Mitigation for Exploration Concern:
- Yes: Your code implements the mitigation:
- -0.1 penalty encourages opening positions when none exist.
- sustained_reward incentivizes holding profitable ones.
- Effectiveness: Sufficient for opening and holding, but limited for closing or adjusting without additional actions.
- Yes: Your code implements the mitigation:
- Dynamic Adjustment:
- Open Positions: Partially yes—can open new positions and rely on SL/PT to close, but no direct manual closing without expanding the action space.
- SL/PT: No—fixed at trade initiation, no dynamic adjustment possible with the current setup.
Recommendations:
- If Satisfied with Current Behavior: Keep as is if automatic SL/PT and sustained reward suffice for your strategy.
- For More Control:
- Add close actions to dynamically manage open positions.
- Introduce adjustable SL/PT via a richer action space if you want the agent to optimize these parameters.
Let me know if you’d like code snippets to implement these enhancements!
1. 对于下面的代码点 10.ProfitTaken = fraction_action * max_profit_taken + SL。这可以动态调整止盈吗? ``` 导入 datetime 导入数学 导入随机 进口健身房 将 numpy 导入为 NP 从 Gym 导入空间 from gym.utils import seeding from stable_baselines3.common.vec_env import DummyVecEnv from meta.env_fx_trading.util.log_render import render_to_file from meta.env_fx_trading.util.plot_chart 导入 TradingChart from meta.env_fx_trading.util.read_config import EnvConfig class tgym(健身房.Env): “”“外汇/期货/期权交易 GYM 环境 1. 三个作空间(0 买入、1 卖出、2 无) 2. 同一时间范围内的多个交易对(EURUSD、GBPUSD... 3. 时间范围从 1 分钟到每天,只要使用烛台柱(开盘价、最高价、最低价、收盘价) 4. 使用 StopLose、ProfitTaken 实现奖励。每对都可以在配置文件中配置自己的 SL 和 PT 5. 配置隔夜现金罚金以及每对的交易手续费和隔夜持仓罚金 6. 将数据集拆分为每日、每周或每月...,具有固定的时间步长,在 len(df) 结束时。业务 logic 将强制在最后收盘价平仓所有持仓(游戏结束)。 7. 必须具有 df 列名称:[(time_col),(asset_col),Open,Close,High,Low,day](区分大小写) 8. 加法指标可以在数据处理过程中添加。Finta 提供 78 种 TA 指标 9. 在 json 配置文件中处理的自定义观察列表。 10. ProfitTaken = fraction_action * max_profit_taken + 止损。 11. SL 是预先固定的 12. 限价单可以配置,如果 limit_order == True,则动作将预设在柱线的低点或高点买入或卖出, 带 limit_order_expiration (n 条)。如果价格交叉,它将触发。否则,它将是 drop off 13. 渲染模式: human -- 在控制台上显示每个步骤实现的奖励 file -- 创建事务日志 graph -- 在 Graph 中创建交易 (开发中) 14. 15. 奖励,我们希望激励长期持续的利润。 在每个步骤中,我们将奖励设置为账户余额乘以 到目前为止时间步数的一小部分。这样做的目的是拖延 在早期阶段过快地奖励代理并允许其探索 在过于深入地优化单个策略之前。 它还将奖励在更长时间内保持较高余额的代理, 而不是那些使用不可持续的策略快速赚钱的人。 16. Observation_space 包含我们需要代理的所有输入变量 在进行交易或不进行交易之前考虑。我们希望我们的代理 “看到” 游戏窗口中的外汇数据点(开盘价、最高价、最低价、收盘价、时间序列、TA), 以及其他一些数据点,如账户余额、当前头寸、 和当前利润。这里的直觉是,对于每个时间步,我们都需要我们的代理 考虑导致当前价格的价格行为,以及他们的 拥有投资组合的状态,以便为下一步行动做出明智的决策。 17. 奖励为外汇交易单位积分,可为每个交易对配置 """ 元数据 = {“render.modes”: [“graph”, “human”, “file”, “none”]} 防守 __init__( 自我 DF / env_config_file=“./neo_finrl/env_fx_trading/config/gdbusd-test-1.json”, ) -> 无: 断言 df.ndim == 2 超级(tgym, self).__init__() self.cf = 环境配置 (env_config_file) self.observation_list = self.cf.env_parameters(“observation_list”) self.balance_initial = self.cf.env_parameters(“余额”) self.over_night_cash_penalty = self.cf.env_parameters(“over_night_cash_penalty”) self.asset_col = self.cf.env_parameters(“asset_col”) self.time_col = self.cf.env_parameters(“time_col”) self.random_start = self.cf.env_parameters(“random_start”) self.log_filename = ( self.cf.env_parameters(“log_filename”) + datetime.datetime.now().strftime(“%Y%m%d%H%M%S”) + “.csv” ) self.df = df self.df[“_time”] = df[self.time_col] self.df[“_day”] = df[“工作日”] self.assets = df[self.asset_col].unique() self.dt_datetime = df[self.time_col].sort_values().unique() self.df = self.df.set_index(self.time_col) self.visualization = False # --- 重置值 --- self.equity_list = [0] * len(self.assets) self.balance = self.balance_initial self.total_equity = self.balance + sum(self.equity_list) self.ticket_id = 0 self.transaction_live = [] self.transaction_history = [] self.transaction_limit_order = [] self.current_draw_downs = [0.0] * len(self.assets) self.max_draw_downs = [0.0] * len(self.assets) self.max_draw_down_pct = sum(self.max_draw_downs) / self.balance * 100 self.current_step = 0 self.episode = -1 self.current_holding = [0] * len(self.assets) self.tranaction_open_this_step = [] self.tranaction_close_this_step = [] self.current_day = 0 self.done_information = “” self.log_header = 真 # --- 结束重置 --- self.cached_data = [ self.get_observation_vector(_dt) 用于 self.dt_datetime 的 _dt ] self.cached_time_serial = ( (self.df[[“_time”, “_day”]].sort_values(“_time”)).drop_duplicates() ).values.tolist() self.reward_range = (-np.inf, np.inf) self.action_space = 空格。Box(low=0, high=3, shape=(len(self.assets),)) # 前两个 3 = balance,current_holding, max_draw_down_pct _space = 3 + len(self.assets) + len(self.assets) * len(self.observation_list) self.observation_space = 空格。框(low=-np.inf, high=np.inf, shape=(_space,)) 打印( f“初始完成:\n” f“observation_list:{self.observation_list}\n ” f“资产:{self.assets}\n ” f“时间序列: {min(self.dt_datetime)} -> {max(self.dt_datetime)} 长度: {len(self.dt_datetime)}” ) self._seed() def _seed(self, seed=None): self.np_random,种子 = seeding.np_random(种子) 返回 [种子] def _history_df(自我, i): 通过 def _take_action(self, actions, done): #作 = math.floor(x), # profit_taken = math.ceil((x- math.floor(x)) * profit_taken_max - stop_loss_max ) # _actions = np.floor(作).astype(int) # _profit_takens = np.ceil((作 - np.floor(作)) *self.cf.symbol(self.assets[i],“profit_taken_max”)).astype(int) _action = 2 _profit_taken = 0 奖励 = [0] * len(self.assets) self.tranaction_open_this_step = [] self.tranaction_close_this_step = [] # 需要使用 multiply 资产 对于 enumerate(actions) 中的 i, x: self._o = self.get_observation(self.current_step, i, “打开”) self._h = self.get_observation(self.current_step, i, “高”) self._l = self.get_observation(self.current_step, i, “低”) self._c = self.get_observation(self.current_step, i, “关闭”) self._t = self.get_observation(self.current_step, i, “_time”) self._day = self.get_observation(self.current_step, i, “_day”) _action = math.floor(x) 奖励[i] = self._calculate_reward(i, 完成) 如果 self.cf.symbol(self.assets[i], “limit_order”): self._limit_order_process(i, _action, 完成) 如果 ( _action 英寸 (0, 1) 并且未完成 和 self.current_holding[i] < self.cf.symbol(self.assets[i], “max_current_holding”) ): # 根据作用分数生成 PT _profit_taken = math.ceil( (x - _action) * self.cf.symbol(self.assets[i], “profit_taken_max”) ) + self.cf.symbol(self.assets[i], “stop_loss_max”) self.ticket_id += 1 如果 self.cf.symbol(self.assets[i], “limit_order”): 交易 = { “票证”:self.ticket_id、 “Symbol”:self.assets[i], “ActionTime”:self._t、 “类型”:_action、 “手数”: 1, “ActionPrice”: self._l if _action == 0 else self._h, “止损”: self.cf.symbol(self.assets[i], “stop_loss_max”), “PT”:_profit_taken、 “MaxDD”:0、 “Swap(掉期)”:0.0、 “CloseTime”: “”, ///// “ClosePrice(收盘价)”: 0.0, “点”:0、 “奖励”: -self.cf.symbol(self.assets[i], “transaction_fee”), “DateDuration”:self._day、 “状态”:0、 “LimitStep”:self.current_step、 “ActionStep”: -1, “CloseStep”:-1、 } self.transaction_limit_order.append(事务) 还: 交易 = { “票证”:self.ticket_id、 “Symbol”:self.assets[i], “ActionTime”:self._t、 “类型”:_action、 “手数”: 1, “ActionPrice”:self._c、 “止损”: self.cf.symbol(self.assets[i], “stop_loss_max”), “PT”:_profit_taken、 “MaxDD”:0、 “Swap(掉期)”:0.0、 “CloseTime”: “”, ///// “ClosePrice(收盘价)”: 0.0, “点”:0、 “奖励”: -self.cf.symbol(self.assets[i], “transaction_fee”), “DateDuration”:self._day、 “状态”:0、 “LimitStep”:self.current_step、 “ActionStep”:self.current_step、 “CloseStep”:-1、 } self.current_holding[i] += 1 self.tranaction_open_this_step.append(事务) self.balance -= self.cf.symbol(self.assets[i], “transaction_fee”) self.transaction_live.append(事务) return sum(奖励) def _calculate_reward(self, i, done): _total_reward = 0 _max_draw_down = 0 对于 self.transaction_live 中的 TR: if tr[“Symbol”] == self.assets[i]: _point = self.cf.symbol(self.assets[i], “点”) # 隔夜现金折扣 如果 self._day > tr[“DateDuration”]: tr[“DateDuration”] = self._day tr[“奖励”] -= self.cf.symbol(self.assets[i], “over_night_penalty”) if tr[“Type”] == 0: # 买入 # 止损触发器 _sl_price = tr[“ActionPrice”] - tr[“SL”] / _point _pt_price = tr[“ActionPrice”] + tr[“PT”] / _point 如果完成: p = (self._c - tr[“ActionPrice”]) * _point self._manage_tranaction(tr, p, self._c, status=2) _total_reward += p elif self._l <= _sl_price: self._manage_tranaction(tr, -tr[“SL”], _sl_price) _total_reward += -tr[“SL”] self.current_holding[i] -= 1 elif self._h >= _pt_price: self._manage_tranaction(tr, tr[“PT”], _pt_price) _total_reward += tr[“PT”] self.current_holding[i] -= 1 else: # 仍然打开 self.current_draw_downs[i] = int( (self._l - tr[“ActionPrice”]) * _point ) _max_draw_down += self.current_draw_downs[i] 如果 ( self.current_draw_downs[i] < 0 和 tr[“MaxDD”] > self.current_draw_downs[i] ): tr[“最大DD”] = self.current_draw_downs[i] elif tr[“Type”] == 1: # 卖出 # 止损触发器 _sl_price = tr[“ActionPrice”] + tr[“SL”] / _point _pt_price = tr[“ActionPrice”] - tr[“PT”] / _point 如果完成: p = (tr[“ActionPrice”] - self._c) * _point self._manage_tranaction(tr, p, self._c, status=2) _total_reward += p elif self._h >= _sl_price: self._manage_tranaction(tr, -tr[“SL”], _sl_price) _total_reward += -tr[“SL”] self.current_holding[i] -= 1 elif self._l <= _pt_price: self._manage_tranaction(tr, tr[“PT”], _pt_price) _total_reward += tr[“PT”] self.current_holding[i] -= 1 还: self.current_draw_downs[i] = int( (tr[“ActionPrice”] - self._h) * _point ) _max_draw_down += self.current_draw_downs[i] 如果 ( self.current_draw_downs[i] < 0 和 tr[“MaxDD”] > self.current_draw_downs[i] ): tr[“最大DD”] = self.current_draw_downs[i] 如果 _max_draw_down > self.max_draw_downs[i]: self.max_draw_downs[i] = _max_draw_down 返回 _total_reward def _limit_order_process(self, i, _action, done): 对于 self.transaction_limit_order 中的 TR: if tr[“Symbol”] == self.assets[i]: if tr[“Type”] != _action 或 done: self.transaction_limit_order.删除 (tr) tr[“状态”] = 3 tr[“CloseStep”] = self.current_step self.transaction_history.append(tr) elif (tr[“ActionPrice”] >= self._l 和 _action == 0) 或 ( tr[“ActionPrice”] <= self._h 和 _action == 1 ): tr[“ActionStep”] = self.current_step self.current_holding[i] += 1 self.balance -= self.cf.symbol(self.assets[i], “transaction_fee”) self.transaction_limit_order.删除 (tr) self.transaction_live.append(tr) self.tranaction_open_this_step.append(tr) elif ( tr[“限制步骤”] + self.cf.symbol(self.assets[i], “limit_order_expiration”) > self.current_step ): tr[“CloseStep”] = self.current_step tr[“状态”] = 4 self.transaction_limit_order.删除 (tr) self.transaction_history.append(tr) def _manage_tranaction(self, tr, _p, close_price, status=1): self.transaction_live.remove(tr) tr[“收盘价”] = close_price tr[“点”] = int(_p) tr[“奖励”] = int(tr[“奖励”] + _p) tr[“状态”] = 状态 tr[“CloseTime”] = self._t self.balance += int(tr[“奖励”]) self.total_equity -= int(abs(tr[“奖励”])) self.tranaction_close_this_step.append(tr) self.transaction_history.append(tr) def step(self, actions): # 在环境中执行一个 time step self.current_step += 1 done = self.balance <= 0 或 self.current_step == len(self.dt_datetime) - 1 如果完成: self.done_information += f“集数: {self.episode} 平衡: {self.balance} 步数: {self.current_step}\n” self.visualization = 真 奖励 = self._take_action(作,完成) 如果 self._day > self.current_day: self.current_day = self._day self.balance -= self.over_night_cash_penalty 如果 self.balance != 0: self.max_draw_down_pct = abs(sum(self.max_draw_downs) / self.balance * 100) # 不再执行任何作 OBS = ( [自我平衡,self.max_draw_down_pct] + self.current_holding + self.current_draw_downs + self.get_observation(self.current_step) ) 返回 ( np.array(obs).astype(np.float32), 奖励 做 {“关闭”: self.tranaction_close_this_step}, ) def get_observation(self, _step, _iter=0, col=None): 如果 col 为 None: 返回 self.cached_data[_step] 如果 col == “_day”: 返回 self.cached_time_serial[_step][1] elif col == “_time”: 返回 self.cached_time_serial[_step][0] col_pos = -1 对于 i,_symbol 在 enumerate(self.observation_list) 中: 如果 _symbol == col: col_pos = i 破 断言 col_pos >= 0 返回 self.cached_data[_step][_iter * len(self.observation_list) + col_pos] def get_observation_vector(self, _dt, cols=无): cols = self.observation_list v = [] for a in self.assets 中: 子集 = self.df.query( f'{self.asset_col} == “{a}” & {self.time_col} == “{_dt}”' ) assert not subset.empty v += subset.loc[_dt, cols].tolist() 断言 len(v) == len(self.assets) * len(cols) 返回 v def reset(自身): # 将环境的状态重置为初始状态 self.seed() 如果self.random_start: self.current_step = random.choice(range(int(len(self.dt_datetime) * 0.5))) 还: self.current_step = 0 self.equity_list = [0] * len(self.assets) self.balance = self.balance_initial self.total_equity = self.balance + sum(self.equity_list) self.ticket_id = 0 self.transaction_live = [] self.transaction_history = [] self.transaction_limit_order = [] self.current_draw_downs = [0.0] * len(self.assets) self.max_draw_downs = [0.0] * len(self.assets) self.max_draw_down_pct = sum(self.max_draw_downs) / self.balance * 100 self.episode = -1 self.current_holding = [0] * len(self.assets) self.tranaction_open_this_step = [] self.tranaction_close_this_step = [] self.current_day = 0 self.done_information = “” self.log_header = 真 self.visualization = False _space = ( [自我平衡,self.max_draw_down_pct] + [0] * len(self.assets) + [0] * len(self.assets) + self.get_observation(self.current_step) ) 返回 np.array(_space).astype(np.float32) def render(self, mode=“human”, title=None, **kwargs): # 将环境渲染到屏幕上 if mode in (“human”, “file”): 打印输出 = 模式 == “人类” 下午 = { “log_header”: self.log_header, “log_filename”: self.log_filename, “printout”: 打印输出, “balance”: self.balance, “balance_initial”:self.balance_initial、 “tranaction_close_this_step”:self.tranaction_close_this_step、 “done_information”:self.done_information、 } render_to_file(**pm) 如果 self.log_header: self.log_header = 假 elif 模式 == “graph” 和 self.visualization: print(“绘图...”) p = TradingChart(self.df, self.transaction_history) p.plot() def close(个体): 通过 def get_sb_env(个体经营): e = DummyVecEnv([lambda: self]) obs = e.reset() 返回 E、OBS ``` 2. 如果这种方法可以动态调整止盈,那么有没有其他方法可以动态调整 PT 和 SL? 3. 有没有办法设置我的风险值,然后根据我的风险值计算模型计算止盈和止损?
Let’s dive into your questions about the ProfitTaken calculation in your tgym environment, its ability to dynamically adjust Take Profit (PT) and Stop Loss (SL), and how you might incorporate a risk value to calculate these dynamically.
1. Does ProfitTaken = fraction_action * max_profit_taken + SL Dynamically Adjust Take Profit?
Relevant Code:
- Docstring Point 10:
text
10. ProfitTaken = fraction_action * max_profit_taken + SL.
- Action Space:
self.action_space = spaces.Box(low=0, high=3, shape=(len(self.assets),))
- Take Action Logic:
def _take_action(self, actions, done):
for i, x in enumerate(actions):
_action = math.floor(x)
if (_action in (0, 1) and not done and
self.current_holding[i] < self.cf.symbol(self.assets[i], "max_current_holding")):
_profit_taken = math.ceil(
(x - _action) * self.cf.symbol(self.assets[i], "profit_taken_max")
) + self.cf.symbol(self.assets[i], "stop_loss_max")
# ... (transaction creation with "PT": _profit_taken)
Analysis:
- Action Interpretation:
- actions is a continuous array from 0 to 3 for each asset (e.g., [2.7, 1.3, 0.4]).
- _action = math.floor(x): Extracts the integer part (0=Buy, 1=Sell, 2=Nothing).
- fraction_action = x - _action: Extracts the fractional part (e.g., for x = 2.7, _action = 2, fraction_action = 0.7).
- Profit Taken Calculation:
- Formula: _profit_taken = math.ceil((x - _action) * max_profit_taken) + stop_loss_max.
- max_profit_taken: Maximum PT value from the config file (e.g., self.cf.symbol(self.assets[i], "profit_taken_max")).
- stop_loss_max: Fixed SL value from the config (e.g., self.cf.symbol(self.assets[i], "stop_loss_max")).
- Example:
- x = 0.7, _action = 0 (Buy), fraction_action = 0.7.
- Config: max_profit_taken = 100, stop_loss_max = 50.
- _profit_taken = math.ceil(0.7 * 100) + 50 = math.ceil(70) + 50 = 70 + 50 = 120.
- Dynamic Adjustment:
- Yes, at Opening: The PT is dynamically set when a position is opened based on the fractional part of the action (x - _action). This allows the agent to specify a PT between stop_loss_max (when fraction = 0) and max_profit_taken + stop_loss_max (when fraction approaches 1).
- No, After Opening: Once set in the transaction dictionary (e.g., "PT": _profit_taken), PT remains static for that position’s lifetime. There’s no mechanism in _calculate_reward() or elsewhere to modify it mid-trade.
- SL Handling:
- SL is fixed at stop_loss_max from the config and isn’t influenced by the action’s fractional part (per docstring point 11: "SL is pre-fixed").
Conclusion for Question 1:
- Yes, It Dynamically Adjusts PT at Trade Initiation:
- The formula fraction_action * max_profit_taken + SL allows the agent to set a variable PT when opening a position, based on the continuous action value (0 to 3).
- Example: For x = 0.3 (Buy), PT might be 0.3 * 100 + 50 = 80; for x = 0.8, PT becomes 0.8 * 100 + 50 = 130.
- Limitation: This adjustment only happens when the trade is opened. PT cannot be changed dynamically after the position is live (e.g., no trailing stops or mid-trade adjustments).
2. If It Can Dynamically Adjust PT, Are There Other Ways to Adjust PT and SL Dynamically?
Current Limitation:
- PT is adjustable only at trade initiation via the fractional action.
- SL is fixed at stop_loss_max and never adjusts.
- No mid-trade adjustments are possible for either PT or SL with the current action space ([0, 3]) and logic.
Alternative Methods to Dynamically Adjust PT and SL:
- Expand Action Space to Include PT/SL Adjustments:
- Approach: Use a multi-dimensional action space where each asset’s action includes [action_type, pt_adjust, sl_adjust].
- Code Change:
python
self.action_space = spaces.Box(low=0, high=[3, 1, 1], shape=(len(self.assets), 3))
- action_type: 0=Buy, 1=Sell, 2=Nothing, 3=Adjust (0 to 3).
- pt_adjust: Fractional adjustment to PT (0 to 1).
- sl_adjust: Fractional adjustment to SL (0 to 1).
- Implementation in _take_action():
python
for i, action in enumerate(actions): action_type = math.floor(action[0]) pt_fraction = action[1] sl_fraction = action[2] if action_type in (0, 1): # Buy/Sell _profit_taken = math.ceil(pt_fraction * self.cf.symbol(self.assets[i], "profit_taken_max")) + self.cf.symbol(self.assets[i], "stop_loss_max") _stop_loss = math.ceil(sl_fraction * self.cf.symbol(self.assets[i], "stop_loss_max")) # Set "SL": _stop_loss, "PT": _profit_taken in transaction elif action_type == 3: # Adjust existing position for tr in self.transaction_live: if tr["Symbol"] == self.assets[i]: tr["PT"] = math.ceil(pt_fraction * self.cf.symbol(self.assets[i], "profit_taken_max")) + tr["SL"] tr["SL"] = math.ceil(sl_fraction * self.cf.symbol(self.assets[i], "stop_loss_max"))
- Pros: Allows continuous adjustment of PT and SL for open positions at each step.
- Cons: Increases complexity and requires retraining the model with a new policy.
- Trailing Stop for SL and PT:
- Approach: Automatically adjust SL and PT based on price movement (e.g., trailing stop).
- Implementation in _calculate_reward():
python
if tr["Type"] == 0: # Buy trailing_sl = max(tr["SL"], (self._h - tr["ActionPrice"]) * _point * 0.5) # Trail SL at 50% of max profit tr["SL"] = trailing_sl tr["PT"] = tr["SL"] + self.cf.symbol(self.assets[i], "profit_taken_max") * 0.8 # Adjust PT dynamically elif tr["Type"] == 1: # Sell trailing_sl = max(tr["SL"], (tr["ActionPrice"] - self._l) * _point * 0.5) tr["SL"] = trailing_sl tr["PT"] = tr["SL"] + self.cf.symbol(self.assets[i], "profit_taken_max") * 0.8
- Pros: No change to action space; adjusts SL/PT based on market conditions.
- Cons: Less agent control; fixed trailing logic might not suit all strategies.
- Separate Actions for Closing and Adjusting:
- Approach: Expand to discrete actions (e.g., 0=Buy, 1=Sell, 2=Nothing, 3=Close, 4=Adjust PT Up, 5=Adjust PT Down, 6=Adjust SL Up, 7=Adjust SL Down).
- Code Change:
python
self.action_space = spaces.MultiDiscrete([8] * len(self.assets))
- In _take_action():
python
if _action == 3: # Close for tr in self.transaction_live[:]: if tr["Symbol"] == self.assets[i]: p = (self._c - tr["ActionPrice"]) * _point if tr["Type"] == 0 else (tr["ActionPrice"] - self._c) * _point self._manage_tranaction(tr, p, self._c, status=5) rewards[i] += p elif _action in (4,5,6,7): # Adjust PT/SL for tr in self.transaction_live: if tr["Symbol"] == self.assets[i]: if _action == 4: tr["PT"] += 10 # Increase PT elif _action == 5: tr["PT"] = max(tr["SL"], tr["PT"] - 10) # Decrease PT elif _action == 6: tr["SL"] += 5 # Increase SL elif _action == 7: tr["SL"] = max(0, tr["SL"] - 5) # Decrease SL
- In _take_action():
- Pros: Explicit control over PT/SL adjustments.
- Cons: Larger action space increases learning complexity.
3. Can You Set a Risk Value for the Model to Calculate TP and SL?
Current Setup:
- SL and PT are derived from config values (stop_loss_max, profit_taken_max), with PT adjustable via fraction_action.
Adding Risk-Based TP and SL:
- Approach: Introduce a risk parameter (e.g., percentage of balance or volatility-based) to calculate SL and PT dynamically.
- Steps:
- Define Risk Value:
- Add to config: "risk_percent": 0.02 (e.g., 2% of balance per trade).
- Or use ATR: "risk_atr_multiplier": 2 (e.g., 2x ATR for SL).
- Calculate SL and PT:
- In _take_action():
python
risk_percent = self.cf.env_parameters("risk_percent", default=0.02) atr = self.get_observation(self.current_step, i, "ATR") # Assuming ATR in observation_list _point = self.cf.symbol(self.assets[i], "point") # Risk-based SL (e.g., 2% of balance) _stop_loss = (self.balance * risk_percent) / _point # Or ATR-based SL # _stop_loss = atr * self.cf.env_parameters("risk_atr_multiplier", default=2) # PT as a multiple of SL (e.g., 2:1 reward-to-risk) reward_risk_ratio = self.cf.env_parameters("reward_risk_ratio", default=2) _profit_taken = _stop_loss * reward_risk_ratio transaction = { "SL": _stop_loss, "PT": _profit_taken, # ... (other fields) }
- In _take_action():
- Dynamic Adjustment:
- Combine with an expanded action space (e.g., method 1 above) to adjust SL/PT based on risk mid-trade:
python
if action_type == 3: # Adjust for tr in self.transaction_live: if tr["Symbol"] == self.assets[i]: tr["SL"] = (self.balance * risk_percent * sl_fraction) / _point tr["PT"] = tr["SL"] * reward_risk_ratio * pt_fraction
- Combine with an expanded action space (e.g., method 1 above) to adjust SL/PT based on risk mid-trade:
- Define Risk Value:
Example:
- Balance = 10,000 USD, risk_percent = 0.02, _point = 1, reward_risk_ratio = 2.
- SL = 10,000 * 0.02 / 1 = 200 (points).
- PT = 200 * 2 = 400 (points).
- If action adjusts sl_fraction = 0.5, SL becomes 100, PT becomes 200.
Pros and Cons:
- Pros: Ties SL/PT to account risk or market volatility (ATR), making them adaptive.
- Cons: Requires tuning risk_percent or risk_atr_multiplier and may need observation updates (e.g., ATR).
Summary
- Dynamic PT Adjustment:
- Yes, your current code dynamically adjusts PT at trade initiation using fraction_action * max_profit_taken + SL. It’s not adjustable mid-trade.
- Other Ways to Adjust PT and SL:
- Expand action space for continuous PT/SL adjustments.
- Implement trailing stops based on price movement.
- Add discrete actions for manual PT/SL changes.
- Risk-Based TP and SL:
- Yes, you can set a risk value (e.g., % of balance or ATR multiplier) and calculate SL/PT dynamically. Combine with action adjustments for full flexibility.
Recommendation:
- If you want mid-trade PT/SL adjustments, adopt the multi-dimensional action space (method 1) and integrate risk-based calculations for a robust solution.
- Start with risk-based SL/PT at trade opening and test before adding mid-trade adjustments.
Let me know if you’d like detailed code for any of these options!
编辑我的代码,作返回一个 fraction_action,然后使用它来计算获利,使其在开仓时动态设置 PT。 ``` 导入 datetime 导入数学 导入随机 导入 AST 将 gymnasium 导入为 Gym 从 Gymnasium 导入空间 from gymnasium.utils import seeding 导入作系统 导入 CSV 将 numpy 导入为 NP 将 pandas_ta 导入为 TA from stable_baselines3.common.vec_env import DummyVecEnv from stable_baselines3.common.torch_layers import BaseFeaturesExtractor, MlpExtractor Import Torch (导入火炬) 将 torch.nn 导入为 nn 从 stable_baselines3 进口 PPO 从 stable_baselines3.common.policies 导入 ActorCriticPolicy from meta.env_fx_trading.util.log_render import render_to_file from meta.env_fx_trading.util.plot_chart 导入 TradingChart from meta.env_fx_trading.util.read_config import EnvConfig class tgym(健身房.Env): “”“外汇/期货/期权交易 GYM 环境 1. 三个作空间(0 买入、1 卖出、2 无) 2. 同一时间范围内的多个交易对(EURUSD、GBPUSD... 3. 时间范围从 1 分钟到每天,只要使用烛台柱(开盘价、最高价、最低价、收盘价) 4. 使用 StopLose、ProfitTaken 实现奖励。每对都可以在配置文件中配置自己的 SL 和 PT 5. 配置隔夜现金罚金以及每对的交易手续费和隔夜持仓罚金 6. 将数据集拆分为每日、每周或每月...,具有固定的时间步长,在 len(df) 结束时。业务 logic 将强制在最后收盘价平仓所有持仓(游戏结束)。 7. 必须具有 df 列名称:[(time_col),(asset_col),Open,Close,High,Low,day](区分大小写) 8. 加法指标可以在数据处理过程中添加。Finta 提供 78 种 TA 指标 9. 在 json 配置文件中处理的自定义观察列表。 10. ProfitTaken = fraction_action * max_profit_taken + 止损。 11. SL 是预先固定的 12. 限价单可以配置,如果 limit_order == True,则动作将预设在柱线的低点或高点买入或卖出, 带 limit_order_expiration (n 条)。如果价格交叉,它将触发。否则,它将是 drop off 13. 渲染模式: human -- 在控制台上显示每个步骤实现的奖励 file -- 创建事务日志 graph -- 在 Graph 中创建交易 (开发中) 14. 15. 奖励,我们希望激励长期持续的利润。 在每个步骤中,我们将奖励设置为账户余额乘以 到目前为止时间步数的一小部分。这样做的目的是拖延 在早期阶段过快地奖励代理并允许其探索 在过于深入地优化单个策略之前。 它还将奖励在更长时间内保持较高余额的代理, 而不是那些使用不可持续的策略快速赚钱的人。 16. Observation_space 包含我们需要代理的所有输入变量 在进行交易或不进行交易之前考虑。我们希望我们的代理 “看到” 游戏窗口中的外汇数据点(开盘价、最高价、最低价、收盘价、时间序列、TA), 以及其他一些数据点,如账户余额、当前头寸、 和当前利润。这里的直觉是,对于每个时间步,我们都需要我们的代理 考虑导致当前价格的价格行为,以及他们的 拥有投资组合的状态,以便为下一步行动做出明智的决策。 17. 奖励为外汇交易单位积分,可为每个交易对配置 18. 为了使未实现利润奖励反映市场状况,我们将计算每种资产的 ATR,并使用它来动态扩展奖励。 """ 元数据 = {“render.modes”: [“graph”, “human”, “file”, “none”]} 防守 __init__( 自我 DF / event_map, currency_map, env_config_file=“./neo_finrl/env_fx_trading/config/gdbusd-test-1.json”, ): 断言 df.ndim == 2 超级(tgym, self).__init__() self.cf = 环境配置 (env_config_file) self.observation_list = self.cf.env_parameters(“observation_list”) # 经济数据映射 self.event_map = event_map self.currency_map = currency_map self.max_events = 8 self.df = df.copy() 如果 'events' 不在 self.df.columns 中: raise ValueError(“DataFrame 必须包含一个 'events' 列”) def parse_events(x): 如果 isinstance(x, str): 尝试: 解析 = ast.literal_eval(x) 返回已解析 if isinstance(parsed, list) else [] except (ValueError, SyntaxError): 返回 [] 返回 x if isinstance(x, list) else [] self.df['事件'] = self.df['事件'].apply(parse_events) 如果不是 isinstance(self.df['events'].iloc[0], list): raise ValueError(“'事件'必须是一个列表”) 如果 self.df['events'].iloc[0] 而不是 isinstance(self.df['events'].iloc[0][0], dict): raise ValueError(“'events' 中的元素必须是字典”) self.balance_initial = self.cf.env_parameters(“余额”) self.over_night_cash_penalty = self.cf.env_parameters(“over_night_cash_penalty”) self.asset_col = self.cf.env_parameters(“asset_col”) self.time_col = self.cf.env_parameters(“time_col”) self.random_start = self.cf.env_parameters(“random_start”) self.log_filename = ( self.cf.env_parameters(“log_filename”) + datetime.datetime.now().strftime(“%Y%m%d%H%M%S”) + “.csv” ) self.analyze_transaction_history_log_filename = (“transaction_history_log” + datetime.datetime.now().strftime(“%Y%m%d%H%M%S”) + “.csv”) self.df[“_time”] = self.df[self.time_col] self.df[“_day”] = self.df[“工作日”] self.assets = self.df[self.asset_col].unique() self.dt_datetime = self.df[self.time_col].sort_values().unique() self.df = self.df.set_index(self.time_col) self.visualization = False # 计算 ATR 并保存 DataFrame self.calculate_atr() self.df.to_csv(“processed_df_with_atr.csv”) print(“已将已处理的 DataFrame 保存到 'processed_df_with_atr.csv'”) # 重置值 self.equity_list = [0] * len(self.assets) self.balance = self.balance_initial self.total_equity = self.balance + sum(self.equity_list) self.ticket_id = 0 self.transaction_live = [] self.transaction_history = [] self.transaction_limit_order = [] self.current_draw_downs = [0.0] * len(self.assets) self.max_draw_downs = [0.0] * len(self.assets) self.max_draw_down_pct = sum(self.max_draw_downs) / self.balance * 100 self.current_step = 0 self.episode = 0 # 从 0 开始,在剧集结束时递增 self.current_holding = [0] * len(self.assets) self.tranaction_open_this_step = [] self.tranaction_close_this_step = [] self.current_day = 0 self.done_information = “” self.log_header = 真 # 缓存数据 self.cached_ohlc_data = [self.get_observation_vector(_dt) 表示 self.dt_datetime _dt] self.cached_economic_data = [self.get_economic_vector(_dt) 表示 self.dt_datetime 年的 _dt] self.cached_time_serial = ( self.df[[“_time”, “_day”]].sort_values(“_time”).drop_duplicates().values.tolist() ) self.reward_range = (-np.inf, np.inf) self.action_space = 空格。MultiDiscrete([3] * len(self.assets)) # 0: 买入, 1: 卖出, 2: 无 self.observation_space = 空格。字典({ “ohlc_data”:空格。框(low=-np.inf, high=np.inf, shape=(len(self.assets) * len(self.observation_list),), dtype=np.float32), “event_ids”:空格。框(low=0, high=len(self.event_map)-1, shape=(self.max_events,), dtype=np.int32), “currency_ids”:空格。框(低=0, 高=len(self.currency_map)-1, shape=(self.max_events,), dtype=np.int32), “economic_numeric”:空格。盒子(low=-np.inf, high=np.inf, shape=(self.max_events * 6,), dtype=np.float32), “portfolio_data”:空格。盒子(low=-np.inf, high=np.inf, shape=(3 + 2 * len(self.assets),), dtype=np.float32) }) 打印( f“初始完成:\n” f“observation_list:{self.observation_list}\n” f“资产:{self.assets}\n” f“时间序列: {min(self.dt_datetime)} -> {max(self.dt_datetime)} 长度: {len(self.dt_datetime)}\n” f“事件:{len(self.event_map)},货币:{len(self.currency_map)}” ) self._seed() def _seed(self, seed=None): self.np_random,种子 = seeding.np_random(种子) 返回 [种子] # 假设 self.df 包含 OHLC 数据,列为:Open、High、Low、Close、asset_col、time_col def calculate_atr(个体经营): # 按资产分组以计算每个交易对的 ATR atr_dfs = [] 对于 self.assets 中的 asset: asset_df = self.df[self.df[self.asset_col] == asset][[“开盘价”, “最高价”, “最低价”, “收盘价”]].copy() # 计算 ATR,默认 period=14 asset_df[“ATR”] = ta.atr( high=asset_df[“高”], low=asset_df[“低”], close=asset_df[“关闭”], 长度 = 14 ) asset_df[“asset”] = 资产 asset_df[self.time_col] = self.df[self.df[self.asset_col] == 资产][self.time_col] atr_dfs.append(asset_df[[self.time_col, “资产”, “ATR”]]) # 将 ATR 数据合并到主 DataFrame 中 atr_df = pd.concat(atr_dfs).set_index(self.time_col) self.df = self.df.merge(atr_df, left_index=真, right_index=真, 如何=“左”) self.df[“ATR”] = self.df[“ATR”].fillna(method=“ffill”) # 正向填充 NaN # 更新 observation_list 以包含 ATR 如果 ATR 不在 self.observation_list: self.observation_list.append(“ATR”) # 使用 ATR 重新缓存观测值 self.cached_ohlc_data = [self.get_observation_vector(_dt) 表示 self.dt_datetime _dt] def _take_action(self, actions, done): 奖励 = [0] * len(self.assets) self.tranaction_open_this_step = [] self.tranaction_close_this_step = [] for i, _action in enumerate(actions): # 动作现在是整数 [0, 1, 2] self._o = self.get_observation(self.current_step, i, “打开”) self._h = self.get_observation(self.current_step, i, “高”) self._l = self.get_observation(self.current_step, i, “低”) self._c = self.get_observation(self.current_step, i, “关闭”) self._t = self.get_observation(self.current_step, i, “_time”) self._day = self.get_observation(self.current_step, i, “_day”) rewards[i] = self._calculate_reward(i, done, _action) # 通过行动以获得探索奖励 print(f“资产 {self.assets[i]}: Action={_action}, reward={rewards[i]}, Holding={self.current_holding[i]}”) 如果 self.cf.symbol(self.assets[i], “limit_order”): self._limit_order_process(i, _action, 完成) if (_action in (0, 1) 且未完成 和 self.current_holding[i] < self.cf.symbol(self.assets[i], “max_current_holding”)): # 使用固定 PT,因为 action 是离散的(没有小数分量) _profit_taken = self.cf.symbol(self.assets[i], “profit_taken_max”) + self.cf.symbol(self.assets[i], “stop_loss_max”) self.ticket_id += 1 如果 self.cf.symbol(self.assets[i], “limit_order”): 交易 = { “票证”:self.ticket_id、 “Symbol”:self.assets[i], “ActionTime”:self._t、 “类型”:_action、 “手数”: 1, “ActionPrice”: self._l if _action == 0 else self._h, “止损”: self.cf.symbol(self.assets[i], “stop_loss_max”), “PT”:_profit_taken、 “MaxDD”:0、 “Swap(掉期)”:0.0、 “CloseTime”: “”, ///// “ClosePrice(收盘价)”: 0.0, “点”:0、 “奖励”: -self.cf.symbol(self.assets[i], “transaction_fee”), “DateDuration”:self._day、 “状态”:0、 “LimitStep”:self.current_step、 “ActionStep”: -1, “CloseStep”:-1、 } self.transaction_limit_order.append(事务) 还: 交易 = { “票证”:self.ticket_id、 “Symbol”:self.assets[i], “ActionTime”:self._t、 “类型”:_action、 “手数”: 1, “ActionPrice”:self._c、 “止损”: self.cf.symbol(self.assets[i], “stop_loss_max”), “PT”:_profit_taken、 “MaxDD”:0、 “Swap(掉期)”:0.0、 “CloseTime”: “”, ///// “ClosePrice(收盘价)”: 0.0, “点”:0、 “奖励”: -self.cf.symbol(self.assets[i], “transaction_fee”), “DateDuration”:self._day、 “状态”:0、 “LimitStep”:self.current_step、 “ActionStep”:self.current_step、 “CloseStep”:-1、 } self.current_holding[i] += 1 self.tranaction_open_this_step.append(事务) self.balance -= self.cf.symbol(self.assets[i], “transaction_fee”) self.transaction_live.append(事务) return sum(奖励) def _calculate_reward(self, i, done, action): _total_reward = 0 _max_draw_down = 0 for tr in self.transaction_live[:]: # 复制以避免修改问题 if tr[“Symbol”] == self.assets[i]: _point = self.cf.symbol(self.assets[i], “点”) # 隔夜现金折扣 如果 self._day > tr[“DateDuration”]: tr[“DateDuration”] = self._day tr[“奖励”] -= self.cf.symbol(self.assets[i], “over_night_penalty”) if tr[“Type”] == 0: # 买入 # 止损触发器 _sl_price = tr[“ActionPrice”] - tr[“SL”] / _point _pt_price = tr[“ActionPrice”] + tr[“PT”] / _point 如果完成: p = (self._c - tr[“ActionPrice”]) * _point self._manage_tranaction(tr, p, self._c, status=2) _total_reward += p elif self._l <= _sl_price: self._manage_tranaction(tr, -tr[“SL”], _sl_price) _total_reward += -tr[“SL”] self.current_holding[i] -= 1 elif self._h >= _pt_price: self._manage_tranaction(tr, tr[“PT”], _pt_price) _total_reward += tr[“PT”] self.current_holding[i] -= 1 else: # 仍然打开 self.current_draw_downs[i] = int((self._l - tr[“ActionPrice”]) * _point) _max_draw_down += self.current_draw_downs[i] 如果 self.current_draw_downs[i] < 0 和 tr[“MaxDD”] > self.current_draw_downs[i]: tr[“最大DD”] = self.current_draw_downs[i] elif tr[“Type”] == 1: # 卖出 # 止损触发器 _sl_price = tr[“ActionPrice”] + tr[“SL”] / _point _pt_price = tr[“ActionPrice”] - tr[“PT”] / _point 如果完成: p = (tr[“ActionPrice”] - self._c) * _point self._manage_tranaction(tr, p, self._c, status=2) _total_reward += p elif self._h >= _sl_price: self._manage_tranaction(tr, -tr[“SL”], _sl_price) _total_reward += -tr[“SL”] self.current_holding[i] -= 1 elif self._l <= _pt_price: self._manage_tranaction(tr, tr[“PT”], _pt_price) _total_reward += tr[“PT”] self.current_holding[i] -= 1 还: self.current_draw_downs[i] = int( (tr[“ActionPrice”] - self._h) * _point ) _max_draw_down += self.current_draw_downs[i] 如果 ( self.current_draw_downs[i] < 0 和 tr[“MaxDD”] > self.current_draw_downs[i] ): tr[“最大DD”] = self.current_draw_downs[i] 如果 _max_draw_down > self.max_draw_downs[i]: self.max_draw_downs[i] = _max_draw_down 返回 _total_reward def _limit_order_process(self, i, _action, done): 对于 self.transaction_limit_order 中的 tr[:]: if tr[“Symbol”] == self.assets[i]: if tr[“Type”] != _action 或 done: self.transaction_limit_order.删除 (tr) tr[“状态”] = 3 tr[“CloseStep”] = self.current_step self.transaction_history.append(tr) elif (tr[“ActionPrice”] >= self._l 和 _action == 0) 或 ( tr[“ActionPrice”] <= self._h 和 _action == 1): tr[“ActionStep”] = self.current_step self.current_holding[i] += 1 self.balance -= self.cf.symbol(self.assets[i], “transaction_fee”) self.transaction_limit_order.删除 (tr) self.transaction_live.append(tr) self.tranaction_open_this_step.append(tr) elif (tr[“LimitStep”] + self.cf.symbol(self.assets[i], “limit_order_expiration”) > self.current_step): tr[“CloseStep”] = self.current_step tr[“状态”] = 4 self.transaction_limit_order.删除 (tr) self.transaction_history.append(tr) def _manage_tranaction(self, tr, _p, close_price, status=1): self.transaction_live.remove(tr) tr[“收盘价”] = close_price tr[“点”] = int(_p) tr[“奖励”] = int(tr[“奖励”] + _p) # 已实现盈/亏 tr[“状态”] = 状态 # 1=止损/太平洋时间,2=强制平仓,3=取消限制,4=过期限制 tr[“CloseTime”] = self._t tr[“CloseStep”] = self.current_step self.balance += int(tr[“奖励”]) self.total_equity -= int(abs(tr[“奖励”])) self.tranaction_close_this_step.append(tr) self.transaction_history.append(tr) def analyze_transaction_history(self, log_file): 如果不self.transaction_history: 指标 = {“trades”: 0, “win_rate”: 0.0, “profit_factor”: 0.0, “sharpe_ratio”: 0.0, “total_profit”: 0.0} 还: 交易 = len(self.transaction_history) rewards = [tr[“奖励”] 对于 tr self.transaction_history] wins = sum(如果 r > 0,则奖励中的 r 为 1) losses = sum(如果 r < 0,则奖励中的 r 为 1) gross_profit = sum(r for r for r in rewards if r > 0) gross_loss = abs(sum(r for r for r in rewards if r < 0)) win_rate = 盈利 / 交易 如果交易 > 0 否则 0.0 profit_factor = gross_profit / gross_loss 如果 gross_loss > 0 else float(“inf”) # 夏普比率(简化,假设无风险利率 = 0) 返回值 = np.array(rewards, dtype=np.float32) sharpe_ratio = np.mean(返回) / np.std(返回) 如果 np.std(返回) > 0 else 0.0 total_profit = sum(rewards) 指标 = { “trades”:交易、 “win_rate”:win_rate、 “profit_factor”:profit_factor、 “sharpe_ratio”:sharpe_ratio、 “total_profit”:total_profit } # 附加到日志文件 其中 open(self.analyze_transaction_history_log_filename, 'a', newline='') 为 f: writer = csv.DictWriter(f, fieldnames=[“时间戳”, “剧集”, “交易”, “win_rate”, “profit_factor”, “sharpe_ratio”, “total_profit”]) metrics[“timestamp”] = datetime.now().strftime(“%Y-%m-%d %H:%M:%S”) metrics[“episode”] = self.episode writer.writerow(指标) 返回指标 def step(self, actions): self.current_step += 1 # 定义终止和截断条件 terminated = self.balance <= 0 # 剧集因破产而结束(最终状态) truncated = self.current_step == len(self.dt_datetime) - 1 # 由于最大步数(时间限制)而结束剧集 done = terminated or truncated # 合并为 VecEnv 的单个 'done' 标志 # 对于渲染或剧集跟踪,您仍然可以检查任一条件是否为 true 如果完成: self.done_information += f“集数: {self.episode} 平衡: {self.balance} 步数: {self.current_step}\n” self.visualization = 真 self.episode += 1 # 增加 episode 计数器 # 计算基础交易奖励 base_reward = self._take_action(作,完成) # 计算持仓的未实现利润 unrealized_profit = 0 atr_scaling = 0 # 用于市场条件缩放 对于 i,枚举 (self.assets) 中的 asset: atr = self.get_observation(self.current_step, i, “ATR”) atr_scaling += atr # 用于标准化的资产的 ATR 总和 对于 self.transaction_live 中的 TR: if tr[“Symbol”] == asset: if tr[“Type”] == 0: # 买入 未实现 = (self._c - tr[“ActionPrice”]) * self.cf.symbol(asset, “point”) else: # 卖出 未实现 = (tr[“ActionPrice”] - self._c) * self.cf.symbol(asset, “点”) unrealized_profit += 未实现 atr_scaling = atr_scaling / len(self.assets) if atr_scaling > 0 else 1 # 避免被 0 除以 # 持续奖励:仅适用于未实现/已实现的利润,由 ATR 缩放 # 调整 0.01 到 0.05 sustained_reward = (unrealized_profit + base_reward) * 0.01 / atr_scaling 如果self.transaction_live否则为 0 # 如果没有持仓,则对不作为的处罚 如果不是 self.transaction_live 和 all(a == 2 for a in actions): sustained_reward -= 0.1 # 鼓励探索的小额惩罚 total_reward = base_reward + sustained_reward 如果 self._day > self.current_day: self.current_day = self._day self.balance -= self.over_night_cash_penalty 如果 self.balance != 0: self.max_draw_down_pct = abs(sum(self.max_draw_downs) / self.balance * 100) OBS = { “ohlc_data”: np.array(self.cached_ohlc_data[self.current_step], dtype=np.float32), “event_ids”: self.cached_economic_data[self.current_step][“event_ids”], “currency_ids”: self.cached_economic_data[self.current_step][“currency_ids”], “economic_numeric”: self.cached_economic_data[self.current_step][“numeric”], “portfolio_data”:np.array( [self.balance, self.total_equity, self.max_draw_down_pct] + self.current_holding + self.current_draw_downs, 数据类型=np.float32 ) } print(f“步骤 {self.current_step}: 基础奖励={base_reward}, 持续奖励={sustained_reward}, 总计={total_reward}, 余额={self.balance}”) # 信息字典保持不变 info = {“关闭”: self.tranaction_close_this_step} 返回 obs、total_reward、terminated、truncated、info def get_observation(self, _step, _iter=0, col=None): 如果 col 为 None: 返回 self.cached_ohlc_data[_step] 如果 col == “_day”: 返回 self.cached_time_serial[_step][1] elif col == “_time”: 返回 self.cached_time_serial[_step][0] 尝试: col_pos = self.observation_list.index(col) except ValueError 之外: raise ValueError(f“在 observation_list 中找不到列 '{col}”) 返回 self.cached_ohlc_data[_step][_iter * len(self.observation_list) + col_pos] def get_observation_vector(self, _dt, cols=无): cols = self.observation_list 如果 cols 不是其他 cols v = [] for a in self.assets 中: 子集 = self.df.query(f'{self.asset_col} == “{a}” & {self.time_col} == “{_dt}”') assert not subset.empty v += subset.loc[_dt, cols].tolist() 断言 len(v) == len(self.assets) * len(cols) 返回 v def get_economic_vector(self, _dt): 子集 = self.df.loc[_dt] events = subset['events'] if isinstance(subset, pd.系列) else subset['events'].iloc[0] event_ids = [self.event_map[e['event']] for e in events[:self.max_events]] + [0] * (self.max_events - len(events)) currency_ids = [self.currency_map.get(e['currency'], 0) for e in events[:self.max_events]] + [0] * (self.max_events - len(events)) numeric_fields = ['actual_norm', 'forecast_norm', 'previous_norm', 'surprise_norm', 'event_freq', 'impact_code'] 数字 = 事件中 e 的 [e[字段] 对于numeric_fields中的字段 :self.max_events] + [0] * (self.max_events * 6 - len(事件) * 6) 返回 { “event_ids”: np.array(event_ids, dtype=np.int32), “currency_ids”: np.array(currency_ids, dtype=np.int32), “数字”: np.array(数字, dtype=np.float32) } def reset(self, seed=None, options=None): # 设置可重复性的种子 如果 seed 不是 None: self._seed(种子) 如果self.random_start: self.current_step = random.choice(range(int(len(self.dt_datetime) * 0.5))) 还: self.current_step = 0 self.equity_list = [0] * len(self.assets) self.balance = self.balance_initial self.total_equity = self.balance + sum(self.equity_list) self.ticket_id = 0 self.transaction_live = [] self.transaction_history = [] self.transaction_limit_order = [] self.current_draw_downs = [0.0] * len(self.assets) self.max_draw_downs = [0.0] * len(self.assets) self.max_draw_down_pct = sum(self.max_draw_downs) / self.balance * 100 self.current_holding = [0] * len(self.assets) self.tranaction_open_this_step = [] self.tranaction_close_this_step = [] self.current_day = 0 self.done_information = “” self.log_header = 真 self.visualization = False OBS = { “ohlc_data”: np.array(self.cached_ohlc_data[self.current_step], dtype=np.float32), “event_ids”: self.cached_economic_data[self.current_step][“event_ids”], “currency_ids”: self.cached_economic_data[self.current_step][“currency_ids”], “economic_numeric”: self.cached_economic_data[self.current_step][“numeric”], “portfolio_data”:np.array( [self.balance, self.total_equity, self.max_draw_down_pct] + self.current_holding + self.current_draw_downs, 数据类型=np.float32 ) } 信息 = {} 返回 OBS、INFO def render(self, mode=“human”, title=None, **kwargs): if mode in (“human”, “file”): 打印输出 = 模式 == “人类” 下午 = { “log_header”: self.log_header, “log_filename”: self.log_filename, “printout”: 打印输出, “balance”: self.balance, “balance_initial”:self.balance_initial、 “tranaction_close_this_step”:self.tranaction_close_this_step、 “done_information”:self.done_information、 } render_to_file(**pm) 如果 self.log_header: self.log_header = 假 elif 模式 == “graph” 和 self.visualization: print(“绘图...”) p = TradingChart(self.df, self.transaction_history) p.plot() def close(个体): 通过 def get_sb_env(个体经营): e = DummyVecEnv([lambda: self]) obs = e.reset() 返回 E、OBS class CustomFeaturesExtractor(BaseFeaturesExtractor): def __init__(self, observation_space): n_assets = (observation_space.spaces[“portfolio_data”].shape[0] - 3) // 2 ohlc_dim = observation_space.spaces[“ohlc_data”].shape[0] max_events = observation_space.spaces[“event_ids”].shape[0] economic_numeric_dim = observation_space.spaces[“economic_numeric”].shape[0] portfolio_dim = observation_space.spaces[“portfolio_data”].形状[0] features_dim = ohlc_dim + 2 * max_events + economic_numeric_dim + portfolio_dim super().__init__(observation_space, features_dim=features_dim) # num_embeddings=唯一数据的数量 self.event_embedding = nn.嵌入(num_embeddings=129, embedding_dim=max_events) self.currency_embedding = nn。嵌入(num_embeddings=6, embedding_dim=max_events) print(f“自定义功能提取器: n_assets={n_assets}, features_dim={features_dim}”) def forward(self, obs): ohlc_data = obs[“ohlc_data”] event_ids = obs[“event_ids”].to(torch.long) currency_ids = obs[“currency_ids”].to(torch.long) economic_numeric = obs[“economic_numeric”] portfolio_data = obs[“portfolio_data”] event_emb = self.event_embedding(event_ids).均值(dim=1) currency_emb = self.currency_embedding(currency_ids).mean(dim=1) 特征 = torch.cat([ohlc_data, event_emb, currency_emb, economic_numeric, portfolio_data], dim=1) 返回功能 类 CustomMultiInputPolicy(ActorCriticPolicy): def __init__(self, observation_space, action_space, lr_schedule, *args, **kwargs): super().__init__( observation_space, action_space, lr_schedule, features_extractor_class=CustomFeaturesExtractor, features_extractor_kwargs={}, net_arch=dict(pi=[64, 64], vf=[64, 64]), *args、 **kwargs ) features_dim = self.features_extractor.features_dim self.mlp_extractor = MlpExtractor( features_dim, net_arch=self.net_arch[“pi”], activation_fn=nn.ReLU , 设备=self.device ) self.action_net = nn.Linear(64, action_space.shape[0] * 3) # n_assets * 3 个动作 self.value_net = nn.线性(64, 1) # 初始化 timestep (替换 step_counter) self.num_timesteps = 0 def forward(self, obs, deterministic=False): # 在每次前向传递上增加时间步长 self.num_timesteps += 1 特征 = self.extract_features(OBS) latent_pi,latent_vf = self.mlp_extractor(特征) logits = self.action_net(latent_pi) # [批量, n_assets * 3] 值 = self.value_net(latent_vf) # 修复:对 MultiDiscrete 使用 Categorical logits = logits.view(-1, self.action_space.shape[0], 3) # [批量, n_assets, 3] 分布 = torch.distributions.Categorical(logits=logits) actions = distribution.sample() 如果不是确定性的 else distribution.mode # [批量, n_assets] log_prob = distribution.log_prob(作).sum(dim=-1, keepdim=True) # 条件调试打印以避免控制台泛洪 如果 self.num_timesteps % 1000 == 0: print(f“步骤 {self.num_timesteps}, 训练: {self.training}, Logits: {logits}”) print(f“步骤 {self.num_timesteps},作: {actions}”) 返回作、值log_prob def extract_features(自身,OBS): 返回self.features_extractor (OBS) def predict_values(自身, OBS): 特征 = self.extract_features(OBS) _, latent_vf = self.mlp_extractor(特征) 返回 self.value_net(latent_vf) ```
To make your tgym environment dynamically set the Profit Taken (PT) at opening using a fraction_action derived from the agent's action, we need to modify the action space and processing logic. Currently, your action space is spaces.MultiDiscrete([3] * len(self.assets)), which outputs discrete integers (0=Buy, 1=Sell, 2=Nothing) per asset, with no fractional component. To incorporate fraction_action for PT calculation, we’ll switch to a continuous action space (e.g., spaces.Box) where the fractional part of the action determines the PT dynamically, as specified in your docstring point 10: ProfitTaken = fraction_action * max_profit_taken + SL.
Here’s the edited code with these changes:
Edited Code
import datetime
import math
import random
import ast
import gymnasium as gym
from gymnasium import spaces
from gymnasium.utils import seeding
import os
import csv
import numpy as np
import pandas_ta as ta
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor, MlpExtractor
import torch
import torch.nn as nn
from stable_baselines3 import PPO
from stable_baselines3.common.policies import ActorCriticPolicy
from meta.env_fx_trading.util.log_render import render_to_file
from meta.env_fx_trading.util.plot_chart import TradingChart
from meta.env_fx_trading.util.read_config import EnvConfig
class tgym(gym.Env):
"""forex/future/option trading gym environment
1. Three action space (0 Buy, 1 Sell, 2 Nothing)
2. Multiple trading pairs (EURUSD, GBPUSD...) under same time frame
3. Timeframe from 1 min to daily as long as use candlestick bar (Open, High, Low, Close)
4. Use StopLose, ProfitTaken to realize rewards. each pair can configure it own SL and PT in configure file
5. Configure over night cash penalty and each pair's transaction fee and overnight position holding penalty
6. Split dataset into daily, weekly or monthly..., with fixed time steps, at end of len(df). The business
logic will force to Close all positions at last Close price (game over).
7. Must have df column name: [(time_col),(asset_col), Open,Close,High,Low,day] (case sensitive)
8. Addition indicators can add during the data process. 78 available TA indicator from Finta
9. Customized observation list handled in json config file.
10. ProfitTaken = fraction_action * max_profit_taken + SL.
11. SL is pre-fixed
12. Limit order can be configure, if limit_order == True, the action will preset buy or sell at Low or High of the bar,
with a limit_order_expiration (n bars). It will be triggered if the price go cross. otherwise, it will be drop off
13. render mode:
human -- display each steps realized reward on console
file -- create a transaction log
graph -- create transaction in graph (under development)
14.
15. Reward, we want to incentivize profit that is sustained over long periods of time.
At each step, we will set the reward to the account balance multiplied by
some fraction of the number of time steps so far.The purpose of this is to delay
rewarding the agent too fast in the early stages and allow it to explore
sufficiently before optimizing a single strategy too deeply.
It will also reward agents that maintain a higher balance for longer,
rather than those who rapidly gain money using unsustainable strategies.
16. Observation_space contains all of the input variables we want our agent
to consider before making, or not making a trade. We want our agent to “see”
the forex data points (Open price, High, Low, Close, time serial, TA) in the game window,
as well a couple other data points like its account balance, current positions,
and current profit.The intuition here is that for each time step, we want our agent
to consider the price action leading up to the current price, as well as their
own portfolio’s status in order to make an informed decision for the next action.
17. reward is forex trading unit Point, it can be configure for each trading pair
18. To make the unrealized profit reward reflect market conditions, we’ll compute ATR for each asset and use it to scale the reward dynamically.
"""
metadata = {"render.modes": ["graph", "human", "file", "none"]}
def __init__(
self,
df,
event_map,
currency_map,
env_config_file="./neo_finrl/env_fx_trading/config/gdbusd-test-1.json",
):
assert df.ndim == 2
super(tgym, self).__init__()
self.cf = EnvConfig(env_config_file)
self.observation_list = self.cf.env_parameters("observation_list")
# Economic data mappings
self.event_map = event_map
self.currency_map = currency_map
self.max_events = 8
self.df = df.copy()
if 'events' not in self.df.columns:
raise ValueError("DataFrame must contain an 'events' column")
def parse_events(x):
if isinstance(x, str):
try:
parsed = ast.literal_eval(x)
return parsed if isinstance(parsed, list) else []
except (ValueError, SyntaxError):
return []
return x if isinstance(x, list) else []
self.df['events'] = self.df['events'].apply(parse_events)
if not isinstance(self.df['events'].iloc[0], list):
raise ValueError("'events' must be a list")
if self.df['events'].iloc[0] and not isinstance(self.df['events'].iloc[0][0], dict):
raise ValueError("Elements in 'events' must be dictionaries")
self.balance_initial = self.cf.env_parameters("balance")
self.over_night_cash_penalty = self.cf.env_parameters("over_night_cash_penalty")
self.asset_col = self.cf.env_parameters("asset_col")
self.time_col = self.cf.env_parameters("time_col")
self.random_start = self.cf.env_parameters("random_start")
self.log_filename = (
self.cf.env_parameters("log_filename")
+ datetime.datetime.now().strftime("%Y%m%d%H%M%S")
+ ".csv"
)
self.analyze_transaction_history_log_filename = ("transaction_history_log"
+ datetime.datetime.now().strftime("%Y%m%d%H%M%S")
+ ".csv")
self.df["_time"] = self.df[self.time_col]
self.df["_day"] = self.df["weekday"]
self.assets = self.df[self.asset_col].unique()
self.dt_datetime = self.df[self.time_col].sort_values().unique()
self.df = self.df.set_index(self.time_col)
self.visualization = False
# Calculate ATR and save DataFrame
self.calculate_atr()
self.df.to_csv("processed_df_with_atr.csv")
print("Saved processed DataFrame to 'processed_df_with_atr.csv'")
# Reset values
self.equity_list = [0] * len(self.assets)
self.balance = self.balance_initial
self.total_equity = self.balance + sum(self.equity_list)
self.ticket_id = 0
self.transaction_live = []
self.transaction_history = []
self.transaction_limit_order = []
self.current_draw_downs = [0.0] * len(self.assets)
self.max_draw_downs = [0.0] * len(self.assets)
self.max_draw_down_pct = sum(self.max_draw_downs) / self.balance * 100
self.current_step = 0
self.episode = 0 # Start from 0, increment on episode end
self.current_holding = [0] * len(self.assets)
self.tranaction_open_this_step = []
self.tranaction_close_this_step = []
self.current_day = 0
self.done_information = ""
self.log_header = True
# Cache data
self.cached_ohlc_data = [self.get_observation_vector(_dt) for _dt in self.dt_datetime]
self.cached_economic_data = [self.get_economic_vector(_dt) for _dt in self.dt_datetime]
self.cached_time_serial = (
self.df[["_time", "_day"]].sort_values("_time").drop_duplicates().values.tolist()
)
self.reward_range = (-np.inf, np.inf)
# Modified action space to continuous Box: 0 to 3 per asset
self.action_space = spaces.Box(low=0, high=3, shape=(len(self.assets),), dtype=np.float32)
self.observation_space = spaces.Dict({
"ohlc_data": spaces.Box(low=-np.inf, high=np.inf, shape=(len(self.assets) * len(self.observation_list),), dtype=np.float32),
"event_ids": spaces.Box(low=0, high=len(self.event_map)-1, shape=(self.max_events,), dtype=np.int32),
"currency_ids": spaces.Box(low=0, high=len(self.currency_map)-1, shape=(self.max_events,), dtype=np.int32),
"economic_numeric": spaces.Box(low=-np.inf, high=np.inf, shape=(self.max_events * 6,), dtype=np.float32),
"portfolio_data": spaces.Box(low=-np.inf, high=np.inf, shape=(3 + 2 * len(self.assets),), dtype=np.float32)
})
print(
f"initial done:\n"
f"observation_list:{self.observation_list}\n"
f"assets:{self.assets}\n"
f"time serial: {min(self.dt_datetime)} -> {max(self.dt_datetime)} length: {len(self.dt_datetime)}\n"
f"events: {len(self.event_map)}, currencies: {len(self.currency_map)}"
)
self._seed()
def _seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
def calculate_atr(self):
atr_dfs = []
for asset in self.assets:
asset_df = self.df[self.df[self.asset_col] == asset][["Open", "High", "Low", "Close"]].copy()
asset_df["ATR"] = ta.atr(
high=asset_df["High"],
low=asset_df["Low"],
close=asset_df["Close"],
length=14
)
asset_df["asset"] = asset
asset_df[self.time_col] = self.df[self.df[self.asset_col] == asset][self.time_col]
atr_dfs.append(asset_df[[self.time_col, "asset", "ATR"]])
atr_df = pd.concat(atr_dfs).set_index(self.time_col)
self.df = self.df.merge(atr_df, left_index=True, right_index=True, how="left")
self.df["ATR"] = self.df["ATR"].fillna(method="ffill")
if "ATR" not in self.observation_list:
self.observation_list.append("ATR")
self.cached_ohlc_data = [self.get_observation_vector(_dt) for _dt in self.dt_datetime]
def _take_action(self, actions, done):
rewards = [0] * len(self.assets)
self.tranaction_open_this_step = []
self.tranaction_close_this_step = []
for i, action in enumerate(actions): # Actions are now floats between 0 and 3
self._o = self.get_observation(self.current_step, i, "Open")
self._h = self.get_observation(self.current_step, i, "High")
self._l = self.get_observation(self.current_step, i, "Low")
self._c = self.get_observation(self.current_step, i, "Close")
self._t = self.get_observation(self.current_step, i, "_time")
self._day = self.get_observation(self.current_step, i, "_day")
# Extract integer action type and fractional part
_action = math.floor(action) # 0=Buy, 1=Sell, 2=Nothing
fraction_action = min(max(action - _action, 0), 1) # Clamp between 0 and 1
rewards[i] = self._calculate_reward(i, done, _action)
print(f"Asset {self.assets[i]}: Action={action}, Action Type={_action}, Fraction={fraction_action}, Reward={rewards[i]}, Holding={self.current_holding[i]}")
if self.cf.symbol(self.assets[i], "limit_order"):
self._limit_order_process(i, _action, done)
if (_action in (0, 1) and not done and
self.current_holding[i] < self.cf.symbol(self.assets[i], "max_current_holding")):
# Dynamically calculate PT using fraction_action
_profit_taken = math.ceil(
fraction_action * self.cf.symbol(self.assets[i], "profit_taken_max")
) + self.cf.symbol(self.assets[i], "stop_loss_max")
self.ticket_id += 1
if self.cf.symbol(self.assets[i], "limit_order"):
transaction = {
"Ticket": self.ticket_id,
"Symbol": self.assets[i],
"ActionTime": self._t,
"Type": _action,
"Lot": 1,
"ActionPrice": self._l if _action == 0 else self._h,
"SL": self.cf.symbol(self.assets[i], "stop_loss_max"),
"PT": _profit_taken,
"MaxDD": 0,
"Swap": 0.0,
"CloseTime": "",
"ClosePrice": 0.0,
"Point": 0,
"Reward": -self.cf.symbol(self.assets[i], "transaction_fee"),
"DateDuration": self._day,
"Status": 0,
"LimitStep": self.current_step,
"ActionStep": -1,
"CloseStep": -1,
}
self.transaction_limit_order.append(transaction)
else:
transaction = {
"Ticket": self.ticket_id,
"Symbol": self.assets[i],
"ActionTime": self._t,
"Type": _action,
"Lot": 1,
"ActionPrice": self._c,
"SL": self.cf.symbol(self.assets[i], "stop_loss_max"),
"PT": _profit_taken,
"MaxDD": 0,
"Swap": 0.0,
"CloseTime": "",
"ClosePrice": 0.0,
"Point": 0,
"Reward": -self.cf.symbol(self.assets[i], "transaction_fee"),
"DateDuration": self._day,
"Status": 0,
"LimitStep": self.current_step,
"ActionStep": self.current_step,
"CloseStep": -1,
}
self.current_holding[i] += 1
self.tranaction_open_this_step.append(transaction)
self.balance -= self.cf.symbol(self.assets[i], "transaction_fee")
self.transaction_live.append(transaction)
return sum(rewards)
def _calculate_reward(self, i, done, action):
_total_reward = 0
_max_draw_down = 0
for tr in self.transaction_live[:]:
if tr["Symbol"] == self.assets[i]:
_point = self.cf.symbol(self.assets[i], "point")
if self._day > tr["DateDuration"]:
tr["DateDuration"] = self._day
tr["Reward"] -= self.cf.symbol(self.assets[i], "over_night_penalty")
if tr["Type"] == 0: # Buy
_sl_price = tr["ActionPrice"] - tr["SL"] / _point
_pt_price = tr["ActionPrice"] + tr["PT"] / _point
if done:
p = (self._c - tr["ActionPrice"]) * _point
self._manage_tranaction(tr, p, self._c, status=2)
_total_reward += p
elif self._l <= _sl_price:
self._manage_tranaction(tr, -tr["SL"], _sl_price)
_total_reward += -tr["SL"]
self.current_holding[i] -= 1
elif self._h >= _pt_price:
self._manage_tranaction(tr, tr["PT"], _pt_price)
_total_reward += tr["PT"]
self.current_holding[i] -= 1
else:
self.current_draw_downs[i] = int((self._l - tr["ActionPrice"]) * _point)
_max_draw_down += self.current_draw_downs[i]
if self.current_draw_downs[i] < 0 and tr["MaxDD"] > self.current_draw_downs[i]:
tr["MaxDD"] = self.current_draw_downs[i]
elif tr["Type"] == 1: # Sell
_sl_price = tr["ActionPrice"] + tr["SL"] / _point
_pt_price = tr["ActionPrice"] - tr["PT"] / _point
if done:
p = (tr["ActionPrice"] - self._c) * _point
self._manage_tranaction(tr, p, self._c, status=2)
_total_reward += p
elif self._h >= _sl_price:
self._manage_tranaction(tr, -tr["SL"], _sl_price)
_total_reward += -tr["SL"]
self.current_holding[i] -= 1
elif self._l <= _pt_price:
self._manage_tranaction(tr, tr["PT"], _pt_price)
_total_reward += tr["PT"]
self.current_holding[i] -= 1
else:
self.current_draw_downs[i] = int((tr["ActionPrice"] - self._h) * _point)
_max_draw_down += self.current_draw_downs[i]
if self.current_draw_downs[i] < 0 and tr["MaxDD"] > self.current_draw_downs[i]:
tr["MaxDD"] = self.current_draw_downs[i]
if _max_draw_down > self.max_draw_downs[i]:
self.max_draw_downs[i] = _max_draw_down
return _total_reward
def _limit_order_process(self, i, _action, done):
for tr in self.transaction_limit_order[:]:
if tr["Symbol"] == self.assets[i]:
if tr["Type"] != _action or done:
self.transaction_limit_order.remove(tr)
tr["Status"] = 3
tr["CloseStep"] = self.current_step
self.transaction_history.append(tr)
elif (tr["ActionPrice"] >= self._l and _action == 0) or (
tr["ActionPrice"] <= self._h and _action == 1):
tr["ActionStep"] = self.current_step
self.current_holding[i] += 1
self.balance -= self.cf.symbol(self.assets[i], "transaction_fee")
self.transaction_limit_order.remove(tr)
self.transaction_live.append(tr)
self.tranaction_open_this_step.append(tr)
elif (tr["LimitStep"] + self.cf.symbol(self.assets[i], "limit_order_expiration")
> self.current_step):
tr["CloseStep"] = self.current_step
tr["Status"] = 4
self.transaction_limit_order.remove(tr)
self.transaction_history.append(tr)
def _manage_tranaction(self, tr, _p, close_price, status=1):
self.transaction_live.remove(tr)
tr["ClosePrice"] = close_price
tr["Point"] = int(_p)
tr["Reward"] = int(tr["Reward"] + _p)
tr["Status"] = status
tr["CloseTime"] = self._t
tr["CloseStep"] = self.current_step
self.balance += int(tr["Reward"])
self.total_equity -= int(abs(tr["Reward"]))
self.tranaction_close_this_step.append(tr)
self.transaction_history.append(tr)
def analyze_transaction_history(self, log_file):
if not self.transaction_history:
metrics = {"trades": 0, "win_rate": 0.0, "profit_factor": 0.0, "sharpe_ratio": 0.0, "total_profit": 0.0}
else:
trades = len(self.transaction_history)
rewards = [tr["Reward"] for tr in self.transaction_history]
wins = sum(1 for r in rewards if r > 0)
losses = sum(1 for r in rewards if r < 0)
gross_profit = sum(r for r in rewards if r > 0)
gross_loss = abs(sum(r for r in rewards if r < 0))
win_rate = wins / trades if trades > 0 else 0.0
profit_factor = gross_profit / gross_loss if gross_loss > 0 else float("inf")
returns = np.array(rewards, dtype=np.float32)
sharpe_ratio = np.mean(returns) / np.std(returns) if np.std(returns) > 0 else 0.0
total_profit = sum(rewards)
metrics = {
"trades": trades,
"win_rate": win_rate,
"profit_factor": profit_factor,
"sharpe_ratio": sharpe_ratio,
"total_profit": total_profit
}
with open(self.analyze_transaction_history_log_filename, 'a', newline='') as f:
writer = csv.DictWriter(f, fieldnames=["timestamp", "episode", "trades", "win_rate", "profit_factor", "sharpe_ratio", "total_profit"])
metrics["timestamp"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
metrics["episode"] = self.episode
writer.writerow(metrics)
return metrics
def step(self, actions):
self.current_step += 1
terminated = self.balance <= 0
truncated = self.current_step == len(self.dt_datetime) - 1
done = terminated or truncated
if done:
self.done_information += f"Episode: {self.episode} Balance: {self.balance} Step: {self.current_step}\n"
self.visualization = True
self.episode += 1
base_reward = self._take_action(actions, done)
unrealized_profit = 0
atr_scaling = 0
for i, asset in enumerate(self.assets):
atr = self.get_observation(self.current_step, i, "ATR")
atr_scaling += atr
for tr in self.transaction_live:
if tr["Symbol"] == asset:
if tr["Type"] == 0:
unrealized = (self._c - tr["ActionPrice"]) * self.cf.symbol(asset, "point")
else:
unrealized = (tr["ActionPrice"] - self._c) * self.cf.symbol(asset, "point")
unrealized_profit += unrealized
atr_scaling = atr_scaling / len(self.assets) if atr_scaling > 0 else 1
sustained_reward = (unrealized_profit + base_reward) * 0.01 / atr_scaling if self.transaction_live else 0
if not self.transaction_live and all(math.floor(a) == 2 for a in actions):
sustained_reward -= 0.1
total_reward = base_reward + sustained_reward
if self._day > self.current_day:
self.current_day = self._day
self.balance -= self.over_night_cash_penalty
if self.balance != 0:
self.max_draw_down_pct = abs(sum(self.max_draw_downs) / self.balance * 100)
obs = {
"ohlc_data": np.array(self.cached_ohlc_data[self.current_step], dtype=np.float32),
"event_ids": self.cached_economic_data[self.current_step]["event_ids"],
"currency_ids": self.cached_economic_data[self.current_step]["currency_ids"],
"economic_numeric": self.cached_economic_data[self.current_step]["numeric"],
"portfolio_data": np.array(
[self.balance, self.total_equity, self.max_draw_down_pct] + self.current_holding + self.current_draw_downs,
dtype=np.float32
)
}
print(f"Step {self.current_step}: Base Reward={base_reward}, Sustained Reward={sustained_reward}, Total={total_reward}, Balance={self.balance}")
info = {"Close": self.tranaction_close_this_step}
return obs, total_reward, terminated, truncated, info
def get_observation(self, _step, _iter=0, col=None):
if col is None:
return self.cached_ohlc_data[_step]
if col == "_day":
return self.cached_time_serial[_step][1]
elif col == "_time":
return self.cached_time_serial[_step][0]
try:
col_pos = self.observation_list.index(col)
except ValueError:
raise ValueError(f"Column '{col}' not found in observation_list")
return self.cached_ohlc_data[_step][_iter * len(self.observation_list) + col_pos]
def get_observation_vector(self, _dt, cols=None):
cols = self.observation_list if cols is None else cols
v = []
for a in self.assets:
subset = self.df.query(f'{self.asset_col} == "{a}" & {self.time_col} == "{_dt}"')
assert not subset.empty
v += subset.loc[_dt, cols].tolist()
assert len(v) == len(self.assets) * len(cols)
return v
def get_economic_vector(self, _dt):
subset = self.df.loc[_dt]
events = subset['events'] if isinstance(subset, pd.Series) else subset['events'].iloc[0]
event_ids = [self.event_map[e['event']] for e in events[:self.max_events]] + [0] * (self.max_events - len(events))
currency_ids = [self.currency_map.get(e['currency'], 0) for e in events[:self.max_events]] + [0] * (self.max_events - len(events))
numeric_fields = ['actual_norm', 'forecast_norm', 'previous_norm', 'surprise_norm', 'event_freq', 'impact_code']
numeric = [e[field] for e in events[:self.max_events] for field in numeric_fields] + [0] * (self.max_events * 6 - len(events) * 6)
return {
"event_ids": np.array(event_ids, dtype=np.int32),
"currency_ids": np.array(currency_ids, dtype=np.int32),
"numeric": np.array(numeric, dtype=np.float32)
}
def reset(self, seed=None, options=None):
if seed is not None:
self._seed(seed)
if self.random_start:
self.current_step = random.choice(range(int(len(self.dt_datetime) * 0.5)))
else:
self.current_step = 0
self.equity_list = [0] * len(self.assets)
self.balance = self.balance_initial
self.total_equity = self.balance + sum(self.equity_list)
self.ticket_id = 0
self.transaction_live = []
self.transaction_history = []
self.transaction_limit_order = []
self.current_draw_downs = [0.0] * len(self.assets)
self.max_draw_downs = [0.0] * len(self.assets)
self.max_draw_down_pct = sum(self.max_draw_downs) / self.balance * 100
self.current_holding = [0] * len(self.assets)
self.tranaction_open_this_step = []
self.tranaction_close_this_step = []
self.current_day = 0
self.done_information = ""
self.log_header = True
self.visualization = False
obs = {
"ohlc_data": np.array(self.cached_ohlc_data[self.current_step], dtype=np.float32),
"event_ids": self.cached_economic_data[self.current_step]["event_ids"],
"currency_ids": self.cached_economic_data[self.current_step]["currency_ids"],
"economic_numeric": self.cached_economic_data[self.current_step]["numeric"],
"portfolio_data": np.array(
[self.balance, self.total_equity, self.max_draw_down_pct] + self.current_holding + self.current_draw_downs,
dtype=np.float32
)
}
info = {}
return obs, info
def render(self, mode="human", title=None, **kwargs):
if mode in ("human", "file"):
printout = mode == "human"
pm = {
"log_header": self.log_header,
"log_filename": self.log_filename,
"printout": printout,
"balance": self.balance,
"balance_initial": self.balance_initial,
"tranaction_close_this_step": self.tranaction_close_this_step,
"done_information": self.done_information,
}
render_to_file(**pm)
if self.log_header:
self.log_header = False
elif mode == "graph" and self.visualization:
print("plotting...")
p = TradingChart(self.df, self.transaction_history)
p.plot()
def close(self):
pass
def get_sb_env(self):
e = DummyVecEnv([lambda: self])
obs = e.reset()
return e, obs
class CustomFeaturesExtractor(BaseFeaturesExtractor):
def __init__(self, observation_space):
n_assets = (observation_space.spaces["portfolio_data"].shape[0] - 3) // 2
ohlc_dim = observation_space.spaces["ohlc_data"].shape[0]
max_events = observation_space.spaces["event_ids"].shape[0]
economic_numeric_dim = observation_space.spaces["economic_numeric"].shape[0]
portfolio_dim = observation_space.spaces["portfolio_data"].shape[0]
features_dim = ohlc_dim + 2 * max_events + economic_numeric_dim + portfolio_dim
super().__init__(observation_space, features_dim=features_dim)
self.event_embedding = nn.Embedding(num_embeddings=129, embedding_dim=max_events)
self.currency_embedding = nn.Embedding(num_embeddings=6, embedding_dim=max_events)
print(f"CustomFeaturesExtractor: n_assets={n_assets}, features_dim={features_dim}")
def forward(self, obs):
ohlc_data = obs["ohlc_data"]
event_ids = obs["event_ids"].to(torch.long)
currency_ids = obs["currency_ids"].to(torch.long)
economic_numeric = obs["economic_numeric"]
portfolio_data = obs["portfolio_data"]
event_emb = self.event_embedding(event_ids).mean(dim=1)
currency_emb = self.currency_embedding(currency_ids).mean(dim=1)
features = torch.cat([ohlc_data, event_emb, currency_emb, economic_numeric, portfolio_data], dim=1)
return features
class CustomMultiInputPolicy(ActorCriticPolicy):
def __init__(self, observation_space, action_space, lr_schedule, *args, **kwargs):
super().__init__(
observation_space,
action_space,
lr_schedule,
features_extractor_class=CustomFeaturesExtractor,
features_extractor_kwargs={},
net_arch=dict(pi=[64, 64], vf=[64, 64]),
*args,
**kwargs
)
features_dim = self.features_extractor.features_dim
self.mlp_extractor = MlpExtractor(
features_dim,
net_arch=self.net_arch["pi"],
activation_fn=nn.ReLU,
device=self.device
)
# Modified to output continuous actions
self.action_net = nn.Linear(64, action_space.shape[0]) # n_assets continuous outputs
self.value_net = nn.Linear(64, 1)
self.num_timesteps = 0
def forward(self, obs, deterministic=False):
self.num_timesteps += 1
features = self.extract_features(obs)
latent_pi, latent_vf = self.mlp_extractor(features)
actions = self.action_net(latent_pi) # Continuous outputs [batch, n_assets]
values = self.value_net(latent_vf)
# Clamp actions to [0, 3] range
actions = torch.clamp(actions, min=0, max=3)
# For continuous actions, log_prob is not directly applicable; use a normal distribution if needed
distribution = torch.distributions.Normal(actions, 0.1) # Small std for exploration
log_prob = distribution.log_prob(actions).sum(dim=-1, keepdim=True) if not deterministic else None
if self.num_timesteps % 1000 == 0:
print(f"Step {self.num_timesteps}, Training: {self.training}, Actions: {actions}")
return actions, values, log_prob
def extract_features(self, obs):
return self.features_extractor(obs)
def predict_values(self, obs):
features = self.extract_features(obs)
_, latent_vf = self.mlp_extractor(features)
return self.value_net(latent_vf)
Key Changes
- Action Space:
- Changed from spaces.MultiDiscrete([3] * len(self.assets)) to spaces.Box(low=0, high=3, shape=(len(self.assets),), dtype=np.float32).
- This allows continuous actions between 0 and 3 per asset, where the integer part determines the action type (0=Buy, 1=Sell, 2=Nothing) and the fractional part is fraction_action.
- Policy Adjustment:
- Modified CustomMultiInputPolicy:
- action_net now outputs action_space.shape[0] (number of assets) continuous values instead of discrete logits.
- Actions are clamped to [0, 3] to match the Box space.
- Used a Normal distribution for continuous actions (simplified; you might refine this with a proper policy like SquashedDiagGaussian if using PPO with continuous actions).
- Modified CustomMultiInputPolicy:
- Action Processing in _take_action:
- Extracted _action = math.floor(action) (integer part) and fraction_action = min(max(action - _action, 0), 1) (fractional part, clamped to [0, 1]).
- Updated PT calculation:
python
_profit_taken = math.ceil(fraction_action * self.cf.symbol(self.assets[i], "profit_taken_max")) + self.cf.symbol(self.assets[i], "stop_loss_max")
- This implements ProfitTaken = fraction_action * max_profit_taken + SL dynamically at trade opening.
- Inaction Penalty in step:
- Adjusted the condition to all(math.floor(a) == 2 for a in actions) to work with continuous actions.
How It Works
- Action Interpretation:
- Action range: [0, 3).
- 0.0 to <1.0: Buy (e.g., 0.3 → Buy with fraction_action = 0.3).
- 1.0 to <2.0: Sell (e.g., 1.7 → Sell with fraction_action = 0.7).
- 2.0 to <3.0: Nothing (fraction ignored).
- PT Calculation:
- Example: max_profit_taken = 100, stop_loss_max = 50.
- Action = 0.3 → _action = 0, fraction_action = 0.3, PT = 0.3 * 100 + 50 = 80.
- Action = 1.8 → _action = 1, fraction_action = 0.8, PT = 0.8 * 100 + 50 = 130.
- Action = 2.4 → _action = 2, PT unchanged (no trade opened).
- Example: max_profit_taken = 100, stop_loss_max = 50.
- Dynamic at Opening: PT is set dynamically when the trade is opened based on fraction_action, fulfilling your requirement.
Notes
- Policy Compatibility: Since you’re using PPO, the switch to continuous actions requires adjusting the policy. The current Normal distribution is a placeholder; for PPO, consider using stable_baselines3.common.distributions.SquashedDiagGaussianDistribution with a tanh transformation to map outputs to [0, 3].
- Debugging: The print statement in _take_action now shows the full action, action type, and fraction for clarity.
- SL: Remains fixed at stop_loss_max per docstring point 11.
SL:根据文档字符串点 11 保持固定stop_loss_max。
Let me know if you need further refinements, like a more sophisticated continuous policy or mid-trade PT adjustments!
如果您需要进一步的改进,例如更复杂的持续政策或中期交易 PT 调整,请告诉我!
如何实现这个 “stable_baselines3.common.distributions.SquashedDiagGaussianDistribution with a tanh transform to map output to [0, 3].”?
To implement stable_baselines3.common.distributions.SquashedDiagGaussianDistribution with a tanh transformation to map the continuous action outputs of your CustomMultiInputPolicy to the range [0, 3] (matching your spaces.Box(low=0, high=3, shape=(len(self.assets),)) action space), we need to modify the CustomMultiInputPolicy class in your code. This involves replacing the simple Normal distribution with a squashed Gaussian distribution, which uses tanh to squash the unbounded outputs into a [-1, 1] range, then scaling and shifting them to [0, 3].
要使用 tanh 转换实现 stable_baselines3.common.distributions.SquashedDiagGaussianDistribution,以将 CustomMultiInputPolicy 的连续作输出映射到范围 [0, 3](与您的空格匹配)。Box(low=0, high=3, shape=(len(self.assets),)) 动作空间),我们需要修改代码中的 CustomMultiInputPolicy 类。这包括将简单的正态分布替换为压缩的高斯分布,该分布使用 tanh 将无界输出压缩到 [-1, 1] 范围内,然后缩放并将它们移动到 [0, 3]。
Here’s how to do it:
这是如何做到的:
Steps to Implement 实施步骤
- Understand the Distribution:
了解分布:- SquashedDiagGaussianDistribution generates actions from a diagonal Gaussian (normal) distribution, applies tanh to squash outputs to [-1, 1], and allows scaling to a custom range (e.g., [0, 3]).
SquashedDiagGaussianDistribution 从对角线高斯(正态)分布生成动作,将 tanh 应用于 [-1, 1] 的挤压输出,并允许缩放到自定义范围(例如,[0, 3])。 - It’s suitable for PPO with continuous action spaces and handles exploration via the Gaussian noise.
它适用于具有连续动作空间的 PPO,并通过高斯噪声处理探索。
- SquashedDiagGaussianDistribution generates actions from a diagonal Gaussian (normal) distribution, applies tanh to squash outputs to [-1, 1], and allows scaling to a custom range (e.g., [0, 3]).
- Modify the Policy:
修改策略:- Replace the action_net output and forward method to use this distribution.
替换 action_net output 和 forward 方法以使用此分配。 - Scale the squashed outputs from [-1, 1] to [0, 3] using a linear transformation: action = (tanh_output + 1) * (high - low) / 2 + low, where low = 0, high = 3.
使用线性变换将压缩的输出从 [-1, 1] 缩放到 [0, 3]:action = (tanh_output + 1) * (high - low) / 2 + low,其中 low = 0,high = 3。
- Replace the action_net output and forward method to use this distribution.
- Integrate with Stable-Baselines3:
与 Stable-Baselines3 集成:- Use stable_baselines3.common.distributions to define the distribution and compute actions, log probabilities, and entropy.
使用 stable_baselines3.common.distributions 定义分布和计算作、对数概率和熵。
- Use stable_baselines3.common.distributions to define the distribution and compute actions, log probabilities, and entropy.
Edited CustomMultiInputPolicy Code
编辑的 CustomMultiInputPolicy 代码
Here’s the updated CustomMultiInputPolicy class with SquashedDiagGaussianDistribution:
以下是使用 SquashedDiagGaussianDistribution 更新的 CustomMultiInputPolicy 类:
from stable_baselines3.common.policies import ActorCriticPolicy
from stable_baselines3.common.torch_layers import MlpExtractor
from stable_baselines3.common.distributions import SquashedDiagGaussianDistribution
import torch
import torch.nn as nn
class CustomMultiInputPolicy(ActorCriticPolicy):
def __init__(self, observation_space, action_space, lr_schedule, *args, **kwargs):
# Extract action space bounds
self.action_space_low = torch.tensor(action_space.low, dtype=torch.float32)
self.action_space_high = torch.tensor(action_space.high, dtype=torch.float32)
action_dim = action_space.shape[0] # Number of assets
super().__init__(
observation_space,
action_space,
lr_schedule,
features_extractor_class=CustomFeaturesExtractor,
features_extractor_kwargs={},
net_arch=dict(pi=[64, 64], vf=[64, 64]),
*args,
**kwargs
)
features_dim = self.features_extractor.features_dim
self.mlp_extractor = MlpExtractor(
features_dim,
net_arch=self.net_arch["pi"],
activation_fn=nn.ReLU,
device=self.device
)
# Define the action network to output mean and log_std for the Gaussian
self.action_net = nn.Linear(64, action_dim * 2) # Outputs mean and log_std for each asset
self.value_net = nn.Linear(64, 1)
# Initialize the distribution
self.action_dist = SquashedDiagGaussianDistribution(action_dim)
self.num_timesteps = 0
def forward(self, obs, deterministic=False):
self.num_timesteps += 1
# Extract features
features = self.extract_features(obs)
latent_pi, latent_vf = self.mlp_extractor(features)
# Get mean and log_std from action_net
action_params = self.action_net(latent_pi) # [batch, n_assets * 2]
mean_actions, log_std = action_params.chunk(2, dim=-1) # Split into mean and log_std
log_std = torch.clamp(log_std, min=-20, max=2) # Stabilize log_std
# Update the distribution with current parameters
self.action_dist.update(mean_actions, log_std)
# Sample actions or get deterministic actions
if deterministic:
actions = self.action_dist.mode()
else:
actions = self.action_dist.sample()
# Map from [-1, 1] to [0, 3]
actions = self._squash_to_range(actions, self.action_space_low, self.action_space_high)
# Compute log probabilities
log_prob = self.action_dist.log_prob(actions).sum(dim=-1, keepdim=True)
# Value prediction
values = self.value_net(latent_vf)
if self.num_timesteps % 1000 == 0:
print(f"Step {self.num_timesteps}, Training: {self.training}, Actions: {actions}, Mean: {mean_actions.mean()}, Log Std: {log_std.mean()}")
return actions, values, log_prob
def _squash_to_range(self, actions, low, high):
"""Scale squashed actions from [-1, 1] to [low, high]."""
return (actions + 1) * (high - low) / 2 + low
def extract_features(self, obs):
return self.features_extractor(obs)
def predict_values(self, obs):
features = self.extract_features(obs)
_, latent_vf = self.mlp_extractor(features)
return self.value_net(latent_vf)
def evaluate_actions(self, obs, actions):
"""Evaluate actions for training (used by PPO)."""
features = self.extract_features(obs)
latent_pi, latent_vf = self.mlp_extractor(features)
action_params = self.action_net(latent_pi)
mean_actions, log_std = action_params.chunk(2, dim=-1)
log_std = torch.clamp(log_std, min=-20, max=2)
self.action_dist.update(mean_actions, log_std)
# Actions need to be unsquashed back to [-1, 1] for log_prob calculation
unsquashed_actions = 2 * (actions - self.action_space_low) / (self.action_space_high - self.action_space_low) - 1
log_prob = self.action_dist.log_prob(unsquashed_actions).sum(dim=-1, keepdim=True)
entropy = self.action_dist.entropy().sum(dim=-1, keepdim=True)
values = self.value_net(latent_vf)
return values, log_prob, entropy
Key Changes Explained 主要变化解释
- Action Space Handling:
作空间处理:- Added action_space_low and action_space_high as tensors to store the bounds ([0, 3] per asset).
添加了 action_space_low 和 action_space_high 作为张量来存储边界 ([0, 3] 每个资产)。 - These are used to scale the tanh-squashed outputs.
这些用于缩放 tanh 压扁的输出。
- Added action_space_low and action_space_high as tensors to store the bounds ([0, 3] per asset).
- Action Network: 行动网络:
- Changed action_net to output action_dim * 2 values (mean and log_std for each asset).
更改了action_net以输出 action_dim * 2 值(每个资产的平均值和 log_std)。 - Split the output into mean_actions and log_std using chunk.
使用 chunk 将输出拆分为 mean_actions 和 log_std。
- Changed action_net to output action_dim * 2 values (mean and log_std for each asset).
- SquashedDiagGaussianDistribution:
SquashedDiagGaussianDistribution:- Initialized with action_dim (number of assets).
使用 action_dim (资产数量) 初始化。 - update(mean_actions, log_std) sets the distribution parameters.
update(mean_actions, log_std) 设置分布参数。 - sample() or mode() generates actions in [-1, 1], which are then scaled.
sample() 或 mode() 在 [-1, 1] 中生成动作,然后进行缩放。
- Initialized with action_dim (number of assets).
- Scaling to [0, 3]:
缩放为 [0, 3]:- Added _squash_to_range method:
新增_squash_to_range方法: action=(tanhoutput+1)∗(3−0)/2+0=(tanhoutput+1)∗1.5- Maps [-1, 1] to [0, 3].
映射 [-1, 1] 到 [0, 3]。
- Maps [-1, 1] to [0, 3].
- Added _squash_to_range method:
- Log Probabilities and Entropy:
对数概率和熵:- log_prob is computed on the unsquashed actions (in [-1, 1] space) to match the distribution’s internal logic.
log_prob 是针对未压缩的动作 (在 [-1, 1] 空间中) 计算的,以匹配分布的内部逻辑。 - Added evaluate_actions method for PPO’s policy gradient computation, unsquashing the input actions back to [-1, 1].
添加了 evaluate_actions PPO 策略梯度计算的方法,将输入作取消压缩回 [-1, 1]。
- log_prob is computed on the unsquashed actions (in [-1, 1] space) to match the distribution’s internal logic.
- Debugging: 调试:
- Updated the debug print to include mean and log_std statistics.
更新了调试打印以包括 mean 和 log_std 统计数据。
- Updated the debug print to include mean and log_std statistics.
Integration with Your Environment
与您的环境集成
Your tgym environment’s action space is already set to spaces.Box(low=0, high=3, shape=(len(self.assets),)), so no changes are needed there. The policy now outputs continuous actions in [0, 3], which _take_action interprets as:
您的 tgym 环境的作空间已设置为 spaces。Box(low=0, high=3, shape=(len(self.assets),))),因此不需要更改。该策略现在输出 [0, 3] 中的连续作,_take_action将其解释为:
- _action = math.floor(action) for Buy (0), Sell (1), or Nothing (2).
_action = math.floor(action) 表示买入 (0)、卖出 (1) 或全无 (2)。 - fraction_action = min(max(action - _action, 0), 1) for PT calculation.
fraction_action = min(max(action - _action, 0), 1) 用于 PT 计算。
Example: 例:
- Policy outputs actions = [0.7, 1.4, 2.2]:
策略输出作 = [0.7, 1.4, 2.2]:- Asset 0: _action = 0, fraction_action = 0.7 → Buy, PT = 0.7 * max_profit_taken + SL.
资产 0:_action = 0,fraction_action = 0.7 →买入,PT = 0.7 * max_profit_taken + 止损。 - Asset 1: _action = 1, fraction_action = 0.4 → Sell, PT = 0.4 * max_profit_taken + SL.
资产 1:_action = 1,fraction_action = 0.4 → 卖出,PT = 0.4 * max_profit_taken + 止损。 - Asset 2: _action = 2, fraction_action = 0.2 → Nothing.
资产 2:_action = 2,fraction_action = 0.2 → 无。
- Asset 0: _action = 0, fraction_action = 0.7 → Buy, PT = 0.7 * max_profit_taken + SL.
Training with PPO 使用 PPO 进行培训
To use this policy with PPO:
要将此策略与 PPO 一起使用,请执行以下作:
from stable_baselines3 import PPO
# Assuming env is your tgym instance wrapped in DummyVecEnv
env, _ = env.get_sb_env()
model = PPO(
CustomMultiInputPolicy,
env,
learning_rate=0.0003,
n_steps=2048,
batch_size=64,
n_epochs=10,
gamma=0.99,
verbose=1
)
model.learn(total_timesteps=10000)
Notes 笔记
- Stability: Clamping log_std between -20 and 2 prevents numerical issues; adjust these bounds if needed.
稳定性:log_std -20 到 2 之间的夹持可以防止数值问题;如果需要,请调整这些边界。 - Exploration: The Gaussian noise (controlled by log_std) ensures exploration, replacing the discrete Categorical distribution’s randomness.
探索:高斯噪声(由 log_std 控制)确保探索,取代离散分类分布的随机性。 - Performance: Test the policy to ensure the [0, 3] range and PT calculation align with your trading goals.
性能:测试策略以确保 [0, 3] 范围和 PT 计算与您的交易目标一致。
Let me know if you need help testing or further tweaking this implementation!
如果您需要帮助测试或进一步调整此实现,请告诉我!
如何使用 evaluate_action()? 这是用于训练 PPO 还是 RL 模型会自动运行此代码? ``` 类 CustomMultiInputPolicy(ActorCriticPolicy): def __init__(self, observation_space, action_space, lr_schedule, *args, **kwargs): # 提取作空间边界 self.action_space_low = torch.tensor (action_space.low, dtype=torch.float32) self.action_space_high = torch.tensor(action_space.high, dtype=torch.float32) action_dim = action_space.shape[0] # 资产数量 super().__init__( observation_space, action_space, lr_schedule, features_extractor_class=CustomFeaturesExtractor, features_extractor_kwargs={}, net_arch=dict(pi=[64, 64], vf=[64, 64]), *args、 **kwargs ) features_dim = self.features_extractor.features_dim self.mlp_extractor = MlpExtractor( features_dim, net_arch=self.net_arch[“pi”], activation_fn=nn.ReLU , 设备=self.device ) # 定义动作网络以输出高斯的均值和log_std self.action_net = nn.Linear(64, action_dim * 2) # 输出每种资产的平均值和log_std self.value_net = nn.线性(64, 1) # 初始化发行版 self.action_dist = SquashedDiagGaussianDistribution (action_dim) self.num_timesteps = 0 def forward(self, obs, deterministic=False): # 在每次前向传递上增加时间步长 self.num_timesteps += 1 # 提取特征 特征 = self.extract_features(OBS) latent_pi,latent_vf = self.mlp_extractor(特征) # 从 action_net 获取 mean 和 log_std action_params = self.action_net(latent_pi) # [批量, n_assets * 2] mean_actions, log_std = action_params.chunk(2, dim=-1) # 拆分为均值和log_std log_std = torch.clamp(log_std, min=-20, max=2) # 稳定log_std # 使用当前参数更新 distribution self.action_dist.update(mean_actions, log_std) # 示例作或获取确定性作 如果是确定性的: 作 = self.action_dist.mode() 还: 作 = self.action_dist.sample() # 从 [-1, 1] 到 [0, 3] 的映射 作 = self._squash_to_range(作、self.action_space_low、self.action_space_high) # 计算对数概率 log_prob = self.action_dist.log_prob(作).sum(dim=-1, keepdim=True) # 价值预测 值 = self.value_net(latent_vf) 如果 self.num_timesteps % 1000 == 0: print(f“步骤 {self.num_timesteps}, 训练: {self.training},作: {actions}, mean: {mean_actions.mean()}, log std: {log_std.mean()}”) 返回作、值log_prob def _squash_to_range(self, actions, low, high): “”“将压缩的作从 [-1, 1] 缩放到 [low, high]。”“”” 返回 (作数 + 1) * (最高价 - 最低价) / 2 + 最低价 def extract_features(自身,OBS): 返回self.features_extractor (OBS) def predict_values(自身, OBS): 特征 = self.extract_features(OBS) _, latent_vf = self.mlp_extractor(特征) 返回 self.value_net(latent_vf) def evaluate_actions(自我、OBS、作): “”“评估训练作(由 PPO 使用)。”“” 特征 = self.extract_features(OBS) latent_pi,latent_vf = self.mlp_extractor(特征) action_params = self.action_net(latent_pi) mean_actions,log_std = action_params.chunk(2, dim=-1) log_std = torch.clamp(log_std,最小=-20,最大=2) self.action_dist.update(mean_actions, log_std) #作需要取消压缩回 [-1, 1] 以进行log_prob计算 unsquashed_actions = 2 * (作 - self.action_space_low) / (self.action_space_high - self.action_space_low) - 1 log_prob = self.action_dist.log_prob(unsquashed_actions).sum(dim=-1, keepdim=True) 熵 = self.action_dist.entropy().sum(dim=-1, keepdim=True) 值 = self.value_net(latent_vf) 返回值、log_prob、熵 ```
Let’s clarify how evaluate_actions() works in your CustomMultiInputPolicy class, its role in training a PPO model with Stable-Baselines3, and whether you need to call it manually or if the reinforcement learning (RL) model (PPO) automatically uses it.
让我们澄清一下 evaluate_actions() 在 CustomMultiInputPolicy 类中的工作原理,它在使用 Stable-Baselines3 训练 PPO 模型时的作用,以及是否需要手动调用它,或者强化学习 (RL) 模型 (PPO) 是否会自动使用它。
What Does evaluate_actions() Do?
evaluate_actions() 有什么作用?
The evaluate_actions() method is a critical part of the ActorCriticPolicy base class in Stable-Baselines3. It’s designed to evaluate a batch of observations and actions during training, returning:
evaluate_actions() 方法是 Stable-Baselines3 中 ActorCriticPolicy 基类的关键部分。它旨在评估训练期间的一批观察和作,返回:
- Values: The critic’s value estimates for the given observations (values).
值:评论家对给定观测值 (值) 的值估计值。 - Log Probabilities: The log probability of the actions under the current policy (log_prob).
对数概率:当前策略下作的对数概率 (log_prob)。 - Entropy: The entropy of the action distribution, which measures exploration (entropy).
熵:动作分布的熵,用于衡量探索(熵)。
These outputs are used by PPO to compute the policy loss, value loss, and entropy regularization during the training process. Specifically:
PPO 使用这些输出来计算训练过程中的策略损失、值损失和熵正则化。具体说来:
- Policy Loss: Uses log_prob to calculate the PPO clipped objective.
保单损失: 使用 log_prob 计算 PPO 剪裁目标。 - Value Loss: Uses values to fit the value function to the observed returns.
Value Loss:使用值将 value 函数拟合到观察到的回报。 - Entropy Bonus: Uses entropy to encourage exploration.
熵加成:使用熵来鼓励探索。
Is evaluate_actions() Used Automatically by PPO?
PPO 会自动使用 evaluate_actions() 吗?
Yes, the PPO algorithm in Stable-Baselines3 automatically calls evaluate_actions() during training as part of its optimization process. You don’t need to invoke it manually when using the standard model.learn() workflow. Here’s how it happens:
是的,Stable-Baselines3 中的 PPO 算法在训练期间自动调用 evaluate_actions(),作为其优化过程的一部分。使用标准 model.learn() 工作流时,您无需手动调用它。它是这样发生的:
- Rollout Collection:
推出集合:- PPO collects experience by interacting with the environment using the forward() method to generate actions (actions), value predictions (values), and log probabilities (log_prob) for each step.
PPO 通过使用 forward() 方法与环境交互来收集经验,以为每个步骤生成作 (actions)、值预测 (values) 和对数概率 (log_prob)。 - This data is stored in a rollout buffer (e.g., RolloutBuffer in Stable-Baselines3).
此数据存储在转出缓冲区中(例如,Stable-Baselines3 中的 RolloutBuffer)。
- PPO collects experience by interacting with the environment using the forward() method to generate actions (actions), value predictions (values), and log probabilities (log_prob) for each step.
- Training Phase: 训练阶段:
- After collecting a batch of rollouts (controlled by n_steps), PPO samples minibatches from the buffer.
在收集一批转出(由 n_steps 控制)后,PPO 从缓冲区中对小批量进行采样。 - For each minibatch, it calls evaluate_actions(obs, actions) with the stored observations and actions to compute updated values, log_prob, and entropy under the current policy parameters.
对于每个小批量,它使用存储的观察值和作调用 evaluate_actions(obs, actions) 来计算当前策略参数下的更新值、log_prob和熵。 - These values are used to compute the loss and update the policy and value networks via gradient descent.
这些值用于计算损失,并通过梯度下降更新策略和价值网络。
- After collecting a batch of rollouts (controlled by n_steps), PPO samples minibatches from the buffer.
- Why It’s Automatic:
为什么它是自动的:- The PPO class in Stable-Baselines3 handles this internally in its train() method. It relies on the policy’s evaluate_actions() implementation to evaluate the policy’s performance on past actions, which is necessary because the policy changes during training (unlike forward(), which is used for action sampling).
Stable-Baselines3 中的 PPO 类在其 train() 方法内部处理此问题。它依赖于策略的 evaluate_actions() 实现来评估策略对过去作的性能,这是必要的,因为策略在训练期间会发生变化(与用于作采样的 forward() 不同)。
- The PPO class in Stable-Baselines3 handles this internally in its train() method. It relies on the policy’s evaluate_actions() implementation to evaluate the policy’s performance on past actions, which is necessary because the policy changes during training (unlike forward(), which is used for action sampling).
When Would You Use evaluate_actions() Manually?
您何时手动使用 evaluate_actions()?
You typically don’t need to call evaluate_actions() directly unless you’re:
通常不需要直接调用 evaluate_actions(),除非你是:
- Debugging: Checking the policy’s outputs (values, log probs, entropy) for specific observations and actions.
调试:检查策略的输出(值、对数概率、熵)以获取特定的观察和作。 - Custom Training Loop: Implementing your own training logic outside of Stable-Baselines3’s default learn() method.
自定义训练循环:在 Stable-Baselines3 的默认 learn() 方法之外实现您自己的训练逻辑。
For example, to debug: 例如,要调试:
obs = env.reset()[0] # Get initial observation
actions = torch.tensor([0.5, 1.2, 2.8], dtype=torch.float32).unsqueeze(0) # Example actions
values, log_prob, entropy = model.policy.evaluate_actions(obs, actions)
print(f"Values: {values}, Log Prob: {log_prob}, Entropy: {entropy}")
How evaluate_actions() Fits Into Your Code
evaluate_actions() 如何适应您的代码
In your CustomMultiInputPolicy, evaluate_actions() is correctly implemented to work with the SquashedDiagGaussianDistribution. Let’s break it down:
在 CustomMultiInputPolicy 中,evaluate_actions() 已正确实现,以便与 SquashedDiagGaussianDistribution 一起使用。让我们来分析一下:
def evaluate_actions(self, obs, actions):
features = self.extract_features(obs)
latent_pi, latent_vf = self.mlp_extractor(features)
action_params = self.action_net(latent_pi)
mean_actions, log_std = action_params.chunk(2, dim=-1)
log_std = torch.clamp(log_std, min=-20, max=2)
self.action_dist.update(mean_actions, log_std)
unsquashed_actions = 2 * (actions - self.action_space_low) / (self.action_space_high - self.action_space_low) - 1
log_prob = self.action_dist.log_prob(unsquashed_actions).sum(dim=-1, keepdim=True)
entropy = self.action_dist.entropy().sum(dim=-1, keepdim=True)
values = self.value_net(latent_vf)
return values, log_prob, entropy
- Input: 输入:
- obs: Batch of observations from the rollout buffer.
obs:来自转出缓冲区的一批观察结果。 - actions: Batch of actions taken (in [0, 3] range) from the rollout buffer.
actions:从转出缓冲区执行的作批次(在 [0, 3] 范围内)。
- obs: Batch of observations from the rollout buffer.
- Process: 工艺:
- Computes the current policy’s mean and log_std (action_params).
计算当前策略的均值和log_std (action_params)。 - Unsquashes the input actions from [0, 3] back to [-1, 1] to match the tanh-squashed distribution’s internal space.
将 [0, 3] 的输入作解压缩回 [-1, 1] 以匹配 tanh 压缩分布的内部空间。 - Evaluates log_prob and entropy in the unsquashed space.
计算 unsqueshed 空间中的 log_prob 和 entrop。 - Computes values from the value network.
计算 value network 中的值。
- Computes the current policy’s mean and log_std (action_params).
- Output: 输出:
- values, log_prob, and entropy are returned to PPO for loss calculation.
值、log_prob 和熵将返回给 PPO 进行损失计算。
- values, log_prob, and entropy are returned to PPO for loss calculation.
Using It in PPO Training
在 PPO 训练中使用它
Here’s how you’d train your PPO model with this policy (no manual evaluate_actions() call needed):
以下是使用此策略训练 PPO 模型的方法(无需手动 evaluate_actions() 调用):
from stable_baselines3 import PPO
# Assuming env is your tgym instance wrapped in DummyVecEnv
env, _ = env.get_sb_env()
model = PPO(
CustomMultiInputPolicy,
env,
learning_rate=0.0003,
n_steps=2048, # Steps per rollout
batch_size=64,
n_epochs=10,
gamma=0.99,
verbose=1
)
# Train the model; evaluate_actions() is called internally
model.learn(total_timesteps=10000)
During learn(), PPO:
在 learn() 期间,PPO:
- Uses forward() to collect rollouts.
使用 forward() 收集卷展栏。 - Calls evaluate_actions() during training to evaluate the stored actions under the updated policy.
在训练期间调用 evaluate_actions() 以评估更新策略下的存储作。 - Updates the policy parameters based on the computed losses.
根据计算的损失更新策略参数。
Key Points 要点
- Automatic Usage:
自动使用:- You don’t need to call evaluate_actions() manually when using PPO.learn(). It’s invoked by Stable-Baselines3’s training loop.
使用 PPO.learn() 时,您无需手动调用 evaluate_actions()。它由 Stable-Baselines3 的训练循环调用。
- You don’t need to call evaluate_actions() manually when using PPO.learn(). It’s invoked by Stable-Baselines3’s training loop.
- Correctness: 正确性:
- Your implementation is correct for a continuous action space with SquashedDiagGaussianDistribution. The unsquashing step ensures log probabilities are computed in the right space.
您的实现对于具有 SquashedDiagGaussianDistribution 的连续作空间是正确的。解压缩步骤可确保在正确的空间中计算对数概率。
- Your implementation is correct for a continuous action space with SquashedDiagGaussianDistribution. The unsquashing step ensures log probabilities are computed in the right space.
- Debugging Tip: 调试提示:
- If you suspect issues (e.g., NaN losses), add a print statement in evaluate_actions():
如果您怀疑有问题(例如,NaN 丢失),请在 evaluate_actions() 中添加 print 语句:python 蟒print(f"Evaluate Actions: Values={values.mean()}, Log Prob={log_prob.mean()}, Entropy={entropy.mean()}")
- If you suspect issues (e.g., NaN losses), add a print statement in evaluate_actions():
- Training Dynamics:
训练动态:- PPO updates the policy gradually, so evaluate_actions() reflects the current policy’s evaluation of past actions, enabling the clipped objective to prevent large policy shifts.
PPO 会逐步更新策略,因此 evaluate_actions() 会反映当前策略对过去作的评估,从而使被裁剪的目标能够防止策略发生重大变化。
- PPO updates the policy gradually, so evaluate_actions() reflects the current policy’s evaluation of past actions, enabling the clipped objective to prevent large policy shifts.
Conclusion 结论
- No Manual Call Needed: The RL model (PPO) will automatically run evaluate_actions() during training as part of its optimization process.
无需手动调用:RL 模型 (PPO) 将在训练期间自动运行 evaluate_actions(),作为其优化过程的一部分。 - Use Case: Only call it manually for debugging or custom workflows.
用例:仅手动调用它以进行调试或自定义工作流。
If you’re seeing issues during training (e.g., unexpected behavior or errors), let me know, and I can help debug or refine the implementation further!
如果您在训练期间发现问题(例如,意外行为或错误),请告诉我,我可以帮助进一步调试或完善实施!