代码拉取完成,页面将自动刷新
同步操作将从 vn.py官方/vnpy_ctp 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
ctp_gateway 修改函数 支持了timestamp 不同数据类型的情况
def onRtnOrder(self, data: dict) -> None:
"""委托更新推送"""
if not self.contract_inited:
self.order_data.append(data)
return
symbol: str = data["InstrumentID"]
contract: ContractData = symbol_contract_map[symbol]
frontid: int = data["FrontID"]
sessionid: int = data["SessionID"]
order_ref: str = data["OrderRef"]
orderid: str = f"{frontid}_{sessionid}_{order_ref}"
timestamp: str = f"{data['InsertDate']} {data['InsertTime']}"
# print(timestamp)
# dt: datetime = datetime.strptime(timestamp, "%Y%m%d %H:%M:%S")
# dt: datetime = CHINA_TZ.localize(dt)
if len(str(int(timestamp))) == 8:
print(timestamp[:4], timestamp[4:6], timestamp[6:8])
year = int(timestamp[:4])
month = int(timestamp[4:6])
day = int(timestamp[6:8])
now = date(year, month, day)
dt: datetime = now.strftime("%Y%m%d %H:%M:%S")
# dt: datetime = dt.replace(tzinfo=CHINA_TZ)
print(dt)
else:
dt: datetime = datetime.strptime(timestamp, "%Y%m%d %H:%M:%S")
dt: datetime = dt.replace(tzinfo=CHINA_TZ)
tp: tuple = (data["OrderPriceType"], data["TimeCondition"], data["VolumeCondition"])
order: OrderData = OrderData(
symbol=symbol,
exchange=contract.exchange,
orderid=orderid,
type=ORDERTYPE_CTP2VT[tp],
direction=DIRECTION_CTP2VT[data["Direction"]],
offset=OFFSET_CTP2VT[data["CombOffsetFlag"]],
price=data["LimitPrice"],
volume=data["VolumeTotalOriginal"],
traded=data["VolumeTraded"],
status=STATUS_CTP2VT[data["OrderStatus"]],
datetime=dt,
gateway_name=self.gateway_name
)
self.gateway.on_order(order)
self.sysid_orderid_map[data["OrderSysID"]] = orderid
基于CTP期货版的6.6.7接口封装开发,接口中自带的是【穿透式实盘环境】的dll文件。
安装环境推荐基于3.3.0版本以上的【VeighNa Studio】。
直接使用pip命令:
pip install vnpy_ctp
或者下载源代码后,解压后在cmd中运行:
pip install .
使用源代码安装时需要进行C++编译,因此在执行上述命令之前请确保已经安装了【Visual Studio(Windows)】或者【GCC(Linux)】编译器。
以脚本方式启动(script/run.py):
from vnpy.event import EventEngine
from vnpy.trader.engine import MainEngine
from vnpy.trader.ui import MainWindow, create_qapp
from vnpy_ctp import CtpGateway
def main():
"""主入口函数"""
qapp = create_qapp()
event_engine = EventEngine()
main_engine = MainEngine(event_engine)
main_engine.add_gateway(CtpGateway)
main_window = MainWindow(main_engine, event_engine)
main_window.showMaximized()
qapp.exec()
if __name__ == "__main__":
main()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。