Adaptive Momentum Fade - стратегия ловли откатов против импульса на Python

Adaptive Momentum Fade - стратегия ловли откатов против импульса на Python

Импульсные движения на рынке криптовалют — не редкость. Цена взлетает или падает за считанные минуты, оставляя трейдера перед выбором: заходить в продолжение или ловить откат. В этой статье мы рассмотрим стратегию Adaptive Momentum Fade, которая помогает ловить обратные движения после сильного импульса, используя Python и библиотеку TA-Lib.

Мы не просто разберем логику, но и представим полностью готовый код стратегии, включая:

  • Индикаторы (ROC, ATR, EMA),

  • Условия входа в позицию (fade short/long),

  • Мини-бэктест,

  • Метрики эффективности (PnL, WinRate, Expectancy).

📌 Суть стратегии: Fade против импульса

Fade (угасание) — это методика входа против краткосрочного импульса, когда есть признаки его истощения. Стратегия работает по следующей логике:

🔍 Шаги:

  1. Ищем аномально сильный импульс, используя индикатор Rate of Change (ROC).

  2. Выявляем признаки истощения:

    • Было мощное движение (z-score ROC > 2.5),

    • Предыдущая свеча — в сторону импульса, текущая — против.

  3. Проверяем тренд на старшем таймфрейме (EMA 200 на 1h):

    • SHORT-сигнал — только в нисходящем тренде.

    • LONG-сигнал — только в восходящем тренде.

  4. Входим в противоположную сторону на следующей свече:

    • Ставим тейк и стоп в зависимости от ATR.

    • Результат сделки рассчитывается как PnL, R-множитель.


🧠 Почему это работает?

  • Импульсные движения часто иссякают, особенно когда на них "прыгают" розничные трейдеры.

  • Откаты — естественная часть любой рыночной динамики.

  • Стопы и тейки по ATR — адаптивны к волатильности и защищают от случайных шумов.

  • Использование EMA-фильтра тренда помогает избежать входов "в стену".


🧑‍💻 Ключевые компоненты кода

ROC + z-score: измеряем силу импульса

roc = ta.momentum.ROCIndicator(df["close"], window=5).roc()
mu = roc.rolling(50).mean()
sigma = roc.rolling(50).std()
zroc = (roc - mu) / sigma

Z-score > 2.5 — это сильное отклонение от нормы.

Тренд-фильтр (EMA 200)

trend_df["ema_trend"] = ta.trend.EMAIndicator(trend_df["close"], window=200).ema_indicator()

Если текущая цена ниже EMA 200 — считаем тренд нисходящим (ищем fade SHORT).

Условия сигнала

short_signal = (
    (zroc > 2.5) &
    (df["close"].shift(1) > df["open"].shift(1)) &
    (df["close"] < df["open"]) &
    (df["close"] < df["ema_trend"])
)

Текущая свеча должна "подтвердить истощение" — быть противоположной предыдущей.

Бэктест: расчёт сделки

entry = df.iloc[i+1]["open"]
stop = entry + ATR
tp = entry - 0.5 * ATR

Выход по тейку или стопу рассчитывается для каждой сделки. Также учитываются комиссии.

📈 Результаты стратегии

После запуска скрипта, вы получите статистику:

SHORT-стратегия ➜ 23 сделок | WinRate 60.9 % | Exp 0.0017 | Σ PNL 0.041 | Sharpe 1.87
  • WinRate — процент прибыльных сделок.

  • Expectancy — средняя прибыль на сделку.

  • Sharpe Ratio — соотношение прибыли к риску.

🛡️ Рекомендации по использованию

  • Используйте реальные данные с Binance через CCXT.

  • Подключите логирование, Telegram-оповещения или GUI-интерфейс.

  • Проверьте на других парах и таймфреймах.

  • Возможна адаптация под backtesting.py, vectorbt или zipline.


🧰 Что ещё можно улучшить?

  • Добавить объём или дельту как фильтр.

  • Использовать LightGBM или CatBoost для предсказания вероятности успеха сделки.

  • Реализовать рейтинговую систему паттернов (какой fade даёт лучший результат).

  • Интеграция с реальным трейдинг-ботом на Binance Futures.

✅ Заключение

"Adaptive Momentum Fade" — простая, но мощная стратегия, которая не требует сложных вычислений или нейросетей. Она хорошо подходит для любителей Python и трейдеров, желающих систематизировать подход к торговле.

Сделайте первый шаг: скачайте код, запустите бэктест, адаптируйте под себя и переходите к реальному рынку — но только после тщательного тестирования!

Полный код скрипта:
"""
Adaptive Momentum Fade v2.0  
(пример учебного кода — не финансовая рекомендация)

Главные отличия от «наброска»:

* Параметры (Config) сгруппированы в dataclass → легко менять/оптимизировать.
* З-скор вместо «среднее ± σ» для ROC-импульса.
* Тренд-фильтр по EMA старшего ТФ.
* Стоп/тейк в долях ATR (+ расчёт R-профиля сделки).
* Мини-бэктест с метриками PnL, WinRate, Expectancy.
* Логирование, обработка rate-limit, возможность асинхронной загрузки.
"""

import asyncio
import logging
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from typing import List

import ccxt.async_support as ccxt  # асинхронная версия
import numpy as np
import pandas as pd
import ta

# ──────────────────────────── ПАРАМЕТРЫ ────────────────────────────── #
@dataclass
class Config:
    symbol: str = "BTC/USDT"
    tf: str = "5m"                # рабочий ТФ
    tf_trend: str = "1h"          # старший ТФ для тренда
    lookback_days: int = 7

    roc_win: int = 5              # окно ROC
    z_threshold: float = 2.5      # «сильный импульс»
    ema_trend_win: int = 200

    atr_win: int = 14
    atr_stop_mult: float = 1.0
    atr_tp_mult: float = 0.5

    fee_pct: float = 0.0004       # спот-комиссия Binance 0.04 %
    max_slippage_pct: float = 0.0005

    exchange_rate_limit_ms: int = 1100  # ~ 900 req/min — запас

CFG = Config()

# ──────────────────────── НАСТРОЙКА ЛОГОВ ─────────────────────────── #
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)-8s | %(message)s",
    handlers=[
        logging.FileHandler("adaptive_fade.log", "w", "utf-8"),
        logging.StreamHandler()
    ],
)
log = logging.getLogger(__name__)
log.info("Старт Adaptive Momentum Fade v2.0  — параметры: %s", asdict(CFG))

# ─────────────────── ФУНКЦИИ ЗАГРУЗКИ OHLCV ───────────────────────── #
async def fetch_ohlcv(exchange, symbol: str, tf: str,
                     since_ms: int, limit: int = 1000) -> List[List]:
    """
    Качаем свечи пачками по 1000 c учётом rateLimit.
    CCXT автоматически дёргает правильные эндпоинты.
    """
    all_ohlcv = []
    while True:
        batch = await exchange.fetch_ohlcv(symbol, tf, since=since_ms, limit=limit)
        if not batch:
            break
        all_ohlcv.extend(batch)
        since_ms = batch[-1][0] + 1  # +1 мс, чтобы не дублировать последнюю
        # бинанс выдаёт макс 1000 свечей
        if len(batch) < limit:
            break
        await asyncio.sleep(CFG.exchange_rate_limit_ms / 1000)
    return all_ohlcv


async def load_data() -> dict[str, pd.DataFrame]:
    """
    Грузим рабочий и трендовый ТФ параллельно.
    Возвращает dict: {'main': df_5m, 'trend': df_1h}
    """
    async with ccxt.binance() as ex:
        since = int((datetime.utcnow() - timedelta(days=CFG.lookback_days)).timestamp() * 1000)

        tasks = [
            fetch_ohlcv(ex, CFG.symbol, CFG.tf, since),
            fetch_ohlcv(ex, CFG.symbol, CFG.tf_trend, since),
        ]
        dfs = await asyncio.gather(*tasks)

    def to_df(raw):
        df = pd.DataFrame(raw, columns=["ts", "open", "high", "low", "close", "vol"])
        df["ts"] = pd.to_datetime(df["ts"], unit="ms")
        return df.set_index("ts")

    return {"main": to_df(dfs[0]), "trend": to_df(dfs[1])}

# ────────────────────── ИНДИКАТОРЫ & СИГНАЛЫ ──────────────────────── #
def enrich_indicators(dfs: dict[str, pd.DataFrame]) -> pd.DataFrame:
    df = dfs["main"].copy()

    # ROC + Z-score
    roc = ta.momentum.ROCIndicator(df["close"], window=CFG.roc_win).roc()
    mu = roc.rolling(50).mean()
    sigma = roc.rolling(50).std(ddof=0)
    df["zroc"] = (roc - mu) / sigma

    # ATR
    df["atr"] = ta.volatility.AverageTrueRange(
        df["high"], df["low"], df["close"], window=CFG.atr_win
    ).average_true_range()

    # Тренд EMA (на старшем ТФ → ресемплим)
    trend_df = dfs["trend"].copy()
    trend_df["ema_trend"] = ta.trend.EMAIndicator(
        trend_df["close"], window=CFG.ema_trend_win
    ).ema_indicator()
    # forward-fill до 5м
    df["ema_trend"] = trend_df["ema_trend"].reindex(df.index, method="ffill")

    # Флаг «нисходящий тренд»
    df["downtrend"] = df["close"] < df["ema_trend"]

    # Candlestick exhaustion: предыдущая зелёная, текущая красная
    prev_green = df["close"].shift(1) > df["open"].shift(1)
    curr_red = df["close"] < df["open"]

    # SHORT-сигнал против восходящего импульса в нисходящем тренде
    df["short_signal"] = (
        (df["zroc"] > CFG.z_threshold) &
        prev_green & curr_red &
        df["downtrend"]
    )

    # LONG-сигнал аналогично (по желанию)
    prev_red = df["close"].shift(1) < df["open"].shift(1)
    curr_green = df["close"] > df["open"]
    df["long_signal"] = (
        (df["zroc"] < -CFG.z_threshold) &
        prev_red & curr_green &
        (~df["downtrend"])
    )
    return df


# ────────────────────────── БЭКТЕСТ - CORE ───────────────────────── #
def run_backtest(df: pd.DataFrame, side: str = "short") -> pd.DataFrame:
    """
    Эмуляция сделок «по рынку» следующей свечой.
    side: 'short' или 'long'
    Возвращает DataFrame с результатами каждой сделки.
    """
    trades = []
    signal_col = f"{side}_signal"
    sign = -1 if side == "short" else 1  # направление PnL

    idxs = np.where(df[signal_col].values)[0]
    for idx in idxs:
        entry_idx = idx + 1  # входим открытием следующей свечи
        if entry_idx >= len(df):
            break
        entry_row = df.iloc[entry_idx]
        entry_price = entry_row["open"]
        atr = entry_row["atr"]

        sl = entry_price + sign * CFG.atr_stop_mult * atr  # для short sign=-1 → +
        tp = entry_price - sign * CFG.atr_tp_mult * atr

        exit_price = None
        exit_idx = None
        outcome = None
        for j in range(entry_idx, len(df)):
            hi, lo = df.iloc[j][["high", "low"]]
            hit_tp = lo <= tp if side == "short" else hi >= tp
            hit_sl = hi >= sl if side == "short" else lo <= sl
            if hit_tp:
                exit_price = tp
                exit_idx = j
                outcome = "TP"
                break
            if hit_sl:
                exit_price = sl
                exit_idx = j
                outcome = "SL"
                break

        if exit_price is None:
            continue  # сделка «в рынке» — в этом демо пропустим

        pnl = (exit_price - entry_price) * sign - entry_price * CFG.fee_pct * 2
        r_multiple = pnl / (CFG.atr_stop_mult * atr)

        trades.append({
            "entry_time": df.index[entry_idx],
            "exit_time": df.index[exit_idx],
            "side": side,
            "entry": entry_price,
            "exit": exit_price,
            "outcome": outcome,
            "pnl": pnl,
            "r": r_multiple,
        })

    return pd.DataFrame(trades)


def summary_stats(trades: pd.DataFrame) -> dict:
    if trades.empty:
        return {}
    wins = trades[trades["pnl"] > 0]
    stats = {
        "trades": len(trades),
        "win_rate": len(wins) / len(trades),
        "avg_r": trades["r"].mean(),
        "expectancy": trades["pnl"].mean(),
        "cum_pnl": trades["pnl"].sum(),
        "sharpe": trades["pnl"].mean() / trades["pnl"].std(ddof=0) * np.sqrt(288)  # ~ кол-во 5m баров в сутки
    }
    return stats


# ───────────────────────────── MAIN ──────────────────────────────── #
async def main():
    dfs = await load_data()
    df = enrich_indicators(dfs)

    log.info("Всего свечей: %d", len(df))
    shorts = run_backtest(df, "short")
    longs = run_backtest(df, "long")

    for side, trades in [("SHORT", shorts), ("LONG", longs)]:
        stats = summary_stats(trades)
        if not stats:
            log.warning("%s — нет сделок", side)
            continue
        log.info("%s-стратегия ➜ %d сделок | WinRate %.1f %% | Exp %.4f | Σ PNL %.3f | Sharpe %.2f",
                 side, stats["trades"], stats["win_rate"] * 100,
                 stats["expectancy"], stats["cum_pnl"], stats["sharpe"])
        # при желании — trades.to_csv(...)

if __name__ == "__main__":
    asyncio.run(main())
Комментарии

Пока нет комментариев. Будьте первым!

Оставить комментарий

Чтобы оставить комментарий, пожалуйста, войдите или зарегистрируйтесь.