【シストレ開発#21】バックアップ/リストア、週末手仕舞い処理追加!

 ※ 本記事は広告・プロモーションを含みます。
シストレ開発

新機能を追加したことで使い勝手がさらに良くなりました! たっきん(Twitter)です!

自作の自動売買システム(システムトレード)ですが、特に大きな不具合もなく順調に稼働しています。

本格的に稼働をはじめてから大体3か月くらい経った感じですね。

トレードのほうは問題なく実行できているのですが、それとは別に使い勝手の悪い面もいくつか見えてきました。

<使い勝手の悪い面>

  1. トレード実行中にプログラムを一旦停止し、再開した後はトレード実行中状態を復元できない。
  2. 週末にクローズされなかったポジションは次週持ち越しになってしまう。

1.については前々から不便に思っていました。

サーバーでシストレ運用中にサーバーメンテやソフトウェア更新などでプログラムを中断/再開するシーンは結構あると思いますが、今のソフトではプログラムを停止→再開するときに停止前の状態を再開後に復元することができずにいました。

なので今の運用としてはトレード中にプログラムを停止するときは手動でポジションを決済した後にプログラムを停止していますが、毎回手動で決済するのも煩わしいし、そもそも手動で決済してたらシステムトレードにならないため、このタイミングで改善することにしました。

2.についても不便に感じていたところになります。

人によってはポジションを翌週に持ち越したくないため、週末に全ポジションを手仕舞いしたい人もいると思いますが、今のソフトでは週末手仕舞いができないため、決済できなかったポジションは強制的に次週に持ち越しになってしまいます。

なので、この使い勝手が良くない点もこのタイミングで改善していこうと思います。

バックアップ/リストア処理の追加

プログラム停止→再開した後のトレード状態復元のやり方は至ってシンプルに実現可能です。

プログラム停止前にトレード状態のバックアップデータをストレージに保存して、再開後にバックアップデータを読み込んでリストアするだけです。

リストア処理もさほど難しくありません。

設計思想が状態遷移で設計されているため、リストア後は「新規約定待ち(EntryWaiting)」「決済約定待ち(ExitWaiting)」へ強制遷移してからメイン処理を再開するだけです。

上記の内容をソースコードに落とし込むと下記のようになります。
(修正箇所のみを抜粋しています。)

import pandas as pd
from pathlib import Path
from typing import Optional


def save_df_csv(filepath: str,
                df: pd.DataFrame,
                index: bool = True,
                date_format: Optional[str] = None
                ) -> None:
    p = Path(filepath)
    p.parent.mkdir(parents=True, exist_ok=True)
    df.to_csv(filepath, index=index, date_format=date_format)


def load_df_csv(filepath: str) -> pd.DataFrame:
    p = Path(filepath)
    if not p.exists():
        raise FileNotFoundError("Could not found [" + str(filepath) + "]")

    return pd.read_csv(filepath)
class ColOrderSchedulerBackup(Enum):
    """
    Order scheduler backup dataframe column name.
    """
    REGISTER_ID = "register_id"

    @classmethod
    def to_list(cls):
        return [m.value for m in cls]


class ColOrderTicketsBackup(Enum):
    """
    Order tickets backup dataframe column name.
    """
    REGISTER_ID = "register_id"
    ORDER_ID = "order_id"
    TRADE_ID = "trade_id"
    INST_ID = "inst_id"
    ORDER_TYPE = "order_type"
    UNITS = "units"
    ENTRY_PRICE = "entry_price"
    ENTRY_EXP_TIME = "entry_exp_time"
    TAKE_PROFIT_PRICE = "take_profit_price"
    STOP_LOSS_PRICE = "stop_loss_price"
    EXIT_EXP_TIME = "exit_exp_time"
    API_INST_ID = "api_inst_id"
    API_ORDER_TYPE = "api_order_type"

    @classmethod
    def to_list(cls):
        return [m.value for m in cls]


class OrderTicket():

    ・・・
    def generate_buckup_record(self) -> list:
        bk_rec_list = [
            self._register_id,
            self._order_id,
            self._trade_id,
            self._inst_id,
            self._order_type,
            self._units,
            self._entry_price,
            self._entry_exp_time,
            self._take_profit_price,
            self._stop_loss_price,
            self._exit_exp_time,
            self._api_inst_id,
            self._api_order_type,
        ]
        return bk_rec_list

    def restore(self, rec: pd.Series):
        Col = ColOrderTicketsBackup

        entry_exp_time = rec[Col.ENTRY_EXP_TIME.value]
        if entry_exp_time is not None:
            entry_exp_time = dt.datetime.strptime(entry_exp_time,
                                                  FMT_YMDHMS)
        exit_exp_time = rec[Col.EXIT_EXP_TIME.value]
        if exit_exp_time is not None:
            exit_exp_time = dt.datetime.strptime(exit_exp_time,
                                                 FMT_YMDHMS)
        self._register_id = rec[Col.REGISTER_ID.value]
        self._order_id = rec[Col.ORDER_ID.value]
        self._trade_id = rec[Col.TRADE_ID.value]
        self._inst_id = rec[Col.INST_ID.value]
        self._order_type = rec[Col.ORDER_TYPE.value]
        self._units = rec[Col.UNITS.value]
        self._entry_price = rec[Col.ENTRY_PRICE.value]
        self._entry_exp_time = entry_exp_time
        self._take_profit_price = rec[Col.TAKE_PROFIT_PRICE.value]
        self._stop_loss_price = rec[Col.STOP_LOSS_PRICE.value]
        self._exit_exp_time = exit_exp_time
        self._api_inst_id = rec[Col.API_INST_ID.value]
        self._api_order_type = rec[Col.API_ORDER_TYPE.value]

        self.logger.info("<<<<<<<<<< Restore:register_id[{}] >>>>>>>>>>"
                         .format(self._register_id))
        self.logger.debug("  - inst_id:[{}]".format(self._inst_id))
        self.logger.debug("  - order_type:[{}]".format(self._order_type))
        self.logger.debug("  - units:[{}]".format(self._units))
        self.logger.debug("  - entry_price:[{}]".format(self._entry_price))
        self.logger.debug("  - entry_exp_time:[{}]".format(self._entry_exp_time))
        self.logger.debug("  - take_profit_price:[{}]".format(self._take_profit_price))
        self.logger.debug("  - stop_loss_price:[{}]".format(self._stop_loss_price))
        self.logger.debug("  - exit_exp_time:[{}]".format(self._exit_exp_time))
        self.logger.debug("  - api_inst_id:[{}]".format(self._api_inst_id))
        self.logger.debug("  - api_order_type:[{}]".format(self._api_order_type))

        if self._trade_id is None:
            self.to_EntryChecking()
        else:
            self.to_ExitChecking()


class OrderScheduler(Node):

    def __init__(self) -> None:

       ・・・
        # --------------- Define constant value ---------------
        buckup_dir = os.path.expanduser("~") + BUCKUP_DIR
        filename_os = "bak_order_scheduler.csv"
        filename_ot = "bak_order_tickets.csv"
        self._BUCKUP_FULLPATH_OS = buckup_dir + filename_os
        self._BUCKUP_FULLPATH_OT = buckup_dir + filename_ot

        ・・・
        # --------------- Restore ---------------
        # ----- Order scheduler -----
        try:
            df = bk.load_df_csv(self._BUCKUP_FULLPATH_OS)
        except FileNotFoundError:
            pass
        else:
            Col = ColOrderSchedulerBackup
            sr = df.iloc[0]
            self.logger.info("========== Restore order scheduler ==========\n{}"
                             .format(sr))
            OrderTicket.set_register_id(sr[Col.REGISTER_ID.value])

    ・・・
    def finalize(self):
        # ---------- write csv ----------
        # ----- Backup order scheduler -----
        register_id = OrderTicket.get_register_id()
        bk_tbl = [register_id]
        df = pd.DataFrame(bk_tbl, columns=ColOrderSchedulerBackup.to_list())
        bk.save_df_csv(self._BUCKUP_FULLPATH_OS, df, index=False, date_format=FMT_YMDHMS)

        # ----- Backup order tickets -----
        bk_tbl = []
        for ticket in self._tickets:
            rec = ticket.generate_buckup_record()
            bk_tbl.append(rec)
        df = pd.DataFrame(bk_tbl, columns=ColOrderTicketsBackup.to_list())
        bk.save_df_csv(self._BUCKUP_FULLPATH_OT, df, index=False, date_format=FMT_YMDHMS)


def main(args=None):

    rclpy.init(args=args)
    os = OrderScheduler()

    try:
        while rclpy.ok():
            rclpy.spin_once(os, timeout_sec=1.0)
            os.do_cyclic_event()
    except KeyboardInterrupt:
        pass

    os.finalize()
    os.destroy_node()
    rclpy.shutdown()

週末手仕舞い処理の追加

週末手仕舞い処理も簡単に実装可能です。

週末なので日本時間の土曜の規定時間を過ぎたら保有しているポジションを全決済するコードを追加するだけです。

また、全決済後に新規約定することがないように新規約定禁止処理も追加する必要があります。

一応、本機能は有効/無効を切り替えられるように、ROSパラメータ化しておきました。

Pythonのコードで記述すると下記のようになります。
(修正箇所のみを抜粋しています。)

class OrderTicket():

    ・・・
    def enable_weekend_close(self) -> None:
        self._enable_weekend_close = True
        self._enable_trade_close = True
        self.logger.debug("----- << Enable weekend close >> -----")

    def _on_do_EntryWaiting(self) -> None:
        ・・・
        elif self._enable_weekend_close:
            self._trans_from_EntryWaiting_to_EntryCanceling()
        ・・・

class OrderScheduler(Node):

    def __init__(self) -> None:

        # --------------- Declare ROS parameter ---------------
        self._rosprm = _RosParams()
        self.declare_parameter(self._rosprm.ENABLE_WEEKEND_ORDER_STOP.name)
        self.declare_parameter(self._rosprm.WEEKEND_ORDER_STOP_TIME.name)
        self.declare_parameter(self._rosprm.ENABLE_WEEKEND_ALL_CLOSE.name)
        self.declare_parameter(self._rosprm.WEEKEND_ALL_CLOSE_TIME.name)

        para = self.get_parameter(self._rosprm.ENABLE_WEEKEND_ORDER_STOP.name)
        self._rosprm.ENABLE_WEEKEND_ORDER_STOP.value = para.value
        para = self.get_parameter(self._rosprm.WEEKEND_ORDER_STOP_TIME.name)
        self._rosprm.WEEKEND_ORDER_STOP_TIME.value = para.value
        datetime_ = dt.datetime.strptime(para.value, FMT_TIME_HMS)
        self._rosprm.WEEKEND_ORDER_STOP_TIME.time = datetime_.time()
        para = self.get_parameter(self._rosprm.ENABLE_WEEKEND_ALL_CLOSE.name)
        self._rosprm.ENABLE_WEEKEND_ALL_CLOSE.value = para.value
        para = self.get_parameter(self._rosprm.WEEKEND_ALL_CLOSE_TIME.name)
        self._rosprm.WEEKEND_ALL_CLOSE_TIME.value = para.value
        datetime_ = dt.datetime.strptime(para.value, FMT_TIME_HMS)
        self._rosprm.WEEKEND_ALL_CLOSE_TIME.time = datetime_.time()

        self.logger.debug("[Param]")
        self.logger.debug("  - enable_weekend_order_stop:[{}]"
                          .format(self._rosprm.ENABLE_WEEKEND_ORDER_STOP.value))
        self.logger.debug("  - weekend_order_stop_time:[{}]"
                          .format(self._rosprm.WEEKEND_ORDER_STOP_TIME.time))
        self.logger.debug("  - enable_weekend_all_close:[{}]"
                          .format(self._rosprm.ENABLE_WEEKEND_ALL_CLOSE.value))
        self.logger.debug("  - weekend_all_close_time:[{}]"
                          .format(self._rosprm.WEEKEND_ALL_CLOSE_TIME.time))

    def do_cyclic_event(self) -> None:
        ・・・
        # Check weekend close proccess
        if self._rosprm.ENABLE_WEEKEND_ALL_CLOSE.value:
            now = dt.datetime.now()
            if ((now.weekday() == WeekDay.SAT.value)
                    and (now.time() > self._rosprm.WEEKEND_ALL_CLOSE_TIME.time)):
                for ticket in self._tickets:
                    ticket.enable_weekend_close()

    def _on_order_register(self,
                           req: SrvTypeRequest,
                           rsp: SrvTypeResponse
                           ) -> SrvTypeResponse:
        ・・・
        if (self._rosprm.ENABLE_WEEKEND_ORDER_STOP.value
            and (dbg_tm_start.weekday() == WeekDay.SAT.value)
                and (dbg_tm_start.time() > self._rosprm.WEEKEND_ORDER_STOP_TIME.time)):
            self.logger.warn("{:!^50}".format(" Reject order create "))
            self.logger.warn("  - Weekend order stop time has passed ")
            self.logger.debug("{:=^50}".format(" Service[order_register]:End "))
            return rsp

        ・・・

さいごに

今回はアプリケーションの追加ではなく、バックアップ/リストア、週末手仕舞い処理を追加することで使い勝手が向上しました。

これで次からプログラムを一時中断しても、中断前の状態へリストアして再開することができるようになりました。

週末手仕舞いも自動でできるようになったため、翌週へのポジションを持ち越すことがなくなります。

さて、これで不便に感じていたところも無事に改善しましたので、またアプリケーションの追加をしていこうと思います!

※今回修正したソースコードはGutHubで公開しています。

スポンサーリンク

コメント

タイトルとURLをコピーしました