方法一:
""" pip install ws4py """ import json from ws4py.client.threadedclient import WebSocketClient class CG_Client(WebSocketClient): def opened(self): req = '{"event":"subscribe", "channel":"eth_usdt.deep"}' self.send(req) def closed(self, code, reason=None): print("Closed down:", code, reason) def received_message(self, resp): resp = json.loads(str(resp)) data = resp['data'] if type(data) is dict: ask = data['asks'][0] print('Ask:', ask) bid = data['bids'][0] print('Bid:', bid) if __name__ == '__main__': ws = None try: ws = CG_Client('wss://i.cg.net/wi/ws') ws.connect() ws.run_forever() except KeyboardInterrupt: ws.close()
方法二(比较通用实现websocket):
# coding=utf-8 import ssl url="localhost:8016/official-website-backend/websocket?" import wave,os p="2019-05-06_11-42-06_8611.wav" path=os.path.join(os.getcwd(),p) files=wave.open(path,'rb') data=files.readframes(1024) print("data type is %s"%type(data)) import json import websocket def on_message(ws, message): # 服务器有数据更新时,主动推送过来的数据 print(message) def on_error(ws, error): # 程序报错时,就会触发on_error事件 print(error) def on_close(ws): print("Connection closed ……") def on_open(ws): # 连接到服务器之后就会触发on_open事件,这里用于send数据 req=data print("发送文件type: %s"%type(data)) ws.send(req) if __name__ == "__main__": websocket.enableTrace(True) ws = websocket.WebSocketApp(url, on_message=on_message, on_error=on_error, on_close=on_close) ws.on_open = on_open ws.on_message=on_message ws.on_error=on_error ws.on_close=on_close ws.run_forever(ping_timeout=30)