7 Star 107 Fork 46

千里码工作室 / SpeechToText

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
demo.py 2.05 KB
一键复制 编辑 原始数据 按行查看 历史
yzd 提交于 2021-08-02 16:42 . feat: websocket 实时转写
import asyncio
import websockets
websocket_users = set()
# https://zhuanlan.zhihu.com/p/103070055
# 检测客户端权限,用户名密码通过才能退出循环
async def check_user_permit(websocket):
print("new websocket_users:", websocket)
websocket_users.add(websocket)
print("websocket_users list:", websocket_users)
while False:
recv_str = await websocket.recv()
cred_dict = recv_str.split(":")
if cred_dict[0] == "admin" and cred_dict[1] == "123456":
response_str = "Congratulation, you have connect with server..."
await websocket.send(response_str)
print("Password is ok...")
return True
else:
response_str = "Sorry, please input the username or password..."
print("Password is wrong...")
await websocket.send(response_str)
# 接收客户端消息并处理,这里只是简单把客户端发来的返回回去
async def recv_user_msg(websocket):
while True:
recv_text = await websocket.recv()
print("recv_text:", websocket.pong, recv_text)
response_text = f"Server return: {recv_text}"
print("response_text:", response_text)
await websocket.send(response_text)
# 服务器端主逻辑
async def run(websocket, path):
while True:
try:
await check_user_permit(websocket)
await recv_user_msg(websocket)
except websockets.ConnectionClosed:
print("ConnectionClosed...", path) # 链接断开
print("websocket_users old:", websocket_users)
websocket_users.remove(websocket)
print("websocket_users new:", websocket_users)
break
except websockets.InvalidState:
print("InvalidState...") # 无效状态
break
except Exception as e:
print("Exception:", e)
if __name__ == '__main__':
print("127.0.0.1:8181 websocket...")
asyncio.get_event_loop().run_until_complete(websockets.serve(run, "127.0.0.1", 8181))
asyncio.get_event_loop().run_forever()
Java
1
https://gitee.com/yzd_org/speechToText.git
git@gitee.com:yzd_org/speechToText.git
yzd_org
speechToText
SpeechToText
master

搜索帮助

53164aa7 5694891 3bd8fe86 5694891