PulseScanner — как я создал сканер аномальных активов на Python

PulseScanner — как я создал сканер аномальных активов на Python

На крипторынке всё решает скорость реакции. Кто первым увидел памп — тот и заработал. Но как отследить 100+ токенов вручную? Именно поэтому я создал PulseScanner — умный скрипт на Python, который сканирует рынок каждые 5 минут и показывает ТОП-10 аномальных активов.

В этой статье покажу, как он работает, что внутри и как ты можешь использовать его для собственной торговли.


💡 Что делает PulseScanner?

Скрипт отслеживает активность топ-100 USDT-пар на Binance и выводит список «горячих» активов на основе трёх метрик:

  • Δ (дельта) — процентное изменение цены за 24 часа

  • VOL — средний объём торгов за сутки

  • σ (сигма) — волатильность (стандартное отклонение цен)

Если актив резко вырос или упал, имеет высокую волатильность или взрывной объём, скрипт его отметит и выведет в консоль с понятными маркерами:

Маркер Значение
📈 / 📉 Рост или падение цены
🔥 Сильное движение или волатильность (аномалия)

🧠 Почему это удобно?

PulseScanner помогает:

  • 🔍 Мгновенно находить анормальные движения

  • 🧪 Анализировать рынок без ручного пролистывания графиков

  • ⏱ Экономить время и получать сигналы быстрее остальных

  • 💸 Работать на пампах, новостях и повышенной волатильности

  • 👨‍💻 Использовать как основу для Telegram-бота, Dashboard или AI-анализа


🧩 Объяснение кода по шагам

Теперь разберём, как устроен скрипт внутри.

📦 Импорты и настройки

Подключаем библиотеки:

import asyncio, logging, statistics
from datetime import datetime, timezone
import ccxt.async_support as ccxt

Настраиваем параметры:

SLEEP_SECONDS = 300  # проверка каждые 5 мин
LIMIT_SYMBOLS = 100  # анализируем топ-100 пар
CHANGE_THRESHOLD = 5  # минимум ±5% для Δ
VOLATILITY_THRESHOLD = 3  # минимум 3% σ

🔌 Подключение к Binance и выбор пар

async def create_exchange():
    exchange = ccxt.binance({"enableRateLimit": True})
    await exchange.load_markets()
    return exchange
def select_symbols(exchange, limit=100):
    symbols = [s for s in exchange.markets if "/USDT" in s and not s.endswith(("UP/USDT", "DOWN/USDT"))]
    symbols.sort(key=lambda s: exchange.markets[s].get("quoteVolume", 0), reverse=True)
    return symbols[:limit]

📈 Получение и расчёт метрик

async def fetch_metrics(exchange, symbol):
    ohlcv = await exchange.fetch_ohlcv(symbol, timeframe="1h", limit=24)
    closes = [c[4] for c in ohlcv]
    volumes = [c[5] for c in ohlcv]

    pct_change = (closes[-1] - closes[0]) / closes[0] * 100
    avg_volume = sum(volumes) / 24
    volatility = statistics.stdev(closes) / statistics.mean(closes) * 100
    score = abs(pct_change) * (avg_volume / 1_000_000) * (volatility + 1)

    return symbol, pct_change, avg_volume, volatility, score

Здесь считается:

  • Δ — изменение цены

  • VOL — средний объём

  • σ — волатильность

  • score — обобщённый показатель "аномальности"


🎨 Форматирование строки

def format_line(rank, sym, chg, volu, vola):
    trend = "📈" if chg > 0 else "📉"
    heat = "🔥" if abs(chg) >= 10 or vola >= 5 else ""
    return f"{rank:02d}. {sym:<12} | {trend} Δ {chg:+7.2f}% | VOL {volu:>10,.0f} | σ {vola:4.2f}% {heat}"

🔁 Основной цикл сканирования

async def scan_loop():
    exchange = await create_exchange()
    symbols = select_symbols(exchange)

    while True:
        results = await asyncio.gather(*[fetch_metrics(exchange, s) for s in symbols])
        metrics = [r for r in results if r]
        anomalies = [r for r in metrics if abs(r[1]) >= CHANGE_THRESHOLD or r[2] >= 1_000_000 or r[3] >= VOLATILITY_THRESHOLD]
        anomalies.sort(key=lambda x: x[-1], reverse=True)

        print(f"\n🔍 Аномальные активы ({datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')})")
        for i, (sym, chg, vol, volat, _) in enumerate(anomalies[:10], 1):
            print(format_line(i, sym, chg, vol, volat))
        print("-" * 60)

        await asyncio.sleep(SLEEP_SECONDS)

🚀 Запуск

if __name__ == "__main__":
    try:
        asyncio.run(scan_loop())
    except KeyboardInterrupt:
        print("\nЗавершено пользователем")

🔚 Вывод

Скрипт PulseScanner — это готовый инструмент для дейтрейдера или криптоаналитика, который экономит часы ручного анализа. Он показывает, где сейчас идёт движение, а не просто где был рост. И всё это — на Python, открыто, адаптируемо и бесплатно.

Можешь добавить Telegram-бота, экспорт в CSV или графики — основа уже готова. Это идеальный старт для своего торгового инструмента.

Полный код:
"""
PulseScanner — «Пульс Рынка»
Отслеживает топ-100 USDT-пар Binance и выводит аномалии:
 • Δ  — изменение цены за 24 ч, %
 • VOL — средний объём за 24 ч, USDT
 • σ  — внутридневная волатильность (стандартное отклонение), %
Маркеры: 📈/📉 — направление, 🔥 — высокая аномальность.
"""

import asyncio
import logging
import statistics
from datetime import datetime, timezone

import ccxt.async_support as ccxt

# ─────────────────────────── НАСТРОЙКИ ────────────────────────────
SLEEP_SECONDS = 300        # интервал обновления, сек
LIMIT_SYMBOLS = 100        # топ-N пар по объёму
TOP_N_PRINT = 10           # сколько аномалий выводить
CHANGE_THRESHOLD = 5       # минимальный Δ для попадания в выборку, %
VOLUME_THRESHOLD = 1_000_000     # минимальный средний объём, USDT
VOLATILITY_THRESHOLD = 3   # минимальная σ для попадания в выборку, %
# ──────────────────────────────────────────────────────────────────

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)-8s | %(message)s",
    datefmt="%H:%M:%S",
)


async def create_exchange():
    ex = ccxt.binance({"enableRateLimit": True, "timeout": 20_000})
    await ex.load_markets()
    return ex


def select_symbols(exchange, limit=LIMIT_SYMBOLS):
    markets = exchange.markets
    symbols = [
        s for s in markets
        if s.endswith("/USDT")
        and not s.endswith(("UP/USDT", "DOWN/USDT"))
        and markets[s]["active"]
    ]
    symbols.sort(
        key=lambda s: markets[s].get("quoteVolume", 0),
        reverse=True,
    )
    return symbols[:limit]


async def fetch_metrics(exchange, symbol):
    """Возвращает (symbol, Δ, avg_vol, σ, anomaly_score) или None."""
    try:
        ohlcv = await exchange.fetch_ohlcv(symbol, timeframe="1h", limit=24)
        if len(ohlcv) < 24:
            return None

        closes = [c[4] for c in ohlcv]
        volumes = [c[5] for c in ohlcv]

        pct_change = (closes[-1] - closes[0]) / closes[0] * 100
        avg_volume = sum(volumes) / 24
        volatility = statistics.stdev(closes) / statistics.mean(closes) * 100

        score = abs(pct_change) * (avg_volume / 1_000_000) * (volatility + 1)
        return symbol, pct_change, avg_volume, volatility, score

    except Exception as exc:          # noqa: BLE001
        logging.debug("❗ %s — %s", symbol, exc)
        return None


def format_line(rank, sym, chg, volu, vola):
    trend = "📈" if chg > 0 else "📉" if chg < 0 else "➡️"
    heat = "🔥" if abs(chg) >= 10 or vola >= 5 else ""
    return (
        f"{rank:02d}. {sym:<12} | {trend} Δ {chg:+7.2f}% "
        f"| VOL {volu:>10,.0f} | σ {vola:4.2f}% {heat}"
    )


async def scan_loop():
    exchange = await create_exchange()
    symbols = select_symbols(exchange)
    logging.info("Отслеживаем %d пар", len(symbols))

    while True:
        tasks = [fetch_metrics(exchange, s) for s in symbols]
        raw = await asyncio.gather(*tasks)
        metrics = [r for r in raw if r]

        anomalies = [
            r for r in metrics
            if (
                abs(r[1]) >= CHANGE_THRESHOLD
                or r[2] >= VOLUME_THRESHOLD
                or r[3] >= VOLATILITY_THRESHOLD
            )
        ]
        anomalies.sort(key=lambda x: x[-1], reverse=True)

        timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
        print(f"\n🔍 Аномальные активы ({timestamp})")
        for i, (sym, chg, vol, volat, _score) in enumerate(anomalies[:TOP_N_PRINT], 1):
            print(format_line(i, sym, chg, vol, volat))
        print("-" * 60)

        await asyncio.sleep(SLEEP_SECONDS)


if __name__ == "__main__":
    try:
        asyncio.run(scan_loop())
    except KeyboardInterrupt:
        print("\nЗавершено пользователем")

 

 

Комментарии

Пока нет комментариев. Будьте первым!

Оставить комментарий

Чтобы оставить комментарий, пожалуйста, войдите или зарегистрируйтесь.