高并发风控策略模型代码设计思想

Scroll Down

在Java多线程编程中,处理风控策略模型这种复杂依赖关系时,需要合理使用线程池、锁机制(如ReentrantLockReadWriteLock)以及并发工具类(如ConcurrentHashMapCountDownLatch)来优化并发性能和避免资源浪费。以下是一个结合风控策略模型的多线程实现方案:


一、常用多线程类及组件

  1. 线程池 (ExecutorService)

    • 使用ThreadPoolExecutorExecutors工厂方法创建,避免频繁创建线程。
    • 适合场景:异步执行多个风控规则集合的计算。
  2. 锁机制

    • ReentrantLock:显式锁,支持公平/非公平策略,用于保护共享资源(如缓存数据)。
    • ReadWriteLock:读写分离锁,适合读多写少的场景(如指标数据的缓存读取)。
  3. 并发集合

    • ConcurrentHashMap:线程安全的哈希表,用于缓存已计算的指标结果(避免重复计算)。
    • FutureCompletableFuture:异步获取任务执行结果,支持依赖关系(如规则依赖指标)。
  4. 同步工具

    • CountDownLatch/CyclicBarrier:协调多个线程的同步(如等待所有规则计算完成)。
    • Semaphore:控制同时访问特定资源的线程数(如限制数据库连接池并发数)。
  5. 数据库连接池

    • 常用库:HikariCP、Druid等,通过DataSource获取连接,避免频繁创建数据库连接。

二、风控策略模型设计(简化版)

假设风控模型包含以下类(继承关系略):

// 风控规则集合
class RiskRuleSet {
    List<RiskRule> rules;
}

// 风控规则
class RiskRule {
    List<Indicator> indicators; // 依赖的指标
}

// 指标(可能依赖其他指标或数据源)
class Indicator {
    List<DataSource> dataSources; // 数据源(如数据库、API)
    List<Indicator> dependencies; // 依赖的其他指标
}

// 数据源(包含接口)
class DataSource {
    List<ServiceInterface> interfaces;
}

// 数据接口(如数据库查询、HTTP调用)
class ServiceInterface {
    String execute(); // 执行获取数据
}

三、多线程并发处理方案

目标:并行计算多个RiskRuleSet,避免重复计算相同指标/数据源。

1. 整体流程

  • 使用线程池异步执行每个RiskRuleSet的计算。
  • 为每个指标的计算任务使用CompletableFuture,通过依赖组合实现异步链式调用。
  • 使用ConcurrentHashMap缓存已计算的指标结果(Key为指标ID,Value为Future或计算结果)。

2. 避免重复计算与资源浪费

  • 指标级别缓存
    多个规则可能依赖同一指标,使用ConcurrentHashMap.computeIfAbsent()保证每个指标只计算一次。
  • 数据源级别控制
    相同数据源可能被多个指标依赖,通过连接池限制并发,并用ReadWriteLock保护缓存(读缓存不加锁,写缓存加锁)。
  • 接口调用优化
    相同接口的多次调用合并为一次(如用Future缓存正在执行的请求,其他线程等待结果)。

3. 代码示例(关键部分)

public class RiskEngine {
    // 线程池(根据业务需求调整参数)
    private ExecutorService executor = Executors.newFixedThreadPool(20);
    // 指标缓存(Key: indicatorId, Value: 计算结果Future)
    private ConcurrentHashMap<String, CompletableFuture<Object>> indicatorCache = new ConcurrentHashMap<>();
    // 数据源缓存(Key: dataSourceId, Value: 数据结果)
    private ConcurrentHashMap<String, Object> dataSourceCache = new ConcurrentHashMap<>();
    // 读写锁(保护数据源缓存,读多写少)
    private ReadWriteLock lock = new ReentrantReadWriteLock();

    // 计算风控规则集合
    public CompletableFuture<Boolean> evaluateRuleSet(RiskRuleSet ruleSet) {
        List<CompletableFuture<Boolean>> ruleFutures = ruleSet.getRules().stream()
                .map(rule -> evaluateRule(rule).exceptionally(ex -> false)) // 规则失败返回false
                .collect(Collectors.toList());
        // 所有规则通过才算整体通过
        return CompletableFuture.allOf(ruleFutures.toArray(new CompletableFuture[0]))
                .thenApply(v -> ruleFutures.stream().allMatch(f -> f.join()));
    }

    // 计算单个规则(依赖多个指标)
    private CompletableFuture<Boolean> evaluateRule(RiskRule rule) {
        List<CompletableFuture<Object>> indicatorFutures = rule.getIndicators().stream()
                .map(this::evaluateIndicator)
                .collect(Collectors.toList());
        // 规则逻辑:所有指标结果计算后,进行判断(这里简化为均非空则通过)
        return CompletableFuture.allOf(indicatorFutures.toArray(new CompletableFuture[0]))
                .thenApply(v -> indicatorFutures.stream().allMatch(f -> f.join() != null));
    }

    // 计算指标(支持依赖其他指标或数据源)
    private CompletableFuture<Object> evaluateIndicator(Indicator indicator) {
        String indicatorId = indicator.getId();
        // 先查缓存,若存在则直接返回(避免重复计算)
        return indicatorCache.computeIfAbsent(indicatorId, id -> {
            // 没有缓存,异步计算指标
            CompletableFuture<Object> future = new CompletableFuture<>();
            executor.submit(() -> {
                try {
                    // 1. 先计算依赖的指标(递归)
                    List<CompletableFuture<Object>> dependencyFutures = indicator.getDependencies().stream()
                            .map(this::evaluateIndicator)
                            .collect(Collectors.toList());
                    // 等待所有依赖指标计算完成
                    CompletableFuture.allOf(dependencyFutures.toArray(new CompletableFuture[0])).join();

                    // 2. 获取数据源数据(可能多个)
                    List<Object> dataValues = new ArrayList<>();
                    for (DataSource dataSource : indicator.getDataSources()) {
                        Object data = getDataSourceData(dataSource);
                        dataValues.add(data);
                    }

                    // 3. 执行指标计算逻辑(根据业务实现)
                    Object result = calculateIndicator(dataValues, dependencyFutures.stream().map(f -> f.join()).collect(Collectors.toList()));
                    future.complete(result);
                } catch (Exception ex) {
                    future.completeExceptionally(ex);
                }
            });
            return future;
        });
    }

    // 获取数据源数据(带缓存和并发控制)
    private Object getDataSourceData(DataSource dataSource) {
        String dataSourceId = dataSource.getId();
        // 先尝试读缓存
        lock.readLock().lock();
        try {
            if (dataSourceCache.containsKey(dataSourceId)) {
                return dataSourceCache.get(dataSourceId);
            }
        } finally {
            lock.readLock().unlock();
        }

        // 缓存不存在,加写锁查询
        lock.writeLock().lock();
        try {
            // 双重检查避免重复查询
            if (dataSourceCache.containsKey(dataSourceId)) {
                return dataSourceCache.get(dataSourceId);
            }
            // 执行数据源接口调用(合并相同接口的请求)
            Object data = fetchDataFromDataSource(dataSource);
            dataSourceCache.put(dataSourceId, data);
            return data;
        } finally {
            lock.writeLock().unlock();
        }
    }

    // 实际调用数据源接口(可限制并发数)
    private Object fetchDataFromDataSource(DataSource dataSource) {
        // 这里可加入连接池控制(如HikariCP)、接口限流等
        // 模拟调用第一个接口
        ServiceInterface service = dataSource.getInterfaces().get(0);
        return service.execute();
    }
}

四、注意事项

  1. 死锁避免
    指标依赖可能存在循环(如A依赖B,B依赖A),需在模型设计时避免,或引入超时/中断机制。
  2. 线程池配置
    根据业务量调整线程池大小(如IO密集型任务可增加线程数)。
  3. 缓存失效
    若数据源数据更新,需设计缓存失效策略(如定时刷新、手动清除)。
  4. 异常处理
    使用CompletableFuture.exceptionally()处理单个任务失败,避免整体失败。

五、总结

通过线程池执行任务、CompletableFuture管理依赖、ConcurrentHashMap缓存中间结果、读写锁保护共享资源,可高效实现风控策略的并发计算,避免重复访问数据源和指标,提升性能。实际场景中还需根据业务特点调整锁粒度、缓存策略和线程池参数。