注册 登录

Python示例

#!python3
# -*- coding:utf-8 -*-
import time
import websocket
import zlib

# 发送订阅
def on_open(ws):
    ws.send("all=lv2_600519,lv1_000001")

# 接收推送
def on_message(ws, message, type, flag):
    # 命令返回文本消息
    if type == websocket.ABNF.OPCODE_TEXT:
        print(time.strftime('%H:%M:%S', time.localtime(time.time())), "Text响应:", message)
    # 行情推送压缩二进制消息,在此解压缩
    if type == websocket.ABNF.OPCODE_BINARY:
        rb = zlib.decompress(message, -zlib.MAX_WBITS)
        print(time.strftime('%H:%M:%S', time.localtime(time.time())), "Binary响应:", rb.decode("utf-8"))

def on_error(ws, error):
    print(error)

def on_close(ws, code, msg):
    print(time.strftime('%H:%M:%S', time.localtime(time.time())), "连接已断开")

wsUrl = "ws://<服务器地址>/?token="
ws = websocket.WebSocketApp(wsUrl,
    on_open=on_open,
    on_data=on_message,
    on_error=on_error,
    on_close=on_close)
ws.run_forever()

Java示例

package com.client;
import java.net.URISyntaxException;

public class Main {
    public static void main(String[] args) throws URISyntaxException {
        String wsUrl = "ws://<服务器地址>/?token= ";
        Client fd = new Client(wsUrl);
        fd.connect();
    }
}

package com.client;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.zip.Inflater;
import java.util.zip.DataFormatException;

public class Client extends WebSocketClient {

    SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");

    public Client(String url) throws URISyntaxException {
        super(new URI(url));
    }

    @Override
    public void onOpen(ServerHandshake shake) {
        //发送订阅命令
        this.send("add=lv1_600519,lv2_600519");
    }

    /**
     * 命令返回文本消息
     */
    @Override
    public void onMessage(String paramString) {
        System.out.println(sdf.format(new Date()) + " Text响应:" + paramString);
    }

    @Override
    public void onClose(int paramInt, String paramString, boolean paramBoolean) {
        System.out.println("连接关闭");
    }

    @Override
    public void onError(Exception e) {
        System.out.println("连接异常" + e);
    }

    /**
     * 行情接收处理
     */
    @Override
    public void onMessage(ByteBuffer bytes) {
        super.onMessage(bytes);
        String s="";
        try {
            //二进制解压缩
            byte[] dec=decompress(bytes.array());
            s = new String(dec, "UTF-8");
        }catch (IOException e){
            System.err.println("Binary解析IO异常:"+e.getMessage());
            return;
        }
        catch (DataFormatException e){
            System.err.println("Binary解析格式异常:"+e.getMessage());
            return;
        }
        System.out.println(sdf.format(new Date()) + " Binary响应:" + s);
    }

    /**
     * 解压缩方法
     */
    public static byte[] decompress(byte[] compressedData) throws DataFormatException {
        Inflater inflater = new Inflater(true);
        inflater.setInput(compressedData);
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream(compressedData.length);
        byte[] buffer = new byte[1024];
        while (!inflater.finished()) {
            int count = inflater.inflate(buffer);
            outputStream.write(buffer, 0, count);
        }
        inflater.end();
        return outputStream.toByteArray();
    }
}

Go示例

package main
import (
    "bytes"
    "compress/flate"
    "github.com/gorilla/websocket"
    "log"
    "time"
)

func main() {
    //连接地址
    wsUrl := "ws://<服务器地址>/?token="
    conn, _, err := websocket.DefaultDialer.Dial(wsUrl, nil)
    if err != nil {
        log.Fatalln("连接错误:", err)
    }
    //接收协程
    go func() {
        receive(conn)
    }()

    //发送订阅
    cmd := "all=lv2_600519,lv1_000001"
    err = conn.WriteMessage(websocket.TextMessage, []byte(cmd))
    if err != nil {
        log.Fatalln("发送指令错误:", err)
    }
    log.Println("发送指令成功,等待接收")
    for {
        time.Sleep(time.Second)
    }
}

func receive(conn *websocket.Conn) {
    for {
        //阻塞接收
        messageType, rb, err := conn.ReadMessage()
        if err != nil {
            log.Fatalln("接收错误:", err)
            return
        }
        //文本消息
        if messageType == websocket.TextMessage {
            log.Println("Text响应:", string(rb))
        }
        //二进制消息
        if messageType == websocket.BinaryMessage {
            unZipByte := DeCompress(rb)
            log.Println("Binary推送:", string(unZipByte))
        }
    }
}

//解压缩方法
func DeCompress(b []byte) []byte {
    var buffer bytes.Buffer
    buffer.Write([]byte(b))
    reader := flate.NewReader(&buffer)
    var result bytes.Buffer
    result.ReadFrom(reader)
    reader.Close()
    return result.Bytes()
}

C/C++示例

#include <websocketpp/config/asio_no_tls_client.hpp>
#include <websocketpp/client.hpp>
#include <string>
#include <iostream>
#include <memory>
#include <assert.h>
#include <cstring>
#include "zlib.h"
#define CHUNK 16384
using websocketpp::lib::placeholders::_1;
using websocketpp::lib::placeholders::_2;
using websocketpp::lib::bind;
typedef websocketpp::client<websocketpp::config::asio_client> client;
typedef websocketpp::config::asio_client::message_type::ptr message_ptr;
int DecompressString(const char *in_str, size_t in_len, std::string &out_str);

/**
 * 接收处理
 */
void on_message(client *c, websocketpp::connection_hdl hdl, message_ptr msg) {
    //文本消息
    if (msg->get_opcode()==websocketpp::frame::opcode::text){
        std::cout <<"Text响应:"<<msg->get_payload().c_str()<< std::endl;
    }
    //二进制消息
    if (msg->get_opcode()==websocketpp::frame::opcode::binary){
        std::string tmp = "";
        std::string &out_decompress = tmp;
        DecompressString( msg->get_payload().c_str(), msg->get_payload().size(), out_decompress);
        std::cout <<"Binary响应:"<<out_decompress<< std::endl;
    }
}

/**
 * 连接处理
 */
void on_open(client *c, websocketpp::connection_hdl hdl) {
    //发送订阅指令
    c->send(hdl, "add=lv1_600519,lv2_600519", websocketpp::frame::opcode::text);
    std::cout << "连接成功" << std::endl;
}

int main(int argc, char *argv[]) {
    //服务地址。 注意:C++版本的地址 问号前需加斜杠
    std::string wsUrl = "ws://<服务器地址>/?token=<jvQuant token>";

    client c;
    //连接相关
    try {
        //debug日志开关
//        c.set_access_channels(websocketpp::log::alevel::all);
        c.clear_access_channels(websocketpp::log::alevel::all);
        c.init_asio();

        // 注册处理函数
        c.set_message_handler(bind(&on_message, &c, ::_1, ::_2));
        c.set_open_handler(bind(&on_open, &c, _1));

        websocketpp::lib::error_code ec;
        client::connection_ptr con = c.get_connection(wsUrl, ec);
        if (ec) {
            std::cout << "连接失败: " << ec.message() << std::endl;
            return 0;
        }
        c.connect(con);
        c.run();
    } catch (websocketpp::exception const &e) {
        std::cout << e.what() << std::endl;
    }
}

/**
 *解压缩方法
 */
int DecompressString(const char *in_str, size_t in_len, std::string &out_str) {
    if (!in_str)
        return Z_DATA_ERROR;
    int ret;
    unsigned have;
    z_stream strm;
    unsigned char out[CHUNK];
    strm.zalloc = Z_NULL;
    strm.zfree = Z_NULL;
    strm.opaque = Z_NULL;
    strm.avail_in = 0;
    strm.next_in = Z_NULL;
    ret = inflateInit2(&strm, -MAX_WBITS);
    if (ret != Z_OK)
        return ret;
    std::shared_ptr<z_stream> sp_strm(&strm, [](z_stream *strm) {
        (void) inflateEnd(strm);
    });
    const char *end = in_str + in_len;
    size_t pos_index = 0;
    size_t distance = 0;
    int flush = 0;
    do {
        distance = end - in_str;
        strm.avail_in = (distance >= CHUNK) ? CHUNK : distance;
        strm.next_in = (Bytef *) in_str;
        in_str += strm.avail_in;
        flush = (in_str == end) ? Z_FINISH : Z_NO_FLUSH;
        do {
            strm.avail_out = CHUNK;
            strm.next_out = out;
            ret = inflate(&strm, Z_NO_FLUSH);
            if (ret == Z_STREAM_ERROR)
                break;
            switch (ret) {
                case Z_NEED_DICT:
                    ret = Z_DATA_ERROR;
                case Z_DATA_ERROR:
                case Z_MEM_ERROR:
                    return ret;
            }
            have = CHUNK - strm.avail_out;
            out_str.append((const char *) out, have);
        } while (strm.avail_out == 0);
    } while (flush != Z_FINISH);
    return ret == Z_STREAM_END ? Z_OK : Z_DATA_ERROR;
}

Php示例

<?php

//需安装swoole扩展
use Swoole\Coroutine\Http\Client;
use function Swoole\Coroutine\run;

run(function () {
    //服务器地址
    $host = '<服务器地址>';
    //服务器端口
    $port = <服务器端口>;

    //连接
    $conn = new Client($host, $port);
    $conn->upgrade("/?token=<jvQuant token>");

    //发送订阅
    $conn->push("add=lv2_600519,lv1_000001");

    //开启接收协程
    go("receive", $conn);
});

function receive($client)
{
    while (true) {
        $data = $client->recv();
        $time = date("H:i:s");
        //解压缩
        @$zipStr = gzinflate($data->data);
        if ($zipStr) {
            echo "{$time} Binary推送:{$zipStr}\n";
        } else {
            echo "{$time} Text响应:{$data->data}\n";
        }
    }
}