Java 从版本8开始支持“Stream API”,即函数式编程,可以用简单的代码表达出比较复杂的遍历操作。本文介绍这些 Stream API 的基本概念,用法,以及一些参考资料。我之前写 Python 比较多,所以一些地方可能用 Python 的视角来解释。
简单用法
1 2 3 4 5 6 7 8 9 10 11 12 |
public class streamApi { public static void main(String[] args) { List<String> myArray = Arrays.asList("a1", "a2", "b1", "b2", "c1", "c2"); myArray.stream() .filter(s -> s.startsWith("b")) .map(String::toUpperCase) .sorted() .forEach(System.out::print); } } |
这些函数和 Python 的 filter
map
sort
很像了,所以很容看懂。就是先过滤出以 "b"
开头的字符串,然后用 map
转换成大写的方式,排序之后,输出。
Stream API 中有一个概念,将这些 API 分成了两种:
- 中间结果(intermediate):像
filter
map
sorted
的结果都是中间结果,可以继续使用 Stream API 连续调用; - 最终结果(terminal):类似
forEach
,这种 API 将会终止 Stream 的调用,它们的结果要么是void
,要么是一个非 Stream 结果。
连续调用 Stream 的方式叫做 operation pipeline。所有的 Stream API 可以参考这个 javadoc。
大多数的 Stream 操作都接收一个 lambda 表达式作为参数,Lambda 表达式描述了 Stream 操作的一个具体行为,通常都是 stateless 和 non-interfering 的。
Non-interfering 意味着它不会修改原始的数据,比如上面的例子,没有操作去动 myList
,迭代结束之后,myList
还是保持着原来的样子。
Stateless 意味着操作都是确定的,没有依赖外面的变量(导致可能在执行期间改变)。
不同类型的 Stream 操作
Stream 可以从不同的数据类型创建,尤其是集合(Collections)。
List 和 Set 支持 stream()
方法和 parallelStream()
方法,parallenStream()
可以在多线程中执行。
在 List 上调用 stream()
可以创建一个 Stream 对象。
1 2 3 4 |
Arrays.asList("a1", "a2", "a3") .stream() .findFirst() .ifPresent(System.out::println); // a1 |
但是我们不必专门为了创建 Stream 对象而创建一个集合:
1 2 3 |
Stream.of("a1", "a2", "a3") .findFirst() .ifPresent(System.out::println); // a1 |
Stream.of()
可以从一些对象引用中自动创建一个 Stream。
除了从对象创建 Stream,Java8 还提供了方法从基本类型创建 Stream,比如int
long
double
,这些方法分别是 IntStream
LongStream
DoubleStream
.
IntStream 可以用来替代 for 循环:
1 2 3 4 5 6 |
IntStream.range(1, 4) .forEach(System.out::println); // 1 // 2 // 3 |
基本类型的 Stream 和普通的 Stream 对象基本一样,几点区别如下:
- 基本类型使用特殊的 lambda 表达式,比如
IntFunction
之于Function
,IntPredicate
之于Predicate
; - 基本类型支持一些特殊的“最终结果API”,比如
sum()
average()
;
1 2 3 4 |
Arrays.stream(new int[] {1, 2, 3}) .map(n -> 2 * n + 1) .average() .ifPresent(System.out::println); // 5.0 |
有时候,我们想把普通的 Stream 转换成原始类型的 Stream,比如我们想用 max()
,这时可以使用转换的方法 mapToInt()
mapToLong()
mapToDouble()
:
1 2 3 4 5 |
Stream.of("a1", "a2", "a3") .map(s -> s.substring(1)) .mapToInt(Integer::parseInt) .max() .ifPresent(System.out::println); // 3 |
原始类型可以通过 mapToObject
方法,将原始类型的 Stream 转换成普通的 Stream 对象。
1 2 3 4 5 6 7 |
IntStream.range(1, 4) .mapToObj(i -> "a" + i) .forEach(System.out::println); // a1 // a2 // a3 |
下面这个例子结合了普通的 Stream 和原始类型的 Stream:
1 2 3 4 5 6 7 8 |
Stream.of(1.0, 2.0, 3.0) .mapToInt(Double::intValue) .mapToObj(i -> "a" + i) .forEach(System.out::println); // a1 // a2 // a3 |
执行顺序
前面介绍了 Stream 的基本概念,下面开始深入原理。
产生中间结果的 Stream 一个比较重要的特性是,它是惰性的。下面这个例子,我们只有中间结果,没有最终结果的 Stream,最终 println
不会被执行。
1 2 3 4 5 |
Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> { System.out.println("filter: " + s); return true; }); |
因为 Stream 操作是惰性的,只有用到的时候才会真正执行。
如果我们在后面加上一个终止类型的 Stream 操作,println
就会执行了。
1 2 3 4 5 6 |
Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> { System.out.println("filter: " + s); return true; }) .forEach(s -> System.out.println("forEach: " + s)); |
这段代码的输出如下:
1 2 3 4 5 6 7 8 9 10 |
filter: d2 forEach: d2 filter: a2 forEach: a2 filter: b1 forEach: b1 filter: b3 forEach: b3 filter: c forEach: c |
注意从输出的顺序也可以看到“惰性执行”的特征:并不是所有的 filter
都打印出来,再打印出来 forEeach
。而是一个元素执行到底,再去执行下一个元素。
这样可以减少执行的次数。参考下面这个例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
Stream.of("d2", "a2", "b1", "b3", "c") .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .anyMatch(s -> { System.out.println("anyMatch: " + s); return s.startsWith("A"); }); // map: d2 // anyMatch: D2 // map: a2 // anyMatch: A2 |
anyMatch
会在找到第一个符合条件的元素就返回。这样我们并不需要对有的元素执行 map
,在第一个 anyMatch
返回 true
之后,执行就结束了。所以前面的中间状态 Stream 操作,会执行尽可能少的次数。
执行的顺序很重要(Stream 的优化)
下面这个例子,我们用了两个生成中间结果的 Stream 操作 map
filter
,和一个最终结果的操作 forEach
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
Stream.of("d2", "a2", "b1", "b3", "c") .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .filter(s -> { System.out.println("filter: " + s); return s.startsWith("A"); }) .forEach(s -> System.out.println("forEach: " + s)); // map: d2 // filter: D2 // map: a2 // filter: A2 // forEach: A2 // map: b1 // filter: B1 // map: b3 // filter: B3 // map: c // filter: C |
map
filter
各执行了5次,forEach
执行了1次。
如果我们在这里稍微改变一下顺序,将 filter
提前执行,可以将 map
的执行次数减少到1次。(有点像 SQL 优化)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> { System.out.println("filter: " + s); return s.startsWith("a"); }) .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .forEach(s -> System.out.println("forEach: " + s)); // filter: d2 // filter: a2 // map: a2 // forEach: A2 // filter: b1 // filter: b3 // filter: c |
现在 map
只执行一次了,在操作很大的集合的时候非常有用。
下面我们引入一下 sorted
这个操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
Stream.of("d2", "a2", "b1", "b3", "c") .sorted((s1, s2) -> { System.out.printf("sort: %s; %s\n", s1, s2); return s1.compareTo(s2); }) .filter(s -> { System.out.println("filter: " + s); return s.startsWith("a"); }) .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .forEach(s -> System.out.println("forEach: " + s)); |
排序是一个特殊的中间操作,是一个 stateful 的操作。因为需要原地排序。
输出如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
sort: a2; d2 sort: b1; a2 sort: b1; d2 sort: b1; a2 sort: b3; b1 sort: b3; d2 sort: c; b3 sort: c; d2 filter: a2 map: a2 forEach: A2 filter: b1 filter: b3 filter: c filter: d2 |
排序操作会在整个集合上执行。所以和之前的“垂直”执行不同,排序操作是水平执行的。注意排序影响的只是后面的 Stream 操作,对于原来的集合,顺序依然是不变的。参考这段代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
public class streamApi { public static void main(String[] args) { List<String> foo = Arrays.asList("d2", "a2", "b1", "b3", "c"); foo.stream().sorted( (s1, s2) -> { return s1.compareTo(s2); } ).forEach( System.out::println ); System.out.println("---"); foo.stream().forEach(System.out::println); } } |
输出如下(注意原来的 foo
并没有变化):
1 2 3 4 5 6 7 8 9 10 11 |
a2 b1 b3 c d2 --- d2 a2 b1 b3 c |
Sort 也有惰性执行的特性,如果我们改变一下上面那个例子的执行顺序:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> { System.out.println("filter: " + s); return s.startsWith("a"); }) .sorted((s1, s2) -> { System.out.printf("sort: %s; %s\n", s1, s2); return s1.compareTo(s2); }) .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .forEach(s -> System.out.println("forEach: " + s)); // filter: d2 // filter: a2 // filter: b1 // filter: b3 // filter: c // map: a2 // forEach: A2 |
可以发现 sorted
不会执行,因为 filter
只产生了一个元素。
Stream 的重用
Java8 的 Stream 是不支持重用的。一旦调用了终止类型的 Stream 操作,Stream 会被 close。
1 2 3 4 5 6 |
Stream<String> stream = Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> s.startsWith("a")); stream.anyMatch(s -> true); // ok stream.noneMatch(s -> true); // exception |
在同一个 Stream 上,先调用 noneMatch
再调用 anyMatch
会看到以下异常:
1 2 3 4 5 |
java.lang.IllegalStateException: stream has already been operated upon or closed at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229) at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459) at com.winterbe.java8.Streams5.test7(Streams5.java:38) at com.winterbe.java8.Streams5.main(Streams5.java:28) |
所以,我们必须为每一个终止类型的 Stream 操作创建一个新的 Stream。可以用 Stream Supplier 来实现。
1 2 3 4 5 6 |
Supplier<Stream<String>> streamSupplier = () -> Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> s.startsWith("a")); streamSupplier.get().anyMatch(s -> true); // ok streamSupplier.get().noneMatch(s -> true); // ok |
每次调用 get()
都会得到一个新的 Stream。
高级操作
Stream 支持的操作很多(不像Python的函数式编程只支持4个)。我们已经见过了最常用的 filter
和 map
。其他的操作读者可以自行阅读 Stream 文档。这里,我们再试一下几个复杂的操作:collect
flatMap
reduce
.
下面的例子都会使用一个 Person
的 List 来演示。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
class Person { String name; int age; Person(String name, int age) { this.name = name; this.age = age; } @Override public String toString() { return name; } } List<Person> persons = Arrays.asList( new Person("Max", 18), new Person("Peter", 23), new Person("Pamela", 23), new Person("David", 12)); |
Collect
Collect 是很有用的一个终止类型的 Stream 操作,可以将 Stream 转换成集合结果,比如 List
Set
Map
。Collect 接收一个 Collector
作为参数,Collector
需要支持4种操作:
- supplier
- accumulator
- combiner
- finisher
听起来实现很复杂,但是好处是 Java8 已经内置了常用的 Collector
,所以大多数情况下我们不需要自己实现。
下面看一个常用的操作:
1 2 3 4 5 6 7 |
List<Person> filtered = persons .stream() .filter(p -> p.name.startsWith("P")) .collect(Collectors.toList()); System.out.println(filtered); // [Peter, Pamela] |
可以看到这个 Stream 操作最后构建了一个 List,如果需要 Set 的话只需要将 toList()
换成 toSet()
。
接下来这个例子,将对象按照属性存放到 Map
中。
1 2 3 4 5 6 7 8 9 10 |
Map<Integer, List<Person>> personsByAge = persons .stream() .collect(Collectors.groupingBy(p -> p.age)); personsByAge .forEach((age, p) -> System.out.format("age %s: %s\n", age, p)); // age 18: [Max] // age 23: [Peter, Pamela] // age 12: [David] |
Collectors 非常实用,还可以对 Stream 进行聚合,比如计算所有 Person 的平均年龄:
1 2 3 4 5 |
Double averageAge = persons .stream() .collect(Collectors.averagingInt(p -> p.age)); System.out.println(averageAge); // 19.0 |
如果需要更全面的统计数据,可以试一下 summarizing
Collector,这个内置的 Collector 提供了 count, sum, min, max 等有用的数据。
1 2 3 4 5 6 7 |
IntSummaryStatistics ageSummary = persons .stream() .collect(Collectors.summarizingInt(p -> p.age)); System.out.println(ageSummary); // IntSummaryStatistics{count=4, sum=76, min=12, average=19.000000, max=23} |
下面这个例子,将所有的对象 join 成一个 String:
1 2 3 4 5 6 7 8 |
String phrase = persons .stream() .filter(p -> p.age >= 18) .map(p -> p.name) .collect(Collectors.joining(" and ", "In Germany ", " are of legal age.")); System.out.println(phrase); // In Germany Max and Peter and Pamela are of legal age. |
Join Collector 的参数是一个分隔符,一个可选的前缀和后缀。
将 Stream 元素转换成 map 的时候,需要特别注意:key 必须是唯一的,否则会抛出 IllegalStateException
。但是我们可以传入一个 merge function,来指定重复的元素映射的方式:
1 2 3 4 5 6 7 8 9 |
Map<Integer, String> map = persons .stream() .collect(Collectors.toMap( p -> p.age, p -> p.name, (name1, name2) -> name1 + ";" + name2)); System.out.println(map); // {18=Max, 23=Peter;Pamela, 12=David} |
最后,来尝试一下实现自己的 Collector。前面已经提到过,实现一个 Collector,我们需要提供4个东西:supplier,accumulator,combiner,finisher.
下面这个 Collector 将所有的 Person 对象转换成一个字符串,名字全部大写,中间用 |
分割。
1 2 3 4 5 6 7 8 9 10 11 12 |
Collector<Person, StringJoiner, String> personNameCollector = Collector.of( () -> new StringJoiner(" | "), // supplier (j, p) -> j.add(p.name.toUpperCase()), // accumulator (j1, j2) -> j1.merge(j2), // combiner StringJoiner::toString); // finisher String names = persons .stream() .collect(personNameCollector); System.out.println(names); // MAX | PETER | PAMELA | DAVID |
Java 的 String 是不可修改的,所以这里需要一个 helper class StringJoiner
,来构建最终的 String。
- 首先 supplier 构建了一个 StringJoiner,以
|
作为分隔符; - 然后 accumulator 将每个 Person 的 name 转换成大写;
- combiner 将2个 StringJoiners 合并成1个;
- 最后 finisher 从 StringJoiner 构建最终的 String。
FlatMap
前面我们演示了如何用 map
将一种类型的对象转换成另一种类型。但是 map
也有一些限制:一个对象只能转换成一个对象,如果需要将一个对象转换成多个就不行了。所以还有一个 flatMap
。
FlatMap 可以将 Stream 中的每一个对象转换成0个,1个或多个。无论产生多少对象,最终都会放到同一个 Stream 中,供后面的操作消费。
下面演示 flatMap
的功能,我们需要一个有继承关系的类型:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
class Foo { String name; List<Bar> bars = new ArrayList<>(); Foo(String name) { this.name = name; } } class Bar { String name; Bar(String name) { this.name = name; } } |
下面,我们使用 Stream 来初始化多个几个对象:
1 2 3 4 5 6 7 8 9 10 11 12 |
List<Foo> foos = new ArrayList<>(); // create foos IntStream .range(1, 4) .forEach(i -> foos.add(new Foo("Foo" + i))); // create bars foos.forEach(f -> IntStream .range(1, 4) .forEach(i -> f.bars.add(new Bar("Bar" + i + " <- " + f.name)))); |
现在我们生成了一个 List,包含3个 foo,每个 foo 中包含3个 bar.
FlatMap 接收一个方法,返回一个 Stream,可以包含任意个 Objects. 所以我们可以用这个方法得到 foo 中的每一个 bar:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
foos.stream() .flatMap(f -> f.bars.stream()) .forEach(b -> System.out.println(b.name)); // Bar1 <- Foo1 // Bar2 <- Foo1 // Bar3 <- Foo1 // Bar1 <- Foo2 // Bar2 <- Foo2 // Bar3 <- Foo2 // Bar1 <- Foo3 // Bar2 <- Foo3 // Bar3 <- Foo3 |
上面这个代码将 foo 的 Stream 转换成了包含 9 个 bar 的 Stream。
上面所有的代码也可以简化到一个 Stream 操作中:
1 2 3 4 5 6 7 |
IntStream.range(1, 4) .mapToObj(i -> new Foo("Foo" + i)) .peek(f -> IntStream.range(1, 4) .mapToObj(i -> new Bar("Bar" + i + " <- " f.name)) .forEach(f.bars::add)) .flatMap(f -> f.bars.stream()) .forEach(b -> System.out.println(b.name)); |
在 flatMap
中也可以用 Optional
对象,Optional 是 Java8 引入的,可以检查 null 的一种机制。结合 Optional 和 flatMap
我们可以相对优雅地处理 null
,考虑下面这种数据结构:
1 2 3 4 5 6 7 8 9 10 11 |
class Outer { Nested nested; } class Nested { Inner inner; } class Inner { String foo; } |
为了正确地得到 Inner
中的 foo
String,我们要这么写:
1 2 3 4 |
Outer outer = new Outer(); if (outer != null && outer.nested != null && outer.nested.inner != null) { System.out.println(outer.nested.inner.foo); } |
用 flatMap
的话,我们可以这么写:
1 2 3 4 5 |
Optional.of(new Outer()) .flatMap(o -> Optional.ofNullable(o.nested)) .flatMap(n -> Optional.ofNullable(n.inner)) .flatMap(i -> Optional.ofNullable(i.foo)) .ifPresent(System.out::println); |
每一个 flatMap
都用 Optional
封装,如果不是空,就返回里面的对象,如果是空的话就返回一个 null
.
Reduce
Reduce 操作可以将所有的元素编程一个结果。Java8 支持3种不同的 reduce
方法。
第一种可以将 Stream 中的元素聚合成一个。比如下面的代码,可以找到 Stream 中年龄最大的 Person.
1 2 3 4 |
persons .stream() .reduce((p1, p2) -> p1.age > p2.age ? p1 : p2) .ifPresent(System.out::println); // Pamela |
reduce
方法接收一个二元函数(一个只有两个参数的函数)作为参数,返回一个对象。(所以叫做 reduce)
第二种 reduce
接收一个初始对象,和一个二元函数。通常可以用于聚合操作(比如累加)。
1 2 3 4 5 6 7 8 9 10 11 |
Person result = persons .stream() .reduce(new Person("", 0), (p1, p2) -> { p1.age += p2.age; p1.name += p2.name; return p1; }); System.out.format("name=%s; age=%s", result.name, result.age); // name=MaxPeterPamelaDavid; age=76 |
第三种 reduce
方法接收3个参数:一个初始化对象,一个二元函数,和一个 combiner 函数。
初始化值并不一定是 Stream 中的对象,所以我们可以直接用一个整数。
1 2 3 4 5 |
Integer ageSum = persons .stream() .reduce(0, (sum, p) -> sum += p.age, (sum1, sum2) -> sum1 + sum2); System.out.println(ageSum); // 76 |
结果依然是 76,那么原理是什么呢?我们可以打印出来执行过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
Integer ageSum = persons .stream() .reduce(0, (sum, p) -> { System.out.format("accumulator: sum=%s; person=%s\n", sum, p); return sum += p.age; }, (sum1, sum2) -> { System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2); return sum1 + sum2; }); // accumulator: sum=0; person=Max // accumulator: sum=18; person=Peter // accumulator: sum=41; person=Pamela // accumulator: sum=64; person=David |
可以看到 accumulator 做了所有的工作,将所有的年龄和初始化的 int 值 0 相加。但是 combiner 没有执行?
我们将 Stream 换成 parallelStream 再来看一下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
Integer ageSum = persons .parallelStream() .reduce(0, (sum, p) -> { System.out.format("accumulator: sum=%s; person=%s\n", sum, p); return sum += p.age; }, (sum1, sum2) -> { System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2); return sum1 + sum2; }); // accumulator: sum=0; person=Pamela // accumulator: sum=0; person=David // accumulator: sum=0; person=Max // accumulator: sum=0; person=Peter // combiner: sum1=18; sum2=23 // combiner: sum1=23; sum2=12 // combiner: sum1=41; sum2=35 |
这次 combiner 执行了。并发执行的 Stream 有不同的行为。Accumulator 是并发执行的,所以需要一个 combiner 将所有的并发得到的结果再聚合起来。
下面来看一下 Parallel Stream。
Parallel Stream
因为 Stream 中每一个元素都是单独执行的,可想而知,如果并行计算每一个元素的话,可以提升性能。Parallel Stream 就是适用这种场景的。Parallel Stream 使用公共的 ForkJoinPool
来并行计算。底层的真正的线程数据取决于 CPU 的核数,默认是3.
1 2 |
ForkJoinPool commonPool = ForkJoinPool.commonPool(); System.out.println(commonPool.getParallelism()); // 3 |
这个值可以通过 JVM 参数修改:
1 |
-Djava.util.concurrent.ForkJoinPool.common.parallelism=5 |
Collections 可以通过 parallelStream()
来创建一个并行执行的 Stream,可以在普通的 Stream 上执行 parallel()
来转换成并行执行的 Stream。
下面这个例子,将并行执行的每一步的线程执行者打印出来:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
Arrays.asList("a1", "a2", "b1", "c2", "c1") .parallelStream() .filter(s -> { System.out.format("filter: %s [%s]\n", s, Thread.currentThread().getName()); return true; }) .map(s -> { System.out.format("map: %s [%s]\n", s, Thread.currentThread().getName()); return s.toUpperCase(); }) .forEach(s -> System.out.format("forEach: %s [%s]\n", s, Thread.currentThread().getName())); |
输出如下,展示了每一步都是由哪一个线程来执行的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
filter: b1 [main] filter: a2 [ForkJoinPool.commonPool-worker-1] map: a2 [ForkJoinPool.commonPool-worker-1] filter: c2 [ForkJoinPool.commonPool-worker-3] map: c2 [ForkJoinPool.commonPool-worker-3] filter: c1 [ForkJoinPool.commonPool-worker-2] map: c1 [ForkJoinPool.commonPool-worker-2] forEach: C2 [ForkJoinPool.commonPool-worker-3] forEach: A2 [ForkJoinPool.commonPool-worker-1] map: b1 [main] forEach: B1 [main] filter: a1 [ForkJoinPool.commonPool-worker-3] map: a1 [ForkJoinPool.commonPool-worker-3] forEach: A1 [ForkJoinPool.commonPool-worker-3] forEach: C1 [ForkJoinPool.commonPool-worker-2] |
从上面的结果页可以看出,所有的 ForkJoinPool
中的线程都参与了计算。
如果在上面的例子中加入一个 sort
操作,结果就有些不同了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
Arrays.asList("a1", "a2", "b1", "c2", "c1") .parallelStream() .filter(s -> { System.out.format("filter: %s [%s]\n", s, Thread.currentThread().getName()); return true; }) .map(s -> { System.out.format("map: %s [%s]\n", s, Thread.currentThread().getName()); return s.toUpperCase(); }) .sorted((s1, s2) -> { System.out.format("sort: %s <> %s [%s]\n", s1, s2, Thread.currentThread().getName()); return s1.compareTo(s2); }) .forEach(s -> System.out.format("forEach: %s [%s]\n", s, Thread.currentThread().getName())); |
结果如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
filter: c2 [ForkJoinPool.commonPool-worker-3] filter: c1 [ForkJoinPool.commonPool-worker-2] map: c1 [ForkJoinPool.commonPool-worker-2] filter: a2 [ForkJoinPool.commonPool-worker-1] map: a2 [ForkJoinPool.commonPool-worker-1] filter: b1 [main] map: b1 [main] filter: a1 [ForkJoinPool.commonPool-worker-2] map: a1 [ForkJoinPool.commonPool-worker-2] map: c2 [ForkJoinPool.commonPool-worker-3] sort: A2 <> A1 [main] sort: B1 <> A2 [main] sort: C2 <> B1 [main] sort: C1 <> C2 [main] sort: C1 <> B1 [main] sort: C1 <> C2 [main] forEach: A1 [ForkJoinPool.commonPool-worker-1] forEach: C2 [ForkJoinPool.commonPool-worker-3] forEach: B1 [main] forEach: A2 [ForkJoinPool.commonPool-worker-2] forEach: C1 [ForkJoinPool.commonPool-worker-1] |
看起来 sort
好像是顺序执行的。实际上,sort
使用的是 Java8 的 Arrays.parallelSort()
方法,文档里提到,这里的排序是否真正的并行执行取决于数组的长度,如果长的话就会用并行排序,否则就用单线程排序:
If the length of the specified array is less than the minimum granularity, then it is sorted using the appropriate Arrays.sort method.
回到之前的 reduce
方法,我们知道 combiner 只会在并行的时候执行,现在来看一下这个方法到底是做什么的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
List<Person> persons = Arrays.asList( new Person("Max", 18), new Person("Peter", 23), new Person("Pamela", 23), new Person("David", 12)); persons .parallelStream() .reduce(0, (sum, p) -> { System.out.format("accumulator: sum=%s; person=%s [%s]\n", sum, p, Thread.currentThread().getName()); return sum += p.age; }, (sum1, sum2) -> { System.out.format("combiner: sum1=%s; sum2=%s [%s]\n", sum1, sum2, Thread.currentThread().getName()); return sum1 + sum2; }); |
可以看到 accumulator 和 combiner 都使用了多线程来运行:
1 2 3 4 5 6 7 |
accumulator: sum=0; person=Pamela; [main] accumulator: sum=0; person=Max; [ForkJoinPool.commonPool-worker-3] accumulator: sum=0; person=David; [ForkJoinPool.commonPool-worker-2] accumulator: sum=0; person=Peter; [ForkJoinPool.commonPool-worker-1] combiner: sum1=18; sum2=23; [ForkJoinPool.commonPool-worker-1] combiner: sum1=23; sum2=12; [ForkJoinPool.commonPool-worker-2] combiner: sum1=41; sum2=35; [ForkJoinPool.commonPool-worker-2] |
综上,在数据量很大的时候,并行执行的 Stream 可以带来很大的性能提升。但是注意像 reduce
和 collect
这样的操作,需要特殊的 combiner。(因为前一操作产生的类型不同,需要做聚合,所以无法和迁移操作的函数一样,需要另外提供)。
另外要注意的是,Parallel Stream 底层使用的通用的 ForkJoinPool
,所以需要注意不要在并行的 Stream 中出现很慢或阻塞的操作,这样会影响其他并行任务。
以上就是基本的 Stream API 介绍了,强烈建议阅读 Java8 的官方文档。
参考资料:
- package summary
- Document
- Toturial
- Java 8 Stream Tutorial
- Collection Pipeline by Martin Fowler
- Stream.js Javascript 版的 Java Stream API