use anyhow::{bail, Context, Result};
use chrono::Utc;
use log::{info, warn};
use redis::AsyncCommands;
use serde::Deserialize;
use serde_json;
use serde_json::json;
use std::sync::Arc;
use tokio::time::{sleep, Duration};
use crate::redis::RedisCore;
use crate::types::Tick;
/// =========================
/// Public API
/// =========================
/// SCAN → выбрать пару → запустить PARSE-луп → вернуть выбранный контракт
pub async fn run_monitor(
redis: Arc<RedisCore>,
_session: &str,
gate_base: &str,
scan_top: usize,
parse_interval_ms: u64,
tick_ttl_sec: u64,
min_volatility_pct: f64,
max_volatility_pct: f64,
) -> Result<String> {
info!(
"🔍 MONITOR: mode=SCAN - ищу сверхволатильные ФЬЮЧЕРСЫ ({}% - {}%)... base={}",
min_volatility_pct, max_volatility_pct, gate_base
);
// 1) получаем все фьючерсы
info!("MONITOR: mode=SCAN - fetching tickers… base={}", gate_base);
let all = fetch_all_tickers(gate_base).await?;
info!("Tickers loaded: {}", all.len());
// 2) фильтруем только памповые (по модулю изменения за сутки)
let filtered = filter_pump_pairs(&all, min_volatility_pct, max_volatility_pct);
info!("SCAN prefilter: {} → {}", all.len(), filtered.len());
let candidates = if filtered.is_empty() {
warn!(
"⚠️ Не найдено памповых пар в диапазоне {}% - {}%",
min_volatility_pct, max_volatility_pct
);
// fallback: ослабленные критерии
let fallback = filter_pump_pairs(&all, 7.0, 2000.0);
if fallback.is_empty() {
bail!("❌ Нет подходящих пар даже при ослабленных критериях");
}
info!("🔄 Используем расширенные критерии 7-2000%");
fallback
} else {
filtered
};
let selected_pair = pick_best_pair(candidates, scan_top)?;
// 3) запускаем фоновый PARSE-луп, который будет пихать тики в Redis
let symbol = selected_pair.clone();
let gate_base_owned = gate_base.to_string();
let redis_clone = redis.clone();
tokio::spawn(async move {
if let Err(e) =
parse_loop(redis_clone, symbol.clone(), gate_base_owned, parse_interval_ms, tick_ttl_sec)
.await
{
warn!("PARSE loop error for {}: {e:?}", symbol);
}
});
Ok(selected_pair)
}
/// Выбирает лучшую пару из отфильтрованных
fn pick_best_pair(pairs: Vec<(String, PumpPair)>, scan_top: usize) -> Result<String> {
if pairs.is_empty() {
bail!("Нет пар для выбора");
}
let mut ranked = pairs;
ranked.sort_by(|a, b| {
b.1.score
.partial_cmp(&a.1.score)
.unwrap_or(std::cmp::Ordering::Equal)
});
// УБРАЛИ ОГРАНИЧЕНИЕ на scan_top - теперь используем ВСЕ найденные пары
// if ranked.len() > scan_top {
// ranked.truncate(scan_top);
// }
let (best_sym, best) = ranked.remove(0);
info!(
"=== OPTIMAL PAIR ===\n Selected: {} (score={:.6}, mid={:.6}, spread_bps={:.6}, vol24q={}, chg%={:.2})",
best_sym,
best.score,
best.mid,
best.spread_bps,
best.vol24h as i64,
best.chg_pct
);
if best.chg_pct >= 100.0 {
info!("🔥 ОБНАРУЖЕН СУПЕР-ПАМП: {}% роста за 24ч!", best.chg_pct);
} else if best.chg_pct >= 50.0 {
info!("⚡ СИЛЬНЫЙ РОСТ: {}% за 24ч", best.chg_pct);
}
info!("SCAN to PARSE switch: pair={}, switched=true", best_sym);
info!("Пара выбрана: {}", best_sym);
Ok(best_sym)
}
/// =========================
/// Data models
/// =========================
/// Структура под `/api/v4/futures/usdt/tickers`
#[derive(Clone, Debug, Deserialize)]
struct RawFuturesContract {
#[serde(default)]
last: String,
#[serde(default)]
lowest_ask: String,
#[serde(default)]
highest_bid: String,
#[serde(default)]
volume_24h_quote: String,
#[serde(default)]
change_percentage: String,
contract: String,
}
impl RawFuturesContract {
fn parse_f64(raw: &str) -> f64 {
raw.trim().parse::<f64>().unwrap_or(0.0)
}
fn get_last(&self) -> f64 {
Self::parse_f64(&self.last)
}
fn get_ask(&self) -> f64 {
Self::parse_f64(&self.lowest_ask)
}
fn get_bid(&self) -> f64 {
Self::parse_f64(&self.highest_bid)
}
fn get_volume(&self) -> f64 {
Self::parse_f64(&self.volume_24h_quote)
}
fn get_change_pct(&self) -> f64 {
Self::parse_f64(&self.change_percentage)
}
}
#[derive(Clone, Debug)]
struct PumpPair {
score: f64,
mid: f64,
spread_bps: f64,
vol24h: f64,
chg_pct: f64,
}
/// Внутренний «богатый» тик для расчётов
#[derive(Clone, Debug)]
struct OneTick {
symbol: String,
mid: f64,
chg_pct: f64,
ts: i64,
bid: f64,
ask: f64,
bid_qty: f64,
ask_qty: f64,
}
/// Детали контракта `/api/v4/futures/usdt/contracts/{contract}`
#[derive(Clone, Debug, Deserialize)]
struct ContractDetails {
#[serde(default)]
leverage_min: String,
#[serde(default)]
leverage_max: String,
#[serde(default)]
order_size_min: i64,
#[serde(default)]
order_size_max: i64,
#[serde(default)]
quanto_multiplier: String,
#[serde(default)]
maintenance_rate: String,
#[serde(default)]
maker_fee_rate: String,
#[serde(default)]
taker_fee_rate: String,
#[serde(default)]
funding_rate: String,
#[serde(default)]
funding_rate_indicative: String,
#[serde(default)]
order_price_round: String,
#[serde(default)]
order_price_deviate: String,
}
/// Ордербук `/api/v4/futures/usdt/order_book`
#[derive(Clone, Debug, Deserialize)]
struct RawOrderBook {
asks: Vec<[String; 2]>,
bids: Vec<[String; 2]>,
}
/// Конвертация OneTick → types::Tick (то, что жрёт Feed)
fn to_public_tick(t: &OneTick) -> Tick {
Tick {
symbol: t.symbol.clone(),
mid: t.mid,
chg_pct: t.chg_pct,
ts: t.ts,
bid: t.bid,
ask: t.ask,
bid_qty: t.bid_qty,
ask_qty: t.ask_qty,
}
}
/// =========================
/// HTTP
/// =========================
/// Все тикеры USDT-фьючерсов
async fn fetch_all_tickers(gate_base: &str) -> Result<Vec<RawFuturesContract>> {
let url = format!(
"{}/api/v4/futures/usdt/tickers",
gate_base.trim_end_matches('/')
);
let resp = reqwest::get(&url)
.await
.context("HTTP запрос к Gate.io API")?
.error_for_status()
.context("Ошибка HTTP статуса от Gate.io")?;
let v = resp
.json::<Vec<RawFuturesContract>>()
.await
.context("Ошибка парсинга JSON от Gate.io")?;
Ok(v)
}
/// Один тикер по контракту
async fn fetch_one_ticker(gate_base: &str, symbol: &str) -> Result<OneTick> {
let url = format!(
"{}/api/v4/futures/usdt/tickers?contract={}",
gate_base.trim_end_matches('/'),
symbol
);
let resp = reqwest::get(&url)
.await
.context("HTTP запрос одного тикера к Gate.io API")?
.error_for_status()
.context("Ошибка HTTP статуса при запросе одного тикера")?;
let mut v = resp
.json::<Vec<RawFuturesContract>>()
.await
.context("Ошибка парсинга JSON одного тикера от Gate.io")?;
let r = v
.pop()
.ok_or_else(|| anyhow::anyhow!("empty ticker for {}", symbol))?;
let ask = r.get_ask();
let bid = r.get_bid();
let last = r.get_last();
let mid = if ask > 0.0 && bid > 0.0 {
(ask + bid) / 2.0
} else {
last
};
let vol24h = r.get_volume();
let chg_pct = r.get_change_pct();
let spread_bps = if mid > 0.0 {
((ask - bid) / mid) * 10_000.0
} else {
0.0
};
let base_volume = (vol24h / 86400.0).max(1000.0);
let spread_impact = spread_bps / 100.0;
let bid_qty = base_volume * (1.0 + spread_impact * 0.1);
let ask_qty = base_volume * (1.0 - spread_impact * 0.1);
Ok(OneTick {
symbol: r.contract,
mid,
chg_pct,
ts: Utc::now().timestamp(),
bid,
ask,
bid_qty,
ask_qty,
})
}
/// Детали контракта
async fn fetch_contract_details(gate_base: &str, symbol: &str) -> Result<ContractDetails> {
let url = format!(
"{}/api/v4/futures/usdt/contracts/{}",
gate_base.trim_end_matches('/'),
symbol
);
let resp = reqwest::get(&url)
.await
.context("HTTP запрос деталей контракта к Gate.io API")?
.error_for_status()
.context("Ошибка HTTP статуса при запросе деталей контракта")?;
let v = resp
.json::<ContractDetails>()
.await
.context("Ошибка парсинга JSON деталей контракта от Gate.io")?;
Ok(v)
}
/// Ордербук для расчёта ликвидности и проскальзывания
async fn fetch_order_book(
gate_base: &str,
symbol: &str,
depth: usize,
) -> Result<RawOrderBook> {
let url = format!(
"{}/api/v4/futures/usdt/order_book?contract={}&limit={}",
gate_base.trim_end_matches('/'),
symbol,
depth
);
let resp = reqwest::get(&url)
.await
.context("HTTP запрос ордербука к Gate.io API")?
.error_for_status()
.context("Ошибка HTTP статуса при запросе ордербука")?;
let v = resp
.json::<RawOrderBook>()
.await
.context("Ошибка парсинга JSON ордербука от Gate.io")?;
Ok(v)
}
/// =========================
/// Фильтрация памповых пар
/// =========================
fn filter_pump_pairs(
all: &[RawFuturesContract],
min_pct: f64,
max_pct: f64,
) -> Vec<(String, PumpPair)> {
let filtered_contracts: Vec<&RawFuturesContract> = all
.iter()
.filter(|t| !t.contract.is_empty())
.filter(|t| {
!t.last.trim().is_empty()
|| (!t.lowest_ask.trim().is_empty() && !t.highest_bid.trim().is_empty())
})
.collect();
info!(
"📊 Префильтр: {} → {}",
all.len(),
filtered_contracts.len()
);
let mut pump_pairs = Vec::new();
for contract in filtered_contracts.iter() {
let chg_pct = contract.get_change_pct();
let abs_chg_pct = chg_pct.abs();
if abs_chg_pct >= min_pct && abs_chg_pct <= max_pct {
if let Some((symbol, pair)) = create_pump_pair(contract) {
pump_pairs.push((symbol, pair));
}
}
}
pump_pairs.sort_by(|a, b| {
b.1.score
.partial_cmp(&a.1.score)
.unwrap_or(std::cmp::Ordering::Equal)
});
pump_pairs
}
fn create_pump_pair(contract: &RawFuturesContract) -> Option<(String, PumpPair)> {
let ask = contract.get_ask();
let bid = contract.get_bid();
let last = contract.get_last();
let mid = if ask > 0.0 && bid > 0.0 {
(ask + bid) / 2.0
} else {
last
};
if mid <= 0.0 {
return None;
}
// 🎯 ФИЛЬТР ПО ЦЕНЕ: только пары с ценой ≤ 0.01 USDT
if mid > 0.01 {
return None;
}
let spread_bps = if ask > 0.0 && bid > 0.0 {
((ask - bid) / mid) * 10_000.0
} else {
0.0
};
let vol24h = contract.get_volume();
let chg_pct = contract.get_change_pct();
let abs_chg_pct = chg_pct.abs();
let vol_score = if vol24h > 0.0 { vol24h.ln() } else { 0.0 };
let score = (abs_chg_pct * 5.0) + vol_score - (spread_bps * 0.5);
Some((
contract.contract.clone(),
PumpPair {
score,
mid,
spread_bps,
vol24h,
chg_pct: abs_chg_pct,
},
))
}
/// =========================
/// PARSE loop
/// =========================
async fn parse_loop(
redis: Arc<RedisCore>,
symbol: String,
gate_base: String,
interval_ms: u64,
_tick_ttl_sec: u64,
) -> Result<()> {
info!(
"PARSE loop start for {} (interval={}ms)",
symbol, interval_ms
);
// Детали контракта — тянем один раз перед лупом
let contract_details = match fetch_contract_details(&gate_base, &symbol).await {
Ok(cd) => cd,
Err(e) => {
warn!(
"Не удалось получить детали контракта для {}: {e:?}. Продолжаем только с тикером",
symbol
);
// Заглушка, чтобы не падать
ContractDetails {
leverage_min: "10".to_string(),
leverage_max: "50".to_string(),
order_size_min: 1,
order_size_max: 1_000_000,
quanto_multiplier: "1".to_string(),
maintenance_rate: "0.01".to_string(),
maker_fee_rate: "0.0".to_string(),
taker_fee_rate: "0.0".to_string(),
funding_rate: "0.0".to_string(),
funding_rate_indicative: "0.0".to_string(),
order_price_round: "0.000001".to_string(),
order_price_deviate: "0.2".to_string(),
}
}
};
let mut depth_cached: Option<RawOrderBook> = None;
loop {
match fetch_one_ticker(&gate_base, &symbol).await {
Ok(mut one) => {
// Ордербук и ликвидность
let order_book = match fetch_order_book(&gate_base, &symbol, 50).await {
Ok(ob) => {
depth_cached = Some(ob.clone());
ob
}
Err(e) => {
warn!(
"fetch_order_book({}) error: {e:?}. Используем кеш или заглушки",
symbol
);
depth_cached.clone().unwrap_or(RawOrderBook {
asks: Vec::new(),
bids: Vec::new(),
})
}
};
// Пересчёт bid/ask/qty на основе стакана, если он есть
let (bid, ask, bid_depth, ask_depth) =
compute_depth_and_best(&order_book, one.mid);
if bid > 0.0 && ask > 0.0 {
one.bid = bid;
one.ask = ask;
one.mid = (bid + ask) / 2.0;
}
if bid_depth > 0.0 {
one.bid_qty = bid_depth;
}
if ask_depth > 0.0 {
one.ask_qty = ask_depth;
}
let tick = to_public_tick(&one);
let key_list = format!("hb:ticks:{}", tick.symbol);
let key_last = format!("hb:last:{}", tick.symbol);
let key_meta = format!("hb:meta:{}", tick.symbol);
let value = serde_json::to_string(&tick)?;
let mut conn = crate::redis::get_redis_conn(&redis).await?;
// список тиков (хвост)
let _: i64 = conn.lpush(&key_list, &value).await.unwrap_or(0);
let _: () = conn.ltrim(&key_list, 0, 999).await.unwrap_or(());
// ПОСЛЕДНИЙ тик — то, что читает Feed::get_tick_from_redis (GET hb:last:...)
let _: () = conn.set(&key_last, &value).await.unwrap_or(());
// META-информация для LIVE:
// макс./мин. плечо, мин./макс. контрактов, мультипликатор, маржинка, комиссии
let max_leverage = contract_details
.leverage_max
.trim()
.parse::<f64>()
.unwrap_or(0.0);
let min_leverage = contract_details
.leverage_min
.trim()
.parse::<f64>()
.unwrap_or(0.0);
let contract_multiplier = contract_details
.quanto_multiplier
.trim()
.parse::<f64>()
.unwrap_or(1.0);
let maintenance_rate = contract_details
.maintenance_rate
.trim()
.parse::<f64>()
.unwrap_or(0.0);
let maker_fee = contract_details
.maker_fee_rate
.trim()
.parse::<f64>()
.unwrap_or(0.0);
let taker_fee = contract_details
.taker_fee_rate
.trim()
.parse::<f64>()
.unwrap_or(0.0);
let funding_rate = contract_details
.funding_rate
.trim()
.parse::<f64>()
.unwrap_or(0.0);
let funding_rate_indicative = contract_details
.funding_rate_indicative
.trim()
.parse::<f64>()
.unwrap_or(0.0);
// Простейшая оценка проскальзывания на 1000$ по стакану
let (slip_buy_1000, slip_sell_1000) =
compute_slippage_1000(&order_book, one.mid, contract_multiplier);
let meta = json!({
"symbol": tick.symbol,
"ts": tick.ts,
"price": tick.mid,
"chg_pct_24h": tick.chg_pct,
"bid": tick.bid,
"ask": tick.ask,
"bid_qty_depth": tick.bid_qty,
"ask_qty_depth": tick.ask_qty,
"leverage_min": min_leverage,
"leverage_max": max_leverage,
"min_contracts": contract_details.order_size_min,
"max_contracts": contract_details.order_size_max,
"contract_multiplier": contract_multiplier,
"maintenance_rate": maintenance_rate,
"maker_fee_rate": maker_fee,
"taker_fee_rate": taker_fee,
"funding_rate": funding_rate,
"funding_rate_indicative": funding_rate_indicative,
"order_price_round": contract_details.order_price_round,
"order_price_deviate": contract_details.order_price_deviate,
"slippage_buy_1000_pct": slip_buy_1000,
"slippage_sell_1000_pct": slip_sell_1000,
});
let meta_str = serde_json::to_string(&meta)?;
let _: () = conn.set(&key_meta, &meta_str).await.unwrap_or(());
info!(
"PARSE tick → {} mid={:.8} chg%={:.2} depth(bid/ask)={:.2}/{:.2} slip(1000)={:.4}/{:.4}%",
tick.symbol,
tick.mid,
tick.chg_pct,
tick.bid_qty,
tick.ask_qty,
slip_buy_1000,
slip_sell_1000
);
}
Err(e) => {
warn!("fetch_one_ticker({symbol}) error: {e:?}");
}
}
sleep(Duration::from_millis(interval_ms)).await;
}
}
/// =========================
/// Depth & Slippage helpers
/// =========================
fn compute_depth_and_best(ob: &RawOrderBook, mid_hint: f64) -> (f64, f64, f64, f64) {
if ob.bids.is_empty() || ob.asks.is_empty() {
return (0.0, 0.0, 0.0, 0.0);
}
let parse_level = |lvl: &[String; 2]| -> (f64, f64) {
let p = lvl[0].trim().parse::<f64>().unwrap_or(0.0);
let s = lvl[1].trim().parse::<f64>().unwrap_or(0.0);
(p, s)
};
let (best_bid, _) = parse_level(&ob.bids[0]);
let (best_ask, _) = parse_level(&ob.asks[0]);
let mid = if best_bid > 0.0 && best_ask > 0.0 {
(best_bid + best_ask) / 2.0
} else {
mid_hint
};
if mid <= 0.0 {
return (best_bid, best_ask, 0.0, 0.0);
}
let mut bid_depth = 0.0;
let mut ask_depth = 0.0;
let max_dist = 0.005; // 0.5% вокруг mid
for lvl in ob.bids.iter() {
let (p, s) = parse_level(lvl);
if p <= 0.0 || s <= 0.0 {
continue;
}
let dist = (mid - p) / mid;
if dist >= 0.0 && dist <= max_dist {
bid_depth += s;
}
}
for lvl in ob.asks.iter() {
let (p, s) = parse_level(lvl);
if p <= 0.0 || s <= 0.0 {
continue;
}
let dist = (p - mid) / mid;
if dist >= 0.0 && dist <= max_dist {
ask_depth += s;
}
}
(best_bid, best_ask, bid_depth, ask_depth)
}
fn compute_slippage_1000(
ob: &RawOrderBook,
mid: f64,
contract_multiplier: f64,
) -> (f64, f64) {
if mid <= 0.0 || ob.bids.is_empty() || ob.asks.is_empty() {
return (0.0, 0.0);
}
let parse_level = |lvl: &[String; 2]| -> (f64, f64) {
let p = lvl[0].trim().parse::<f64>().unwrap_or(0.0);
let s = lvl[1].trim().parse::<f64>().unwrap_or(0.0);
(p, s)
};
let target_quote = 1000.0_f64; // 1000 USDT
// BUY: идём по ask'ам, пока не зальём target_quote
let mut remain = target_quote;
let mut max_price_buy = mid;
for lvl in ob.asks.iter() {
let (p, s) = parse_level(lvl);
if p <= 0.0 || s <= 0.0 {
continue;
}
let level_quote = p * s * contract_multiplier;
if level_quote >= remain {
max_price_buy = p;
remain = 0.0;
break;
} else {
max_price_buy = p;
remain -= level_quote;
}
}
let slip_buy_pct = if remain > 0.0 {
0.0
} else {
((max_price_buy / mid) - 1.0) * 100.0
};
// SELL: идём по bid'ам, пока не сольём target_quote
let mut remain = target_quote;
let mut min_price_sell = mid;
for lvl in ob.bids.iter() {
let (p, s) = parse_level(lvl);
if p <= 0.0 || s <= 0.0 {
continue;
}
let level_quote = p * s * contract_multiplier;
if level_quote >= remain {
min_price_sell = p;
remain = 0.0;
break;
} else {
min_price_sell = p;
remain -= level_quote;
}
}
let slip_sell_pct = if remain > 0.0 {
0.0
} else {
((mid / min_price_sell) - 1.0) * 100.0
};
(slip_buy_pct, slip_sell_pct)
}