Java 8 中的 Stream 是对集合(Collection)对象功能的增强,它专注于对集合进行各种便利、高效的聚合操作(aggregate operation),或者大批量数据操作 (bulk data operation)。
很多数据处理的场景不得不脱离 RDBMS,或者以底层返回的数据为基础进行更上层的数据统计。而以前 Java 的集合 API 中,仅仅有极少量的辅助型方法,更多的时候是程序员需要用 Iterator 来遍历集合,完成相关的聚合应用逻辑。这是一种远不够高效、笨拙的方法。
Stream API 借助于 Lambda 表达式,极大的提高编程效率。同时它提供串行和并行两种模式进行汇聚操作,并发模式能够充分利用多核处理器的优势,使用 fork/join 并行方式来拆分任务和加速处理过程。
Stream 不是一种数据结构,并不保存数据,其更像一种高级的迭代器(Iterator),阮一峰老师的 图解 Monad 会更有助于理解 Stream。
创建 Stream
- 从 Stream 创建
Stream.of()IntStreamLongStreamDoubleStream
Stream.builder().add().build()- 遍历1~3:
Stream.iterate(1, i -> ++i).limit(3) - 连接两个Stream:
Stream.concat(Stream<? extends T> a, Stream<? extends T> b) Stream.empty()
- 从 Collection 和 数组 创建
collection.stream()collection.parallelStream()Arrays.stream(T array)
- 自己构建
java.util.Spliterator
- 其它
java.io.BufferedReader.lines()java.nio.file.Files.walk()Random.ints()BitSet.stream()Pattern.splitAsStream(java.lang.CharSequence)JarFile.stream()
操作类型
中间转换操作(Intermediate)
一个流可以后面跟随零个或多个 intermediate 操作。其目的主要是打开流,做出某种程度的数据映射/过滤,然后返回一个新的流,交给下一个操作使用。这类操作都是惰性化的(lazy),就是说,仅仅调用到这类方法,并没有真正开始流的遍历
- 无状态操作
map: 处理迭代过程中的每个元素,每个输入元素,都按照规则转换成为另外一个元素flatMap: 与Map的区别在于可以把多维集合压平成另一个Stream进行输出,可以理解为嵌套循环处理,如:在每个字母后面追加,并打印Stream.of("Hello", "World").flatMap(w -> Stream.of(w.split(""))).map(c -> c + ',').forEach(System.out::println);filter: 如果返回 true ,元素被留下进行后续操作distinct: 滤重peek: 与 map 的不同在于其没有返回值,可以处理每个元素,但是结果不会传递下去skip: 跳过前几个元素limit: 限制只处理几个元素parallel: 标记后续操作为并行处理sequential: 标记后续操作为串行处理unordered: 无序的流
- 有状态操作
sorted: 排序,该操作会把并行流前后的无状态操作进行分割,降低并行性
一个操作可能会影响流的有序,比如
map方法,它会用不同的值甚至类型替换流中的元素,所以输入元素的有序性已经变得没有意义了;但是对于filter方法来说,它只是丢弃掉一些值而已,输入元素的有序性还是保障的。对于串行流,流有序与否不会影响其性能,只是会影响确定性(determinism),无序流在多次执行的时候结果可能是不一样的。对于并行流,去掉有序这个约束可能会提供性能,比如
distinct、groupingBy这些聚合操作。–鸟窝 《Java Stream 详解》
Terminal 终止操作
一个流只能有一个 terminal 操作,当这个操作执行后,流就被使用“光”了,无法再被操作。所以这必定是流的最后一个操作。Terminal 操作的执行,才会真正开始流的遍历,并且会生成一个结果。
forEach: 遍历forEachOrdered: 并不是排序后输出,而是在parallel情况下保证按照集合按照原始顺序输出,forEach在parallel无法保证顺序toArray: 结果存入数组collect: 结果存入集合iterator: 返回迭代器min: 求最小值max: 求最大值count: 计数reduce(min/max/count/… 都是特殊的 reduce)- 计数:
Stream.of(1, 2, 3).reduce(0, (reslut, b) -> ++reslut); - 求最小:
Stream.of(1, 2, 3).reduce(Integer.MAX_VALUE, Math::min); - 求最大:
Stream.of(1, 2, 3).reduce(Integer.MIN_VALUE, Math::max); - 求和:
Stream.of(1, 2, 3).reduce(0, (reslut, b) -> reslut + b); - 求平均:
DoubleStream.of(1, 2, 3).reduce(0, (reslut, b) -> (reslut + b / 2)); - 取第一:
Stream.of(1, 2, 3).reduce((reslut, b) -> reslut).get(); - 取最后:
Stream.of(1, 2, 3).reduce((reslut, b) -> b).get();
- 计数:
short-circuiting 短路操作
当 Stream 是一个无限大的集合的时候,就需要一个短路操作来使Stream 返回一个有限的结果集。
anyMatch: 只要有一个元素符合条件就返回 trueallMatch: 所有元素读符合条件才返回truenoneMatch: 所有元素都不符合条件才返回truefindFirst: 获取第一个元素findAny: 串行的情况还是返回第一个,并行的情况就不确定了,可能返回任意一个limit: 限制只处理几个元素
收集器(Collectors)
辅助对象,便于演示1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public static class Person {
public Long id;
public String name;
public Integer age;
public Person(Long id, String name, Integer age) {
this.id = id;
this.name = name;
this.age = age;
}
public String toString() {
return "Person{" +
"id=" + id +
", name='" + name + '\'' +
", age=" + age +
'}';
}
}
Stream1
2
3
4
5
6Stream<Person> personStream = Stream.of(
new Person(1L, "张三", 18),
new Person(2L, "张六", 18),
new Person(3L, "李四", 23),
new Person(4L, "赵五", 26)
);
常规用法
Collectors.toList(): 结果存入ArrayListCollectors.toSet(): 结果存入HashSetpersonStream.map(p -> p.name).collect(Collectors.joining(",")): 姓名以,分割Collectors.counting(): 计数Collectors.averagingInt(p -> p.age): 求平均年龄averagingIntaveragingLongaveragingDouble
Collectors.summingInt(p -> p.age): 求和,等于mapToInt(p -> p.age).sum()summingIntsummingLongsummingDouble
Collectors.summarizingInt(i -> i.age): 统计(count,sum,min,average,max)summarizingIntsummarizingLongsummarizingDouble
Collectors.maxBy(Comparator.comparing(p -> p.age)): 年龄最大的人Collectors.minBy(Comparator.comparing(p -> p.age)): 年龄最小的人
toCollection
Collectors.toCollection(LinkedHashSet::new)Collectors.toCollection(TreeSet::new)- …
toMap / toConcurrentMap
- Key 是 age,Value 是 name,Key重复的情况下 Value用
,分割
1 | // {18=张三,张六, 23=李四, 26=赵五} |
- 获取ID与对象的映射关系
1 | Map<Long, Person> idPersonMapping = personStream.collect(Collectors.toMap( |
如果 Key 重复会报异常 java.lang.IllegalStateException: Duplicate key xxx
分组 (groupingBy / groupingByConcurrent)
1 | // Map<Integer, List<Person>> 按照性别分组 |
分区(partitioningBy)
满足条件的分为一组,不满足条件的分为另外一组1
2// Map<Boolean, List<Person>> 大于20分为一组,小于等于20的分为另外一组
personStream.collect(Collectors.partitioningBy(p -> p.age > 20))
其他
1 | Collector<Person, StringJoiner, String> personNameCollector = |