java / 正文

java原生的线程池 怎么使用给个例子​

2025-12-07 22:18 1 浏览
评论(0
字体大小:
package com.example.demo.controller;

import com.example.demo.service.OrderAsyncService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.*;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@RestController
@RequestMapping("/api/order")
public class OrderController {

    @Autowired
    private OrderAsyncService orderAsyncService;
    
    @Autowired
    @Qualifier("nativeThreadPool")
    private ThreadPoolExecutor nativeThreadPool;

    @PostMapping("/process/{orderId}")
    public String processOrder(@PathVariable String orderId) {
        orderAsyncService.processOrderAsync(orderId);
        return "订单处理任务已提交";
    }

    @GetMapping("/inventory/batch")
    public List<String> checkBatchInventory(@RequestParam List<String> productIds) throws Exception {
        List<Future<String>> futures = new ArrayList<>();
        for (String productId : productIds) {
            futures.add(orderAsyncService.checkInventoryAsync(productId));
        }

        List<String> results = new ArrayList<>();
        for (Future<String> future : futures) {
            results.add(future.get());  // 这里等待,但多个检查是并行执行的
        }
        return results;
    }
    
    // 使用原生线程池的示例1:直接提交任务
    @PostMapping("/native/process/{orderId}")
    public String processOrderWithNativePool(@PathVariable String orderId) {
        nativeThreadPool.submit(() -> {
            System.out.println("使用原生线程池处理订单: " + orderId + ", 线程: " + Thread.currentThread().getName());
            try {
                Thread.sleep(2000); // 模拟处理时间
                System.out.println("订单 " + orderId + " 处理完成");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        return "订单已提交到原生线程池处理";
    }
    
    // 使用原生线程池的示例2:带返回值的任务
    @GetMapping("/native/inventory/{productId}")
    public CompletableFuture<String> checkInventoryWithNativePool(@PathVariable String productId) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("使用原生线程池检查库存: " + productId + ", 线程: " + Thread.currentThread().getName());
            try {
                Thread.sleep(1000); // 模拟检查时间
                return "商品 " + productId + " 库存充足 (原生线程池)";
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return "检查失败";
            }
        }, nativeThreadPool);
    }
    
    // 使用原生线程池的示例3:批量处理
    @PostMapping("/native/batch")
    public String processBatchWithNativePool(@RequestBody List<String> orderIds) {
        List<CompletableFuture<Void>> futures = new ArrayList<>();
        
        for (String orderId : orderIds) {
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                System.out.println("批量处理订单: " + orderId + ", 线程: " + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000); // 模拟处理时间
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }, nativeThreadPool);
            futures.add(future);
        }
        
        // 等待所有任务完成
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
            .thenRun(() -> System.out.println("所有批量任务处理完成"));
            
        return "批量订单已提交到原生线程池,共" + orderIds.size() + "个订单";
    }
    
    // 查看线程池状态
    @GetMapping("/native/status")
    public String getThreadPoolStatus() {
        return String.format(
            "线程池状态: 核心线程数=%d, 活动线程数=%d, 最大线程数=%d, 队列大小=%d, 已完成任务数=%d",
            nativeThreadPool.getCorePoolSize(),
            nativeThreadPool.getActiveCount(),
            nativeThreadPool.getMaximumPoolSize(),
            nativeThreadPool.getQueue().size(),
            nativeThreadPool.getCompletedTaskCount()
        );
    }
}
package com.example.demo.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Slf4j
@Service
public class NativeAsyncService {

    @Autowired
    @Qualifier("nativeThreadPool")
    private ThreadPoolExecutor nativeThreadPool;

    /**
     * 使用原生线程池处理订单
     */
    public CompletableFuture<String> processOrderNative(String orderId) {
        return CompletableFuture.supplyAsync(() -> {
            log.info("使用原生线程池处理订单: {}, 线程: {}", orderId, Thread.currentThread().getName());
            try {
                // 模拟订单处理逻辑
                TimeUnit.SECONDS.sleep(2);
                log.info("订单 {} 处理完成", orderId);
                return "订单 " + orderId + " 处理成功";
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                log.error("订单处理被中断: {}", orderId, e);
                return "订单 " + orderId + " 处理失败";
            }
        }, nativeThreadPool);
    }

    /**
     * 使用原生线程池检查库存
     */
    public CompletableFuture<String> checkInventoryNative(String productId) {
        return CompletableFuture.supplyAsync(() -> {
            log.info("使用原生线程池检查商品 {} 库存, 线程: {}", productId, Thread.currentThread().getName());
            try {
                // 模拟库存检查逻辑
                TimeUnit.SECONDS.sleep(1);
                // 模拟库存结果
                int stock = (int) (Math.random() * 100);
                String result = stock > 10 ? "充足" : "不足";
                return String.format("商品 %s 库存: %d (%s)", productId, stock, result);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                log.error("库存检查被中断", e);
                return "库存检查失败";
            }
        }, nativeThreadPool);
    }

    /**
     * 批量处理订单 - 使用原生线程池
     */
    public CompletableFuture<List<String>> batchProcessOrdersNative(List<String> orderIds) {
        // 为每个订单创建异步任务
        List<CompletableFuture<String>> futures = orderIds.stream()
            .map(orderId -> CompletableFuture.supplyAsync(() -> {
                log.info("批量处理订单: {}, 线程: {}", orderId, Thread.currentThread().getName());
                try {
                    TimeUnit.SECONDS.sleep(1);
                    return "订单 " + orderId + " 处理完成";
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return "订单 " + orderId + " 处理失败";
                }
            }, nativeThreadPool))
            .toList();

        // 等待所有任务完成并收集结果
        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
            .thenApply(v -> futures.stream()
                .map(CompletableFuture::join)
                .toList());
    }

    /**
     * 带超时控制的异步任务
     */
    public CompletableFuture<String> processOrderWithTimeout(String orderId, long timeout, TimeUnit unit) {
        return CompletableFuture.supplyAsync(() -> {
            log.info("处理订单 {} (带超时控制), 线程: {}", orderId, Thread.currentThread().getName());
            try {
                // 模拟长时间处理
                TimeUnit.SECONDS.sleep(5);
                return "订单 " + orderId + " 处理成功";
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return "订单 " + orderId + " 处理失败";
            }
        }, nativeThreadPool)
        .orTimeout(timeout, unit) // 设置超时
        .exceptionally(ex -> "订单 " + orderId + " 处理超时: " + ex.getMessage());
    }

    /**
     * 获取线程池状态信息
     */
    public String getThreadPoolStatus() {
        return String.format(
            "原生线程池状态:\n" +
            "核心线程数: %d\n" +
            "活动线程数: %d\n" +
            "最大线程数: %d\n" +
            "队列大小: %d\n" +
            "已完成任务: %d\n" +
            "总任务数: %d",
            nativeThreadPool.getCorePoolSize(),
            nativeThreadPool.getActiveCount(),
            nativeThreadPool.getMaximumPoolSize(),
            nativeThreadPool.getQueue().size(),
            nativeThreadPool.getCompletedTaskCount(),
            nativeThreadPool.getTaskCount()
        );
    }

    /**
     * 优雅关闭线程池的示例方法
     */
    public void gracefulShutdown() {
        log.info("开始优雅关闭线程池...");
        nativeThreadPool.shutdown(); // 不再接受新任务
        
        try {
            // 等待现有任务完成,最多等待30秒
            if (!nativeThreadPool.awaitTermination(30, TimeUnit.SECONDS)) {
                log.warn("线程池未在指定时间内关闭,尝试强制关闭");
                nativeThreadPool.shutdownNow(); // 强制关闭
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            nativeThreadPool.shutdownNow();
        }
        log.info("线程池已关闭");
    }
}
package com.example.demo.controller;

import com.example.demo.service.NativeAsyncService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

@Slf4j
@RestController
@RequestMapping("/api/native")
public class NativeAsyncController {

    @Autowired
    private NativeAsyncService nativeAsyncService;

    @PostMapping("/order/{orderId}")
    public CompletableFuture<String> processOrder(@PathVariable String orderId) {
        log.info("接收到订单处理请求: {}", orderId);
        return nativeAsyncService.processOrderNative(orderId);
    }

    @GetMapping("/inventory/{productId}")
    public CompletableFuture<String> checkInventory(@PathVariable String productId) {
        log.info("接收到库存检查请求: {}", productId);
        return nativeAsyncService.checkInventoryNative(productId);
    }

    @PostMapping("/batch/orders")
    public CompletableFuture<List<String>> batchProcessOrders(@RequestBody List<String> orderIds) {
        log.info("接收到批量订单处理请求,订单数量: {}", orderIds.size());
        return nativeAsyncService.batchProcessOrdersNative(orderIds);
    }

    @PostMapping("/order/{orderId}/timeout")
    public CompletableFuture<String> processOrderWithTimeout(@PathVariable String orderId, 
                                                           @RequestParam(defaultValue = "3") long timeout) {
        log.info("接收到带超时的订单处理请求: {}, 超时时间: {}秒", orderId, timeout);
        return nativeAsyncService.processOrderWithTimeout(orderId, timeout, TimeUnit.SECONDS);
    }

    @GetMapping("/pool/status")
    public String getThreadPoolStatus() {
        return nativeAsyncService.getThreadPoolStatus();
    }

    @PostMapping("/pool/shutdown")
    public String shutdownThreadPool() {
        new Thread(() -> nativeAsyncService.gracefulShutdown()).start();
        return "线程池关闭指令已发送";
    }
}

 

本文发布于程序达人 ,转载请注明出处,谢谢合作

0 人认为有用
0 评论

相关热点文章推荐

程序达人 - chengxudaren.com

一个帮助开发者成长的社区

相关文章