# 策略实例
在下面部分我们列举了一些常用的算法范例,您也可以通过右上角的 clone 按钮很方便的把范例算法复制到自己的算法列表中进行调试、更改甚至模拟、真实交易。
# 第一个策略-买入&持有
万事开头难,这是一个最简单的策略:在回测开始的第一天买入资金量的 100%的平安银行并且一直持有。可以通过右上角的 clone 按钮很方便的把范例算法复制到自己的算法列表中, 您可以更改一下股票代码以及回测时间、金钱等就可以很快的知道某个时期您测试某个股票的表现了,各种风险数值也可以很快地计算出来。
# 可以自己import我们平台支持的第三方python模块,比如pandas、numpy等。
# 在这个方法中编写任何的初始化逻辑。context对象将会在你的算法策略的任何方法之间做传递。
def init(context):
context.s1 = "000001.XSHE"
# 是否已发送了order
context.fired = False
# 你选择的证券的数据更新将会触发此段逻辑,例如日或分钟历史数据切片或者是实时数据切片更新
def handle_bar(context, bar_dict):
# 开始编写你的主要的算法逻辑
# bar_dict[order_book_id] 可以拿到某个证券的bar信息
# context.portfolio 可以拿到现在的投资组合状态信息
# 使用order_shares(id_or_ins, amount)方法进行落单
# TODO: 开始编写你的算法吧!
if not context.fired:
# order_percent并且传入1代表买入该股票并且使其占有投资组合的100%
order_percent(context.s1, 1)
context.fired = True
# Golden Cross 算法示例
以下是一个我们使用 TALib 在我们的平台上编写的 golden cross 算法的示例,使用了 simple moving average 方法:
# 可以自己import我们平台支持的第三方python模块,比如pandas、numpy等。
import talib
# 在这个方法中编写任何的初始化逻辑。context对象将会在你的算法策略的任何方法之间做传递。
def init(context):
context.s1 = "000001.XSHE"
# 设置这个策略当中会用到的参数,在策略中可以随时调用,这个策略使用长短均线,我们在这里设定长线和短线的区间,在调试寻找最佳区间的时候只需要在这里进行数值改动
context.SHORTPERIOD = 20
context.LONGPERIOD = 120
# 你选择的证券的数据更新将会触发此段逻辑,例如日或分钟历史数据切片或者是实时数据切片更新
def handle_bar(context, bar_dict):
# 开始编写你的主要的算法逻辑
# bar_dict[order_book_id] 可以拿到某个证券的bar信息
# context.portfolio 可以拿到现在的投资组合状态信息
# 使用order_shares(id_or_ins, amount)方法进行落单
# TODO: 开始编写你的算法吧!
# 因为策略需要用到均线,所以需要读取历史数据
prices = history_bars(context.s1,context.LONGPERIOD+1, '1d', 'close')
# 使用talib计算长短两根均线,均线以array的格式表达
short_avg = talib.SMA(prices, context.SHORTPERIOD)
long_avg = talib.SMA(prices, context.LONGPERIOD)
plot("short avg", short_avg[-1])
plot("long avg", long_avg[-1])
# 计算现在portfolio中股票的仓位
cur_position = context.portfolio.positions[context.s1].quantity
# 计算现在portfolio中的现金可以购买多少股票
shares = context.portfolio.cash/bar_dict[context.s1].close
# 如果短均线从上往下跌破长均线,也就是在目前的bar短线平均值低于长线平均值,而上一个bar的短线平均值高于长线平均值
if short_avg[-1] - long_avg[-1] < 0 and short_avg[-2] - long_avg[-2] > 0 and cur_position > 0:
# 进行清仓
order_target_value(context.s1, 0)
# 如果短均线从下往上突破长均线,为入场信号
if short_avg[-1] - long_avg[-1] > 0 and short_avg[-2] - long_avg[-2] < 0:
# 满仓入股
order_shares(context.s1, shares)
# 单股票 MACD 算法示例
以下是一个我们使用 TALib 在我们的平台上编写的单股票 MACD 算法示例,使用了 TALib 的 MACD 方法:
# 可以自己import我们平台支持的第三方python模块,比如pandas、numpy等。
import talib
# 在这个方法中编写任何的初始化逻辑。context对象将会在你的算法策略的任何方法之间做传递。
def init(context):
context.s1 = "000001.XSHE"
# 使用MACD需要设置长短均线和macd平均线的参数
context.SHORTPERIOD = 12
context.LONGPERIOD = 26
context.SMOOTHPERIOD = 9
context.OBSERVATION = 100
# 你选择的证券的数据更新将会触发此段逻辑,例如日或分钟历史数据切片或者是实时数据切片更新
def handle_bar(context, bar_dict):
# 开始编写你的主要的算法逻辑
# bar_dict[order_book_id] 可以拿到某个证券的bar信息
# context.portfolio 可以拿到现在的投资组合状态信息
# 使用order_shares(id_or_ins, amount)方法进行落单
# TODO: 开始编写你的算法吧!
# 读取历史数据,使用sma方式计算均线准确度和数据长度无关,但是在使用ema方式计算均线时建议将历史数据窗口适当放大,结果会更加准确
prices = history_bars(context.s1,context.OBSERVATION,'1d','close')
# 用Talib计算MACD取值,得到三个时间序列数组,分别为macd, signal 和 hist
macd, signal, hist = talib.MACD(prices, context.SHORTPERIOD,
context.LONGPERIOD, context.SMOOTHPERIOD)
plot("macd", macd[-1])
plot("macd signal", signal[-1])
# macd 是长短均线的差值,signal是macd的均线,使用macd策略有几种不同的方法,我们这里采用macd线突破signal线的判断方法
# 如果macd从上往下跌破macd_signal
if macd[-1] - signal[-1] < 0 and macd[-2] - signal[-2] > 0:
# 计算现在portfolio中股票的仓位
curPosition = context.portfolio.positions[context.s1].quantity
#进行清仓
if curPosition > 0:
order_target_value(context.s1, 0)
# 如果短均线从下往上突破长均线,为入场信号
if macd[-1] - signal[-1] > 0 and macd[-2] - signal[-2] < 0:
# 满仓入股
order_target_percent(context.s1, 1)
# 多股票 RSI 算法示例
以下是一个我们使用 TALib 在我们的平台上编写的多股票 RSI 算法示例,使用了 TALib 的 RSI 方法:
# 可以自己import我们平台支持的第三方python模块,比如pandas、numpy等。
import talib
# 在这个方法中编写任何的初始化逻辑。context对象将会在你的算法策略的任何方法之间做传递。
def init(context):
# 选择我们感兴趣的股票
context.s1 = "000001.XSHE"
context.s2 = "601988.XSHG"
context.s3 = "000068.XSHE"
context.stocks = [context.s1, context.s2, context.s3]
context.TIME_PERIOD = 14
context.HIGH_RSI = 85
context.LOW_RSI = 30
context.ORDER_PERCENT = 0.3
# 你选择的证券的数据更新将会触发此段逻辑,例如日或分钟历史数据切片或者是实时数据切片更新
def handle_bar(context, bar_dict):
# 开始编写你的主要的算法逻辑
# bar_dict[order_book_id] 可以拿到某个证券的bar信息
# context.portfolio 可以拿到现在的投资组合状态信息
# 使用order_shares(id_or_ins, amount)方法进行落单
# TODO: 开始编写你的算法吧!
# 对我们选中的股票集合进行loop,运算每一只股票的RSI数值
for stock in context.stocks:
# 读取历史数据
prices = history_bars(stock,context.TIME_PERIOD+1, '1d', 'close')
# 用Talib计算RSI值
rsi_data = talib.RSI(prices, timeperiod=context.TIME_PERIOD)[-1]
cur_position = context.portfolio.positions[stock].quantity
# 用剩余现金的30%来购买新的股票
target_available_cash = context.portfolio.cash * context.ORDER_PERCENT
# 当RSI大于设置的上限阀值,清仓该股票
if rsi_data > context.HIGH_RSI and cur_position > 0:
order_target_value(stock, 0)
# 当RSI小于设置的下限阀值,用剩余cash的一定比例补仓该股
if rsi_data < context.LOW_RSI:
logger.info("target available cash caled: " + str(target_available_cash))
# 如果剩余的现金不够一手 - 100shares,那么会被ricequant 的order management system reject掉
order_value(stock, target_available_cash)
# 财务数据策略
在回测开始前,通过查询回测开始当天的财务数据,获得市盈率大于 55 且小于 60,营业总收入前 10 的股票,然后将所有资金平摊到这 10 个股票 Buy & Hold 的策略。您可以 clone 之后自行修改,通过查询回测开始当天的其它财务数据指标来筛选股票。但是注意需要调整资金量到¥ 300,000 以上才会有比较好的落单效果(否则资金量不足以满足买入如此多股票组成的一篮子投资组合)
# 可以自己import我们平台支持的第三方python模块,比如pandas、numpy等。
# 在这个方法中编写任何的初始化逻辑。context对象将会在你的算法策略的任何方法之间做传递。
def init(context):
# 查询revenue前十名的公司的股票并且他们的pe_ratio在55和60之间。打fundamentals的时候会有auto-complete方便写查询代码。
stocks = all_instruments('CS').order_book_id
fundamental_df=get_factor(stocks,['pe_ratio','revenue'])
fundamental_df = fundamental_df[(fundamental_df['pe_ratio'] > 55) & (fundamental_df['pe_ratio'] < 60)]
fundamental_df = fundamental_df.sort_values(by='revenue', ascending=False).head(10).reset_index(1,drop=True)
# 将查询结果dataframe的fundamental_df存放在context里面以备后面只需:
context.fundamental_df = fundamental_df
# 实时打印日志看下查询结果,会有我们精心处理的数据表格显示:
logger.info(context.fundamental_df)
update_universe(context.fundamental_df.index.values)
# 对于每一个股票按照平均现金买入:
context.stocks = context.fundamental_df.index.values
stocks_number = len(context.stocks)
context.average_percent = 0.99 / stocks_number
logger.info("Calculated average percent for each stock is: %f" % context.average_percent)
context.fired = False
# 你选择的证券的数据更新将会触发此段逻辑,例如日或分钟历史数据切片或者是实时数据切片更新
def handle_bar(context, bar_dict):
# 开始编写你的主要的算法逻辑
# bar_dict[order_book_id] 可以拿到某个证券的bar信息
# context.portfolio 可以拿到现在的投资组合状态信息
# 使用order_shares(id_or_ins, amount)方法进行落单
# TODO: 开始编写你的算法吧!
# 对于选择出来的股票按照平均比例买入:
if not context.fired:
for stock in context.stocks:
order_target_percent(stock, context.average_percent)
logger.info("Bought: " + str(context.average_percent) + " % for stock: " + str(stock))
context.fired = True
# 根据收益和市盈率定期调仓策略
这是一个利用财务数据定期选股并且调仓的策略:选取市盈率在 53 到 67 之间,盈利前 10 的股票,然后按照等比例分配资金买入一个 10 个股票的投资组合:
# 可以自己import我们平台支持的第三方python模块,比如pandas、numpy等。
def query_fundamental(context, bar_dict):
# 查询revenue前十名的公司的股票并且他们的pe_ratio在53和67之间。
stocks = all_instruments('CS').order_book_id
fundamental_df = get_factor(stocks, ['pe_ratio', 'revenue'])
fundamental_df = fundamental_df[(fundamental_df['pe_ratio'] > 53) & (fundamental_df['pe_ratio'] < 67)]
fundamental_df = fundamental_df.sort_values(by='revenue', ascending=False).head(10).reset_index(1, drop=True)
# 将查询结果dataframe的fundamental_df存放在context里面以备后面只需:
context.fundamental_df = fundamental_df
# 实时打印日志看下查询结果,会有我们精心处理的数据表格显示:
logger.info(context.fundamental_df)
# 对于每一个股票按照平均现金买入:
context.stocks = context.fundamental_df.index.values
update_universe(context.stocks)
stocksNumber = len(context.stocks)
context.average_percent = 0.99 / stocksNumber
logger.info("Calculated average percent for each stock is: %f" % context.average_percent)
# 先查一下选出来的股票是否在已有的portfolio里面:
# 这样做并不是最好的,只是代码比较简单
# 先清仓然后再买入这一个月新的符合条件的股票
logger.info("Clearing all the current positions.")
for holding_stock in context.portfolio.positions.keys():
if context.portfolio.positions[holding_stock].quantity != 0:
order_target_percent(holding_stock, 0)
logger.info("Building new positions for portfolio.")
for stock in context.stocks:
order_target_percent(stock, context.average_percent)
logger.info("Buying: " + str(context.average_percent) + " % for stock: " + str(stock))
# 在这个方法中编写任何的初始化逻辑。context对象将会在你的算法策略的任何方法之间做传递。
def init(context):
scheduler.run_monthly(query_fundamental, monthday=1)
# 你选择的证券的数据更新将会触发此段逻辑,例如日或分钟历史数据切片或者是实时数据切片更新
def handle_bar(context, bar_dict):
# 开始编写你的主要的算法逻辑
# bar_dict[order_book_id] 可以拿到某个证券的bar信息
# context.portfolio 可以拿到现在的投资组合状态信息
# 使用order_shares(id_or_ins, amount)方法进行落单
# TODO: 开始编写你的算法吧!
pass
# 海龟交易系统
海龟交易系统也是非常经典的一种策略,我们也放出了范例代码如下。
#导入必须的模块
import numpy as np
import talib
import math
def get_extreme(array_high_price_result, array_low_price_result):
#抛开最新价格的价格序列
np_array_high_price_result = np.array(array_high_price_result[:-1])
np_array_low_price_result = np.array(array_low_price_result[:-1])
#序列最大值
max_result = np_array_high_price_result.max()
#最小值
min_result = np_array_low_price_result.min()
#返回一个两个元素的list
return [max_result, min_result]
#拿到真实震幅与头寸
def get_atr_and_unit( atr_array_result, atr_length_result, total_value_result):
#atr为真实平均振幅的最新值
atr = atr_array_result[ atr_length_result-1]
#头寸
unit = math.floor(total_value_result * .01 / atr)
return [atr, unit]
#得到止损价格
def get_stop_price(first_open_price_result, units_hold_result, atr_result):
stop_price = first_open_price_result - 2 * atr_result \
+ (units_hold_result - 1) * 0.5 * atr_result
return stop_price
def init(context):
context.trade_day_num = 0
context.unit = 0
context.atr = 0
context.trading_signal = 'start'
context.pre_trading_signal = ''
context.units_hold_max = 4
context.units_hold = 0
context.quantity = 0
context.max_add = 0
context.first_open_price = 0
context.s = '000300.XSHG'
context.open_observe_time = 55
context.close_observe_time = 20
context.atr_time = 20
def handle_bar(context, bar_dict):
#当前合约的价值
total_value = context.portfolio.total_value
#context.open_observe_time+1个bar的每日最高价
high_price = history_bars(context.s,context.open_observe_time+1, '1d', 'high')
low_price_for_atr = history_bars(context.s,context.open_observe_time+1, '1d', 'low')
low_price_for_extreme = history_bars(context.s,context.close_observe_time+1, '1d', 'low')
close_price = history_bars(context.s,context.open_observe_time+2, '1d', 'close')
close_price_for_atr = close_price[:-1]
#talib.ATR平均真实振幅
atr_array = talib.ATR(high_price, low_price_for_atr, close_price_for_atr, timeperiod=context.atr_time)
#得到max_result
maxx = get_extreme(high_price, low_price_for_extreme)[0]
#得到min_result
minn = get_extreme(high_price, low_price_for_extreme)[1]
#前两日的平均真实振幅
atr = atr_array[-2]
if context.trading_signal != 'start':
if context.units_hold != 0:
context.max_add += 0.5 * get_atr_and_unit(atr_array, atr_array.size, total_value)[0]
else:
context.max_add = bar_dict[context.s].last
#当前context.s持仓股数
cur_position = context.portfolio.positions[context.s].quantity
#当前仓位的现金
available_cash = context.portfolio.cash
#当前仓位的市场价值
market_value = context.portfolio.market_value
#当前仓位大于0并且当前价格小于止损价,产生信号"stop"
if (cur_position > 0 and
bar_dict[context.s].last < get_stop_price(context.first_open_price, context.units_hold, atr)):
context.trading_signal = 'stop'
else:
#仓位大于0,并且当前价格小于观察时期的low的最小值
if cur_position > 0 and bar_dict[context.s].last < minn:
context.trading_signal = 'exit'
else:
#仓位大于0但小于策略设计仓位,并且当前价格大于观察时期的high的最大值,产生追加持仓的信号
if (bar_dict[context.s].last > context.max_add and context.units_hold != 0 and
context.units_hold < context.units_hold_max and
available_cash > bar_dict[context.s].last*context.unit):
context.trading_signal = 'entry_add'
else:
#价格大于maxx并且持仓为0,产生信号"entry"
if bar_dict[context.s].last > maxx and context.units_hold == 0:
context.max_add = bar_dict[context.s].last
context.trading_signal = 'entry'
atr = get_atr_and_unit(atr_array, atr_array.size, total_value)[0]
if context.trade_day_num % 5 == 0:
#每5个bar计算一次context.unit寸头
context.unit = get_atr_and_unit(atr_array, atr_array.size, total_value)[1]
context.trade_day_num += 1
context.quantity = context.unit
#不同信号下的操作
if (context.trading_signal != context.pre_trading_signal or
(context.units_hold < context.units_hold_max and context.units_hold > 1) or
context.trading_signal == 'stop'):
if context.trading_signal == 'entry':
context.quantity = context.unit
if available_cash > bar_dict[context.s].last*context.quantity:
order_shares(context.s, context.quantity)
context.first_open_price = bar_dict[context.s].last
context.units_hold = 1
if context.trading_signal == 'entry_add':
context.quantity = context.unit
order_shares(context.s, context.quantity)
context.units_hold += 1
if context.trading_signal == 'stop':
if context.units_hold > 0:
order_shares(context.s, -context.quantity)
context.units_hold -= 1
if context.trading_signal == 'exit':
if cur_position > 0:
order_shares(context.s, -cur_position)
context.units_hold = 0
context.pre_trading_signal = context.trading_signal
# 股指期货 MACD 日回测
以下是一个使用 TALib 进行股指期货主力合约日级别回测 MACD 算法示例:
# 可以自己import我们平台支持的第三方python模块,比如pandas、numpy等
import talib
# 在这个方法中编写任何的初始化逻辑。context对象将会在你的算法策略的任何方法之间做传递
def init(context):
# context内引入全局变量s1,存储目标合约信息
context.s1 = 'IF1606'
# 使用MACD需要设置长短均线和macd平均线的参数
context.SHORTPERIOD = 12
context.LONGPERIOD = 26
context.SMOOTHPERIOD = 9
context.OBSERVATION = 50
#初始化时订阅合约行情。订阅之后的合约行情会在handle_bar中进行更新
subscribe(context.s1)
# 你选择的期货数据更新将会触发此段逻辑,例如日线或分钟线更新
def handle_bar(context, bar_dict):
# 开始编写你的主要的算法逻辑
# 获取历史收盘价序列,history_bars函数直接返回ndarray,方便之后的有关指标计算
prices = history_bars(context.s1, context.OBSERVATION, '1d', 'close')
# 用Talib计算MACD取值,得到三个时间序列数组,分别为macd,signal 和 hist
macd, signal, hist = talib.MACD(prices, context.SHORTPERIOD,
context.LONGPERIOD, context.SMOOTHPERIOD)
# macd 是长短均线的差值,signal是macd的均线,如果短均线从下往上突破长均线,为入场信号,进行买入开仓操作
if macd[-1] - signal[-1] > 0 and macd[-2] - signal[-2] < 0:
sell_qty = context.portfolio.positions[context.s1].sell_quantity
# 先判断当前卖方仓位,如果有,则进行平仓操作
if sell_qty > 0:
buy_close(context.s1, 1)
# 买入开仓
buy_open(context.s1, 1)
if macd[-1] - signal[-1] < 0 and macd[-2] - signal[-2] > 0:
buy_qty = context.portfolio.positions[context.s1].buy_quantity
# 先判断当前买方仓位,如果有,则进行平仓操作
if buy_qty > 0:
sell_close(context.s1, 1)
# 卖出开仓
sell_open(context.s1, 1)
# 商品期货跨品种配对交易
该策略为分钟级别回测。运用了简单的移动平均以及布林带(Bollinger Bands)作为交易信号产生源。有关对冲比率(HedgeRatio)的确定,您可以在我们的研究平台上面通过import statsmodels.api as sm
引入statsmodels中的 OLS 方法进行线性回归估计。具体估计窗口,您可以根据自己策略需要自行选择。
策略中的移动窗口选择为 60 分钟,即在每天开盘 60 分钟内不做任何交易,积累数据计算移动平均值。当然,这一移动窗口也可以根据自身需要进行灵活选择。下面例子中使用了黄金与白银两种商品期货进行配对交易。简单起见,例子中期货的价格并未做对数差处理。
# 可以自己import我们平台支持的第三方python模块,比如pandas、numpy等。
import numpy as np
# 在这个方法中编写任何的初始化逻辑。context对象将会在你的算法策略的任何方法之间做传递。
def init(context):
context.s1 = 'AG1612'
context.s2 = 'AU1612'
# 设置全局计数器
context.counter = 0
# 设置滚动窗口
context.window = 60
# 设置对冲手数,通过研究历史数据进行价格序列回归得到该值
context.ratio = 15
context.up_cross_up_limit = False
context.down_cross_down_limit = False
# 设置入场临界值
context.entry_score = 2
# 初始化时订阅合约行情。订阅之后的合约行情会在handle_bar中进行更新
subscribe([context.s1, context.s2])
# before_trading此函数会在每天交易开始前被调用,当天只会被调用一次
def before_trading(context):
# 样例商品期货在回测区间内有夜盘交易,所以在每日开盘前将计数器清零
context.counter = 0
# 你选择的期货数据更新将会触发此段逻辑,例如日线或分钟线更新
def handle_bar(context, bar_dict):
# 获取当前一对合约的仓位情况。如尚未有仓位,则对应持仓量都为0
position_a = context.portfolio.positions[context.s1]
position_b = context.portfolio.positions[context.s2]
context.counter += 1
# 当累积满一定数量的bar数据时候,进行交易逻辑的判断
if context.counter > context.window:
# 获取当天历史分钟线价格队列
price_array_a = history_bars(context.s1, context.window, '1m', 'close')
price_array_b = history_bars(context.s2, context.window, '1m', 'close')
# 计算价差序列、其标准差、均值、上限、下限
spread_array = price_array_a - context.ratio * price_array_b
std = np.std(spread_array)
mean = np.mean(spread_array)
up_limit = mean + context.entry_score * std
down_limit = mean - context.entry_score * std
# 获取当前bar对应合约的收盘价格并计算价差
price_a = bar_dict[context.s1].close
price_b = bar_dict[context.s2].close
spread = price_a - context.ratio * price_b
# 如果价差低于预先计算得到的下限,则为建仓信号,'买入'价差合约
if spread <= down_limit and not context.down_cross_down_limit:
# 可以通过logger打印日志
logger.info('spread: {}, mean: {}, down_limit: {}'.format(spread, mean, down_limit))
logger.info('创建买入价差中...')
# 获取当前剩余的应建仓的数量
qty_a = 1 - position_a.buy_quantity
qty_b = context.ratio - position_b.sell_quantity
# 由于存在成交不超过下一bar成交量25%的限制,所以可能要通过多次发单成交才能够成功建仓
if qty_a > 0:
buy_open(context.s1, qty_a)
if qty_b > 0:
sell_open(context.s2, qty_b)
if qty_a == 0 and qty_b == 0:
# 已成功建立价差的'多仓'
context.down_cross_down_limit = True
logger.info('买入价差仓位创建成功!')
# 如果价差向上回归移动平均线,则为平仓信号
if spread >= mean and context.down_cross_down_limit:
logger.info('spread: {}, mean: {}, down_limit: {}'.format(spread, mean, down_limit))
logger.info('对买入价差仓位进行平仓操作中...')
# 由于存在成交不超过下一bar成交量25%的限制,所以可能要通过多次发单成交才能够成功建仓
qty_a = position_a.buy_quantity
qty_b = position_b.sell_quantity
if qty_a > 0:
sell_close(context.s1, qty_a)
if qty_b > 0:
buy_close(context.s2, qty_b)
if qty_a == 0 and qty_b == 0:
context.down_cross_down_limit = False
logger.info('买入价差仓位平仓成功!')
# 如果价差高于预先计算得到的上限,则为建仓信号,'卖出'价差合约
if spread >= up_limit and not context.up_cross_up_limit:
logger.info('spread: {}, mean: {}, up_limit: {}'.format(spread, mean, up_limit))
logger.info('创建卖出价差中...')
qty_a = 1 - position_a.sell_quantity
qty_b = context.ratio - position_b.buy_quantity
if qty_a > 0:
sell_open(context.s1, qty_a)
if qty_b > 0:
buy_open(context.s2, qty_b)
if qty_a == 0 and qty_b == 0:
context.up_cross_up_limit = True
logger.info('卖出价差仓位创建成功')
# 如果价差向下回归移动平均线,则为平仓信号
if spread < mean and context.up_cross_up_limit:
logger.info('spread: {}, mean: {}, up_limit: {}'.format(spread, mean, up_limit))
logger.info('对卖出价差仓位进行平仓操作中...')
qty_a = position_a.sell_quantity
qty_b = position_b.buy_quantity
if qty_a > 0:
buy_close(context.s1, qty_a)
if qty_b > 0:
sell_close(context.s2, qty_b)
if qty_a == 0 and qty_b == 0:
context.up_cross_up_limit = False
logger.info('卖出价差仓位平仓成功!')
# Alpha 策略----多因子对冲
多因子模型是应用最广泛的一种选股模型,基本原理是采用一系列的因子作为选股标准,满足这些因子的股票则被买入,不满足的则卖出。各种多因子模型核心的区别第一是在因子的选取上,第二是在如何用多因子综合得到一个最终的判断。股票端我们按照多因子的模型买入,期货端则一手空单对冲风险。
# 可以自己import我们平台支持的第三方python模块,比如pandas、numpy等。
import numpy as np
import math
# 在这个方法中编写任何的初始化逻辑。context对象将会在你的算法策略的任何方法之间做传递。
def init(context):
#选取板块
context.stks=[]
context.stks.append(sector("consumer discretionary"))
context.stks.append(sector("consumer staples"))
context.stks.append(sector("health care"))
context.stks.append(sector("telecommunication services"))
context.stks.append(sector("utilities"))
context.stks.append(sector("materials"))
context.flag=True
# 确定运行频率
scheduler.run_daily(rebalance)
# 手动添加银行板块
context.stocks = ['000001.XSHE','002142.XSHE','600000.XSHG','600015.XSHG','600016.XSHG','600036.XSHG','601009.XSHG','601166.XSHG','601169.XSHG','601288.XSHG','601328.XSHG','601398.XSHG','601818.XSHG','601939.XSHG','601988.XSHG','601998.XSHG']
# 你选择的证券的数据更新将会触发此段逻辑,例如日或分钟历史数据切片或者是实时数据切片更新
def get_stocks(context, bar_dict):
stocks=set([])
for i in range(0,6):
# 在这个循环里,首先获取每个板块的财务数据
fundamental_df=get_factor(context.stks[i],['adjusted_return_on_equity_weighted_average','pe_ratio','pb_ratio']).reset_index(1,drop=True)
fundamental_df=fundamental_df.loc[context.stks[i]]
fundamental_df=fundamental_df[fundamental_df.pe_ratio<999]
# 使用pandas对财务数据进行排名并打分
fundamental_df=fundamental_df.sort_values('pb_ratio')
fundamental_df['pb_score']=list(range(1,len(fundamental_df)+1))
fundamental_df=fundamental_df.sort_values('pe_ratio')
fundamental_df['pe_score']=list(range(1,len(fundamental_df)+1))
scores=[]
for stock in fundamental_df.T.columns.values:
scores.append(fundamental_df.loc[stock,'pe_score']+fundamental_df.loc[stock,'pb_score'])
fundamental_df['scores']=list(scores)
fundamental_df=fundamental_df.sort_values('scores')
#取得分最低的三个股票
fundamental_df=fundamental_df.head(3)
#logger.info(df)
stocks = stocks | set(fundamental_df.index.values)
#logger.info(i)
# 银行板块单独按照市净率取市净率最低的两个
fundamental_df=get_factor(context.stocks,['adjusted_return_on_equity_weighted_average','pb_ratio']).reset_index(1,drop=True)
fundamental_df = fundamental_df.loc[context.stocks]
fundamental_df=fundamental_df.sort_values('pb_ratio').head(2)
# 买入的股票,进行调仓操作
stocks =stocks | set(fundamental_df.index.values)
return stocks
def rebalance(context, bar_dict):
stocks = get_stocks(context, bar_dict)
holdings = set(get_holdings(context))
to_buy = stocks - holdings
to_sell = holdings - stocks
to_buy2= stocks - holdings
for stock in to_sell:
if bar_dict[stock].suspended == False:
order_target_percent(stock , 0)
if len(to_buy) == 0:
return
to_buy = get_trading_stocks(to_buy, context, bar_dict)
cash = context.portfolio.cash
total_value=context.portfolio.total_value
if len(to_buy) >0:
average_value = total_value *0.025
if average_value > total_value/len(to_buy):
average_value = total_value/len(to_buy)
for stock in to_buy:
if (bar_dict[stock].suspended == False)and(context.portfolio.cash>average_value):
order_target_value(stock, average_value)
if context.flag==True :
sell_open('IF88', 1)
context.flag=False
# 得到交易的股票
def get_trading_stocks(to_buy, context, bar_dict):
trading_stocks = []
for stock in to_buy:
if bar_dict[stock].suspended == False:
trading_stocks.append(stock)
return trading_stocks
# 持仓的股票
def get_holdings(context):
positions = context.portfolio.stock_account.positions
holdings = []
for position in positions:
if positions[position].quantity > 0:
holdings.append(position)
return holdings
def handle_bar(context, bar_dict):
# TODO: 开始编写你的算法吧!
pass