Stream API 深度解析
Stream API 是 Java 8 最重要的特性之一,彻底改变了集合处理的方式。理解其惰性求值和并行流原理,能写出更优雅高效的代码。
Stream 的本质
Stream 不是数据结构,而是对数据源的计算描述:
java
// Stream 的三个阶段
List<String> names = List.of("Alice", "Bob", "Charlie", "David", "Eve");
List<String> result = names.stream() // 1. 创建 Stream(数据源)
.filter(s -> s.length() > 3) // 2. 中间操作(惰性,不立即执行)
.map(String::toUpperCase) // 2. 中间操作
.sorted() // 2. 中间操作
.collect(Collectors.toList()); // 3. 终止操作(触发执行)
// 没有终止操作,中间操作永远不会执行!
Stream<String> stream = names.stream().filter(s -> {
System.out.println("过滤: " + s); // 不会打印
return s.length() > 3;
});
// 此时什么都没发生创建 Stream
java
// 从集合
List<Integer> list = List.of(1, 2, 3, 4, 5);
Stream<Integer> s1 = list.stream();
Stream<Integer> s2 = list.parallelStream(); // 并行流
// 从数组
int[] arr = {1, 2, 3, 4, 5};
IntStream s3 = Arrays.stream(arr);
Stream<Integer> s4 = Stream.of(1, 2, 3, 4, 5);
// 无限流
Stream<Integer> s5 = Stream.iterate(0, n -> n + 2); // 0, 2, 4, 6, ...
Stream<Double> s6 = Stream.generate(Math::random); // 随机数流
// 范围流(基本类型)
IntStream range = IntStream.range(1, 10); // [1, 10)
IntStream rangeClosed = IntStream.rangeClosed(1, 10); // [1, 10]
// 文件流
try (Stream<String> lines = Files.lines(Path.of("data.txt"))) {
lines.forEach(System.out::println);
}中间操作
java
List<Employee> employees = getEmployees();
// filter — 过滤
employees.stream()
.filter(e -> e.getSalary() > 10000)
.filter(e -> e.getDept().equals("Engineering"));
// map — 转换
employees.stream()
.map(Employee::getName) // 提取字段
.map(String::toUpperCase); // 转换类型
// flatMap — 扁平化
List<List<Integer>> nested = List.of(List.of(1, 2), List.of(3, 4));
nested.stream()
.flatMap(Collection::stream) // [[1,2],[3,4]] → [1,2,3,4]
.forEach(System.out::println);
// distinct — 去重(依赖 equals/hashCode)
Stream.of(1, 2, 2, 3, 3, 3).distinct(); // [1, 2, 3]
// sorted — 排序
employees.stream()
.sorted(Comparator.comparing(Employee::getSalary).reversed()
.thenComparing(Employee::getName));
// peek — 调试用,不改变流
employees.stream()
.filter(e -> e.getSalary() > 10000)
.peek(e -> System.out.println("过滤后: " + e.getName()))
.map(Employee::getName)
.collect(Collectors.toList());
// limit / skip — 截取
Stream.iterate(1, n -> n + 1)
.skip(10) // 跳过前 10 个
.limit(5) // 取 5 个
.forEach(System.out::println); // 11, 12, 13, 14, 15
// takeWhile / dropWhile(Java 9+)
Stream.of(1, 2, 3, 4, 5, 1, 2)
.takeWhile(n -> n < 4) // [1, 2, 3](遇到不满足条件立即停止)
.forEach(System.out::println);终止操作
java
// collect — 收集到集合
List<String> nameList = employees.stream()
.map(Employee::getName)
.collect(Collectors.toList());
Set<String> nameSet = employees.stream()
.map(Employee::getDept)
.collect(Collectors.toSet());
// 分组
Map<String, List<Employee>> byDept = employees.stream()
.collect(Collectors.groupingBy(Employee::getDept));
// 分组 + 统计
Map<String, Long> countByDept = employees.stream()
.collect(Collectors.groupingBy(Employee::getDept, Collectors.counting()));
Map<String, Double> avgSalaryByDept = employees.stream()
.collect(Collectors.groupingBy(
Employee::getDept,
Collectors.averagingDouble(Employee::getSalary)
));
// 分区(按 boolean 分两组)
Map<Boolean, List<Employee>> partitioned = employees.stream()
.collect(Collectors.partitioningBy(e -> e.getSalary() > 15000));
// joining — 字符串拼接
String names = employees.stream()
.map(Employee::getName)
.collect(Collectors.joining(", ", "[", "]"));
// [Alice, Bob, Charlie]
// toMap
Map<Long, Employee> empMap = employees.stream()
.collect(Collectors.toMap(
Employee::getId,
e -> e,
(e1, e2) -> e1 // 重复 key 时的合并策略
));
// reduce — 归约
Optional<Integer> sum = Stream.of(1, 2, 3, 4, 5)
.reduce(Integer::sum); // Optional[15]
int sumWithIdentity = Stream.of(1, 2, 3, 4, 5)
.reduce(0, Integer::sum); // 15
// forEach / forEachOrdered
employees.stream().forEach(System.out::println);
// count / min / max / findFirst / findAny / anyMatch / allMatch / noneMatch
long count = employees.stream().filter(e -> e.getSalary() > 10000).count();
Optional<Employee> richest = employees.stream()
.max(Comparator.comparing(Employee::getSalary));
boolean anyRich = employees.stream()
.anyMatch(e -> e.getSalary() > 50000);并行流
java
// 并行流使用 ForkJoinPool.commonPool()
List<Integer> numbers = IntStream.rangeClosed(1, 1_000_000)
.boxed()
.collect(Collectors.toList());
// 串行
long sum1 = numbers.stream()
.mapToLong(Integer::longValue)
.sum();
// 并行(多核 CPU 下更快)
long sum2 = numbers.parallelStream()
.mapToLong(Integer::longValue)
.sum();
// 注意:并行流不适合所有场景
// ✅ 适合:数据量大、计算密集、无状态操作
// ❌ 不适合:数据量小、有状态操作、需要顺序、I/O 操作
// 并行流的陷阱:线程安全
List<Integer> result = new ArrayList<>(); // 非线程安全!
numbers.parallelStream()
.filter(n -> n % 2 == 0)
.forEach(result::add); // 可能丢失数据!
// 正确做法:使用 collect
List<Integer> safeResult = numbers.parallelStream()
.filter(n -> n % 2 == 0)
.collect(Collectors.toList()); // 线程安全自定义 Collector
java
// 实现一个统计 Collector
public class StatisticsCollector implements Collector<Double,
DoubleSummaryStatistics, DoubleSummaryStatistics> {
@Override
public Supplier<DoubleSummaryStatistics> supplier() {
return DoubleSummaryStatistics::new;
}
@Override
public BiConsumer<DoubleSummaryStatistics, Double> accumulator() {
return DoubleSummaryStatistics::accept;
}
@Override
public BinaryOperator<DoubleSummaryStatistics> combiner() {
return (a, b) -> { a.combine(b); return a; };
}
@Override
public Function<DoubleSummaryStatistics, DoubleSummaryStatistics> finisher() {
return Function.identity();
}
@Override
public Set<Characteristics> characteristics() {
return Set.of(Characteristics.UNORDERED, Characteristics.IDENTITY_FINISH);
}
}
// 使用
DoubleSummaryStatistics stats = employees.stream()
.map(Employee::getSalary)
.collect(new StatisticsCollector());
System.out.println("平均薪资: " + stats.getAverage());
System.out.println("最高薪资: " + stats.getMax());实战:复杂数据处理
java
// 场景:统计各部门薪资 Top3 员工
Map<String, List<String>> top3ByDept = employees.stream()
.collect(Collectors.groupingBy(
Employee::getDept,
Collectors.collectingAndThen(
Collectors.toList(),
list -> list.stream()
.sorted(Comparator.comparing(Employee::getSalary).reversed())
.limit(3)
.map(Employee::getName)
.collect(Collectors.toList())
)
));
// 场景:多级分组
Map<String, Map<String, List<Employee>>> grouped = employees.stream()
.collect(Collectors.groupingBy(
Employee::getDept,
Collectors.groupingBy(e -> e.getSalary() > 15000 ? "高薪" : "普通")
));
// 场景:流式处理大文件(内存友好)
try (Stream<String> lines = Files.lines(Path.of("large.csv"))) {
Map<String, Long> wordCount = lines
.flatMap(line -> Arrays.stream(line.split(",")))
.map(String::trim)
.filter(s -> !s.isEmpty())
.collect(Collectors.groupingBy(
Function.identity(),
Collectors.counting()
));
}性能建议
- 数据量 < 1000:串行流更快(并行流有线程切换开销)
- 数据量 > 10000 且 CPU 密集:并行流有优势
- 避免在并行流中使用有状态的 lambda
Collectors.toUnmodifiableList()比toList()更安全(Java 10+)- Java 16+ 直接用
.toList()替代collect(Collectors.toList())