原创

使用webSocket做指定用户推送以及群发推送消息

pom.xml 依赖

<dependency>
    <groupId>javax.websocket</groupId>
    <artifactId>javax.websocket-api</artifactId>
    <version>1.1</version>
    <scope>provided</scope>
</dependency>

前端js实现

$(function () {
    if ($.websocket) {
        var ishttps = 'https:' == document.location.protocol ? true : false;
        var host = location.host;
        if (ishttps) {
            host = "wss://" + host + "/test-websocket-web/websocket/";
        } else {
            host = "ws://" + host  + "/test-websocket-web/websocket/";
        }

        try { userObj = JSON.parse(sessionStorage.getItem('UserConfig')); } catch (e) { }
        if (null == userObj){
            return;
        }
        $.websocket.open({
            host: host + userObj.username,
            reconnect: true,
            callback: function (result) {
                winShowNotice({ title: "您有新消息!", body: result })
            }
        });
    }
});

(function ($) {
    $.extend({
        websocket: {
            _this: null,
            _initialized: false,
            init: function (options) {
                if (!this.isSupported()) {
                    // console.error('Not support websocket');
                    return;
                }
                var op = $.extend({
                    callback: function () {
                    },
                    host: null,
                    reconnect: false
                }, options);
                if (!op.host) {
                    // console.error("初始化WebSocket失败,无效的请求地址");
                    return;
                }
                try {
                    this._this = new WebSocket(op.host);
                } catch (error) {
                    return;
                }
                this._initialized = true;
                //连接发生错误的回调方法
                this._this.onerror = function () {
                    // console.log("与服务器连接失败...");
                };

                //连接成功建立的回调方法
                this._this.onopen = function (event) {
                    // console.log("与服务器连接成功...");
                };

                //接收到消息的回调方法
                this._this.onmessage = function (event) {
                    // dwz.notification.show({notification: event.data});
                    op.callback(event.data);
                    // console.log("接收到服务器端推送的消息:" + event.data);
                };

                //连接关闭的回调方法
                this._this.onclose = function () {
                    $.websocket._initialized = false;
                    // console.log("已关闭当前链接");
                    if (op.reconnect) {
                        // 自动重连
                        setTimeout(function () {
                            $.websocket.open(op);
                        }, 5000);
                    }
                }
            },
            open: function (options) {
                var op = $.extend({
                    callback: function () {
                    },
                    host: null,
                    reconnect: false
                }, options);

                if (this._initialized) {
                    this.close();
                }
                this.init(options);
                //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
                window.onbeforeunload = function () {
                    // console.log("窗口关闭了");
                    $.websocket.close();
                }
            },
            isSupported: function () {
                return 'WebSocket' in window;
            },
            send: function (message) {
                if (!this._this) {
                    return;
                }
                this._this.send(message);
            },
            close: function () {
                if (!this._this) {
                    return;
                }
                this._this.close();
            }
        }
    });
})(jQuery);

后端服务实现

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;


@ServerEndpoint("/websocket/{userCode}")
public class WebSocketServer {
    //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
    private static int onlineCount = 0;
    //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
    private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();
    //user
    private String currentUser;
    //与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session session;

    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(@PathParam("userCode") String userCode, Session session) {
        int cnt = 0;
        for (WebSocketServer socketServer : webSocketSet){
            if (socketServer.currentUser.equals(userCode)) {
                cnt++;
            }
        }

        if (cnt == 0) {
            this.currentUser = userCode;
            this.session = session;

            webSocketSet.add(this);     //加入set中
            addOnlineCount();           //在线数加1
            System.out.println("有新连接加入!当前在线人数为" + getOnlineCount() + ",userCode:" + userCode);
        }
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        webSocketSet.remove(this);  //从set中删除
        subOnlineCount();           //在线数减1
        System.out.println("有一连接关闭!当前在线人数为" + getOnlineCount());
    }

    /**
     * 收到客户端消息后调用的方法
     * @param message 客户端发送过来的消息
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        System.out.println("来自客户端的消息:" + message);
    }

    /**
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        System.out.println("发生错误");
    }

    public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }

    /**
     * 发送给指定用户
     *
     * @param message
     * @param userCode
     * @throws IOException
     */
    public static void sendMessageTo(String message, String userCode) throws IOException {
        for (WebSocketServer item : webSocketSet) {
            if (item.currentUser.equals(userCode)) {
                item.session.getBasicRemote().sendText(message);
            }
        }
    }

    /**
     * 群发自定义消息
     */
    public static void sendInfo(String message) throws IOException {
        for (WebSocketServer item : webSocketSet) {
            try {
                item.sendMessage(message);
            } catch (IOException e) {
                continue;
            }
        }
    }

    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    public static synchronized void addOnlineCount() {
        WebSocketServer.onlineCount++;
    }

    public static synchronized void subOnlineCount() {
        WebSocketServer.onlineCount--;
    }
}

调用方式

sendToUser

SysAdmin user = SessionUtil.getCurrentUser(request);
try {
     WebSocketServer.sendMessageTo(msg, user.getName());
    } catch (IOException e) {
     LogUtil.error(e.getStackTrace());
}

sendAll

try {
     WebSocketServer.sendInfo(msg);
    } catch (IOException e) {
     LogUtil.error(e.getStackTrace());
}

遇到的问题

1.Error during WebSocket handshake: Unexpected response code: 404

解决方案:

这个问题是因为项目是有nginx做代理;
需要在HTTPS 域名位置中加上如下配置:

location /{
    。。。
    proxy_http_version 1.1;
    proxy_set_header Upgrade $http_upgrade;
    proxy_set_header Connection "upgrade";
}

原因:

我们都知道HTTP协议是无状态的,而WebSocket协议是建立一个TCP长链接。并且WebSocket协议的握手和HTTP是兼容的。它使用HTTP的Upgrade协议头将Http链接升级到WebSocket链接,这个特性使WebSocket应用程序可以很容易的应用到现有的基础设施。例如,WebSocket应用可以使用标准的80和443 HTTP端口,因此可以通过现有的防火墙设施。下面两图是HTTP和WebSocket的通信协议对比:

WebSocket:
file

HTTP:
file

WebSocket应用程序会在客户端和服务器之间建立一个长链接,使得开发实施应用很容易。HTTP的Upgrade协议头机制用于将连接从HTTP链接升级到WebSocket链接,Upgrade机制使用了Upgrade协议头和Connection协议头,反向代理服务器在支持WebSocket协议方面面临一些挑战。挑战之一是WebSocket是一个逐段转发(hop-by-hop)协议,因此当代理服务器拦截到来自客户端的Upgrade请求时,代理服务器需要将自己的Upgrade请求发送给后端服务器,包括适合的请求头。而且,由于WebSocket链接是长链接,与传统的HTTP端链接截然不同,故反向代理服务器还需要允许这些链接处于打开(Open)状态,而不能因为其空闲就关闭了链接 ;

Nginx通过在客户端和后端服务器之间建立隧道来支持WebSocket通信。为了让Nginx可以将来自客户端的Upgrade请求发送到后端服务器,Upgrade和Connection的头信息必须被显示的设置。所以才需要上述在nginx中的配置;

2. 不要去掉本文开始处pom依赖中的<scope>provided</scope>

因为在开发时,我们一般是在maven环境中开发,此时需要引入javax.websocket-api-1.1.jar包支持websocket。但是在集成环境测试和线上环境是不需要这个单独引入这个依赖的;这是因为tomcat有自带的websocket包。如果不给maven的依赖加<scope>provided</scope>,则会和tomcat的websocket包冲突;

  • maven中的scope的值有几种可能:
  • compile:

    默认就是compile,什么都不配置也就是意味着compile。compile表示被依赖项目需要参与当前项目的编译,当然后续的测试,运行周期也参与其中,是一个比较强的依赖。打包的时候通常需要包含进去。默认的scope,在部署的时候将会打包到lib目录下,项目在编译,测试,运行阶段都需要

  • test

    scope为test表示依赖项目仅仅参与测试相关的工作,在编译和运行环境下都不会被使用,更别说打包了。

  • runntime

    仅仅适用于运行环境,在编译和测试环境下都不会被使用

  • provided

    provided适合在编译和测试的环境,他和compile很接近,但是provide仅仅需要在编译和测试阶段,同样provide将不会被打包到lib目录下。

  • system

    从参与度来说,也provided相同,不过被依赖项不会从maven仓库抓,而是从本地文件系统拿,一定需要配合systemPath属性使用。

3.项目启动失败:Error creating bean with name..........

org.springframework.web.context.ContextLoader:350 - Context initialization failed
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'serverEndpointExporter' defined in com.ycgame.sm.common.websocket.WebSocketConfig: Initialization of bean failed; nested exception is java.lang.ClassCastException: org.apache.tomcat.websocket.server.WsServerContainer cannot be cast to javax.websocket.server.ServerContainer
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:564)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:483)
        at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:306)
        at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230)
        at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:302)
        at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197)
        at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:761)
        at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:867)
        at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:543)
        at org.springframework.web.context.ContextLoader.configureAndRefreshWebApplicationContext(ContextLoader.java:443)
        at org.springframework.web.context.ContextLoader.initWebApplicationContext(ContextLoader.java:325)
        at org.springframework.web.context.ContextLoaderListener.contextInitialized(ContextLoaderListener.java:107)
        at org.apache.catalina.core.StandardContext.listenerStart(StandardContext.java:4973)
        at org.apache.catalina.core.StandardContext.startInternal(StandardContext.java:5467)
        at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:150)
        at org.apache.catalina.core.ContainerBase.addChildInternal(ContainerBase.java:901)
        at org.apache.catalina.core.ContainerBase.addChild(ContainerBase.java:877)
        at org.apache.catalina.core.StandardHost.addChild(StandardHost.java:632)
        at org.apache.catalina.startup.HostConfig.deployWAR(HostConfig.java:1073)
        at org.apache.catalina.startup.HostConfig$DeployWar.run(HostConfig.java:1857)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: org.apache.tomcat.websocket.server.WsServerContainer cannot be cast to javax.websocket.server.ServerContainer
        at org.springframework.web.socket.server.standard.ServerEndpointExporter.initServletContext(ServerEndpointExporter.java:91)
        at org.springframework.web.context.support.WebApplicationObjectSupport.initApplicationContext(WebApplicationObjectSupport.java:80)
        at org.springframework.context.support.ApplicationObjectSupport.setApplicationContext(ApplicationObjectSupport.java:74)
        at org.springframework.context.support.ApplicationContextAwareProcessor.invokeAwareInterfaces(ApplicationContextAwareProcessor.java:121)
        at org.springframework.context.support.ApplicationContextAwareProcessor.postProcessBeforeInitialization(ApplicationContextAwareProcessor.java:97)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:409)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1620)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:555)
        ... 24 more

解决方案:

如果出现这个问题,只要把前面创建的WebSocketConfig.java文件删掉就可以了

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

这个配置类是为了自动注册使用@ServerEndpoint注册声明的Websocket endpoint,开发阶段该配置会自动依赖springboot内嵌的tomcat容器;

但是打成war包的项目内嵌tomcat是不会在打包之后运行的,也就是说打包之后websocket没有依赖到tomcat容器。所以只需要把上面的配置类删掉;

正文到此结束
本文目录