在本文中,我们将向您展示如何使用服务器发送的事件开发响应式Web应用程序。
Spring Boot 2.1.2.RELEASE Spring WebFlux 5.1.4.RELEASE Thymeleaf 3.0.11.RELEASE JUnit 5.3.2 Maven 3在Spring中,返回JSON和标头 MediaType.TEXT_EVENT_STREAM_VALUE
@RestControllerpublic class CommentController { @GetMapping(path = "/comment/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Fluxfeed() { //... }}
在Javascript中,用于EventSource向上述端点发送请求。
function loadComments () { this.source = null; this.start = function () { this.source = new EventSource("/comment/stream"); this.source.addEventListener("message", function (event) { var comment = JSON.parse(event.data); //... update somewhere }); this.source.onerror = function () { this.close(); }; }; this.stop = function() { this.source.close(); }}comment = new loadComments();window.onload = function() { comment.start();};window.onbeforeunload = function() { comment.stop();}
1.项目目录
2. Maven的pom.xml
4.0.0 com.imddy.springboot webflux-thymeleaf-sse 0.0.1-SNAPSHOT jar webflux-thymeleaf-sse http://maven.apache.org org.springframework.boot spring-boot-starter-parent 2.1.2.RELEASE UTF-8 1.8 5.3.2 org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-webflux org.springframework.boot spring-boot-starter-thymeleaf org.springframework.boot spring-boot-starter-test test junit junit org.junit.jupiter junit-jupiter-engine ${junit-jupiter.version} test org.springframework.boot spring-boot-devtools true org.springframework.boot spring-boot-maven-plugin org.apache.maven.plugins maven-surefire-plugin 2.22.0
3. Spring Boot + Spring WebFlux
3.1 Spring基于WebFlux注释的控制器。启用数据流。写produces = MediaType.TEXT_EVENT_STREAM_VALUE
CommentController.java
package com.imddy.springboot.reactive.controller;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.http.MediaType;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;import com.imddy.springboot.reactive.model.Comment;import com.imddy.springboot.reactive.repository.CommentRepository;import reactor.core.publisher.Flux;@RestControllerpublic class CommentController { @Autowired private CommentRepository commentRepository; @GetMapping(path = "/comment/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Fluxfeed() { return this.commentRepository.findAll(); }}
MainController.java
package com.imddy.springboot.reactive.controller;import org.springframework.stereotype.Controller;import org.springframework.ui.Model;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;@Controllerpublic class MainController { @GetMapping(path = {"/","/index"}) public String index(final Model model) { return "index"; }}
3.2在repository,返回一个Flux对象。
CommentRepository.java 这个是个接口
package com.imddy.springboot.reactive.repository;import com.imddy.springboot.reactive.model.Comment;import reactor.core.publisher.Flux;public interface CommentRepository { FluxfindAll(); }
ReactiveCommentRepository.java 这个是实现类
package com.imddy.springboot.reactive.repository;import java.time.Duration;import java.util.Arrays;import java.util.List;import org.springframework.stereotype.Repository;import com.imddy.springboot.reactive.model.Comment;import com.imddy.springboot.reactive.utils.CommentGenerator;import reactor.core.publisher.Flux;@Repositorypublic class ReactiveCommentRepository implements CommentRepository{ @Override public FluxfindAll() { //simulate data streaming every 1 second. return Flux.interval(Duration.ofSeconds(1)) .onBackpressureDrop() .map(this::generateComment) .flatMapIterable(x -> x); } private List generateComment(long interval) { Comment obj = new Comment( CommentGenerator.randomAuthor(), CommentGenerator.randomMessage(), CommentGenerator.getCurrentTimeStamp()); return Arrays.asList(obj); }}
3.3一个用于生成随机注释的utils类。
CommentGenerator.java
package com.imddy.springboot.reactive.utils;import java.time.LocalDateTime;import java.time.format.DateTimeFormatter;import java.util.Arrays;import java.util.List;import java.util.Random;public class CommentGenerator { private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss"); private static final Random RANDOM = new Random(System.currentTimeMillis()); private static final ListCOMMENT_AUTHOR = Arrays.asList( "Mkyong", "Oliver", "Jack", "Harry", "Jacob", "Isla", "Emily", "Poppy", "Ava", "Isabella"); private static final List COMMENT_MESSAGE = Arrays.asList( "I Love this!", "Me too!", "Wow", "True!", "Hello everyone here?", "Good!"); public static String randomAuthor() { return COMMENT_AUTHOR.get(RANDOM.nextInt(COMMENT_AUTHOR.size())); } public static String randomMessage() { return COMMENT_MESSAGE.get(RANDOM.nextInt(COMMENT_MESSAGE.size())); } public static String getCurrentTimeStamp() { return dtf.format(LocalDateTime.now()); }}
3.4评论模型。
Comment.java
package com.imddy.springboot.reactive.model;public class Comment { private String author; private String message; private String timestamp; public Comment() { } public Comment(String author, String message, String timestamp) { this.author = author; this.message = message; this.timestamp = timestamp; } public String getAuthor() { return author; } public void setAuthor(String author) { this.author = author; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } public String getTimestamp() { return timestamp; } public void setTimestamp(String timestamp) { this.timestamp = timestamp; }}
3.5启动Spring Boot。
CommentWebApplication.java 这个是Spring Boot Application启动程序。
package com.imddy.springboot.reactive;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class CommentWebApplication { public static void main(String[] args) throws Exception { SpringApplication.run(CommentWebApplication.class, args); }}
application.properties 这个项目这个里面没有也可以。
logging.level.org.springframework.web=INFO#thymelea模板配置spring.thymeleaf.prefix=classpath:/templates/spring.thymeleaf.suffix=.htmlspring.thymeleaf.mode=HTML5spring.thymeleaf.encoding=UTF-8#热部署文件,页面不产生缓存,及时更新spring.thymeleaf.cache=false
4. Thymeleaf
模板中没有特殊的反应标签,只使用正常循环。放在 templates目录下
index.html
Spring WebFlux + Server Sent Events
Author Message Date [[${comment.author}]] [[${comment.message}]] [[${comment.timestamp}]]
5. JavaScript EventSource
关键是使用Javascript EventSource类发送请求并监听message事件,并将流数据反应更新到表中。 放在 /static/js/ 下
main.js
function loadComments() { this.source = null; this.start = function() { var commentTable = document.getElementById("comments"); this.source = new EventSource("/comment/stream"); this.source.addEventListener("message", function(event) { // These events are JSON, so parsing and DOM fiddling are needed var comment = JSON.parse(event.data); var row = commentTable.getElementsByTagName("tbody")[0] .insertRow(0); var cell0 = row.insertCell(0); var cell1 = row.insertCell(1); var cell2 = row.insertCell(2); cell0.className = "author-style"; cell0.innerHTML = comment.author; cell1.className = "text"; cell1.innerHTML = comment.message; cell2.className = "date"; cell2.innerHTML = comment.timestamp; }); this.source.onerror = function() { this.close(); }; }; this.stop = function() { this.source.close(); }}comment = new loadComments();/* * Register callbacks for starting and stopping the SSE controller. */window.onload = function() { comment.start();};window.onbeforeunload = function() { comment.stop();}
页面使用了bootstrap的样式,引入对应放在 /static/css/ 下
bootstrap.min.css 文件太长,我没有引入,使用的是 Bootstrap v4.2.1 (https://getbootstrap.com/)
/*! * Bootstrap v4.2.1 (https://getbootstrap.com/) * Copyright 2011-2018 The Bootstrap Authors * Copyright 2011-2018 Twitter, Inc. * Licensed under MIT (https://github.com/twbs/bootstrap/blob/master/LICENSE) */:root{--blue:#007bff;--indigo:#6610f2;--purple:#6f42c1;--pink:#e83e8c;--red:#dc3545;--/*# sourceMappingURL=bootstrap.min.css.map */
main.css
#title{ margin:40px 0;}
6.单元测试
WebTestClient单元测试流式响应
TestCommentWebApplication.java 放在test测试目录下,这里其实不需要这个文件,它就是只是个测试。
package com.imddy.springboot.reactive;import java.util.List;import org.junit.jupiter.api.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.http.MediaType;import org.springframework.test.web.reactive.server.WebTestClient;import com.imddy.springboot.reactive.model.Comment;import static org.junit.jupiter.api.Assertions.assertEquals;//junit 4//@RunWith(SpringRunner.class)//@ExtendWith(SpringExtension.class)@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)//@AutoConfigureWebTestClient(timeout = "10000")//10 secondspublic class TestCommentWebApplication { @Autowired private WebTestClient webClient; @Test public void testCommentStream() { Listcomments = webClient .get().uri("/comment/stream") .accept(MediaType.valueOf(MediaType.TEXT_EVENT_STREAM_VALUE)) .exchange() .expectStatus().isOk() //.expectHeader().contentType(MediaType.APPLICATION_STREAM_JSON) // caused timeout .returnResult(Comment.class) .getResponseBody() .take(3) .collectList() .block(); comments.forEach(x -> System.out.println(x)); assertEquals(3, comments.size()); }}
7.Demo运行测试
运行
结果
http://localhost:8080/
http://localhost:8080/comment/stream
/ 访问的是 thymeleaf映射的index.html 页面。
/comment/stream 为数据流。
参考:http://www.spring4all.com/article/6852 和 https://www.mkyong.com/spring-boot/spring-boot-webflux-server-sent-events-example/