java /
正文
java原生的线程池 怎么使用给个例子
2025-12-07 22:18
1 浏览
评论(0)
字体大小:
package com.example.demo.controller;
import com.example.demo.service.OrderAsyncService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@RestController
@RequestMapping("/api/order")
public class OrderController {
@Autowired
private OrderAsyncService orderAsyncService;
@Autowired
@Qualifier("nativeThreadPool")
private ThreadPoolExecutor nativeThreadPool;
@PostMapping("/process/{orderId}")
public String processOrder(@PathVariable String orderId) {
orderAsyncService.processOrderAsync(orderId);
return "订单处理任务已提交";
}
@GetMapping("/inventory/batch")
public List<String> checkBatchInventory(@RequestParam List<String> productIds) throws Exception {
List<Future<String>> futures = new ArrayList<>();
for (String productId : productIds) {
futures.add(orderAsyncService.checkInventoryAsync(productId));
}
List<String> results = new ArrayList<>();
for (Future<String> future : futures) {
results.add(future.get()); // 这里等待,但多个检查是并行执行的
}
return results;
}
// 使用原生线程池的示例1:直接提交任务
@PostMapping("/native/process/{orderId}")
public String processOrderWithNativePool(@PathVariable String orderId) {
nativeThreadPool.submit(() -> {
System.out.println("使用原生线程池处理订单: " + orderId + ", 线程: " + Thread.currentThread().getName());
try {
Thread.sleep(2000); // 模拟处理时间
System.out.println("订单 " + orderId + " 处理完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
return "订单已提交到原生线程池处理";
}
// 使用原生线程池的示例2:带返回值的任务
@GetMapping("/native/inventory/{productId}")
public CompletableFuture<String> checkInventoryWithNativePool(@PathVariable String productId) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("使用原生线程池检查库存: " + productId + ", 线程: " + Thread.currentThread().getName());
try {
Thread.sleep(1000); // 模拟检查时间
return "商品 " + productId + " 库存充足 (原生线程池)";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "检查失败";
}
}, nativeThreadPool);
}
// 使用原生线程池的示例3:批量处理
@PostMapping("/native/batch")
public String processBatchWithNativePool(@RequestBody List<String> orderIds) {
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (String orderId : orderIds) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("批量处理订单: " + orderId + ", 线程: " + Thread.currentThread().getName());
try {
Thread.sleep(1000); // 模拟处理时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, nativeThreadPool);
futures.add(future);
}
// 等待所有任务完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenRun(() -> System.out.println("所有批量任务处理完成"));
return "批量订单已提交到原生线程池,共" + orderIds.size() + "个订单";
}
// 查看线程池状态
@GetMapping("/native/status")
public String getThreadPoolStatus() {
return String.format(
"线程池状态: 核心线程数=%d, 活动线程数=%d, 最大线程数=%d, 队列大小=%d, 已完成任务数=%d",
nativeThreadPool.getCorePoolSize(),
nativeThreadPool.getActiveCount(),
nativeThreadPool.getMaximumPoolSize(),
nativeThreadPool.getQueue().size(),
nativeThreadPool.getCompletedTaskCount()
);
}
}
package com.example.demo.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Slf4j
@Service
public class NativeAsyncService {
@Autowired
@Qualifier("nativeThreadPool")
private ThreadPoolExecutor nativeThreadPool;
/**
* 使用原生线程池处理订单
*/
public CompletableFuture<String> processOrderNative(String orderId) {
return CompletableFuture.supplyAsync(() -> {
log.info("使用原生线程池处理订单: {}, 线程: {}", orderId, Thread.currentThread().getName());
try {
// 模拟订单处理逻辑
TimeUnit.SECONDS.sleep(2);
log.info("订单 {} 处理完成", orderId);
return "订单 " + orderId + " 处理成功";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("订单处理被中断: {}", orderId, e);
return "订单 " + orderId + " 处理失败";
}
}, nativeThreadPool);
}
/**
* 使用原生线程池检查库存
*/
public CompletableFuture<String> checkInventoryNative(String productId) {
return CompletableFuture.supplyAsync(() -> {
log.info("使用原生线程池检查商品 {} 库存, 线程: {}", productId, Thread.currentThread().getName());
try {
// 模拟库存检查逻辑
TimeUnit.SECONDS.sleep(1);
// 模拟库存结果
int stock = (int) (Math.random() * 100);
String result = stock > 10 ? "充足" : "不足";
return String.format("商品 %s 库存: %d (%s)", productId, stock, result);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("库存检查被中断", e);
return "库存检查失败";
}
}, nativeThreadPool);
}
/**
* 批量处理订单 - 使用原生线程池
*/
public CompletableFuture<List<String>> batchProcessOrdersNative(List<String> orderIds) {
// 为每个订单创建异步任务
List<CompletableFuture<String>> futures = orderIds.stream()
.map(orderId -> CompletableFuture.supplyAsync(() -> {
log.info("批量处理订单: {}, 线程: {}", orderId, Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(1);
return "订单 " + orderId + " 处理完成";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "订单 " + orderId + " 处理失败";
}
}, nativeThreadPool))
.toList();
// 等待所有任务完成并收集结果
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.toList());
}
/**
* 带超时控制的异步任务
*/
public CompletableFuture<String> processOrderWithTimeout(String orderId, long timeout, TimeUnit unit) {
return CompletableFuture.supplyAsync(() -> {
log.info("处理订单 {} (带超时控制), 线程: {}", orderId, Thread.currentThread().getName());
try {
// 模拟长时间处理
TimeUnit.SECONDS.sleep(5);
return "订单 " + orderId + " 处理成功";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "订单 " + orderId + " 处理失败";
}
}, nativeThreadPool)
.orTimeout(timeout, unit) // 设置超时
.exceptionally(ex -> "订单 " + orderId + " 处理超时: " + ex.getMessage());
}
/**
* 获取线程池状态信息
*/
public String getThreadPoolStatus() {
return String.format(
"原生线程池状态:\n" +
"核心线程数: %d\n" +
"活动线程数: %d\n" +
"最大线程数: %d\n" +
"队列大小: %d\n" +
"已完成任务: %d\n" +
"总任务数: %d",
nativeThreadPool.getCorePoolSize(),
nativeThreadPool.getActiveCount(),
nativeThreadPool.getMaximumPoolSize(),
nativeThreadPool.getQueue().size(),
nativeThreadPool.getCompletedTaskCount(),
nativeThreadPool.getTaskCount()
);
}
/**
* 优雅关闭线程池的示例方法
*/
public void gracefulShutdown() {
log.info("开始优雅关闭线程池...");
nativeThreadPool.shutdown(); // 不再接受新任务
try {
// 等待现有任务完成,最多等待30秒
if (!nativeThreadPool.awaitTermination(30, TimeUnit.SECONDS)) {
log.warn("线程池未在指定时间内关闭,尝试强制关闭");
nativeThreadPool.shutdownNow(); // 强制关闭
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
nativeThreadPool.shutdownNow();
}
log.info("线程池已关闭");
}
}
package com.example.demo.controller;
import com.example.demo.service.NativeAsyncService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@Slf4j
@RestController
@RequestMapping("/api/native")
public class NativeAsyncController {
@Autowired
private NativeAsyncService nativeAsyncService;
@PostMapping("/order/{orderId}")
public CompletableFuture<String> processOrder(@PathVariable String orderId) {
log.info("接收到订单处理请求: {}", orderId);
return nativeAsyncService.processOrderNative(orderId);
}
@GetMapping("/inventory/{productId}")
public CompletableFuture<String> checkInventory(@PathVariable String productId) {
log.info("接收到库存检查请求: {}", productId);
return nativeAsyncService.checkInventoryNative(productId);
}
@PostMapping("/batch/orders")
public CompletableFuture<List<String>> batchProcessOrders(@RequestBody List<String> orderIds) {
log.info("接收到批量订单处理请求,订单数量: {}", orderIds.size());
return nativeAsyncService.batchProcessOrdersNative(orderIds);
}
@PostMapping("/order/{orderId}/timeout")
public CompletableFuture<String> processOrderWithTimeout(@PathVariable String orderId,
@RequestParam(defaultValue = "3") long timeout) {
log.info("接收到带超时的订单处理请求: {}, 超时时间: {}秒", orderId, timeout);
return nativeAsyncService.processOrderWithTimeout(orderId, timeout, TimeUnit.SECONDS);
}
@GetMapping("/pool/status")
public String getThreadPoolStatus() {
return nativeAsyncService.getThreadPoolStatus();
}
@PostMapping("/pool/shutdown")
public String shutdownThreadPool() {
new Thread(() -> nativeAsyncService.gracefulShutdown()).start();
return "线程池关闭指令已发送";
}
}
本文发布于程序达人 ,转载请注明出处,谢谢合作
有 0 人认为有用
0 评论
共同学习,写下你的评论
相关热点文章推荐
Spring Boot文档翻译【转】
20661
2024-01-13 23:29
Spring Boot报java.lang.IllegalArgumentException:Property 'sqlSessionFactory' or 'sqlSessionTemplate'
16473
2024-01-13 23:29
SpringBoot 2.0 报错: Failed to configure a DataSource: 'url' attribute is not specified and no embe...
UploadiFive Documentation (api 说明文档)
9899
2024-01-13 23:29
svn: 目录中的条目从本地编码转换到 UTF8 失败 解决办法
5336
2024-01-13 23:29
解决Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile办法
4393
2024-01-13 23:29
程序达人 - chengxudaren.com
一个帮助开发者成长的社区
相关文章