Skip to content

Stream API 深度解析

Stream API 是 Java 8 最重要的特性之一,彻底改变了集合处理的方式。理解其惰性求值和并行流原理,能写出更优雅高效的代码。

Stream 的本质

Stream 不是数据结构,而是对数据源的计算描述

java
// Stream 的三个阶段
List<String> names = List.of("Alice", "Bob", "Charlie", "David", "Eve");

List<String> result = names.stream()          // 1. 创建 Stream(数据源)
    .filter(s -> s.length() > 3)              // 2. 中间操作(惰性,不立即执行)
    .map(String::toUpperCase)                 // 2. 中间操作
    .sorted()                                 // 2. 中间操作
    .collect(Collectors.toList());            // 3. 终止操作(触发执行)

// 没有终止操作,中间操作永远不会执行!
Stream<String> stream = names.stream().filter(s -> {
    System.out.println("过滤: " + s);  // 不会打印
    return s.length() > 3;
});
// 此时什么都没发生

创建 Stream

java
// 从集合
List<Integer> list = List.of(1, 2, 3, 4, 5);
Stream<Integer> s1 = list.stream();
Stream<Integer> s2 = list.parallelStream();  // 并行流

// 从数组
int[] arr = {1, 2, 3, 4, 5};
IntStream s3 = Arrays.stream(arr);
Stream<Integer> s4 = Stream.of(1, 2, 3, 4, 5);

// 无限流
Stream<Integer> s5 = Stream.iterate(0, n -> n + 2);  // 0, 2, 4, 6, ...
Stream<Double> s6 = Stream.generate(Math::random);    // 随机数流

// 范围流(基本类型)
IntStream range = IntStream.range(1, 10);       // [1, 10)
IntStream rangeClosed = IntStream.rangeClosed(1, 10);  // [1, 10]

// 文件流
try (Stream<String> lines = Files.lines(Path.of("data.txt"))) {
    lines.forEach(System.out::println);
}

中间操作

java
List<Employee> employees = getEmployees();

// filter — 过滤
employees.stream()
    .filter(e -> e.getSalary() > 10000)
    .filter(e -> e.getDept().equals("Engineering"));

// map — 转换
employees.stream()
    .map(Employee::getName)           // 提取字段
    .map(String::toUpperCase);        // 转换类型

// flatMap — 扁平化
List<List<Integer>> nested = List.of(List.of(1, 2), List.of(3, 4));
nested.stream()
    .flatMap(Collection::stream)      // [[1,2],[3,4]] → [1,2,3,4]
    .forEach(System.out::println);

// distinct — 去重(依赖 equals/hashCode)
Stream.of(1, 2, 2, 3, 3, 3).distinct();  // [1, 2, 3]

// sorted — 排序
employees.stream()
    .sorted(Comparator.comparing(Employee::getSalary).reversed()
        .thenComparing(Employee::getName));

// peek — 调试用,不改变流
employees.stream()
    .filter(e -> e.getSalary() > 10000)
    .peek(e -> System.out.println("过滤后: " + e.getName()))
    .map(Employee::getName)
    .collect(Collectors.toList());

// limit / skip — 截取
Stream.iterate(1, n -> n + 1)
    .skip(10)    // 跳过前 10 个
    .limit(5)    // 取 5 个
    .forEach(System.out::println);  // 11, 12, 13, 14, 15

// takeWhile / dropWhile(Java 9+)
Stream.of(1, 2, 3, 4, 5, 1, 2)
    .takeWhile(n -> n < 4)   // [1, 2, 3](遇到不满足条件立即停止)
    .forEach(System.out::println);

终止操作

java
// collect — 收集到集合
List<String> nameList = employees.stream()
    .map(Employee::getName)
    .collect(Collectors.toList());

Set<String> nameSet = employees.stream()
    .map(Employee::getDept)
    .collect(Collectors.toSet());

// 分组
Map<String, List<Employee>> byDept = employees.stream()
    .collect(Collectors.groupingBy(Employee::getDept));

// 分组 + 统计
Map<String, Long> countByDept = employees.stream()
    .collect(Collectors.groupingBy(Employee::getDept, Collectors.counting()));

Map<String, Double> avgSalaryByDept = employees.stream()
    .collect(Collectors.groupingBy(
        Employee::getDept,
        Collectors.averagingDouble(Employee::getSalary)
    ));

// 分区(按 boolean 分两组)
Map<Boolean, List<Employee>> partitioned = employees.stream()
    .collect(Collectors.partitioningBy(e -> e.getSalary() > 15000));

// joining — 字符串拼接
String names = employees.stream()
    .map(Employee::getName)
    .collect(Collectors.joining(", ", "[", "]"));
// [Alice, Bob, Charlie]

// toMap
Map<Long, Employee> empMap = employees.stream()
    .collect(Collectors.toMap(
        Employee::getId,
        e -> e,
        (e1, e2) -> e1  // 重复 key 时的合并策略
    ));

// reduce — 归约
Optional<Integer> sum = Stream.of(1, 2, 3, 4, 5)
    .reduce(Integer::sum);  // Optional[15]

int sumWithIdentity = Stream.of(1, 2, 3, 4, 5)
    .reduce(0, Integer::sum);  // 15

// forEach / forEachOrdered
employees.stream().forEach(System.out::println);

// count / min / max / findFirst / findAny / anyMatch / allMatch / noneMatch
long count = employees.stream().filter(e -> e.getSalary() > 10000).count();

Optional<Employee> richest = employees.stream()
    .max(Comparator.comparing(Employee::getSalary));

boolean anyRich = employees.stream()
    .anyMatch(e -> e.getSalary() > 50000);

并行流

java
// 并行流使用 ForkJoinPool.commonPool()
List<Integer> numbers = IntStream.rangeClosed(1, 1_000_000)
    .boxed()
    .collect(Collectors.toList());

// 串行
long sum1 = numbers.stream()
    .mapToLong(Integer::longValue)
    .sum();

// 并行(多核 CPU 下更快)
long sum2 = numbers.parallelStream()
    .mapToLong(Integer::longValue)
    .sum();

// 注意:并行流不适合所有场景
// ✅ 适合:数据量大、计算密集、无状态操作
// ❌ 不适合:数据量小、有状态操作、需要顺序、I/O 操作

// 并行流的陷阱:线程安全
List<Integer> result = new ArrayList<>();  // 非线程安全!
numbers.parallelStream()
    .filter(n -> n % 2 == 0)
    .forEach(result::add);  // 可能丢失数据!

// 正确做法:使用 collect
List<Integer> safeResult = numbers.parallelStream()
    .filter(n -> n % 2 == 0)
    .collect(Collectors.toList());  // 线程安全

自定义 Collector

java
// 实现一个统计 Collector
public class StatisticsCollector implements Collector<Double,
        DoubleSummaryStatistics, DoubleSummaryStatistics> {

    @Override
    public Supplier<DoubleSummaryStatistics> supplier() {
        return DoubleSummaryStatistics::new;
    }

    @Override
    public BiConsumer<DoubleSummaryStatistics, Double> accumulator() {
        return DoubleSummaryStatistics::accept;
    }

    @Override
    public BinaryOperator<DoubleSummaryStatistics> combiner() {
        return (a, b) -> { a.combine(b); return a; };
    }

    @Override
    public Function<DoubleSummaryStatistics, DoubleSummaryStatistics> finisher() {
        return Function.identity();
    }

    @Override
    public Set<Characteristics> characteristics() {
        return Set.of(Characteristics.UNORDERED, Characteristics.IDENTITY_FINISH);
    }
}

// 使用
DoubleSummaryStatistics stats = employees.stream()
    .map(Employee::getSalary)
    .collect(new StatisticsCollector());
System.out.println("平均薪资: " + stats.getAverage());
System.out.println("最高薪资: " + stats.getMax());

实战:复杂数据处理

java
// 场景:统计各部门薪资 Top3 员工
Map<String, List<String>> top3ByDept = employees.stream()
    .collect(Collectors.groupingBy(
        Employee::getDept,
        Collectors.collectingAndThen(
            Collectors.toList(),
            list -> list.stream()
                .sorted(Comparator.comparing(Employee::getSalary).reversed())
                .limit(3)
                .map(Employee::getName)
                .collect(Collectors.toList())
        )
    ));

// 场景:多级分组
Map<String, Map<String, List<Employee>>> grouped = employees.stream()
    .collect(Collectors.groupingBy(
        Employee::getDept,
        Collectors.groupingBy(e -> e.getSalary() > 15000 ? "高薪" : "普通")
    ));

// 场景:流式处理大文件(内存友好)
try (Stream<String> lines = Files.lines(Path.of("large.csv"))) {
    Map<String, Long> wordCount = lines
        .flatMap(line -> Arrays.stream(line.split(",")))
        .map(String::trim)
        .filter(s -> !s.isEmpty())
        .collect(Collectors.groupingBy(
            Function.identity(),
            Collectors.counting()
        ));
}

性能建议

  • 数据量 < 1000:串行流更快(并行流有线程切换开销)
  • 数据量 > 10000 且 CPU 密集:并行流有优势
  • 避免在并行流中使用有状态的 lambda
  • Collectors.toUnmodifiableList()toList() 更安全(Java 10+)
  • Java 16+ 直接用 .toList() 替代 collect(Collectors.toList())

系统学习 Java 生态,深入底层架构