Springboot使用EventStream实现SSE(服务端推送)

在启动类添加注解,以启用异步功能

@EnableAsync

编写Service

我的service如下

package com.luoyuer.eventstreamdemo.service;

import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;

import java.util.concurrent.Future;

@Service
public class AsyncService {
    static String s = null;

    @Async
    public Future<String> getResult() {
        while (true) {
            if (s != null) {
                String re = s;
                s = null;
                return new AsyncResult<>(re);
            }
        }
    }

    public void setS(String s) {
        AsyncService.s = s;
    }
}
@Async注解标注该方法为异步(需完成第一步,否则无效)
Future表示一个可能还没有完成的异步任务的结果
AsyncResult表示异步任务的结果

编写controller

demo如下,可直接跑

package com.luoyuer.eventstreamdemo.controller;

import com.luoyuer.eventstreamdemo.service.AsyncService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

@RestController
public class AsyncController {
    @Autowired
    AsyncService asyncService;

    @GetMapping("stream")
    public void streamTest(HttpServletResponse response) {
        response.setHeader("Content-Type", "text/event-stream;charset=UTF-8");
        response.setHeader("Cache-Control", "no-cache");
        try {
            String s = asyncService.getResult().get(30, TimeUnit.SECONDS);

            response.getWriter().write("id:1\n");
            response.getWriter().write("event:text\n");
            response.getWriter().write("data:" + s + "\n\n");
            response.getWriter().flush();
        } catch (IOException | InterruptedException | ExecutionException | TimeoutException e) {
            try {
                response.getWriter().write("id:1\n");
                response.getWriter().write("event:text\n");
                response.getWriter().write("data:nodata\n\n");
                response.getWriter().flush();
            } catch (IOException ioException) {
                ioException.printStackTrace();
            }

        }
    }

    @GetMapping("set")
    public String set() {
        asyncService.setS("test");
        return "success";
    }
}
实现event-stream需修改响应头Content-Type为text/event-stream
并将相应数据修改为如下
id:
event:
data:
其中data是必须的,并且每个关键字独占一行,并且如上结构可有多个,中间用一行空行分隔

Future实例的get方法,无参方法会一直阻塞直至获得结果。2参方法第一个参数为时长,第二个为TimeUnit类型的单位,超时会出现TimeoutException

js简单连接测试

var source=new EventSource("http://localhost:8080/stream");
source.onmessage=function(event){
    console.log(event);
}
如上方法会自动GET轮询 onmessage在收到消息时执行

开始执行后,访问/set接口,即可看到有test的输出,否则等待超时后返回nodata

注意!该方式不适用于广播!
上一篇
下一篇