Commit 6664a2ce authored by 耿迪迪's avatar 耿迪迪

websocket接口 gengdidi

parent ea47958b
package com.zehong.web.controller.webSocket;
import com.zehong.system.service.WebSocketServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/websocket")
public class WebSocketController {
@Autowired
private WebSocketServer webSocketServer;
@GetMapping("/send")
public void send(){
webSocketServer.batchSendMessage("dafdfafdas=======");
}
}
......@@ -119,6 +119,12 @@
<artifactId>javax.servlet-api</artifactId>
</dependency>
<!-- websocket 支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.zehong.common.config;
import javax.servlet.http.HttpSession;
import javax.websocket.HandshakeResponse;
import javax.websocket.server.HandshakeRequest;
import javax.websocket.server.ServerEndpointConfig;
import javax.websocket.server.ServerEndpointConfig.Configurator;
/**
* <websocket获取HttpSession>
* <功能详细描述>
* @author wzh
* @version 2018-07-10 01:02
* @see [相关类/方法] (可选)
**/
public class GetHttpSessionConfigurator extends Configurator{
@Override
public void modifyHandshake(ServerEndpointConfig sec,HandshakeRequest request, HandshakeResponse response) {
HttpSession httpSession=(HttpSession) request.getHttpSession();
sec.getUserProperties().put(HttpSession.class.getName(),httpSession);
}
}
package com.zehong.common.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
......@@ -97,7 +97,7 @@ public class SecurityConfig extends WebSecurityConfigurerAdapter
// 过滤请求
.authorizeRequests()
// 对于登录login 验证码captchaImage 允许匿名访问
.antMatchers("/login", "/captchaImage").anonymous()
.antMatchers("/login", "/captchaImage","/websocket/**","/websocketServer").anonymous()
.antMatchers(
HttpMethod.GET,
"/*.html",
......
package com.zehong.system.domain;
import javax.websocket.Session;
import java.util.concurrent.atomic.AtomicInteger;
/**
* <websocket信息对象>
* <用于存储secket连接信息>
* @author wzh
* @version 2018-07-08 18:49
* @see [相关类/方法] (可选)
**/
public class WebSocketBean {
/**
* 连接session对象
*/
private Session session;
/**
* 连接错误次数
*/
private AtomicInteger erroerLinkCount = new AtomicInteger(0);
public int getErroerLinkCount() {
// 线程安全,以原子方式将当前值加1,注意:这里返回的是自增前的值
return erroerLinkCount.getAndIncrement();
}
public void cleanErrorNum()
{
// 清空计数
erroerLinkCount = new AtomicInteger(0);
}
public Session getSession() {
return session;
}
public void setSession(Session session) {
this.session = session;
}
}
\ No newline at end of file
package com.zehong.system.service;
import javax.websocket.EndpointConfig;
import javax.websocket.Session;
/**
* <基于javax websocket通讯>
* <功能详细描述>
* @author wzh
* @version 2018-07-08 17:11
* @see [相关类/方法] (可选)
**/
public interface WebSocketServer {
/**
* 连接建立成功调用的方法
* @param session session 对象
*/
public void onOpen(Session session,EndpointConfig config);
/**
* 断开连接方法
*/
public void onClose(Session session);
/**
* 收到客户端消息后调用的方法
* @param session session 对象
* @param message 返回客户端的消息
*/
public void onMessage(Session session, String message);
/**
* 发生异常时触发的方法
* @param session session 对象
* @param throwable 抛出的异常
*/
public void onError(Session session,Throwable throwable);
/**
* 向单个客户端发送消息
* @param session session 对象
* @param message 发送给客户端的消息
*/
public void sendMessage(Session session, String message);
/**
* 向所有在线用户群发消息
* @param message 发送给客户端的消息
*/
public void batchSendMessage(String message);
}
\ No newline at end of file
package com.zehong.system.service.impl;
import com.zehong.common.config.GetHttpSessionConfigurator;
import com.zehong.system.domain.WebSocketBean;
import com.zehong.system.service.WebSocketServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.servlet.http.HttpSession;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* <基于javax websocket通讯>
* <各个方法的参数都是可以根据项目的实际情况改的>
* @author wzh
* @version 2018-07-08 17:11
* @see [相关类/方法] (可选)
**/
@ServerEndpoint(value = "/websocketServer")
@Component("webSocketService")
public class WebSocketServiceImpl implements WebSocketServer {
private Logger log = LoggerFactory.getLogger(WebSocketServiceImpl.class);
/**
* 错误最大重试次数
*/
private static final int MAX_ERROR_NUM = 10;
/**
* 用来存放每个客户端对应的webSocket对象。
*/
private static Map<String,WebSocketBean> webSocketInfo;
static
{
// concurrent包的线程安全map
webSocketInfo = new ConcurrentHashMap<>();
}
@OnOpen
@Override
public void onOpen(Session session,EndpointConfig config) {
// 如果是session没有激活的情况,就是没有请求获取或session,这里可能会取出空,需要实际业务处理
/* HttpSession httpSession= (HttpSession) config.getUserProperties().get(HttpSession.class.getName());
if(httpSession != null)
{
log.info("获取到httpsession" + httpSession.getId());
}else {
log.error("未获取到httpsession");
}*/
// 连接成功当前对象放入websocket对象集合
WebSocketBean bean = new WebSocketBean();
bean.setSession(session);
webSocketInfo.put(session.getId(),bean);
log.info("客户端连接服务器session id :"+session.getId()+",当前连接数:" + webSocketInfo.size());
}
@OnClose
@Override
public void onClose(Session session) {
// 客户端断开连接移除websocket对象
webSocketInfo.remove(session.getId());
log.info("客户端断开连接,当前连接数:" + webSocketInfo.size());
}
@OnMessage
@Override
public void onMessage(Session session, String message) {
log.info("客户端 session id: "+session.getId()+",消息:" + message);
// 此方法为客户端给服务器发送消息后进行的处理,可以根据业务自己处理,这里返回页面
sendMessage(session, "服务端返回" + message);
}
@OnError
@Override
public void onError(Session session, Throwable throwable) {
log.error("发生错误"+ throwable.getMessage(),throwable);
}
@Override
public void sendMessage(Session session, String message) {
try
{
// 发送消息
session.getBasicRemote().sendText(message);
// 清空错误计数
webSocketInfo.get(session.getId()).cleanErrorNum();
}
catch (Exception e)
{
log.error("发送消息失败"+ e.getMessage(),e);
int errorNum = webSocketInfo.get(session.getId()).getErroerLinkCount();
// 小于最大重试次数重发
if(errorNum <= MAX_ERROR_NUM)
{
sendMessage(session, message);
}
else{
log.error("发送消息失败超过最大次数");
// 清空错误计数
webSocketInfo.get(session.getId()).cleanErrorNum();
}
}
}
@Override
public void batchSendMessage(String message) {
Set<Map.Entry<String, WebSocketBean>> set = webSocketInfo.entrySet();
for (Map.Entry<String, WebSocketBean> map : set)
{
sendMessage(map.getValue().getSession(),message);
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment