Skip to content

WebSocket 实现简易聊天室

消息推送的常见方式

1.轮询 浏览器以指定的时间间隔向服务器发出HTTP请求,服务器实时遮回数据给浏览器
	
2.长轮询 浏览器发出ajax请求,服务器端接收到请求后,会阻塞请求直到有数据或者超时才返回
	
3.SSE(server-sent event)服务器发送事件
	SSE在服务器和客户端之间打开一个单向通道
	服务端响应的不再是一次性的数据包,而是text/event-stream类型的数据流信息
	服务器有数据变更时将数据流式传输到客户端
	
4.WebSocket Websocket是一种在基于TCP连接上进行全双工通信的协议

TIP

全双工: 全双工(Full Duplex):允许数据在两个方向上同时传输

半双工: 半双工(Half Duplex):允许数据在两个方向上传输,但是同一个时间段内只允许一个方向上传输。

客户端

客户端方法

代码概览

服务端

Tomcat的7.0.5 版本开始支持WebSocket,并且实现了Java WebSocket规范。

Java Websocket应用由一系列的Endpoint组成。Endpoint 是一个java对象,代表Websocket链接的一端,对于服务端,我们可以视为处理具体WebSocket消息的接口。

如何定义Endpoint?

我们可以通过两种方式定义Endpoint:
第一种是编程式, 即继承类javax.websocket.Endpoint并实现其方法
第二种是注解式, 即定义一个POJO, 并添加@ServerEndpoint相关注解

生命周期方法

Endpoint实例在Websocket握手时创建,并在客户端与服务端链接过程中有效,最后在链接关闭时结束。
在Endpoint接口中明确定义了与其生命周期相关的方法, 规范实现者确保生命周期的各个阶段调用实例的相关方法。
生命周期方法如下:

DANGER

⬆ 没有onError,应为onMessage @OnMessage

服务端如何接收客户端发送的数据?

1.编程式	通过添加 MessageHandler 消息处理器来接收消息
2.注解式	在定义Endpoint时,通过@OnMessage注解指定接收消息的方法

服务端如何推送数据给客户端?

发送消息则由 RemoteEndpoint 完成,其实例由Session 维护发送消息有2种方式发送消息
方式1:通过session.getBasicRemote 获取同步消息发送的实例,然后调用其 sendXxx()方法发送消息
方式2:通过session.getAsyncRemote 获取异步消息发送实例,然后调用其 sendXxx()方法发送消息

代码概览

SpringBoot整合Websocket

引入依赖

pom.xml
xml
<!--websocket-->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

配置类

编写配置类,扫描添加有@ServerEndpoint注解的Bean
该配置类会自动扫描带有@ServerEndpoint注解的
WebSocketConfig
java
@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}
编写配置类,用于获取HttpSession对象
GetHttpSessionConfig
java
public class GetHttpSessionConfig extends ServerEndpointConfig.Configurator {

    @Override
    public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
        // 获取HttpSession对象
        HttpSession httpSession = (HttpSession) request.getHttpSession();
        // 将httpSession对象保存起来
        sec.getUserProperties().put(HttpSession.class.getName(), httpSession);
    }
}

定义Bean,在@serverEndpoint中引入配置器

java
@ServerEndpoint(value = "/chat", configurator = GetHttpSessionConfig.class)
@Component
public class ChatEndPoint {

    private static final Map<String, Session> onlineUsers = new ConcurrentHashMap<>();

    private HttpSession httpSession;

    /**
     * 建立websocket连接后,被调用
     *
     * @param session
     */
    @OnOpen
    public void onOpen(Session session, EndpointConfig config) {
        // 1.将session进行保存
        this.httpSession = (HttpSession) config.getUserProperties().get(HttpSession.class.getName());
        String userName = (String) this.httpSession.getAttribute("userName");
        onlineUsers.put(userName, session);
        // 2. 广播消息,需要将登录的所有用户推送给所有用户
        String message = MessageUtil.getMessage(true, null, getAllUserName());
        broadcastAllUsers(message);
    }

    /**
     * 浏览器发送消息到服务端,该方法被调用
     *
     * @param message
     */
    @OnMessage
    public void onMessage(String message) {
        // 将消息推送给指定用户
        Message msgObj = JSON.parseObject(message, Message.class);
        // 获取消息接收方
        String toName = msgObj.getToName();
        String msg = msgObj.getMessage();
        // 接收方 session对象
        Session session = onlineUsers.get(toName);
        String sendMsg = MessageUtil.getMessage(false, (String) this.httpSession.getAttribute("userName"), msg);
        try {
            session.getBasicRemote().sendText(sendMsg);
        } catch (IOException e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    /**
     * 断开websocket连接时被调用
     *
     * @param session
     */
    @OnClose
    public void onClose(Session session) {
        // 1.从onlineUsers中剔除当前用户的session对象
        onlineUsers.remove((String) this.httpSession.getAttribute("userName"));
        // 2.通知其他所有用户,当前用户下线了
        String message = MessageUtil.getMessage(true, null, getAllUserName());
        broadcastAllUsers(message);
    }

    private Set getAllUserName() {
        Set<String> strings = onlineUsers.keySet();
        return strings;
    }

    private void broadcastAllUsers(String message) {
        onlineUsers.forEach((k, v) -> {
            // 获取所有用户的session对象
            Session session = v;
            // 发送消息
            try {
                session.getBasicRemote().sendText(message);
            } catch (IOException e) {
                e.printStackTrace();
                throw new RuntimeException(e);
            }
        });
    }


}

注意

在onOpen中的EndpointConfig 与 GetHttpSessionConfig类中modifyHandshake方法的ServerEndpointConfig这俩其实是一个对象

保存HttpSession对象时

用到的代码片段

java
public class MessageUtil {

    public static String getMessage(boolean isSystemMessage, String fromName, Object message) {
        ResultMessage resultMessage = new ResultMessage();
        resultMessage.setMessage(message);
        resultMessage.setSystem(isSystemMessage);
        if (!ObjectUtils.isEmpty(fromName)) {
            resultMessage.setFromName(fromName);
        }
        return JSON.toJSONString(resultMessage);
    }
}
java
@AllArgsConstructor
@NoArgsConstructor
@Data
public class Message {
    private String toName;
    private String message;
}
java
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ResultMessage {
    private boolean system;
    private Object message;
    private String fromName;
}

消息格式


SSE(Server-Sent Events)

什么是 SSE

服务器发送事件(Server-Sent Events,简称SSE)
SSE,就是浏览器向服务器发送一个HTTP请求,然后服务器不断单向地向浏览器推送“信息”(message)。
这种信息在格式上很简单,就是“信息”加上前缀“data:”,然后以“\n\n”结尾。

SSE 应用场景

服务器向浏览器“发送”数据,比如,每当收到新的电子邮件,服务器就向浏览器发送一个“通知”

alt text

SSE 与 Websocket

SSE 与 WebSocket 有相似功能,都是用来建立浏览器与服务器之间的通信渠道。两者的区别在于:

  • WebSocket 是全双工通道,可以双向通信,功能更强;SSE 是单向通道,只能服务器向浏览器端发送。
  • SSE 是一个轻量级协议,相对简单; WebSocket 是一种较重的协议,相对复杂。

实现方式 1

前端

后端

MessageNoticeController
java
@RestController
@RequestMapping("/messageObtain")
@Slf4j
public class MessageNoticeController {

    @PostMapping("/getStreamData")
    public String getStreamData(HttpServletResponse response) {
        response.setContentType("text/event-stream");
        response.setCharacterEncoding("utf-8");
        String str = "";
        while (true) {
            str = "data:" + new Date() + "\n\n";
            PrintWriter writer = null;
            try {
                Thread.sleep(1000);
                writer = response.getWriter();
            } catch (IOException | InterruptedException e) {
                throw new RuntimeException(e);
            }
            writer.write(str);
//            log.info(str);
            writer.flush();
        }
    }
}

TIP

响应是主要的

response.setContentType("text/event-stream");

实现方式 2

前端

vue
<template>
  <div>
    <h1>SSE Messages</h1>
    <ul>
      <li v-for="message in messages" :key="message.id">{{ message.data }}</li>
    </ul>
  </div>
</template>

<script setup lang="ts">
import { onMounted, ref } from 'vue';

interface SseMessage {
  id: string;
  data: string;
}

const messages = ref<SseMessage[]>([]);

onMounted(() => {
  const eventSource = new EventSource('http://localhost:8080/api/sse/events');

  eventSource.onmessage = (event) => {
    const parsedData: SseMessage = JSON.parse(event.data);
    messages.value = [...messages.value, parsedData];
  };

  eventSource.onerror = () => {
    console.error('EventSource failed.');
    eventSource.close();
  };
});
</script>

<style scoped>
/* Add some styles here */
</style>

后端

java
package com.example.demo.controller;

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@RestController
@RequestMapping("/api/sse")
public class SseController {

    private final ExecutorService executorService = Executors.newCachedThreadPool();

    @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter handleSse() {
        // 设置超时时间为1小时
        SseEmitter emitter = new SseEmitter(3600000L);

        // 处理客户端断开连接回调
        emitter.onCompletion(() -> {
            System.out.println("Connection completed");
        });

        // 设置超时回调
        emitter.onTimeout(() -> {
            System.out.println("Connection timed out");
            emitter.complete();
        });
         // 设置错误回调
        emitter.onError((ex) -> {
            System.err.println("SSE 连接出错: " + ex.getMessage());
            emitter.completeWithError(ex);
        });

        // 模拟每两秒发送一次消息到客户端,总共发送十次
        executorService.execute(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    SseEmitter.SseEventBuilder event = SseEmitter.event()
                            .data("SSE message " + i)
                            .id("" + i)
                            .name("sseEvent");

                    emitter.send(event);
                    Thread.sleep(2000);
                }
                emitter.complete();
            } catch (IOException | InterruptedException e) {
                emitter.completeWithError(e);
            }
        });

        return emitter;
    }
}

实现方式 3

java
package com.example.ssedemo.controller;

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@RestController
@RequestMapping("/sse")
public class SseController {

    private final ExecutorService executor = Executors.newCachedThreadPool();

    @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter streamEvents() {
        // 创建一个 SseEmitter 实例,设置超时时间为 0(永不超时)
        SseEmitter emitter = new SseEmitter(0L);

        // 异步发送事件
        executor.execute(() -> {
            try {
                for (int i = 1; i <= 10; i++) {
                    // 模拟每秒发送一次数据
                    Thread.sleep(1000);
                    emitter.send(SseEmitter.event()
                            .id(String.valueOf(i))
                            .name("message")
                            .data("Event " + i));
                }
                // 完成事件流
                emitter.complete();
            } catch (IOException | InterruptedException e) {
                emitter.completeWithError(e);
            }
        });

        return emitter;
    }
}
vue
<template>
  <div>
    <h1>Server-Sent Events (SSE) Demo</h1>
    <button @click="startSSE">Start SSE</button>
    <ul>
      <li v-for="(event, index) in events" :key="index">{{ event }}</li>
    </ul>
  </div>
</template>

<script setup lang="ts">
import { ref } from 'vue';

// 定义响应式变量
const events = ref<string[]>([]);
let eventSource: EventSource | null = null;

// 启动 SSE 连接
const startSSE = () => {
  if (eventSource) {
    eventSource.close(); // 如果已经有连接,先关闭
  }

  // 创建 EventSource 连接到后端 SSE 端点
  eventSource = new EventSource('http://localhost:8080/sse/stream');

  // 监听消息事件
  eventSource.addEventListener('message', (event: MessageEvent) => {
    console.log('Received event:', event.data);
    events.value.push(event.data); // 将接收到的消息添加到列表中
  });

  // 监听错误事件
  eventSource.onerror = (error: Event) => {
    console.error('EventSource failed:', error);
    eventSource?.close(); // 关闭连接
  };
};
</script>

<style scoped>
ul {
  list-style-type: none;
  padding: 0;
}
li {
  margin: 5px 0;
}
</style>