Files
strategy-lab/to_explore/pyquantnews/44_ZiplinePipeline.ipynb
David Brazda e3da60c647 daily update
2024-10-21 20:57:56 +02:00

12 KiB

No description has been provided for this image

This code implements a trading algorithm using the Zipline library in Python. It defines a custom momentum factor and a pipeline to rank stocks based on their momentum. The algorithm schedules rebalancing every week and executes trades based on the ranked stocks. The performance of the algorithm is then simulated over a specified time period. This is useful for backtesting trading strategies and analyzing their performance.

In [ ]:
import pandas as pd
In [ ]:
from zipline import run_algorithm
from zipline.pipeline import Pipeline, CustomFactor
from zipline.pipeline.data import USEquityPricing
from zipline.api import (
    attach_pipeline,
    calendars,
    pipeline_output,
    date_rules,
    time_rules,
    set_commission,
    set_slippage,
    record,
    order_target_percent,
    get_open_orders,
    schedule_function
)
In [ ]:
import re
In [ ]:
import warnings
warnings.filterwarnings("ignore")

Define the number of stocks to long and short

In [ ]:
N_LONGS = N_SHORTS = 10

Define a custom factor for momentum calculation

In [ ]:
class Momentum(CustomFactor):
    """Calculate momentum as the ratio of the last to first price.

    This class calculates momentum by dividing the last closing price
    by the first closing price over a window.

    Attributes
    ----------
    inputs : list
        Data inputs for the factor, here the closing prices.
    window_length : int
        Length of the window to compute momentum.
    """

    inputs = [USEquityPricing.close]

    def compute(self, today, assets, out, close):
        """Compute the momentum factor.

        Parameters
        ----------
        today : datetime
            Current date.
        assets : iterable
            List of asset identifiers.
        out : ndarray
            Output array to store computed momentum values.
        close : ndarray
            Array of closing prices over the window length.
        """
        out[:] = close[-1] / close[0]

Construct a pipeline to filter and rank stocks based on momentum

In [ ]:
def make_pipeline():
    """Create a pipeline for ranking stocks based on momentum.

    This function defines a pipeline that filters stocks with positive
    momentum and ranks them.

    Returns
    -------
    Pipeline
        A Zipline Pipeline object.
    """

    twenty_day_momentum = Momentum(window_length=20)
    thirty_day_momentum = Momentum(window_length=30)

    positive_momentum = (
        (twenty_day_momentum > 1) & 
        (thirty_day_momentum > 1)
    )

    return Pipeline(
        columns={
            'longs': thirty_day_momentum.top(N_LONGS),
            'shorts': thirty_day_momentum.top(N_SHORTS),
            'ranking': twenty_day_momentum.rank(ascending=False)
        },
        screen=positive_momentum
    )

Function called before the trading day starts to fetch pipeline output

In [ ]:
def before_trading_start(context, data):
    """Fetch pipeline output before trading starts.

    This function is called before the trading day begins to fetch
    the output of the pipeline.

    Parameters
    ----------
    context : object
        An object to store global variables.
    data : object
        An object to fetch data.
    """
    context.factor_data = pipeline_output("factor_pipeline")

Initialize the algorithm by attaching the pipeline and scheduling rebalancing

In [ ]:
def initialize(context):
    """Initialize the trading algorithm.

    This function sets up the pipeline, schedules the rebalancing
    function, and initializes other settings.

    Parameters
    ----------
    context : object
        An object to store global variables.
    """
    attach_pipeline(make_pipeline(), "factor_pipeline")
    schedule_function(
        rebalance,
        date_rules.week_start(),
        time_rules.market_open(),
        calendar=calendars.US_EQUITIES,
    )

Function to rebalance the portfolio based on pipeline output

In [ ]:
def rebalance(context, data):
    """Rebalance the portfolio based on pipeline output.

    This function rebalances the portfolio by executing trades
    according to the ranked stocks from the pipeline.

    Parameters
    ----------
    context : object
        An object to store global variables.
    data : object
        An object to fetch data.
    """
    factor_data = context.factor_data
    record(factor_data=factor_data.ranking)

    assets = factor_data.index
    record(prices=data.current(assets, 'price'))

    longs = assets[factor_data.longs]
    shorts = assets[factor_data.shorts]
    divest = set(context.portfolio.positions.keys()) - set(longs.union(shorts))

    exec_trades(data, assets=divest, target_percent=0)
    exec_trades(data, assets=longs, target_percent=1 / N_LONGS)
    exec_trades(data, assets=shorts, target_percent=-1 / N_SHORTS)

Function to execute trades for given assets and target percentages

In [ ]:
def exec_trades(data, assets, target_percent):
    """Execute trades for given assets and target percentages.

    This function loops through each asset and executes trades
    if the asset is tradeable and has no open orders.

    Parameters
    ----------
    data : object
        An object to fetch data.
    assets : iterable
        List of asset identifiers.
    target_percent : float
        Target portfolio weight for each asset.
    """
    for asset in assets:
        if data.can_trade(asset) and not get_open_orders(asset):
            order_target_percent(asset, target_percent)

Define the start and end dates for the backtest simulation

In [ ]:
start = pd.Timestamp('2016')
end = pd.Timestamp('2018')

Run the algorithm with the defined parameters and fetch performance

In [ ]:
perf = run_algorithm(
    start=start,
    end=end,
    initialize=initialize,
    before_trading_start=before_trading_start,
    capital_base=100_000,
    bundle="quandl",
)

Output the final portfolio value after the simulation

In [ ]:
perf.portfolio_value

Construct a DataFrame with symbols as columns and dates as rows

In [ ]:
prices = pd.concat([df.to_frame(d) for d, df in perf.prices.dropna().items()], axis=1).T

Convert column names to strings

In [ ]:
prices.columns = [col.symbol for col in prices.columns]

Normalize Timestamp to midnight, preserving TZ information

In [ ]:
prices.index = prices.index.normalize()

PyQuant News is where finance practitioners level up with Python for quant finance, algorithmic trading, and market data analysis. Looking to get started? Check out the fastest growing, top-selling course to get started with Python for quant finance. For educational purposes. Not investment advise. Use at your own risk.