Spring WebFlux响应式编程
约 1351 字大约 5 分钟
springwebfluxreactive
2025-04-04
概述
Spring WebFlux 是 Spring 5 引入的响应式 Web 框架,基于 Project Reactor 实现,采用非阻塞 I/O 模型。与传统的 Spring MVC(Servlet 阻塞模型)不同,WebFlux 适用于高并发、I/O 密集型场景,以更少的线程处理更多的请求。
Spring MVC vs WebFlux
| 特性 | Spring MVC | Spring WebFlux |
|---|---|---|
| 编程模型 | 阻塞/同步 | 非阻塞/异步 |
| 线程模型 | Thread-per-request | Event Loop |
| Servlet 依赖 | 依赖 Servlet API | 不依赖 Servlet |
| 运行容器 | Tomcat、Jetty | Netty、Undertow |
| 适用场景 | CRUD 业务系统 | 高并发网关、流处理 |
| 数据库支持 | JDBC、JPA | R2DBC |
Reactor 核心类型
Mono —— 0或1个元素
// 创建 Mono
Mono<String> mono1 = Mono.just("Hello");
Mono<String> mono2 = Mono.empty();
Mono<String> mono3 = Mono.error(new RuntimeException("error"));
// 延迟创建
Mono<User> mono4 = Mono.fromSupplier(() -> userRepository.findById(1L));
Mono<User> mono5 = Mono.defer(() -> Mono.just(expensiveOperation()));
// 转换操作
Mono<UserDto> result = Mono.just(user)
.map(u -> new UserDto(u.getId(), u.getName()))
.flatMap(dto -> enrichWithDetails(dto))
.filter(dto -> dto.isActive())
.switchIfEmpty(Mono.just(UserDto.defaultUser()))
.doOnNext(dto -> log.info("Processing: {}", dto))
.doOnError(e -> log.error("Error: {}", e.getMessage()));Flux —— 0到N个元素
// 创建 Flux
Flux<Integer> flux1 = Flux.just(1, 2, 3, 4, 5);
Flux<Integer> flux2 = Flux.range(1, 100);
Flux<Long> flux3 = Flux.interval(Duration.ofSeconds(1));
Flux<String> flux4 = Flux.fromIterable(List.of("a", "b", "c"));
// 常用操作
Flux<UserDto> users = Flux.fromIterable(userIds)
.flatMap(id -> userService.findById(id)) // 并发执行
.filter(user -> user.isActive())
.map(user -> UserDto.from(user))
.sort(Comparator.comparing(UserDto::getName))
.take(10) // 取前10个
.distinct(); // 去重
// 缓冲与窗口
Flux.range(1, 100)
.buffer(10) // 每10个元素一组: Flux<List<Integer>>
.flatMap(batch -> processBatch(batch));
Flux.range(1, 100)
.window(10) // 每10个元素一个子Flux
.flatMap(window -> window.collectList().flatMap(this::processBatch));WebFlux 注解式编程
@RestController
@RequestMapping("/api/users")
public class UserController {
private final UserService userService;
public UserController(UserService userService) {
this.userService = userService;
}
@GetMapping("/{id}")
public Mono<ResponseEntity<User>> getUser(@PathVariable Long id) {
return userService.findById(id)
.map(ResponseEntity::ok)
.defaultIfEmpty(ResponseEntity.notFound().build());
}
@GetMapping
public Flux<User> listUsers(@RequestParam(defaultValue = "ACTIVE") String status) {
return userService.findByStatus(status);
}
@PostMapping
@ResponseStatus(HttpStatus.CREATED)
public Mono<User> createUser(@RequestBody @Valid Mono<UserRequest> request) {
return request.flatMap(userService::create);
}
@PutMapping("/{id}")
public Mono<ResponseEntity<User>> updateUser(
@PathVariable Long id,
@RequestBody Mono<UserRequest> request) {
return request
.flatMap(req -> userService.update(id, req))
.map(ResponseEntity::ok)
.defaultIfEmpty(ResponseEntity.notFound().build());
}
@DeleteMapping("/{id}")
@ResponseStatus(HttpStatus.NO_CONTENT)
public Mono<Void> deleteUser(@PathVariable Long id) {
return userService.deleteById(id);
}
// Server-Sent Events(服务端推送)
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> streamUsers() {
return userService.streamActiveUsers()
.delayElements(Duration.ofSeconds(1));
}
}函数式端点(Functional Endpoints)
@Configuration
public class RouterConfig {
@Bean
public RouterFunction<ServerResponse> userRoutes(UserHandler handler) {
return RouterFunctions.route()
.path("/api/users", builder -> builder
.GET("/{id}", handler::getUser)
.GET("", handler::listUsers)
.POST("", handler::createUser)
.PUT("/{id}", handler::updateUser)
.DELETE("/{id}", handler::deleteUser))
.build();
}
}
@Component
public class UserHandler {
private final UserService userService;
public UserHandler(UserService userService) {
this.userService = userService;
}
public Mono<ServerResponse> getUser(ServerRequest request) {
Long id = Long.valueOf(request.pathVariable("id"));
return userService.findById(id)
.flatMap(user -> ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(user))
.switchIfEmpty(ServerResponse.notFound().build());
}
public Mono<ServerResponse> listUsers(ServerRequest request) {
String status = request.queryParam("status").orElse("ACTIVE");
Flux<User> users = userService.findByStatus(status);
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(users, User.class);
}
public Mono<ServerResponse> createUser(ServerRequest request) {
return request.bodyToMono(UserRequest.class)
.flatMap(userService::create)
.flatMap(user -> ServerResponse.created(
URI.create("/api/users/" + user.getId()))
.bodyValue(user));
}
public Mono<ServerResponse> updateUser(ServerRequest request) {
Long id = Long.valueOf(request.pathVariable("id"));
return request.bodyToMono(UserRequest.class)
.flatMap(req -> userService.update(id, req))
.flatMap(user -> ServerResponse.ok().bodyValue(user))
.switchIfEmpty(ServerResponse.notFound().build());
}
public Mono<ServerResponse> deleteUser(ServerRequest request) {
Long id = Long.valueOf(request.pathVariable("id"));
return userService.deleteById(id)
.then(ServerResponse.noContent().build());
}
}WebClient —— 响应式 HTTP 客户端
@Service
public class ExternalApiService {
private final WebClient webClient;
public ExternalApiService(WebClient.Builder builder) {
this.webClient = builder
.baseUrl("https://api.example.com")
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.filter(ExchangeFilterFunction.ofResponseProcessor(response -> {
if (response.statusCode().isError()) {
return response.bodyToMono(String.class)
.flatMap(body -> Mono.error(
new ApiException(response.statusCode(), body)));
}
return Mono.just(response);
}))
.build();
}
public Mono<Product> getProduct(String productId) {
return webClient.get()
.uri("/products/{id}", productId)
.retrieve()
.bodyToMono(Product.class)
.timeout(Duration.ofSeconds(5))
.retryWhen(Retry.backoff(3, Duration.ofMillis(500))
.filter(ex -> ex instanceof WebClientResponseException.ServiceUnavailable));
}
public Flux<Product> searchProducts(String keyword) {
return webClient.get()
.uri(uriBuilder -> uriBuilder
.path("/products/search")
.queryParam("q", keyword)
.build())
.retrieve()
.bodyToFlux(Product.class);
}
public Mono<Product> createProduct(ProductRequest request) {
return webClient.post()
.uri("/products")
.bodyValue(request)
.retrieve()
.bodyToMono(Product.class);
}
}R2DBC —— 响应式数据库访问
// R2DBC Repository
public interface UserRepository extends ReactiveCrudRepository<User, Long> {
Flux<User> findByStatus(String status);
@Query("SELECT * FROM users WHERE username LIKE :keyword")
Flux<User> searchByUsername(@Param("keyword") String keyword);
Mono<Long> countByStatus(String status);
}
// Service 层
@Service
public class UserService {
private final UserRepository userRepository;
public UserService(UserRepository userRepository) {
this.userRepository = userRepository;
}
public Mono<User> findById(Long id) {
return userRepository.findById(id);
}
@Transactional
public Mono<User> create(UserRequest request) {
User user = new User();
user.setUsername(request.getUsername());
user.setEmail(request.getEmail());
user.setStatus("ACTIVE");
return userRepository.save(user);
}
}# R2DBC 配置
spring:
r2dbc:
url: r2dbc:mysql://localhost:3306/mydb
username: root
password: ${DB_PASSWORD}
pool:
initial-size: 10
max-size: 50
max-idle-time: 30m背压(Backpressure)
// 背压策略
Flux.range(1, 1000000)
.onBackpressureBuffer(1024) // 缓冲区
.onBackpressureDrop(item -> // 丢弃
log.warn("Dropped: {}", item))
.onBackpressureLatest() // 只保留最新
.subscribe(item -> {
// 慢速消费
Thread.sleep(100);
process(item);
});
// 限流
Flux.range(1, 100)
.limitRate(10) // 每次向上游请求10个元素
.subscribe(this::process);错误处理
Mono<User> result = userService.findById(id)
.switchIfEmpty(Mono.error(new UserNotFoundException(id)))
.onErrorResume(TimeoutException.class,
ex -> Mono.just(User.cachedVersion(id)))
.onErrorMap(DataAccessException.class,
ex -> new ServiceException("Database error", ex))
.doOnError(ex -> log.error("Failed to get user: {}", id, ex))
.retry(3);总结
Spring WebFlux 基于 Reactor 的 Mono/Flux 提供了完整的响应式 Web 编程模型。注解式控制器保持了与 Spring MVC 相似的开发体验,函数式端点提供了更灵活的路由定义。WebClient 替代 RestTemplate 实现非阻塞 HTTP 调用,R2DBC 替代 JDBC 实现响应式数据库访问。WebFlux 适用于高并发 I/O 密集型场景,但需要整个调用链都是非阻塞的才能发挥优势。
贡献者
更新日志
2026/3/14 13:09
查看所有更新日志
9f6c2-feat: organize wiki content and refresh site setup于