实战-货币报价机器人
100行代码实现一个报价机器人
开箱即用的一款免费报价机器人🤖️不足100行代码即可实现报价~废话不多说 先上效果图在上代码
# WS client example
import asyncio
from asyncio.proactor_events import _ProactorDuplexPipeTransport
import json
import os
import random
import time
import requests
import websockets
SERCIVE_TOKEN = ""
SERCIVE_HOST = "10.10.88.88:8888"
def GetCoinCode(code):
url = "https://dncapi.fxhapp.com/api/search/websearch?webp=1&page=1&exchange_page=1&pagesize=100&code={}&wallet_page=1&token=".format(
code)
re_data = requests.get(url)
re_json = re_data.json()
code = code.upper()
if re_json['code'] == 200:
for coin in re_json['coinlist']:
if coin['symbol'] == code:
return coin['coincode']
return None
def GetCoinInfo(coincode):
url = 'https://dncapi.feixiaohao.info/api/v1/coin/market/{}'.format(
coincode)
re_data = requests.get(url)
re_json = re_data.json()
if re_json['code'] == 200:
coindata = re_json['data']
return "{}\n现价{}$\n涨幅{}%\n24H高{}$\n24H低{}%\n换手{}%\n".format(coindata["name"], coindata['price'], coindata['changerate'], coindata['high'], coindata['low'], coindata["turn_over"])
return None
# 发送文字
def new_SendTextMsg(Uid, ToUserId, ToUserType, Content):
url = "http://{}/v1/WeWork/SendMsg?Uid={}&Token={}".format(
SERCIVE_HOST, Uid, SERCIVE_TOKEN
)
payload = {
"MsgType": 0,
"ToUserId": ToUserId,
"ToUserType": ToUserType,
"Content": Content,
}
re_data = requests.post(url, json=payload)
return re_data.json()
async def Wsdemo():
uri = "ws://{}/ws?Token={}".format(SERCIVE_HOST, SERCIVE_TOKEN)
try:
async with websockets.connect(uri) as websocket:
while True:
greeting = await websocket.recv()
EventJson = json.loads(greeting)
EventName = EventJson["CurrentPacket"]["EventName"]
EventData = EventJson["CurrentPacket"]["EventData"]
UserID = EventJson["CurrentUser"]["UserID"]
print(f"< {greeting} {EventName}")
if EventName == "ON_EVENT_TOKEN_EXPIRED": # Token过期关闭WebSocket
print(f"< {greeting} {EventName}")
return
if EventName == "ON_EVENT_NEW_MSG":
# print(f"< {greeting} {EventName}")
NewAddMsg = EventData
ToUserId = NewAddMsg["ToUserId"]
ToUserType = NewAddMsg["ToUserType"]
FromUserId = NewAddMsg["FromUserId"]
if NewAddMsg["MsgType"] == 0 or NewAddMsg["MsgType"] == 2:
if "ding" in NewAddMsg["Content"]:
new_SendTextMsg(
机器人Uid, ToUserId, ToUserType, "dong"
)
if "查" in NewAddMsg["Content"]:
code = NewAddMsg["Content"].replace("查", "")
code = GetCoinCode(code)
if code != None:
coin_info = GetCoinInfo(code)
if coin_info != None:
print(coin_info)
new_SendTextMsg(
机器人Uid, ToUserId, ToUserType, coin_info
)
continue
else:
continue
except Exception as e:
# 断线重连
t = random.randint(5, 8)
print(f"< 超时重连中... { t}", e)
await asyncio.sleep(t)
await Wsdemo()
asyncio.get_event_loop().run_until_complete(Wsdemo())
修改于 2022-04-12 09:37:40