WebSocketServiceImpl.java 4.04 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
package com.zehong.system.service.impl;

import com.zehong.system.domain.WebSocketBean;
import com.zehong.system.service.WebSocketServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.web.servlet.server.Session;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
 * <基于javax websocket通讯>
 * <各个方法的参数都是可以根据项目的实际情况改的>
 * @author wzh
 * @version 2018-07-08 17:11
 * @see [相关类/方法] (可选)
 **/
@ServerEndpoint(value = "/websocketServer")
@Component("webSocketService")
public class WebSocketServiceImpl implements WebSocketServer {

    private Logger log = LoggerFactory.getLogger(WebSocketServiceImpl.class);

    /**
     * 错误最大重试次数
     */
    private static final int MAX_ERROR_NUM = 10;

    /**
     * 用来存放每个客户端对应的webSocket对象。
     */
    private static Map<String,WebSocketBean> webSocketInfo;

    static
    {
        // concurrent包的线程安全map
        webSocketInfo = new ConcurrentHashMap<>();
    }

    @OnOpen
    @Override
    public void onOpen(javax.websocket.Session session, EndpointConfig config) {

        // 如果是session没有激活的情况,就是没有请求获取或session,这里可能会取出空,需要实际业务处理
       /* HttpSession httpSession= (HttpSession) config.getUserProperties().get(HttpSession.class.getName());
        if(httpSession != null)
        {
            log.info("获取到httpsession" + httpSession.getId());
        }else {
            log.error("未获取到httpsession");
        }*/

        // 连接成功当前对象放入websocket对象集合
        WebSocketBean bean = new WebSocketBean();
        bean.setSession(session);
        webSocketInfo.put(session.getId(),bean);

        log.info("客户端连接服务器session id :"+session.getId()+",当前连接数:" + webSocketInfo.size());
    }

    @OnClose
    @Override
    public void onClose(javax.websocket.Session session) {

        // 客户端断开连接移除websocket对象
        webSocketInfo.remove(session.getId());
        log.info("客户端断开连接,当前连接数:" + webSocketInfo.size());

    }

    @OnMessage
    @Override
    public void onMessage(javax.websocket.Session session, String message) {

        log.info("客户端 session id: "+session.getId()+",消息:" + message);

        // 此方法为客户端给服务器发送消息后进行的处理,可以根据业务自己处理,这里返回页面
        sendMessage(session, "服务端返回" + message);

    }

    @OnError
    @Override
    public void onError(javax.websocket.Session session, Throwable throwable) {

        log.error("发生错误"+ throwable.getMessage(),throwable);
    }

    @Override
    public void sendMessage(javax.websocket.Session session, String message) {

        try
        {
            // 发送消息
            session.getBasicRemote().sendText(message);

            // 清空错误计数
            webSocketInfo.get(session.getId()).cleanErrorNum();
        }
        catch (Exception e)
        {
            log.error("发送消息失败"+ e.getMessage(),e);
            int errorNum = webSocketInfo.get(session.getId()).getErroerLinkCount();

            // 小于最大重试次数重发
            if(errorNum <= MAX_ERROR_NUM)
            {
                sendMessage(session, message);
            }
            else{
                log.error("发送消息失败超过最大次数");
                // 清空错误计数
                webSocketInfo.get(session.getId()).cleanErrorNum();
            }
        }
    }

    @Override
    public void batchSendMessage(String message) {
        Set<Map.Entry<String, WebSocketBean>> set = webSocketInfo.entrySet();
        for (Map.Entry<String, WebSocketBean> map : set)
        {
            sendMessage(map.getValue().getSession(),message);
        }
    }
}