WWIPad4.1
    WWIPad4.1
    • 搭建指南
    • 监听消息
    • 免费计划
    • WebSocket事件
    • 实战-货币报价机器人
    • 实战-语音报时机器人
    • 实战-快速对接ChatGPT
    • 登录接口
      • 唤醒登录
        GET
      • 获取二维码
        GET
    • 发送消息
      • 发送文字
        POST
      • 发送图片
        POST
      • 发送语音
        POST
      • A T某人
        POST
      • 万能转发
        POST
    • 上传接口
      • 上传图片
        POST
      • 上传语音
        POST
    • 下载接口
      • 下载文件
        POST
    • 资料接口
      • 获取群成员资料
        POST
      • 获取个 人资料
        POST
    • 其他接口
      • 短信登陆
        POST
      • 提交数字验证
        POST
      • 获取链接授权
        POST
      • 登录企业后台
        POST
      • 扫码登陆PC
        POST
      • 获取数字验证
        POST
      • 获取企业列表
        POST
      • 登录企业
        POST

    监听消息

    通过WebSokcet监听消息
    监听地址
    ws://10.10.88.88:8888/ws?Token=xxxxx
    例:ws://10.10.88.88:8888/ws?Token=849909cfd662cf00b53b845d405f741f612758ec
    以下用Python演示链接WebSocket

    给机器人发送ding 机器人回复dong

     # WS client example
    import asyncio
    import json
    import random
    import requests
    import websockets
    
    SERCIVE_TOKEN = "849909cfd662cf00b53b845d405f741f612758ec"
    SERCIVE_HOST = "127.0.0.1:8888"
    BOT_UID = 0
    # 上传语音
    def new_CdnUpload(Uid, ToUserId, MusicUrl):
        url = "http://{}/v1/WeWork/CdnUpload?Uid={}&Token={}".format(
            SERCIVE_HOST, Uid, SERCIVE_TOKEN
        )
        payload = {"FileType": 5, "ToUserId": ToUserId, "FilePath": MusicUrl}
        re_data = requests.post(url, json=payload)
        return re_data.json()
    
    
    # 发送语音
    def new_SendMsg(Uid, ToUserId, ToUserType, CdnInfo):
        url = "http://{}/v1/WeWork/SendMsg?Uid={}&Token={}".format(
            SERCIVE_HOST, Uid, SERCIVE_TOKEN
        )
        payload = {
            "MsgType": 16,
            "ToUserId": ToUserId,
            "ToUserType": ToUserType,
            "CdnInfo": CdnInfo,
        }
        re_data = requests.post(url, json=payload)
        return re_data.json()
    
    
    # 发送文字
    def new_SendTextMsg(Uid, ToUserId, ToUserType, Content):
        url = "http://{}/v1/WeWork/SendMsg?Uid={}&Token={}".format(
            SERCIVE_HOST, Uid, SERCIVE_TOKEN
        )
        payload = {
            "MsgType": 0,
            "ToUserId": ToUserId,
            "ToUserType": ToUserType,
            "Content": Content,
        }
        re_data = requests.post(url, json=payload)
        return re_data.json()
    
    
    # 获取群成员
    def new_GetGroupUsers(Uid, Gid):
        url = "http://{}/v1/WeWork/Group/MemberList?Uid={}&Token={}".format(
            SERCIVE_HOST, Uid, SERCIVE_TOKEN
        )
        payload = {"Gid": Gid}
        re_data = requests.post(url, json=payload)
        return re_data.json()
    
    
    async def Wsdemo():
        uri = "ws://{}/ws?Token={}".format(SERCIVE_HOST, SERCIVE_TOKEN)
        try:
            async with websockets.connect(uri) as websocket:
                while True:
                    greeting = await websocket.recv()
                    EventJson = json.loads(greeting)
                    EventName = EventJson["CurrentPacket"]["EventName"]
                    EventData = EventJson["CurrentPacket"]["EventData"]
                    print(f"< {greeting} {EventName}")
                    if EventName == "ON_EVENT_TOKEN_EXPIRED":  # Token过期关闭WebSocket
                        print(f"< {greeting} {EventName}")
                        return
                    if EventName == "ON_EVENT_NEW_MSG":
                        # print(f"< {greeting} {EventName}")
                        NewAddMsg = EventData
                        ToUserId = NewAddMsg["ToUserId"]
                        ToUserType = NewAddMsg["ToUserType"]
                        if NewAddMsg["MsgType"] == 0 or NewAddMsg["MsgType"] == 2:
                            if "ding" in NewAddMsg["Content"]:
                                new_SendTextMsg(
                                    机器人Uid, ToUserId, ToUserType, "dong"
                                )
    
        except:
            #断线重连
            t = random.randint(5, 8)
            print(f"< 超时重连中... { t}")
            await asyncio.sleep(t)
            await Wsdemo()
    
    
    asyncio.get_event_loop().run_until_complete(Wsdemo())
    
    
    `
    修改于 2022-04-12 07:51:09
    上一页
    搭建指南
    下一页
    免费计划
    Built with