使用webSocket做指定用户推送以及群发推送消息
pom.xml 依赖
<dependency>
<groupId>javax.websocket</groupId>
<artifactId>javax.websocket-api</artifactId>
<version>1.1</version>
<scope>provided</scope>
</dependency>
前端js实现
$(function () {
if ($.websocket) {
var ishttps = 'https:' == document.location.protocol ? true : false;
var host = location.host;
if (ishttps) {
host = "wss://" + host + "/test-websocket-web/websocket/";
} else {
host = "ws://" + host + "/test-websocket-web/websocket/";
}
try { userObj = JSON.parse(sessionStorage.getItem('UserConfig')); } catch (e) { }
if (null == userObj){
return;
}
$.websocket.open({
host: host + userObj.username,
reconnect: true,
callback: function (result) {
winShowNotice({ title: "您有新消息!", body: result })
}
});
}
});
(function ($) {
$.extend({
websocket: {
_this: null,
_initialized: false,
init: function (options) {
if (!this.isSupported()) {
// console.error('Not support websocket');
return;
}
var op = $.extend({
callback: function () {
},
host: null,
reconnect: false
}, options);
if (!op.host) {
// console.error("初始化WebSocket失败,无效的请求地址");
return;
}
try {
this._this = new WebSocket(op.host);
} catch (error) {
return;
}
this._initialized = true;
//连接发生错误的回调方法
this._this.onerror = function () {
// console.log("与服务器连接失败...");
};
//连接成功建立的回调方法
this._this.onopen = function (event) {
// console.log("与服务器连接成功...");
};
//接收到消息的回调方法
this._this.onmessage = function (event) {
// dwz.notification.show({notification: event.data});
op.callback(event.data);
// console.log("接收到服务器端推送的消息:" + event.data);
};
//连接关闭的回调方法
this._this.onclose = function () {
$.websocket._initialized = false;
// console.log("已关闭当前链接");
if (op.reconnect) {
// 自动重连
setTimeout(function () {
$.websocket.open(op);
}, 5000);
}
}
},
open: function (options) {
var op = $.extend({
callback: function () {
},
host: null,
reconnect: false
}, options);
if (this._initialized) {
this.close();
}
this.init(options);
//监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
window.onbeforeunload = function () {
// console.log("窗口关闭了");
$.websocket.close();
}
},
isSupported: function () {
return 'WebSocket' in window;
},
send: function (message) {
if (!this._this) {
return;
}
this._this.send(message);
},
close: function () {
if (!this._this) {
return;
}
this._this.close();
}
}
});
})(jQuery);
后端服务实现
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
@ServerEndpoint("/websocket/{userCode}")
public class WebSocketServer {
//静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
private static int onlineCount = 0;
//concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();
//user
private String currentUser;
//与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(@PathParam("userCode") String userCode, Session session) {
int cnt = 0;
for (WebSocketServer socketServer : webSocketSet){
if (socketServer.currentUser.equals(userCode)) {
cnt++;
}
}
if (cnt == 0) {
this.currentUser = userCode;
this.session = session;
webSocketSet.add(this); //加入set中
addOnlineCount(); //在线数加1
System.out.println("有新连接加入!当前在线人数为" + getOnlineCount() + ",userCode:" + userCode);
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
webSocketSet.remove(this); //从set中删除
subOnlineCount(); //在线数减1
System.out.println("有一连接关闭!当前在线人数为" + getOnlineCount());
}
/**
* 收到客户端消息后调用的方法
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) {
System.out.println("来自客户端的消息:" + message);
}
/**
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
System.out.println("发生错误");
}
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
/**
* 发送给指定用户
*
* @param message
* @param userCode
* @throws IOException
*/
public static void sendMessageTo(String message, String userCode) throws IOException {
for (WebSocketServer item : webSocketSet) {
if (item.currentUser.equals(userCode)) {
item.session.getBasicRemote().sendText(message);
}
}
}
/**
* 群发自定义消息
*/
public static void sendInfo(String message) throws IOException {
for (WebSocketServer item : webSocketSet) {
try {
item.sendMessage(message);
} catch (IOException e) {
continue;
}
}
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
public static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
}
调用方式
sendToUser
SysAdmin user = SessionUtil.getCurrentUser(request);
try {
WebSocketServer.sendMessageTo(msg, user.getName());
} catch (IOException e) {
LogUtil.error(e.getStackTrace());
}
sendAll
try {
WebSocketServer.sendInfo(msg);
} catch (IOException e) {
LogUtil.error(e.getStackTrace());
}
遇到的问题
1.Error during WebSocket handshake: Unexpected response code: 404
解决方案:
这个问题是因为项目是有nginx做代理;
需要在HTTPS 域名位置中加上如下配置:
location /{
。。。
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
原因:
我们都知道HTTP协议是无状态的,而WebSocket协议是建立一个TCP长链接。并且WebSocket协议的握手和HTTP是兼容的。它使用HTTP的Upgrade协议头将Http链接升级到WebSocket链接,这个特性使WebSocket应用程序可以很容易的应用到现有的基础设施。例如,WebSocket应用可以使用标准的80和443 HTTP端口,因此可以通过现有的防火墙设施。下面两图是HTTP和WebSocket的通信协议对比:
WebSocket:
HTTP:
WebSocket应用程序会在客户端和服务器之间建立一个长链接,使得开发实施应用很容易。HTTP的Upgrade协议头机制用于将连接从HTTP链接升级到WebSocket链接,Upgrade机制使用了Upgrade协议头和Connection协议头,反向代理服务器在支持WebSocket协议方面面临一些挑战。挑战之一是WebSocket是一个逐段转发(hop-by-hop)协议,因此当代理服务器拦截到来自客户端的Upgrade请求时,代理服务器需要将自己的Upgrade请求发送给后端服务器,包括适合的请求头。而且,由于WebSocket链接是长链接,与传统的HTTP端链接截然不同,故反向代理服务器还需要允许这些链接处于打开(Open)状态,而不能因为其空闲就关闭了链接 ;
Nginx通过在客户端和后端服务器之间建立隧道来支持WebSocket通信。为了让Nginx可以将来自客户端的Upgrade请求发送到后端服务器,Upgrade和Connection的头信息必须被显示的设置。所以才需要上述在nginx中的配置;
2. 不要去掉本文开始处pom依赖中的<scope>provided</scope>
因为在开发时,我们一般是在maven环境中开发,此时需要引入javax.websocket-api-1.1.jar包支持websocket。但是在集成环境测试和线上环境是不需要这个单独引入这个依赖的;这是因为tomcat有自带的websocket包。如果不给maven的依赖加
<scope>provided</scope>
,则会和tomcat的websocket包冲突;
- maven中的scope的值有几种可能:
compile:
默认就是compile,什么都不配置也就是意味着compile。compile表示被依赖项目需要参与当前项目的编译,当然后续的测试,运行周期也参与其中,是一个比较强的依赖。打包的时候通常需要包含进去。默认的scope,在部署的时候将会打包到lib目录下,项目在编译,测试,运行阶段都需要
test
scope为test表示依赖项目仅仅参与测试相关的工作,在编译和运行环境下都不会被使用,更别说打包了。
runntime
仅仅适用于运行环境,在编译和测试环境下都不会被使用
provided
provided适合在编译和测试的环境,他和compile很接近,但是provide仅仅需要在编译和测试阶段,同样provide将不会被打包到lib目录下。
system
从参与度来说,也provided相同,不过被依赖项不会从maven仓库抓,而是从本地文件系统拿,一定需要配合systemPath属性使用。
3.项目启动失败:Error creating bean with name..........
org.springframework.web.context.ContextLoader:350 - Context initialization failed
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'serverEndpointExporter' defined in com.ycgame.sm.common.websocket.WebSocketConfig: Initialization of bean failed; nested exception is java.lang.ClassCastException: org.apache.tomcat.websocket.server.WsServerContainer cannot be cast to javax.websocket.server.ServerContainer
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:564)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:483)
at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:306)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:302)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:761)
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:867)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:543)
at org.springframework.web.context.ContextLoader.configureAndRefreshWebApplicationContext(ContextLoader.java:443)
at org.springframework.web.context.ContextLoader.initWebApplicationContext(ContextLoader.java:325)
at org.springframework.web.context.ContextLoaderListener.contextInitialized(ContextLoaderListener.java:107)
at org.apache.catalina.core.StandardContext.listenerStart(StandardContext.java:4973)
at org.apache.catalina.core.StandardContext.startInternal(StandardContext.java:5467)
at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:150)
at org.apache.catalina.core.ContainerBase.addChildInternal(ContainerBase.java:901)
at org.apache.catalina.core.ContainerBase.addChild(ContainerBase.java:877)
at org.apache.catalina.core.StandardHost.addChild(StandardHost.java:632)
at org.apache.catalina.startup.HostConfig.deployWAR(HostConfig.java:1073)
at org.apache.catalina.startup.HostConfig$DeployWar.run(HostConfig.java:1857)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: org.apache.tomcat.websocket.server.WsServerContainer cannot be cast to javax.websocket.server.ServerContainer
at org.springframework.web.socket.server.standard.ServerEndpointExporter.initServletContext(ServerEndpointExporter.java:91)
at org.springframework.web.context.support.WebApplicationObjectSupport.initApplicationContext(WebApplicationObjectSupport.java:80)
at org.springframework.context.support.ApplicationObjectSupport.setApplicationContext(ApplicationObjectSupport.java:74)
at org.springframework.context.support.ApplicationContextAwareProcessor.invokeAwareInterfaces(ApplicationContextAwareProcessor.java:121)
at org.springframework.context.support.ApplicationContextAwareProcessor.postProcessBeforeInitialization(ApplicationContextAwareProcessor.java:97)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:409)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1620)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:555)
... 24 more
解决方案:
如果出现这个问题,只要把前面创建的WebSocketConfig.java文件删掉就可以了
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
这个配置类是为了自动注册使用@ServerEndpoint注册声明的Websocket endpoint,开发阶段该配置会自动依赖springboot内嵌的tomcat容器;
但是打成war包的项目内嵌tomcat是不会在打包之后运行的,也就是说打包之后websocket没有依赖到tomcat容器。所以只需要把上面的配置类删掉;