debot_monitor_no_tg.py
import requests
import json
import time
import uuid
import os
from datetime import datetime
# 日志配置
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("debot_monitor.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# DeBot.ai API配置
BASE_URL = 'https://debot.ai/api'
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36',
'Accept': 'application/json',
'Content-Type': 'application/json',
'Cookie': '_ga=GA1.1.1584224071.1746597635; beegosessionID=0092383fa45843004210a85130b73678; MEIQIA_TRACK_ID=2wktwYUiQ4vnQXLWPOznVJUZMpp; MEIQIA_VISIT_ID=2wktwZZz1fv94ZOWqVc24Gs9apl; _bl_uid=9pmRbaq7dtbjgg4k9zmRzeCsXmUg; cf_clearance=FtWjYWxvlGYu5FZmED4mXyAP9vFpH_eTq_OWeGTWTjE-1746607347-1.2.1.1-v3UUxEUQQW_6qO.foQKNYUiH7hqsr8AXvyyHL.F48J06gNJ9uDGLZgAkLt8genDV9ANvANxYHHJYImOCJdTMGDVs8QTzvtW823G3pp_krZI_3K.1Fw1iy1QPSXxHJGgBOf_pslTr2e46ksP4bwSB5O1YxdtzTgJNicCwpjKF8rH5INTZdlhVYj2ENaRb0VFqTol75dIBb28i3dUYsmZivo267FYc3qPOyqmjHyZdi_KXJqYnNxrjuDGbeAJyDlnO5R0M8rBRDVvD1JBO4O9pz1oZLa5nuoLwTEbSn3rb7QbOC4jEZ3lL7VhYoBErihJBVtXn5XZ17Oi6tCkVjzR1JgUJq3LD0d.JV4O5hPtIpoE; _ga_7QSW3N3LVZ=GS2.1.s1746607347$o3$g0$t1746607347$j0$l0$h0; version=66; frill-sdk={%22data%22:{%22sessionCount%22:2%2C%22identifiedAt%22:%222025-05-07T06:00:37.927Z%22%2C%22lastIdentifiedAt%22:%222025-05-07T08:42:28.943Z%22}}'
}
# 创建会话对象
session = requests.Session()
session.headers.update(HEADERS)
# 存储已知信号的ID
known_signal_ids = set()
# 存储所有发现的信号,用于测试
all_signals = []
# 生成请求ID的函数
def generate_request_id():
return str(uuid.uuid4())
# 获取频道列表数据
def get_channel_list(chain='solana', page_size=24, next_page=''):
url = f"{BASE_URL}/official/signal/channel/list"
params = {
'request_id': generate_request_id(),
'page_size': page_size,
'next': next_page,
'chain': chain
}
try:
logger.info(f"请求API: {url} 参数: {params}")
response = session.get(url, params=params)
if response.status_code == 200:
logger.info(f"API请求成功,状态码: {response.status_code}")
return response.json()
else:
logger.error(f"获取频道列表失败: {response.status_code}")
return None
except Exception as e:
logger.error(f"请求API出错: {e}")
return None
# 模拟信号转发(仅打印日志)
def simulate_signal_forward(token_address, token_symbol, strategy_name):
"""
模拟将信号的合约地址转发到Telegram
"""
# 构建消息
message = f"/address {token_address}"
comment = f"⚡️ 新信号 | {token_symbol} | {strategy_name} ⚡️"
# 打印日志
logger.info("=" * 50)
logger.info("检测到新信号!")
logger.info(f"合约地址: {token_address}")
logger.info(f"代币符号: {token_symbol}")
logger.info(f"策略名称: {strategy_name}")
logger.info(f"模拟发送消息: {message}")
logger.info(f"模拟发送注释: {comment}")
logger.info("=" * 50)
# 将信号添加到列表中
all_signals.append({
'token_address': token_address,
'token_symbol': token_symbol,
'strategy_name': strategy_name,
'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
})
return True
# 保存和加载已知信号ID
def save_known_signals():
try:
with open('known_signal_ids.json', 'w') as f:
json.dump(list(known_signal_ids), f)
logger.info(f"已保存 {len(known_signal_ids)} 个已知信号ID")
except Exception as e:
logger.error(f"保存已知信号ID失败: {e}")
def load_known_signals():
global known_signal_ids
try:
if os.path.exists('known_signal_ids.json'):
with open('known_signal_ids.json', 'r') as f:
known_signal_ids = set(json.load(f))
logger.info(f"已加载 {len(known_signal_ids)} 个已知信号ID")
else:
logger.info("未找到已知信号ID文件,将创建新文件")
except Exception as e:
logger.error(f"加载已知信号ID失败: {e}")
known_signal_ids = set()
# 保存所有发现的信号到JSON文件,用于分析
def save_all_signals():
try:
with open('all_signals.json', 'w') as f:
json.dump(all_signals, f, indent=2)
logger.info(f"已保存 {len(all_signals)} 个信号到all_signals.json")
except Exception as e:
logger.error(f"保存信号列表失败: {e}")
# 监控新信号
def monitor_new_signals(test_mode=True, max_iterations=None):
logger.info("开始监控DeBot.ai新信号...")
# 加载已知信号ID
if not test_mode:
load_known_signals()
else:
logger.info("测试模式:不加载已知信号ID,所有信号都将被视为新信号")
iteration = 0
while True:
try:
# 如果设置了最大迭代次数,检查是否达到
if max_iterations is not None and iteration >= max_iterations:
logger.info(f"已达到最大迭代次数 {max_iterations},退出监控")
break
iteration += 1
logger.info(f"开始第 {iteration} 次检查")
# 获取频道列表数据
data = get_channel_list()
if not data:
logger.warning("获取数据失败,等待重试...")
time.sleep(10)
continue
# 检查是否有新信号
if 'data' in data and 'results' in data['data']:
new_signals_found = False
current_signals = []
for result in data['data']['results']:
if 'meta' in result and 'id' in result['meta']:
signal_id = result['meta']['id']
# 记录当前所有信号
current_signals.append(signal_id)
# 如果这是一个新的信号ID或者处于测试模式
if test_mode or signal_id not in known_signal_ids:
new_signals_found = True
# 提取信号信息
strategy_name = result['meta'].get('strategy_info', {}).get('group_name', 'Unknown')
create_time = result['meta'].get('create_time', 0)
# 提取代币信息
if 'monitor_data' in result and 'record_data' in result['monitor_data']:
record_data = result['monitor_data']['record_data']
token_address = record_data.get('token', '')
token_symbol = record_data.get('token_symbol', '')
if token_address and token_symbol:
# 模拟转发
simulate_signal_forward(token_address, token_symbol, strategy_name)
# 记录到日志
time_str = datetime.fromtimestamp(create_time).strftime('%Y-%m-%d %H:%M:%S')
logger.info(f"发现{'新' if not test_mode else ''}信号: {signal_id} - {token_symbol} - {strategy_name} - {time_str}")
# 添加到已知信号集合
known_signal_ids.add(signal_id)
# 记录当前检测到的信号数量
logger.info(f"当前检测到 {len(current_signals)} 个信号,其中新信号 {len(current_signals) - (0 if test_mode else len(known_signal_ids) - len(new_signals_found))} 个")
# 如果发现了新信号,保存已知信号列表
if new_signals_found and not test_mode:
save_known_signals()
# 保存所有发现的信号
save_all_signals()
# 等待一段时间再次检查
wait_time = 5
logger.info(f"等待 {wait_time} 秒后进行下一次检查...")
time.sleep(wait_time)
except Exception as e:
logger.error(f"监控新信号出错: {e}", exc_info=True)
time.sleep(30) # 出错后等待30秒再试
# 主函数
if __name__ == "__main__":
try:
# 测试模式:设置为True会将所有信号视为新信号,方便测试
# max_iterations:设置最大迭代次数,用于测试,设置为None则无限循环
monitor_new_signals(test_mode=True, max_iterations=3)
except KeyboardInterrupt:
logger.info("程序被手动停止")
save_all_signals() # 保存所有信号
debot_monitor.py
import requests
import json
import time
import uuid
import os
import signal
from datetime import datetime, timedelta
from telethon import TelegramClient, sync
from telethon.tl.functions.messages import GetDialogsRequest
from telethon.tl.types import InputPeerEmpty
# Telegram用户API配置
API_ID = 'your_api_id' # 从 https://my.telegram.org 获取
API_HASH = 'your_api_hash' # 从 https://my.telegram.org 获取
PHONE = 'your_phone_number' # 您的电话号码,格式如 +12345678901
BOT_A_USERNAME = 'bot_a_username' # 接收合约地址的机器人用户名,不带@
BOT_B_USERNAME = 'bot_b_username' # 接收信号信息的机器人用户名,不带@
# 信号ID文件路径
SIGNAL_FILE = 'known_signal_ids.json'
# 代币年龄限制(小时)
TOKEN_AGE_LIMIT_HOURS = 24
# DeBot.ai API配置
BASE_URL = 'https://debot.ai/api'
HEADERS = {
'User-Agent': '',
'Accept': 'application/json',
'Content-Type': 'application/json',
'Cookie': ''
}
# 创建会话对象
session = requests.Session()
session.headers.update(HEADERS)
# 存储已知信号的ID
known_signal_ids = set()
# 初始化Telegram客户端
client = TelegramClient('session_name', API_ID, API_HASH)
# 生成请求ID的函数
def generate_request_id():
return str(uuid.uuid4())
# 获取频道列表数据
def get_channel_list(chain='solana', page_size=24, next_page=''):
url = f"{BASE_URL}/official/signal/channel/list"
params = {
'request_id': generate_request_id(),
'page_size': page_size,
'next': next_page,
'chain': chain
}
response = session.get(url, params=params)
if response.status_code == 200:
return response.json()
else:
print(f"获取频道列表失败: {response.status_code}")
return None
# 检查代币是否是最近24小时内创建的
def is_token_new(creation_timestamp):
if not creation_timestamp:
return False
# 获取当前时间戳
current_time = time.time()
# 计算时间差(小时)
hours_diff = (current_time - creation_timestamp) / 3600
# 如果创建时间在过去24小时内,返回True
return hours_diff <= TOKEN_AGE_LIMIT_HOURS
# 发送合约地址到机器人A,信号信息到机器人B
async def send_signal_to_bots(token_address, token_symbol, strategy_name, creation_time=None):
"""
将合约地址发送给机器人A,将信号信息发送给机器人B
"""
try:
# 发送合约地址到机器人A
ca_message = f"/address {token_address}"
await client.send_message(BOT_A_USERNAME, ca_message)
print(f"已将合约地址 {token_address} 发送给机器人A")
# 发送信号信息到机器人B
info_message = f"⚡️ 新信号检测 ⚡️\n\n"
info_message += f"代币: {token_symbol}\n"
info_message += f"策略: {strategy_name}\n"
info_message += f"合约地址: {token_address}"
# 如果有创建时间,添加到消息中
if creation_time:
creation_time_str = datetime.fromtimestamp(creation_time).strftime('%Y-%m-%d %H:%M:%S')
info_message += f"\n创建时间: {creation_time_str}"
# 计算代币年龄
current_time = time.time()
hours_old = (current_time - creation_time) / 3600
info_message += f"\n代币年龄: {hours_old:.1f}小时"
await client.send_message(BOT_B_USERNAME, info_message)
print(f"已将信号信息发送给机器人B: {token_symbol} - {strategy_name}")
return True
except Exception as e:
print(f"发送信息失败: {e}")
return False
# 保存信号ID(仅用于程序运行期间,退出时会删除)
def save_known_signals():
try:
with open(SIGNAL_FILE, 'w') as f:
json.dump(list(known_signal_ids), f)
print(f"已保存 {len(known_signal_ids)} 个已知信号ID")
except Exception as e:
print(f"保存已知信号ID失败: {e}")
# 删除信号ID文件
def delete_signal_file():
try:
if os.path.exists(SIGNAL_FILE):
os.remove(SIGNAL_FILE)
print(f"已删除信号ID文件: {SIGNAL_FILE}")
except Exception as e:
print(f"删除信号ID文件失败: {e}")
# 初始化函数:获取当前所有信号但不发送通知
async def initialize_signal_database():
print("开始初始化信号数据库...")
global known_signal_ids
# 清空已知信号集合
known_signal_ids = set()
# 获取频道列表数据
data = get_channel_list()
if not data:
print("获取数据失败,初始化失败")
return False
# 提取所有当前信号的ID
if 'data' in data and 'results' in data['data']:
for result in data['data']['results']:
if 'meta' in result and 'id' in result['meta']:
signal_id = result['meta']['id']
known_signal_ids.add(signal_id)
print(f"初始化完成,已记录 {len(known_signal_ids)} 个现有信号")
# 发送初始化完成通知
try:
await client.send_message(BOT_B_USERNAME, f"🔄 DeBot.ai信号监控已初始化完成,现在开始监控新信号(仅关注{TOKEN_AGE_LIMIT_HOURS}小时内创建的代币)")
print("已发送初始化完成通知")
except Exception as e:
print(f"发送初始化通知失败: {e}")
return True
# 监控新信号并发送给机器人
async def monitor_new_signals():
print(f"开始监控DeBot.ai新信号(仅关注{TOKEN_AGE_LIMIT_HOURS}小时内创建的代币)...")
# 初始化信号数据库
success = await initialize_signal_database()
if not success:
print("初始化失败,退出监控")
return
# 发送监控开始通知
try:
await client.send_message(BOT_B_USERNAME, "🤖 DeBot.ai新信号监控已启动")
print("已发送监控启动通知")
except Exception as e:
print(f"发送启动通知失败: {e}")
while True:
try:
# 获取频道列表数据
data = get_channel_list()
if not data:
print("获取数据失败,等待重试...")
time.sleep(10)
continue
# 检查是否有新信号
if 'data' in data and 'results' in data['data']:
new_signals_found = False
# 提取代币信息
token_info = {}
if 'data' in data and 'meta' in data['data'] and 'tokens' in data['data']['meta']:
for token_address, info in data['data']['meta']['tokens'].items():
token_info[token_address] = info
for result in data['data']['results']:
if 'meta' in result and 'id' in result['meta']:
signal_id = result['meta']['id']
# 如果这是一个新的信号ID
if signal_id not in known_signal_ids:
new_signals_found = True
# 提取信号信息
strategy_name = result['meta'].get('strategy_info', {}).get('group_name', 'Unknown')
create_time = result['meta'].get('create_time', 0)
# 提取代币信息
if 'monitor_data' in result and 'record_data' in result['monitor_data']:
record_data = result['monitor_data']['record_data']
token_address = record_data.get('token', '')
token_symbol = record_data.get('token_symbol', '')
if token_address and token_symbol:
# 获取代币创建时间
token_creation_time = None
if token_address in token_info:
token_creation_time = token_info[token_address].get('creation_timestamp', 0)
# 检查代币是否是最近24小时内创建的
if token_creation_time and is_token_new(token_creation_time):
# 发送信息给两个机器人
await send_signal_to_bots(token_address, token_symbol, strategy_name, token_creation_time)
# 记录到日志
signal_time_str = datetime.fromtimestamp(create_time).strftime('%Y-%m-%d %H:%M:%S')
token_time_str = datetime.fromtimestamp(token_creation_time).strftime('%Y-%m-%d %H:%M:%S')
hours_old = (time.time() - token_creation_time) / 3600
print(f"发现新信号: {signal_id} - {token_symbol} - {strategy_name}")
print(f" 信号时间: {signal_time_str}")
print(f" 代币创建: {token_time_str} ({hours_old:.1f}小时前)")
else:
# 如果代币不是最近24小时内创建的,记录但不发送
if token_creation_time:
hours_old = (time.time() - token_creation_time) / 3600
print(f"忽略旧代币信号: {token_symbol} - 创建于{hours_old:.1f}小时前")
else:
print(f"忽略未知创建时间的代币信号: {token_symbol}")
# 添加到已知信号集合
known_signal_ids.add(signal_id)
# 如果发现了新信号,保存已知信号列表(仅用于程序运行期间)
if new_signals_found:
save_known_signals()
# 等待一段时间再次检查
time.sleep(5)
except Exception as e:
print(f"监控新信号出错: {e}")
time.sleep(30) # 出错后等待30秒再试
# 设置退出处理函数
def exit_handler(signum, frame):
print("程序正在退出...")
delete_signal_file() # 删除信号ID文件
print("清理完成,退出程序")
exit(0)
# 主函数
async def main():
# 注册信号处理器,确保在程序退出时删除文件
signal.signal(signal.SIGINT, exit_handler) # Ctrl+C
signal.signal(signal.SIGTERM, exit_handler) # 终止信号
# 启动时删除旧的信号ID文件(如果存在)
delete_signal_file()
# 连接到Telegram
print("正在连接到Telegram...")
await client.start()
if not await client.is_user_authorized():
# 如果未登录,则请求登录
await client.send_code_request(PHONE)
print(f"请输入发送到 {PHONE} 的验证码:")
code = input()
await client.sign_in(PHONE, code)
print("已成功登录Telegram")
# 开始监控
await monitor_new_signals()
# 运行主函数
if __name__ == "__main__":
try:
import asyncio
asyncio.run(main())
except KeyboardInterrupt:
print("程序被手动停止")
delete_signal_file() # 确保在手动停止时也删除文件
Comments NOTHING