当前位置: 首页> 技术文章> websocket python实现原理

websocket python实现原理

方法一:

""" pip install ws4py """
import json
from ws4py.client.threadedclient import WebSocketClient
  
class CG_Client(WebSocketClient):
  
    def opened(self):
        req = '{"event":"subscribe", "channel":"eth_usdt.deep"}'
        self.send(req)
  
    def closed(self, code, reason=None):
        print("Closed down:", code, reason)
  
    def received_message(self, resp):
        resp = json.loads(str(resp))
        data = resp['data']
        if type(data) is dict:
            ask = data['asks'][0]
            print('Ask:', ask)
            bid = data['bids'][0]
            print('Bid:', bid)
  
  
if __name__ == '__main__':
    ws = None
    try:
        ws = CG_Client('wss://i.cg.net/wi/ws')
        ws.connect()
        ws.run_forever()
    except KeyboardInterrupt:
        ws.close()

方法二(比较通用实现websocket):

# coding=utf-8
import ssl
url="localhost:8016/official-website-backend/websocket?"
import wave,os
p="2019-05-06_11-42-06_8611.wav"
path=os.path.join(os.getcwd(),p)
files=wave.open(path,'rb')
data=files.readframes(1024)
print("data type is %s"%type(data))
import json
import websocket

def on_message(ws, message):  # 服务器有数据更新时,主动推送过来的数据
    print(message)


def on_error(ws, error):  # 程序报错时,就会触发on_error事件
    print(error)


def on_close(ws):
    print("Connection closed ……")


def on_open(ws):  # 连接到服务器之后就会触发on_open事件,这里用于send数据
    req=data
    print("发送文件type: %s"%type(data))
    ws.send(req)


if __name__ == "__main__":
    websocket.enableTrace(True)
    ws = websocket.WebSocketApp(url,
                                on_message=on_message,
                                on_error=on_error,
                                on_close=on_close)
    ws.on_open = on_open
    ws.on_message=on_message
    ws.on_error=on_error
    ws.on_close=on_close
    ws.run_forever(ping_timeout=30)


上一篇: 使用python同时替换json多个指定key的value

下一篇: 软件测试之手工测试人员如何转测试开发?

QQ技术交流群

多测师官方学习交流
556733550

加入群聊