← Назад к документации
monitor
Исходный код Rust - Trading AI
📄 Rust
📦 Модуль
🔧 Исходный код
use anyhow::{bail, Context, Result};
use chrono::Utc;
use log::{info, warn};
use redis::AsyncCommands;
use serde::Deserialize;
use serde_json;
use serde_json::json;
use std::sync::Arc;
use tokio::time::{sleep, Duration};

use crate::redis::RedisCore;
use crate::types::Tick;

/// =========================
/// Public API
/// =========================

/// SCAN → выбрать пару → запустить PARSE-луп → вернуть выбранный контракт
pub async fn run_monitor(
    redis: Arc<RedisCore>,
    _session: &str,
    gate_base: &str,
    scan_top: usize,
    parse_interval_ms: u64,
    tick_ttl_sec: u64,
    min_volatility_pct: f64,
    max_volatility_pct: f64,
) -> Result<String> {
    info!(
        "🔍 MONITOR: mode=SCAN - ищу сверхволатильные ФЬЮЧЕРСЫ ({}% - {}%)... base={}",
        min_volatility_pct, max_volatility_pct, gate_base
    );

    // 1) получаем все фьючерсы
    info!("MONITOR: mode=SCAN - fetching tickers… base={}", gate_base);
    let all = fetch_all_tickers(gate_base).await?;
    info!("Tickers loaded: {}", all.len());

    // 2) фильтруем только памповые (по модулю изменения за сутки)
    let filtered = filter_pump_pairs(&all, min_volatility_pct, max_volatility_pct);
    info!("SCAN prefilter: {} → {}", all.len(), filtered.len());

    let candidates = if filtered.is_empty() {
        warn!(
            "⚠️  Не найдено памповых пар в диапазоне {}% - {}%",
            min_volatility_pct, max_volatility_pct
        );

        // fallback: ослабленные критерии
        let fallback = filter_pump_pairs(&all, 7.0, 2000.0);
        if fallback.is_empty() {
            bail!("❌ Нет подходящих пар даже при ослабленных критериях");
        }
        info!("🔄 Используем расширенные критерии 7-2000%");
        fallback
    } else {
        filtered
    };

    let selected_pair = pick_best_pair(candidates, scan_top)?;

    // 3) запускаем фоновый PARSE-луп, который будет пихать тики в Redis
    let symbol = selected_pair.clone();
    let gate_base_owned = gate_base.to_string();
    let redis_clone = redis.clone();

    tokio::spawn(async move {
        if let Err(e) =
            parse_loop(redis_clone, symbol.clone(), gate_base_owned, parse_interval_ms, tick_ttl_sec)
                .await
        {
            warn!("PARSE loop error for {}: {e:?}", symbol);
        }
    });

    Ok(selected_pair)
}

/// Выбирает лучшую пару из отфильтрованных
fn pick_best_pair(pairs: Vec<(String, PumpPair)>, scan_top: usize) -> Result<String> {
    if pairs.is_empty() {
        bail!("Нет пар для выбора");
    }

    let mut ranked = pairs;
    ranked.sort_by(|a, b| {
        b.1.score
            .partial_cmp(&a.1.score)
            .unwrap_or(std::cmp::Ordering::Equal)
    });

    // УБРАЛИ ОГРАНИЧЕНИЕ на scan_top - теперь используем ВСЕ найденные пары
    // if ranked.len() > scan_top {
    //     ranked.truncate(scan_top);
    // }

    let (best_sym, best) = ranked.remove(0);

    info!(
        "=== OPTIMAL PAIR ===\n            Selected: {} (score={:.6}, mid={:.6}, spread_bps={:.6}, vol24q={}, chg%={:.2})",
        best_sym,
        best.score,
        best.mid,
        best.spread_bps,
        best.vol24h as i64,
        best.chg_pct
    );

    if best.chg_pct >= 100.0 {
        info!("🔥 ОБНАРУЖЕН СУПЕР-ПАМП: {}% роста за 24ч!", best.chg_pct);
    } else if best.chg_pct >= 50.0 {
        info!("⚡ СИЛЬНЫЙ РОСТ: {}% за 24ч", best.chg_pct);
    }

    info!("SCAN to PARSE switch: pair={}, switched=true", best_sym);
    info!("Пара выбрана: {}", best_sym);

    Ok(best_sym)
}

/// =========================
/// Data models
/// =========================

/// Структура под `/api/v4/futures/usdt/tickers`
#[derive(Clone, Debug, Deserialize)]
struct RawFuturesContract {
    #[serde(default)]
    last: String,
    #[serde(default)]
    lowest_ask: String,
    #[serde(default)]
    highest_bid: String,
    #[serde(default)]
    volume_24h_quote: String,
    #[serde(default)]
    change_percentage: String,
    contract: String,
}

impl RawFuturesContract {
    fn parse_f64(raw: &str) -> f64 {
        raw.trim().parse::<f64>().unwrap_or(0.0)
    }

    fn get_last(&self) -> f64 {
        Self::parse_f64(&self.last)
    }
    fn get_ask(&self) -> f64 {
        Self::parse_f64(&self.lowest_ask)
    }
    fn get_bid(&self) -> f64 {
        Self::parse_f64(&self.highest_bid)
    }
    fn get_volume(&self) -> f64 {
        Self::parse_f64(&self.volume_24h_quote)
    }
    fn get_change_pct(&self) -> f64 {
        Self::parse_f64(&self.change_percentage)
    }
}

#[derive(Clone, Debug)]
struct PumpPair {
    score: f64,
    mid: f64,
    spread_bps: f64,
    vol24h: f64,
    chg_pct: f64,
}

/// Внутренний «богатый» тик для расчётов
#[derive(Clone, Debug)]
struct OneTick {
    symbol: String,
    mid: f64,
    chg_pct: f64,
    ts: i64,
    bid: f64,
    ask: f64,
    bid_qty: f64,
    ask_qty: f64,
}

/// Детали контракта `/api/v4/futures/usdt/contracts/{contract}`
#[derive(Clone, Debug, Deserialize)]
struct ContractDetails {
    #[serde(default)]
    leverage_min: String,
    #[serde(default)]
    leverage_max: String,
    #[serde(default)]
    order_size_min: i64,
    #[serde(default)]
    order_size_max: i64,
    #[serde(default)]
    quanto_multiplier: String,
    #[serde(default)]
    maintenance_rate: String,
    #[serde(default)]
    maker_fee_rate: String,
    #[serde(default)]
    taker_fee_rate: String,
    #[serde(default)]
    funding_rate: String,
    #[serde(default)]
    funding_rate_indicative: String,
    #[serde(default)]
    order_price_round: String,
    #[serde(default)]
    order_price_deviate: String,
}

/// Ордербук `/api/v4/futures/usdt/order_book`
#[derive(Clone, Debug, Deserialize)]
struct RawOrderBook {
    asks: Vec<[String; 2]>,
    bids: Vec<[String; 2]>,
}

/// Конвертация OneTick → types::Tick (то, что жрёт Feed)
fn to_public_tick(t: &OneTick) -> Tick {
    Tick {
        symbol: t.symbol.clone(),
        mid: t.mid,
        chg_pct: t.chg_pct,
        ts: t.ts,
        bid: t.bid,
        ask: t.ask,
        bid_qty: t.bid_qty,
        ask_qty: t.ask_qty,
    }
}

/// =========================
/// HTTP
/// =========================

/// Все тикеры USDT-фьючерсов
async fn fetch_all_tickers(gate_base: &str) -> Result<Vec<RawFuturesContract>> {
    let url = format!(
        "{}/api/v4/futures/usdt/tickers",
        gate_base.trim_end_matches('/')
    );

    let resp = reqwest::get(&url)
        .await
        .context("HTTP запрос к Gate.io API")?
        .error_for_status()
        .context("Ошибка HTTP статуса от Gate.io")?;

    let v = resp
        .json::<Vec<RawFuturesContract>>()
        .await
        .context("Ошибка парсинга JSON от Gate.io")?;

    Ok(v)
}

/// Один тикер по контракту
async fn fetch_one_ticker(gate_base: &str, symbol: &str) -> Result<OneTick> {
    let url = format!(
        "{}/api/v4/futures/usdt/tickers?contract={}",
        gate_base.trim_end_matches('/'),
        symbol
    );

    let resp = reqwest::get(&url)
        .await
        .context("HTTP запрос одного тикера к Gate.io API")?
        .error_for_status()
        .context("Ошибка HTTP статуса при запросе одного тикера")?;

    let mut v = resp
        .json::<Vec<RawFuturesContract>>()
        .await
        .context("Ошибка парсинга JSON одного тикера от Gate.io")?;

    let r = v
        .pop()
        .ok_or_else(|| anyhow::anyhow!("empty ticker for {}", symbol))?;

    let ask = r.get_ask();
    let bid = r.get_bid();
    let last = r.get_last();

    let mid = if ask > 0.0 && bid > 0.0 {
        (ask + bid) / 2.0
    } else {
        last
    };

    let vol24h = r.get_volume();
    let chg_pct = r.get_change_pct();

    let spread_bps = if mid > 0.0 {
        ((ask - bid) / mid) * 10_000.0
    } else {
        0.0
    };

    let base_volume = (vol24h / 86400.0).max(1000.0);
    let spread_impact = spread_bps / 100.0;

    let bid_qty = base_volume * (1.0 + spread_impact * 0.1);
    let ask_qty = base_volume * (1.0 - spread_impact * 0.1);

    Ok(OneTick {
        symbol: r.contract,
        mid,
        chg_pct,
        ts: Utc::now().timestamp(),
        bid,
        ask,
        bid_qty,
        ask_qty,
    })
}

/// Детали контракта
async fn fetch_contract_details(gate_base: &str, symbol: &str) -> Result<ContractDetails> {
    let url = format!(
        "{}/api/v4/futures/usdt/contracts/{}",
        gate_base.trim_end_matches('/'),
        symbol
    );

    let resp = reqwest::get(&url)
        .await
        .context("HTTP запрос деталей контракта к Gate.io API")?
        .error_for_status()
        .context("Ошибка HTTP статуса при запросе деталей контракта")?;

    let v = resp
        .json::<ContractDetails>()
        .await
        .context("Ошибка парсинга JSON деталей контракта от Gate.io")?;

    Ok(v)
}

/// Ордербук для расчёта ликвидности и проскальзывания
async fn fetch_order_book(
    gate_base: &str,
    symbol: &str,
    depth: usize,
) -> Result<RawOrderBook> {
    let url = format!(
        "{}/api/v4/futures/usdt/order_book?contract={}&limit={}",
        gate_base.trim_end_matches('/'),
        symbol,
        depth
    );

    let resp = reqwest::get(&url)
        .await
        .context("HTTP запрос ордербука к Gate.io API")?
        .error_for_status()
        .context("Ошибка HTTP статуса при запросе ордербука")?;

    let v = resp
        .json::<RawOrderBook>()
        .await
        .context("Ошибка парсинга JSON ордербука от Gate.io")?;

    Ok(v)
}

/// =========================
/// Фильтрация памповых пар
/// =========================

fn filter_pump_pairs(
    all: &[RawFuturesContract],
    min_pct: f64,
    max_pct: f64,
) -> Vec<(String, PumpPair)> {
    let filtered_contracts: Vec<&RawFuturesContract> = all
        .iter()
        .filter(|t| !t.contract.is_empty())
        .filter(|t| {
            !t.last.trim().is_empty()
                || (!t.lowest_ask.trim().is_empty() && !t.highest_bid.trim().is_empty())
        })
        .collect();

    info!(
        "📊 Префильтр: {} → {}",
        all.len(),
        filtered_contracts.len()
    );

    let mut pump_pairs = Vec::new();

    for contract in filtered_contracts.iter() {
        let chg_pct = contract.get_change_pct();
        let abs_chg_pct = chg_pct.abs();

        if abs_chg_pct >= min_pct && abs_chg_pct <= max_pct {
            if let Some((symbol, pair)) = create_pump_pair(contract) {
                pump_pairs.push((symbol, pair));
            }
        }
    }

    pump_pairs.sort_by(|a, b| {
        b.1.score
            .partial_cmp(&a.1.score)
            .unwrap_or(std::cmp::Ordering::Equal)
    });
    pump_pairs
}

fn create_pump_pair(contract: &RawFuturesContract) -> Option<(String, PumpPair)> {
    let ask = contract.get_ask();
    let bid = contract.get_bid();
    let last = contract.get_last();

    let mid = if ask > 0.0 && bid > 0.0 {
        (ask + bid) / 2.0
    } else {
        last
    };

    if mid <= 0.0 {
        return None;
    }

    // 🎯 ФИЛЬТР ПО ЦЕНЕ: только пары с ценой ≤ 0.01 USDT
    if mid > 0.01 {
        return None;
    }

    let spread_bps = if ask > 0.0 && bid > 0.0 {
        ((ask - bid) / mid) * 10_000.0
    } else {
        0.0
    };

    let vol24h = contract.get_volume();
    let chg_pct = contract.get_change_pct();
    let abs_chg_pct = chg_pct.abs();

    let vol_score = if vol24h > 0.0 { vol24h.ln() } else { 0.0 };

    let score = (abs_chg_pct * 5.0) + vol_score - (spread_bps * 0.5);

    Some((
        contract.contract.clone(),
        PumpPair {
            score,
            mid,
            spread_bps,
            vol24h,
            chg_pct: abs_chg_pct,
        },
    ))
}

/// =========================
/// PARSE loop
/// =========================

async fn parse_loop(
    redis: Arc<RedisCore>,
    symbol: String,
    gate_base: String,
    interval_ms: u64,
    _tick_ttl_sec: u64,
) -> Result<()> {
    info!(
        "PARSE loop start for {} (interval={}ms)",
        symbol, interval_ms
    );

    // Детали контракта — тянем один раз перед лупом
    let contract_details = match fetch_contract_details(&gate_base, &symbol).await {
        Ok(cd) => cd,
        Err(e) => {
            warn!(
                "Не удалось получить детали контракта для {}: {e:?}. Продолжаем только с тикером",
                symbol
            );
            // Заглушка, чтобы не падать
            ContractDetails {
                leverage_min: "10".to_string(),
                leverage_max: "50".to_string(),
                order_size_min: 1,
                order_size_max: 1_000_000,
                quanto_multiplier: "1".to_string(),
                maintenance_rate: "0.01".to_string(),
                maker_fee_rate: "0.0".to_string(),
                taker_fee_rate: "0.0".to_string(),
                funding_rate: "0.0".to_string(),
                funding_rate_indicative: "0.0".to_string(),
                order_price_round: "0.000001".to_string(),
                order_price_deviate: "0.2".to_string(),
            }
        }
    };

    let mut depth_cached: Option<RawOrderBook> = None;

    loop {
        match fetch_one_ticker(&gate_base, &symbol).await {
            Ok(mut one) => {
                // Ордербук и ликвидность
                let order_book = match fetch_order_book(&gate_base, &symbol, 50).await {
                    Ok(ob) => {
                        depth_cached = Some(ob.clone());
                        ob
                    }
                    Err(e) => {
                        warn!(
                            "fetch_order_book({}) error: {e:?}. Используем кеш или заглушки",
                            symbol
                        );
                        depth_cached.clone().unwrap_or(RawOrderBook {
                            asks: Vec::new(),
                            bids: Vec::new(),
                        })
                    }
                };

                // Пересчёт bid/ask/qty на основе стакана, если он есть
                let (bid, ask, bid_depth, ask_depth) =
                    compute_depth_and_best(&order_book, one.mid);
                if bid > 0.0 && ask > 0.0 {
                    one.bid = bid;
                    one.ask = ask;
                    one.mid = (bid + ask) / 2.0;
                }
                if bid_depth > 0.0 {
                    one.bid_qty = bid_depth;
                }
                if ask_depth > 0.0 {
                    one.ask_qty = ask_depth;
                }

                let tick = to_public_tick(&one);
                let key_list = format!("hb:ticks:{}", tick.symbol);
                let key_last = format!("hb:last:{}", tick.symbol);
                let key_meta = format!("hb:meta:{}", tick.symbol);

                let value = serde_json::to_string(&tick)?;

                let mut conn = crate::redis::get_redis_conn(&redis).await?;

                // список тиков (хвост)
                let _: i64 = conn.lpush(&key_list, &value).await.unwrap_or(0);
                let _: () = conn.ltrim(&key_list, 0, 999).await.unwrap_or(());

                // ПОСЛЕДНИЙ тик — то, что читает Feed::get_tick_from_redis (GET hb:last:...)
                let _: () = conn.set(&key_last, &value).await.unwrap_or(());

                // META-информация для LIVE:
                // макс./мин. плечо, мин./макс. контрактов, мультипликатор, маржинка, комиссии
                let max_leverage = contract_details
                    .leverage_max
                    .trim()
                    .parse::<f64>()
                    .unwrap_or(0.0);
                let min_leverage = contract_details
                    .leverage_min
                    .trim()
                    .parse::<f64>()
                    .unwrap_or(0.0);
                let contract_multiplier = contract_details
                    .quanto_multiplier
                    .trim()
                    .parse::<f64>()
                    .unwrap_or(1.0);
                let maintenance_rate = contract_details
                    .maintenance_rate
                    .trim()
                    .parse::<f64>()
                    .unwrap_or(0.0);
                let maker_fee = contract_details
                    .maker_fee_rate
                    .trim()
                    .parse::<f64>()
                    .unwrap_or(0.0);
                let taker_fee = contract_details
                    .taker_fee_rate
                    .trim()
                    .parse::<f64>()
                    .unwrap_or(0.0);
                let funding_rate = contract_details
                    .funding_rate
                    .trim()
                    .parse::<f64>()
                    .unwrap_or(0.0);
                let funding_rate_indicative = contract_details
                    .funding_rate_indicative
                    .trim()
                    .parse::<f64>()
                    .unwrap_or(0.0);

                // Простейшая оценка проскальзывания на 1000$ по стакану
                let (slip_buy_1000, slip_sell_1000) =
                    compute_slippage_1000(&order_book, one.mid, contract_multiplier);

                let meta = json!({
                    "symbol": tick.symbol,
                    "ts": tick.ts,
                    "price": tick.mid,
                    "chg_pct_24h": tick.chg_pct,
                    "bid": tick.bid,
                    "ask": tick.ask,
                    "bid_qty_depth": tick.bid_qty,
                    "ask_qty_depth": tick.ask_qty,

                    "leverage_min": min_leverage,
                    "leverage_max": max_leverage,
                    "min_contracts": contract_details.order_size_min,
                    "max_contracts": contract_details.order_size_max,
                    "contract_multiplier": contract_multiplier,
                    "maintenance_rate": maintenance_rate,
                    "maker_fee_rate": maker_fee,
                    "taker_fee_rate": taker_fee,
                    "funding_rate": funding_rate,
                    "funding_rate_indicative": funding_rate_indicative,
                    "order_price_round": contract_details.order_price_round,
                    "order_price_deviate": contract_details.order_price_deviate,

                    "slippage_buy_1000_pct": slip_buy_1000,
                    "slippage_sell_1000_pct": slip_sell_1000,
                });

                let meta_str = serde_json::to_string(&meta)?;
                let _: () = conn.set(&key_meta, &meta_str).await.unwrap_or(());

                info!(
                    "PARSE tick → {} mid={:.8} chg%={:.2} depth(bid/ask)={:.2}/{:.2} slip(1000)={:.4}/{:.4}%",
                    tick.symbol,
                    tick.mid,
                    tick.chg_pct,
                    tick.bid_qty,
                    tick.ask_qty,
                    slip_buy_1000,
                    slip_sell_1000
                );
            }
            Err(e) => {
                warn!("fetch_one_ticker({symbol}) error: {e:?}");
            }
        }

        sleep(Duration::from_millis(interval_ms)).await;
    }
}

/// =========================
/// Depth & Slippage helpers
/// =========================

fn compute_depth_and_best(ob: &RawOrderBook, mid_hint: f64) -> (f64, f64, f64, f64) {
    if ob.bids.is_empty() || ob.asks.is_empty() {
        return (0.0, 0.0, 0.0, 0.0);
    }

    let parse_level = |lvl: &[String; 2]| -> (f64, f64) {
        let p = lvl[0].trim().parse::<f64>().unwrap_or(0.0);
        let s = lvl[1].trim().parse::<f64>().unwrap_or(0.0);
        (p, s)
    };

    let (best_bid, _) = parse_level(&ob.bids[0]);
    let (best_ask, _) = parse_level(&ob.asks[0]);

    let mid = if best_bid > 0.0 && best_ask > 0.0 {
        (best_bid + best_ask) / 2.0
    } else {
        mid_hint
    };
    if mid <= 0.0 {
        return (best_bid, best_ask, 0.0, 0.0);
    }

    let mut bid_depth = 0.0;
    let mut ask_depth = 0.0;

    let max_dist = 0.005; // 0.5% вокруг mid

    for lvl in ob.bids.iter() {
        let (p, s) = parse_level(lvl);
        if p <= 0.0 || s <= 0.0 {
            continue;
        }
        let dist = (mid - p) / mid;
        if dist >= 0.0 && dist <= max_dist {
            bid_depth += s;
        }
    }

    for lvl in ob.asks.iter() {
        let (p, s) = parse_level(lvl);
        if p <= 0.0 || s <= 0.0 {
            continue;
        }
        let dist = (p - mid) / mid;
        if dist >= 0.0 && dist <= max_dist {
            ask_depth += s;
        }
    }

    (best_bid, best_ask, bid_depth, ask_depth)
}

fn compute_slippage_1000(
    ob: &RawOrderBook,
    mid: f64,
    contract_multiplier: f64,
) -> (f64, f64) {
    if mid <= 0.0 || ob.bids.is_empty() || ob.asks.is_empty() {
        return (0.0, 0.0);
    }

    let parse_level = |lvl: &[String; 2]| -> (f64, f64) {
        let p = lvl[0].trim().parse::<f64>().unwrap_or(0.0);
        let s = lvl[1].trim().parse::<f64>().unwrap_or(0.0);
        (p, s)
    };

    let target_quote = 1000.0_f64; // 1000 USDT

    // BUY: идём по ask'ам, пока не зальём target_quote
    let mut remain = target_quote;
    let mut max_price_buy = mid;
    for lvl in ob.asks.iter() {
        let (p, s) = parse_level(lvl);
        if p <= 0.0 || s <= 0.0 {
            continue;
        }
        let level_quote = p * s * contract_multiplier;
        if level_quote >= remain {
            max_price_buy = p;
            remain = 0.0;
            break;
        } else {
            max_price_buy = p;
            remain -= level_quote;
        }
    }
    let slip_buy_pct = if remain > 0.0 {
        0.0
    } else {
        ((max_price_buy / mid) - 1.0) * 100.0
    };

    // SELL: идём по bid'ам, пока не сольём target_quote
    let mut remain = target_quote;
    let mut min_price_sell = mid;
    for lvl in ob.bids.iter() {
        let (p, s) = parse_level(lvl);
        if p <= 0.0 || s <= 0.0 {
            continue;
        }
        let level_quote = p * s * contract_multiplier;
        if level_quote >= remain {
            min_price_sell = p;
            remain = 0.0;
            break;
        } else {
            min_price_sell = p;
            remain -= level_quote;
        }
    }
    let slip_sell_pct = if remain > 0.0 {
        0.0
    } else {
        ((mid / min_price_sell) - 1.0) * 100.0
    };

    (slip_buy_pct, slip_sell_pct)
}