🔮 功能🔮
解决痛点:内网环境中反向链接不能调用WebApi的情况
例如 发送消息
结构如下
{
"ReqId": 123123, //标记🏷️
"BotUin":"123456789",
"CgiCmd": "MessageSvc.PbSendMsg",
"CgiRequest": {
"ToUin": 1234455667,
"ToType": 2,
"Content": "你好"
}
}
发送消息返回
{
"CgiBaseResponse": {
"ErrMsg": "",
"Ret": 0
},
"ReqId": 123123,//🏷️标记
"ResponseData": {
"MsgSeq": 11073,
"MsgTime": 1697271309
}
}
其他功能包的调用同理
信息
# WS client example
import asyncio
import json
import random
import requests
import websockets
# websocket client
SERCIVE_HOST = "127.0.0.1:8086"
async def Wsdemo():
uri = "ws://{}/ws".format(SERCIVE_HOST)
try:
async with websockets.connect(uri) as websocket:
while True:
greeting = await websocket.recv()
print(f"<{greeting}")
EventJson = json.loads(greeting)
if "CurrentPacket" in EventJson:
EventName = EventJson["CurrentPacket"]["EventName"]
EventData = EventJson["CurrentPacket"]["EventData"]
print(f"<{EventName}")
if EventData["MsgHead"]["FromType"] == 2 and EventData["MsgHead"]["MsgType"] == 82:
if "复读机" in EventData["MsgBody"]["Content"]:
text = EventData["MsgBody"]["Content"].replace(
"复读机", "")
api_json = {"ReqId": 123123, "BotUin": str(EventJson["CurrentQQ"]), "CgiCmd": "MessageSvc.PbSendMsg", "CgiRequest": {"ToUin": EventData["MsgHead"]["FromUin"], "ToType": EventData["MsgHead"]["FromType"],
"Content": text}}
#在原有的WeApi提交的Json中增加ReqId字段标记即可 随机整数
await websocket.send(json.dumps(api_json))
if "ReqId" in EventJson:
print(f"<发送消息返回{EventJson}")
except Exception as e:
# 断线重连
t = random.randint(5, 8)
print(f"< 超时重连中... { t}", e)
await asyncio.sleep(t)
await Wsdemo()
asyncio.get_event_loop().run_until_complete(Wsdemo())
# websocket server
# async def main_logic(websocket, path):
# while True:
# greeting = await websocket.recv()
# EventJson = json.loads(greeting)
# EventName = EventJson["CurrentPacket"]["EventName"]
# EventData = EventJson["CurrentPacket"]["EventData"]
# print(f"<{EventName} {greeting}")
# start_server = websockets.serve(main_logic, '127.0.0.1', 5678)
# asyncio.get_event_loop().run_until_complete(start_server)
# asyncio.get_event_loop().run_forever()