Debot_Monitor

发布于 2025-05-07  83 次阅读


debot_monitor_no_tg.py

import requests
import json
import time
import uuid
import os
from datetime import datetime

# 日志配置
import logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("debot_monitor.log"),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

# DeBot.ai API配置
BASE_URL = 'https://debot.ai/api'
HEADERS = {
    'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36',
    'Accept': 'application/json',
    'Content-Type': 'application/json',
    'Cookie': '_ga=GA1.1.1584224071.1746597635; beegosessionID=0092383fa45843004210a85130b73678; MEIQIA_TRACK_ID=2wktwYUiQ4vnQXLWPOznVJUZMpp; MEIQIA_VISIT_ID=2wktwZZz1fv94ZOWqVc24Gs9apl; _bl_uid=9pmRbaq7dtbjgg4k9zmRzeCsXmUg; cf_clearance=FtWjYWxvlGYu5FZmED4mXyAP9vFpH_eTq_OWeGTWTjE-1746607347-1.2.1.1-v3UUxEUQQW_6qO.foQKNYUiH7hqsr8AXvyyHL.F48J06gNJ9uDGLZgAkLt8genDV9ANvANxYHHJYImOCJdTMGDVs8QTzvtW823G3pp_krZI_3K.1Fw1iy1QPSXxHJGgBOf_pslTr2e46ksP4bwSB5O1YxdtzTgJNicCwpjKF8rH5INTZdlhVYj2ENaRb0VFqTol75dIBb28i3dUYsmZivo267FYc3qPOyqmjHyZdi_KXJqYnNxrjuDGbeAJyDlnO5R0M8rBRDVvD1JBO4O9pz1oZLa5nuoLwTEbSn3rb7QbOC4jEZ3lL7VhYoBErihJBVtXn5XZ17Oi6tCkVjzR1JgUJq3LD0d.JV4O5hPtIpoE; _ga_7QSW3N3LVZ=GS2.1.s1746607347$o3$g0$t1746607347$j0$l0$h0; version=66; frill-sdk={%22data%22:{%22sessionCount%22:2%2C%22identifiedAt%22:%222025-05-07T06:00:37.927Z%22%2C%22lastIdentifiedAt%22:%222025-05-07T08:42:28.943Z%22}}'
}

# 创建会话对象
session = requests.Session()
session.headers.update(HEADERS)

# 存储已知信号的ID
known_signal_ids = set()
# 存储所有发现的信号,用于测试
all_signals = []

# 生成请求ID的函数
def generate_request_id():
    return str(uuid.uuid4())

# 获取频道列表数据
def get_channel_list(chain='solana', page_size=24, next_page=''):
    url = f"{BASE_URL}/official/signal/channel/list"
    params = {
        'request_id': generate_request_id(),
        'page_size': page_size,
        'next': next_page,
        'chain': chain
    }
    try:
        logger.info(f"请求API: {url} 参数: {params}")
        response = session.get(url, params=params)
        if response.status_code == 200:
            logger.info(f"API请求成功,状态码: {response.status_code}")
            return response.json()
        else:
            logger.error(f"获取频道列表失败: {response.status_code}")
            return None
    except Exception as e:
        logger.error(f"请求API出错: {e}")
        return None

# 模拟信号转发(仅打印日志)
def simulate_signal_forward(token_address, token_symbol, strategy_name):
    """
    模拟将信号的合约地址转发到Telegram
    """
    # 构建消息
    message = f"/address {token_address}"
    comment = f"⚡️ 新信号 | {token_symbol} | {strategy_name} ⚡️"
    
    # 打印日志
    logger.info("=" * 50)
    logger.info("检测到新信号!")
    logger.info(f"合约地址: {token_address}")
    logger.info(f"代币符号: {token_symbol}")
    logger.info(f"策略名称: {strategy_name}")
    logger.info(f"模拟发送消息: {message}")
    logger.info(f"模拟发送注释: {comment}")
    logger.info("=" * 50)
    
    # 将信号添加到列表中
    all_signals.append({
        'token_address': token_address,
        'token_symbol': token_symbol,
        'strategy_name': strategy_name,
        'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    })
    
    return True

# 保存和加载已知信号ID
def save_known_signals():
    try:
        with open('known_signal_ids.json', 'w') as f:
            json.dump(list(known_signal_ids), f)
        logger.info(f"已保存 {len(known_signal_ids)} 个已知信号ID")
    except Exception as e:
        logger.error(f"保存已知信号ID失败: {e}")

def load_known_signals():
    global known_signal_ids
    try:
        if os.path.exists('known_signal_ids.json'):
            with open('known_signal_ids.json', 'r') as f:
                known_signal_ids = set(json.load(f))
            logger.info(f"已加载 {len(known_signal_ids)} 个已知信号ID")
        else:
            logger.info("未找到已知信号ID文件,将创建新文件")
    except Exception as e:
        logger.error(f"加载已知信号ID失败: {e}")
        known_signal_ids = set()

# 保存所有发现的信号到JSON文件,用于分析
def save_all_signals():
    try:
        with open('all_signals.json', 'w') as f:
            json.dump(all_signals, f, indent=2)
        logger.info(f"已保存 {len(all_signals)} 个信号到all_signals.json")
    except Exception as e:
        logger.error(f"保存信号列表失败: {e}")

# 监控新信号
def monitor_new_signals(test_mode=True, max_iterations=None):
    logger.info("开始监控DeBot.ai新信号...")
    
    # 加载已知信号ID
    if not test_mode:
        load_known_signals()
    else:
        logger.info("测试模式:不加载已知信号ID,所有信号都将被视为新信号")
        
    iteration = 0
    
    while True:
        try:
            # 如果设置了最大迭代次数,检查是否达到
            if max_iterations is not None and iteration >= max_iterations:
                logger.info(f"已达到最大迭代次数 {max_iterations},退出监控")
                break
                
            iteration += 1
            logger.info(f"开始第 {iteration} 次检查")
            
            # 获取频道列表数据
            data = get_channel_list()
            if not data:
                logger.warning("获取数据失败,等待重试...")
                time.sleep(10)
                continue
            
            # 检查是否有新信号
            if 'data' in data and 'results' in data['data']:
                new_signals_found = False
                current_signals = []
                
                for result in data['data']['results']:
                    if 'meta' in result and 'id' in result['meta']:
                        signal_id = result['meta']['id']
                        
                        # 记录当前所有信号
                        current_signals.append(signal_id)
                        
                        # 如果这是一个新的信号ID或者处于测试模式
                        if test_mode or signal_id not in known_signal_ids:
                            new_signals_found = True
                            
                            # 提取信号信息
                            strategy_name = result['meta'].get('strategy_info', {}).get('group_name', 'Unknown')
                            create_time = result['meta'].get('create_time', 0)
                            
                            # 提取代币信息
                            if 'monitor_data' in result and 'record_data' in result['monitor_data']:
                                record_data = result['monitor_data']['record_data']
                                token_address = record_data.get('token', '')
                                token_symbol = record_data.get('token_symbol', '')
                                
                                if token_address and token_symbol:
                                    # 模拟转发
                                    simulate_signal_forward(token_address, token_symbol, strategy_name)
                                    
                                    # 记录到日志
                                    time_str = datetime.fromtimestamp(create_time).strftime('%Y-%m-%d %H:%M:%S')
                                    logger.info(f"发现{'新' if not test_mode else ''}信号: {signal_id} - {token_symbol} - {strategy_name} - {time_str}")
                            
                            # 添加到已知信号集合
                            known_signal_ids.add(signal_id)
                
                # 记录当前检测到的信号数量
                logger.info(f"当前检测到 {len(current_signals)} 个信号,其中新信号 {len(current_signals) - (0 if test_mode else len(known_signal_ids) - len(new_signals_found))} 个")
                
                # 如果发现了新信号,保存已知信号列表
                if new_signals_found and not test_mode:
                    save_known_signals()
            
            # 保存所有发现的信号
            save_all_signals()
            
            # 等待一段时间再次检查
            wait_time = 5
            logger.info(f"等待 {wait_time} 秒后进行下一次检查...")
            time.sleep(wait_time)
            
        except Exception as e:
            logger.error(f"监控新信号出错: {e}", exc_info=True)
            time.sleep(30)  # 出错后等待30秒再试

# 主函数
if __name__ == "__main__":
    try:
        # 测试模式:设置为True会将所有信号视为新信号,方便测试
        # max_iterations:设置最大迭代次数,用于测试,设置为None则无限循环
        monitor_new_signals(test_mode=True, max_iterations=3)
    except KeyboardInterrupt:
        logger.info("程序被手动停止")
        save_all_signals()  # 保存所有信号

debot_monitor.py

import requests
import json
import time
import uuid
import os
import signal
from datetime import datetime, timedelta
from telethon import TelegramClient, sync
from telethon.tl.functions.messages import GetDialogsRequest
from telethon.tl.types import InputPeerEmpty

# Telegram用户API配置
API_ID = 'your_api_id'  # 从 https://my.telegram.org 获取
API_HASH = 'your_api_hash'  # 从 https://my.telegram.org 获取
PHONE = 'your_phone_number'  # 您的电话号码,格式如 +12345678901
BOT_A_USERNAME = 'bot_a_username'  # 接收合约地址的机器人用户名,不带@
BOT_B_USERNAME = 'bot_b_username'  # 接收信号信息的机器人用户名,不带@

# 信号ID文件路径
SIGNAL_FILE = 'known_signal_ids.json'

# 代币年龄限制(小时)
TOKEN_AGE_LIMIT_HOURS = 24

# DeBot.ai API配置
BASE_URL = 'https://debot.ai/api'
HEADERS = {
    'User-Agent': '',
    'Accept': 'application/json',
    'Content-Type': 'application/json',
    'Cookie': ''
}

# 创建会话对象
session = requests.Session()
session.headers.update(HEADERS)

# 存储已知信号的ID
known_signal_ids = set()

# 初始化Telegram客户端
client = TelegramClient('session_name', API_ID, API_HASH)

# 生成请求ID的函数
def generate_request_id():
    return str(uuid.uuid4())

# 获取频道列表数据
def get_channel_list(chain='solana', page_size=24, next_page=''):
    url = f"{BASE_URL}/official/signal/channel/list"
    params = {
        'request_id': generate_request_id(),
        'page_size': page_size,
        'next': next_page,
        'chain': chain
    }
    response = session.get(url, params=params)
    if response.status_code == 200:
        return response.json()
    else:
        print(f"获取频道列表失败: {response.status_code}")
        return None

# 检查代币是否是最近24小时内创建的
def is_token_new(creation_timestamp):
    if not creation_timestamp:
        return False
    
    # 获取当前时间戳
    current_time = time.time()
    
    # 计算时间差(小时)
    hours_diff = (current_time - creation_timestamp) / 3600
    
    # 如果创建时间在过去24小时内,返回True
    return hours_diff <= TOKEN_AGE_LIMIT_HOURS

# 发送合约地址到机器人A,信号信息到机器人B
async def send_signal_to_bots(token_address, token_symbol, strategy_name, creation_time=None):
    """
    将合约地址发送给机器人A,将信号信息发送给机器人B
    """
    try:
        # 发送合约地址到机器人A
        ca_message = f"/address {token_address}"
        await client.send_message(BOT_A_USERNAME, ca_message)
        print(f"已将合约地址 {token_address} 发送给机器人A")
        
        # 发送信号信息到机器人B
        info_message = f"⚡️ 新信号检测 ⚡️\n\n"
        info_message += f"代币: {token_symbol}\n"
        info_message += f"策略: {strategy_name}\n"
        info_message += f"合约地址: {token_address}"
        
        # 如果有创建时间,添加到消息中
        if creation_time:
            creation_time_str = datetime.fromtimestamp(creation_time).strftime('%Y-%m-%d %H:%M:%S')
            info_message += f"\n创建时间: {creation_time_str}"
            
            # 计算代币年龄
            current_time = time.time()
            hours_old = (current_time - creation_time) / 3600
            info_message += f"\n代币年龄: {hours_old:.1f}小时"
        
        await client.send_message(BOT_B_USERNAME, info_message)
        print(f"已将信号信息发送给机器人B: {token_symbol} - {strategy_name}")
        
        return True
    except Exception as e:
        print(f"发送信息失败: {e}")
        return False

# 保存信号ID(仅用于程序运行期间,退出时会删除)
def save_known_signals():
    try:
        with open(SIGNAL_FILE, 'w') as f:
            json.dump(list(known_signal_ids), f)
        print(f"已保存 {len(known_signal_ids)} 个已知信号ID")
    except Exception as e:
        print(f"保存已知信号ID失败: {e}")

# 删除信号ID文件
def delete_signal_file():
    try:
        if os.path.exists(SIGNAL_FILE):
            os.remove(SIGNAL_FILE)
            print(f"已删除信号ID文件: {SIGNAL_FILE}")
    except Exception as e:
        print(f"删除信号ID文件失败: {e}")

# 初始化函数:获取当前所有信号但不发送通知
async def initialize_signal_database():
    print("开始初始化信号数据库...")
    
    global known_signal_ids
    
    # 清空已知信号集合
    known_signal_ids = set()
    
    # 获取频道列表数据
    data = get_channel_list()
    if not data:
        print("获取数据失败,初始化失败")
        return False
    
    # 提取所有当前信号的ID
    if 'data' in data and 'results' in data['data']:
        for result in data['data']['results']:
            if 'meta' in result and 'id' in result['meta']:
                signal_id = result['meta']['id']
                known_signal_ids.add(signal_id)
    
    print(f"初始化完成,已记录 {len(known_signal_ids)} 个现有信号")
    
    # 发送初始化完成通知
    try:
        await client.send_message(BOT_B_USERNAME, f"🔄 DeBot.ai信号监控已初始化完成,现在开始监控新信号(仅关注{TOKEN_AGE_LIMIT_HOURS}小时内创建的代币)")
        print("已发送初始化完成通知")
    except Exception as e:
        print(f"发送初始化通知失败: {e}")
    
    return True

# 监控新信号并发送给机器人
async def monitor_new_signals():
    print(f"开始监控DeBot.ai新信号(仅关注{TOKEN_AGE_LIMIT_HOURS}小时内创建的代币)...")
    
    # 初始化信号数据库
    success = await initialize_signal_database()
    if not success:
        print("初始化失败,退出监控")
        return

    # 发送监控开始通知
    try:
        await client.send_message(BOT_B_USERNAME, "🤖 DeBot.ai新信号监控已启动")
        print("已发送监控启动通知")
    except Exception as e:
        print(f"发送启动通知失败: {e}")

    while True:
        try:
            # 获取频道列表数据
            data = get_channel_list()
            if not data:
                print("获取数据失败,等待重试...")
                time.sleep(10)
                continue

            # 检查是否有新信号
            if 'data' in data and 'results' in data['data']:
                new_signals_found = False
                
                # 提取代币信息
                token_info = {}
                if 'data' in data and 'meta' in data['data'] and 'tokens' in data['data']['meta']:
                    for token_address, info in data['data']['meta']['tokens'].items():
                        token_info[token_address] = info

                for result in data['data']['results']:
                    if 'meta' in result and 'id' in result['meta']:
                        signal_id = result['meta']['id']

                        # 如果这是一个新的信号ID
                        if signal_id not in known_signal_ids:
                            new_signals_found = True

                            # 提取信号信息
                            strategy_name = result['meta'].get('strategy_info', {}).get('group_name', 'Unknown')
                            create_time = result['meta'].get('create_time', 0)

                            # 提取代币信息
                            if 'monitor_data' in result and 'record_data' in result['monitor_data']:
                                record_data = result['monitor_data']['record_data']
                                token_address = record_data.get('token', '')
                                token_symbol = record_data.get('token_symbol', '')

                                if token_address and token_symbol:
                                    # 获取代币创建时间
                                    token_creation_time = None
                                    if token_address in token_info:
                                        token_creation_time = token_info[token_address].get('creation_timestamp', 0)
                                    
                                    # 检查代币是否是最近24小时内创建的
                                    if token_creation_time and is_token_new(token_creation_time):
                                        # 发送信息给两个机器人
                                        await send_signal_to_bots(token_address, token_symbol, strategy_name, token_creation_time)
                                        
                                        # 记录到日志
                                        signal_time_str = datetime.fromtimestamp(create_time).strftime('%Y-%m-%d %H:%M:%S')
                                        token_time_str = datetime.fromtimestamp(token_creation_time).strftime('%Y-%m-%d %H:%M:%S')
                                        hours_old = (time.time() - token_creation_time) / 3600
                                        
                                        print(f"发现新信号: {signal_id} - {token_symbol} - {strategy_name}")
                                        print(f"  信号时间: {signal_time_str}")
                                        print(f"  代币创建: {token_time_str} ({hours_old:.1f}小时前)")
                                    else:
                                        # 如果代币不是最近24小时内创建的,记录但不发送
                                        if token_creation_time:
                                            hours_old = (time.time() - token_creation_time) / 3600
                                            print(f"忽略旧代币信号: {token_symbol} - 创建于{hours_old:.1f}小时前")
                                        else:
                                            print(f"忽略未知创建时间的代币信号: {token_symbol}")

                            # 添加到已知信号集合
                            known_signal_ids.add(signal_id)

                # 如果发现了新信号,保存已知信号列表(仅用于程序运行期间)
                if new_signals_found:
                    save_known_signals()

            # 等待一段时间再次检查
            time.sleep(5)

        except Exception as e:
            print(f"监控新信号出错: {e}")
            time.sleep(30)  # 出错后等待30秒再试

# 设置退出处理函数
def exit_handler(signum, frame):
    print("程序正在退出...")
    delete_signal_file()  # 删除信号ID文件
    print("清理完成,退出程序")
    exit(0)

# 主函数
async def main():
    # 注册信号处理器,确保在程序退出时删除文件
    signal.signal(signal.SIGINT, exit_handler)  # Ctrl+C
    signal.signal(signal.SIGTERM, exit_handler)  # 终止信号
    
    # 启动时删除旧的信号ID文件(如果存在)
    delete_signal_file()
    
    # 连接到Telegram
    print("正在连接到Telegram...")
    await client.start()
    
    if not await client.is_user_authorized():
        # 如果未登录,则请求登录
        await client.send_code_request(PHONE)
        print(f"请输入发送到 {PHONE} 的验证码:")
        code = input()
        await client.sign_in(PHONE, code)
    
    print("已成功登录Telegram")
    
    # 开始监控
    await monitor_new_signals()

# 运行主函数
if __name__ == "__main__":
    try:
        import asyncio
        asyncio.run(main())
    except KeyboardInterrupt:
        print("程序被手动停止")
        delete_signal_file()  # 确保在手动停止时也删除文件
人生の意味は平凡ですか、それとも素晴らしいですか?
最后更新于 2025-05-08