WWIPad4.1
    WWIPad4.1
    • 搭建指南
    • 监听消息
    • 免费计划
    • WebSocket事件
    • 实战-货币报价机器人
    • 实战-语音报时机器人
    • 实战-快速对接ChatGPT
    • 登录接口
      • 唤醒登录
        GET
      • 获取二维码
        GET
    • 发送消息
      • 发送文字
        POST
      • 发送图片
        POST
      • 发送语音
        POST
      • A T某人
        POST
      • 万能转发
        POST
    • 上传接口
      • 上传图片
        POST
      • 上传语音
        POST
    • 下载接口
      • 下载文件
        POST
    • 资料接口
      • 获取群成员资料
        POST
      • 获取个 人资料
        POST
    • 其他接口
      • 短信登陆
        POST
      • 提交数字验证
        POST
      • 获取链接授权
        POST
      • 登录企业后台
        POST
      • 扫码登陆PC
        POST
      • 获取数字验证
        POST
      • 获取企业列表
        POST
      • 登录企业
        POST

    实战-货币报价机器人

    100行代码实现一个报价机器人

    开箱即用的一款免费报价机器人🤖️不足100行代码即可实现报价~废话不多说 先上效果图在上代码

    # WS client example
    import asyncio
    from asyncio.proactor_events import _ProactorDuplexPipeTransport
    import json
    import os
    import random
    import time
    import requests
    import websockets
    
    SERCIVE_TOKEN = ""
    SERCIVE_HOST = "10.10.88.88:8888"
    
    
    def GetCoinCode(code):
        url = "https://dncapi.fxhapp.com/api/search/websearch?webp=1&page=1&exchange_page=1&pagesize=100&code={}&wallet_page=1&token=".format(
            code)
        re_data = requests.get(url)
        re_json = re_data.json()
        code = code.upper()
        if re_json['code'] == 200:
            for coin in re_json['coinlist']:
                if coin['symbol'] == code:
                    return coin['coincode']
        return None
    
    def GetCoinInfo(coincode):
        url = 'https://dncapi.feixiaohao.info/api/v1/coin/market/{}'.format(
            coincode)
        re_data = requests.get(url)
        re_json = re_data.json()
    
        if re_json['code'] == 200:
            coindata = re_json['data']
            return "{}\n现价{}$\n涨幅{}%\n24H高{}$\n24H低{}%\n换手{}%\n".format(coindata["name"], coindata['price'], coindata['changerate'], coindata['high'], coindata['low'], coindata["turn_over"])
        return None
    
    # 发送文字
    def new_SendTextMsg(Uid, ToUserId, ToUserType, Content):
        url = "http://{}/v1/WeWork/SendMsg?Uid={}&Token={}".format(
            SERCIVE_HOST, Uid, SERCIVE_TOKEN
        )
        payload = {
            "MsgType": 0,
            "ToUserId": ToUserId,
            "ToUserType": ToUserType,
            "Content": Content,
        }
        re_data = requests.post(url, json=payload)
        return re_data.json()
    
    
    async def Wsdemo():
        uri = "ws://{}/ws?Token={}".format(SERCIVE_HOST, SERCIVE_TOKEN)
        try:
            async with websockets.connect(uri) as websocket:
                while True:
                    greeting = await websocket.recv()
                    EventJson = json.loads(greeting)
                    EventName = EventJson["CurrentPacket"]["EventName"]
                    EventData = EventJson["CurrentPacket"]["EventData"]
                    UserID = EventJson["CurrentUser"]["UserID"]
                    print(f"< {greeting} {EventName}")
                    if EventName == "ON_EVENT_TOKEN_EXPIRED":  # Token过期关闭WebSocket
                        print(f"< {greeting} {EventName}")
                        return
                    if EventName == "ON_EVENT_NEW_MSG":
                        # print(f"< {greeting} {EventName}")
                        NewAddMsg = EventData
                        ToUserId = NewAddMsg["ToUserId"]
                        ToUserType = NewAddMsg["ToUserType"]
                        FromUserId = NewAddMsg["FromUserId"]
                        if NewAddMsg["MsgType"] == 0 or NewAddMsg["MsgType"] == 2:
                            if "ding" in NewAddMsg["Content"]:
                                new_SendTextMsg(
                                    机器人Uid, ToUserId, ToUserType, "dong"
                                )
    
                            if "查" in NewAddMsg["Content"]:
                                code = NewAddMsg["Content"].replace("查", "")
                                code = GetCoinCode(code)
                                if code != None:
                                    coin_info = GetCoinInfo(code)
                                    if coin_info != None:
                                        print(coin_info)
                                        new_SendTextMsg(
                                            机器人Uid, ToUserId, ToUserType, coin_info
                                        )
                                        continue
    
                            else: 
                                continue
    
        except Exception as e:
            # 断线重连
            t = random.randint(5, 8)
            print(f"< 超时重连中... { t}", e)
            await asyncio.sleep(t)
            await Wsdemo()
    
    
    asyncio.get_event_loop().run_until_complete(Wsdemo())
    
    
    修改于 2022-04-12 09:37:40
    上一页
    WebSocket事件
    下一页
    实战-语音报时机器人
    Built with