前提 Stream
是JDK1.8
中首次引入的,距今已经过去了接近8
年时间(JDK1.8
正式版是2013
年底发布的)。Stream
的引入一方面极大地简化了某些开发场景,另一方面也可能降低了编码的可读性(确实有不少人说到Stream
会降低代码的可读性,但是在笔者看来,熟练使用之后反而觉得代码的可读性提高了)。这篇文章会花巨量篇幅,详细分析Stream
的底层实现原理,参考的源码是JDK11
的源码,其他版本JDK
可能不适用于本文中的源码展示和相关例子。
这篇文章花费了极多时间和精力梳理和编写,希望能够帮助到本文的读者
Stream是如何做到向前兼容的 Stream
是JDK1.8
引入的,如要需要JDK1.7
或者以前的代码也能在JDK1.8
或以上运行,那么Stream
的引入必定不能在原来已经发布的接口方法进行修改,否则必定会因为兼容性问题导致老版本的接口实现无法在新版本中运行(方法签名出现异常),猜测是基于这个问题引入了接口默认方法,也就是default
关键字。查看源码可以发现,ArrayList
的超类Collection
和Iterable
分别添加了数个default
方法:
public interface Collection <E > extends Iterable <E > { @Override default Spliterator<E> spliterator () { return Spliterators.spliterator(this , 0 ); } default Stream<E> stream () { return StreamSupport.stream(spliterator(), false ); } default Stream<E> parallelStream () { return StreamSupport.stream(spliterator(), true ); } } public interface Iterable <T > { default void forEach (Consumer<? super T> action) { Objects.requireNonNull(action); for (T t : this ) { action.accept(t); } } default Spliterator<T> spliterator () { return Spliterators.spliteratorUnknownSize(iterator(), 0 ); } }
从直觉来看,这些新增的方法应该就是Stream
实现的关键方法(后面会印证这不是直觉,而是查看源码的结果)。接口默认方法在使用上和实例方法一致,在实现上可以直接在接口方法中编写方法体,有点静态方法的意味,但是子类可以覆盖其实现(也就是接口默认方法在本接口中的实现有点像静态方法,可以被子类覆盖,使用上和实例方法一致)。这种实现方式,有可能是一种突破,也有可能是一种妥协,但是无论是妥协还是突破,都实现了向前兼容:
public interface Iterable <T > { Iterator<T> iterator () ; } public MyIterable<Long> implements Iterable<Long>{ public Iterator<Long> iterator () { .... } }
如上,MyIterable
在JDK1.7
中定义,如果该类在JDK1.8
中运行,那么调用其实例中的forEach()
和spliterator()
方法,相当于直接调用JDK1.8
中的Iterable
中的接口默认方法forEach()
和spliterator()
。当然受限于JDK
版本,这里只能确保编译通过,旧功能正常使用,而无法在JDK1.7
中使用Stream
相关功能或者使用default
方法关键字。总结这么多,就是想说明为什么使用JDK7
开发和编译的代码可以在JDK8
环境下运行。
可拆分迭代器Spliterator Stream
实现的基石是Spliterator
,Spliterator
是splitable iterator
的缩写,意为”可拆分迭代器”,用于遍历指定数据源(例如数组、集合或者IO Channel
等)中的元素,在设计上充分考虑了串行和并行的场景。上一节提到了Collection
存在接口默认方法spliterator()
,此方法会生成一个Spliterator<E>
实例,意为着所有的集合子类都具备创建Spliterator
实例的能力 。Stream
的实现在设计上和Netty
中的ChannelHandlerContext
十分相似,本质是一个链表,而Spliterator
就是这个链表的Head
节点 (Spliterator
实例就是一个流实例的头节点,后面分析具体的源码时候再具体展开)。
Spliterator接口方法 接着看Spliterator
接口定义的方法:
public interface Spliterator <T > { boolean tryAdvance (Consumer<? super T> action) ; default void forEachRemaining (Consumer<? super T> action) { do { } while (tryAdvance(action)); } Spliterator<T> trySplit () ; long estimateSize () ; default long getExactSizeIfKnown () { return (characteristics() & SIZED) == 0 ? -1L : estimateSize(); } int characteristics () ; default boolean hasCharacteristics (int characteristics) { return (characteristics() & characteristics) == characteristics; } default Comparator<? super T> getComparator() { throw new IllegalStateException(); } }
tryAdvance
方法签名:boolean tryAdvance(Consumer<? super T> action)
功能:如果Spliterator
中存在剩余元素,则对其中的某个元素执行传入的action
回调,并且返回true
,否则返回false
。如果Spliterator
启用了ORDERED
特性,会按照顺序(这里的顺序值可以类比为ArrayList
中容器数组元素的下标,ArrayList
中添加新元素是天然有序的,下标由零开始递增)处理下一个元素
例子:
public static void main (String[] args) throws Exception { List<Integer> list = new ArrayList<>(); list.add(2 ); list.add(1 ); list.add(3 ); Spliterator<Integer> spliterator = list.stream().spliterator(); final AtomicInteger round = new AtomicInteger(1 ); final AtomicInteger loop = new AtomicInteger(1 ); while (spliterator.tryAdvance(num -> System.out.printf("第%d轮回调Action,值:%d\n" , round.getAndIncrement(), num))) { System.out.printf("第%d轮循环\n" , loop.getAndIncrement()); } } 第1 轮回调Action,值:2 第1 轮循环 第2 轮回调Action,值:1 第2 轮循环 第3 轮回调Action,值:3 第3 轮循环
forEachRemaining
方法签名:default void forEachRemaining(Consumer<? super T> action)
功能:如果Spliterator
中存在剩余元素,则对其中的所有剩余元素 在当前线程中 执行传入的action
回调。如果Spliterator
启用了ORDERED
特性,会按照顺序处理剩余所有元素。这是一个接口默认方法,方法体比较粗暴,直接是一个死循环包裹着tryAdvance()
方法,直到false
退出循环
例子:
public static void main (String[] args) { List<Integer> list = new ArrayList<>(); list.add(2 ); list.add(1 ); list.add(3 ); Spliterator<Integer> spliterator = list.stream().spliterator(); final AtomicInteger round = new AtomicInteger(1 ); spliterator.forEachRemaining(num -> System.out.printf("第%d轮回调Action,值:%d\n" , round.getAndIncrement(), num)); } 第1 轮回调Action,值:2 第2 轮回调Action,值:1 第3 轮回调Action,值:3
trySplit
方法签名:Spliterator<T> trySplit()
功能:如果当前的Spliterator
是可分区(可分割)的,那么此方法将会返回一个全新的Spliterator
实例,这个全新的Spliterator
实例里面的元素不会被当前Spliterator
实例中的元素覆盖(这里是直译了API
注释,实际要表达的意思是:当前的Spliterator
实例X
是可分割的,trySplit()
方法会分割X
产生一个全新的Spliterator
实例Y
,原来的X
所包含的元素(范围)也会收缩,类似于X = [a,b,c,d] => X = [a,b], Y = [c,d]
;如果当前的Spliterator
实例X
是不可分割的,此方法会返回NULL
),具体的分割算法由实现类决定
例子:
public static void main (String[] args) throws Exception { List<Integer> list = new ArrayList<>(); list.add(2 ); list.add(3 ); list.add(4 ); list.add(1 ); Spliterator<Integer> first = list.stream().spliterator(); Spliterator<Integer> second = first.trySplit(); first.forEachRemaining(num -> { System.out.printf("first spliterator item: %d\n" , num); }); second.forEachRemaining(num -> { System.out.printf("second spliterator item: %d\n" , num); }); } first spliterator item: 4 first spliterator item: 1 second spliterator item: 2 second spliterator item: 3
estimateSize
方法签名:long estimateSize()
功能:返回forEachRemaining()
方法需要遍历的元素总量的估计值,如果样本个数是无限、计算成本过高或者未知,会直接返回Long.MAX_VALUE
例子:
public static void main (String[] args) throws Exception { List<Integer> list = new ArrayList<>(); list.add(2 ); list.add(3 ); list.add(4 ); list.add(1 ); Spliterator<Integer> spliterator = list.stream().spliterator(); System.out.println(spliterator.estimateSize()); } 4
getExactSizeIfKnown
方法签名:default long getExactSizeIfKnown()
功能:如果当前的Spliterator
具备SIZED
特性(关于特性,下文再展开分析),那么直接调用estimateSize()
方法,否则返回-1
例子:
public static void main (String[] args) throws Exception { List<Integer> list = new ArrayList<>(); list.add(2 ); list.add(3 ); list.add(4 ); list.add(1 ); Spliterator<Integer> spliterator = list.stream().spliterator(); System.out.println(spliterator.getExactSizeIfKnown()); } 4
int characteristics()
方法签名:long estimateSize()
功能:当前的Spliterator
具备的特性(集合),采用位运算,存储在32
位整数中(关于特性,下文再展开分析)
hasCharacteristics
方法签名:default boolean hasCharacteristics(int characteristics)
功能:判断当前的Spliterator
是否具备传入的特性
getComparator
方法签名:default Comparator<? super T> getComparator()
功能:如果当前的Spliterator
具备SORTED
特性,则需要返回一个Comparator
实例;如果Spliterator
中的元素是天然有序(例如元素实现了Comparable
接口),则返回NULL
;其他情况直接抛出IllegalStateException
异常
Spliterator自分割 Spliterator#trySplit()
可以把一个既有的Spliterator
实例分割为两个Spliterator
实例,笔者这里把这种方式称为Spliterator
自分割,示意图如下:
这里的分割在实现上可以采用两种方式:
物理分割:对于ArrayList
而言,把底层数组拷贝 并且进行分割,用上面的例子来说相当于X = [1,3,4,2] => X = [4,2], Y = [1,3]
,这样实现加上对于ArrayList
中本身的元素容器数组,相当于多存了一份数据,显然不是十分合理
逻辑分割:对于ArrayList
而言,由于元素容器数组天然有序,可以采用数组的索引(下标)进行分割,用上面的例子来说相当于X = 索引表[0,1,2,3] => X = 索引表[2,3], Y = 索引表[0,1]
,这种方式是共享底层容器数组,只对元素索引进行分割,实现上比较简单而且相对合理
参看ArrayListSpliterator
的源码,可以分析其分割算法实现:
public Spliterator<E> spliterator () { return new ArrayListSpliterator(0 , -1 , 0 ); } final class ArrayListSpliterator implements Spliterator <E > { private int index; private int fence; private int expectedModCount; ArrayListSpliterator(int origin, int fence, int expectedModCount) { this .index = origin; this .fence = fence; this .expectedModCount = expectedModCount; } private int getFence () { int hi; if ((hi = fence) < 0 ) { expectedModCount = modCount; hi = fence = size; } return hi; } public ArrayListSpliterator trySplit () { int hi = getFence(), lo = index, mid = (lo + hi) >>> 1 ; return (lo >= mid) ? null : new ArrayListSpliterator(lo, index = mid, expectedModCount); } public boolean tryAdvance (Consumer<? super E> action) { if (action == null ) throw new NullPointerException(); int hi = getFence(), i = index; if (i < hi) { index = i + 1 ; @SuppressWarnings("unchecked") E e = (E)elementData[i]; action.accept(e); if (modCount != expectedModCount) throw new ConcurrentModificationException(); return true ; } return false ; } public void forEachRemaining (Consumer<? super E> action) { int i, hi, mc; Object[] a; if (action == null ) throw new NullPointerException(); if ((a = elementData) != null ) { if ((hi = fence) < 0 ) { mc = modCount; hi = size; } else mc = expectedModCount; if ((i = index) >= 0 && (index = hi) <= a.length) { for (; i < hi; ++i) { @SuppressWarnings("unchecked") E e = (E) a[i]; action.accept(e); } if (modCount == mc) return ; } } throw new ConcurrentModificationException(); } public long estimateSize () { return getFence() - index; } public int characteristics () { return Spliterator.ORDERED | Spliterator.SIZED | Spliterator.SUBSIZED; } }
在阅读源码的时候务必注意,老一辈的程序员有时候会采用比较隐蔽 的赋值方式,笔者认为需要展开一下:
第一处红圈位置在构建新的ArrayListSpliterator
的时候,当前ArrayListSpliterator
的index
属性也被修改了,过程如下图:
第二处红圈位置,在forEachRemaining()
方法调用时候做参数校验,并且if
分支里面把index
(下边界值)赋值为hi
(上边界值),那么一个ArrayListSpliterator
实例中的forEachRemaining()
方法的遍历操作必定只会执行一次 。可以这样验证一下:
public static void main (String[] args) { List<Integer> list = new ArrayList<>(); list.add(2 ); list.add(1 ); list.add(3 ); Spliterator<Integer> spliterator = list.stream().spliterator(); final AtomicInteger round = new AtomicInteger(1 ); spliterator.forEachRemaining(num -> System.out.printf("[第一次遍历forEachRemaining]第%d轮回调Action,值:%d\n" , round.getAndIncrement(), num)); round.set(1 ); spliterator.forEachRemaining(num -> System.out.printf("[第二次遍历forEachRemaining]第%d轮回调Action,值:%d\n" , round.getAndIncrement(), num)); } [第一次遍历forEachRemaining]第1 轮回调Action,值:2 [第一次遍历forEachRemaining]第2 轮回调Action,值:1 [第一次遍历forEachRemaining]第3 轮回调Action,值:3
对于ArrayListSpliterator
的实现可以确认下面几点:
一个新的ArrayListSpliterator
实例中的forEachRemaining()
方法只能调用一次
ArrayListSpliterator
实例中的forEachRemaining()
方法遍历元素的边界是[index, fence)
ArrayListSpliterator
自分割的时候,分割出来的新ArrayListSpliterator
负责处理元素下标小的分段(类比fork
的左分支),而原ArrayListSpliterator
负责处理元素下标大的分段(类比fork
的右分支)
ArrayListSpliterator
提供的estimateSize()
方法得到的分段元素剩余数量是一个准确值
如果把上面的例子继续分割,可以得到下面的过程:
Spliterator
自分割是并行流实现的基础 ,并行流计算过程其实就是fork-join
的处理过程,trySplit()
方法的实现决定了fork
任务的粒度,每个fork
任务进行计算的时候是并发安全的,这一点由线程封闭(线程栈封闭)保证,每一个fork
任务计算完成最后的结果再由单个线程进行join
操作,才能得到正确的结果。下面的例子是求整数1 ~ 100
的和:
public class ConcurrentSplitCalculateSum { private static class ForkTask extends Thread { private int result = 0 ; private final Spliterator<Integer> spliterator; private final CountDownLatch latch; public ForkTask (Spliterator<Integer> spliterator, CountDownLatch latch) { this .spliterator = spliterator; this .latch = latch; } @Override public void run () { long start = System.currentTimeMillis(); spliterator.forEachRemaining(num -> result = result + num); long end = System.currentTimeMillis(); System.out.printf("线程[%s]完成计算任务,当前段计算结果:%d,耗时:%d ms\n" , Thread.currentThread().getName(), result, end - start); latch.countDown(); } public int result () { return result; } } private static int join (List<ForkTask> tasks) { int result = 0 ; for (ForkTask task : tasks) { result = result + task.result(); } return result; } private static final int THREAD_NUM = 4 ; public static void main (String[] args) throws Exception { List<Integer> source = new ArrayList<>(); for (int i = 1 ; i < 101 ; i++) { source.add(i); } Spliterator<Integer> root = source.stream().spliterator(); List<Spliterator<Integer>> spliteratorList = new ArrayList<>(); Spliterator<Integer> x = root.trySplit(); Spliterator<Integer> y = x.trySplit(); Spliterator<Integer> z = root.trySplit(); spliteratorList.add(root); spliteratorList.add(x); spliteratorList.add(y); spliteratorList.add(z); List<ForkTask> tasks = new ArrayList<>(); CountDownLatch latch = new CountDownLatch(THREAD_NUM); for (int i = 0 ; i < THREAD_NUM; i++) { ForkTask task = new ForkTask(spliteratorList.get(i), latch); task.setName("fork-task-" + (i + 1 )); tasks.add(task); } tasks.forEach(Thread::start); latch.await(); int result = join(tasks); System.out.println("最终计算结果为:" + result); } } 线程[fork-task-4 ]完成计算任务,当前段计算结果:1575 ,耗时:0 ms 线程[fork-task-2 ]完成计算任务,当前段计算结果:950 ,耗时:1 ms 线程[fork-task-3 ]完成计算任务,当前段计算结果:325 ,耗时:1 ms 线程[fork-task-1 ]完成计算任务,当前段计算结果:2200 ,耗时:1 ms 最终计算结果为:5050
当然,最终并行流的计算用到了ForkJoinPool
,并不像这个例子中这么粗暴地进行异步执行。关于并行流的实现下文会详细分析。
Spliterator支持的特性 某一个Spliterator
实例支持的特性由方法characteristics()
决定,这个方法返回的是一个32
位数值,实际使用中会展开为bit
数组,所有的特性分配在不同的位上,而hasCharacteristics(int characteristics)
就是通过输入的具体特性值通过位运算判断该特性是否存在于characteristics()
中。下面简化characteristics
为byte
分析一下这个技巧:
假设:byte characteristics() => 也就是最多8个位用于表示特性集合,如果每个位只表示一种特性,那么可以总共表示8种特性 特性X:0000 0001 特性Y:0000 0010 以此类推 假设:characteristics = X | Y = 0000 0001 | 0000 0010 = 0000 0011 那么:characteristics & X = 0000 0011 & 0000 0001 = 0000 0001 判断characteristics是否包含X:(characteristics & X) == X
上面推断的过程就是Spliterator
中特性判断方法的处理逻辑:
int characteristics () ;default boolean hasCharacteristics (int characteristics) { return (characteristics() & characteristics) == characteristics; }
这里可以验证一下:
public class CharacteristicsCheck { public static void main (String[] args) { System.out.printf("是否存在ORDERED特性:%s\n" , hasCharacteristics(Spliterator.ORDERED)); System.out.printf("是否存在SIZED特性:%s\n" , hasCharacteristics(Spliterator.SIZED)); System.out.printf("是否存在DISTINCT特性:%s\n" , hasCharacteristics(Spliterator.DISTINCT)); } private static int characteristics () { return Spliterator.ORDERED | Spliterator.SIZED | Spliterator.SORTED; } private static boolean hasCharacteristics (int characteristics) { return (characteristics() & characteristics) == characteristics; } } 是否存在ORDERED特性:true 是否存在SIZED特性:true 是否存在DISTINCT特性:false
目前Spliterator
支持的特性一共有8
个,如下:
特性
十六进制值
二进制值
功能
DISTINCT
0x00000001
0000 0000 0000 0001
去重,例如对于每对要处理的元素(x,y)
,使用!x.equals(y)
比较,Spliterator
中去重实际上基于Set
处理
ORDERED
0x00000010
0000 0000 0001 0000
(元素)顺序处理,可以理解为trySplit()
、tryAdvance()
和forEachRemaining()
方法对所有元素处理都保证一个严格的前缀顺序
SORTED
0x00000004
0000 0000 0000 0100
排序,元素使用getComparator()
方法提供的Comparator
进行排序,如果定义了SORTED
特性,则必须定义ORDERED
特性
SIZED
0x00000040
0000 0000 0100 0000
(元素)预估数量,启用此特性,那么Spliterator
拆分或者迭代之前,estimateSize()
返回的是元素的准确数量
NONNULL
0x00000040
0000 0001 0000 0000
(元素)非NULL
,数据源保证Spliterator
需要处理的元素不能为NULL
,最常用于并发容器中的集合、队列和Map
IMMUTABLE
0x00000400
0000 0100 0000 0000
(元素)不可变,数据源不可被修改,也就是处理过程中元素不能被添加、替换和移除(更新属性是允许的)
CONCURRENT
0x00001000
0001 0000 0000 0000
(元素源)的修改是并发安全的,意味着多线程在数据源中添加、替换或者移除元素在不需要额外的同步条件下是并发安全的
SUBSIZED
0x00004000
0100 0000 0000 0000
(子Spliterator
元素)预估数量,启用此特性,意味着通过trySplit()
方法分割出来的所有子Spliterator
(当前Spliterator
分割后也属于子Spliterator
)都启用SIZED
特性
细心点观察可以发现:所有特性采用32位的整数存储,使用了隔1位存储的策略,位下标和特性的映射是:(0 => DISTINCT)、(3 => SORTED)、(5 => ORDERED)、(7=> SIZED)、(9 => NONNULL)、(11 => IMMUTABLE)、(13 => CONCURRENT)、(15 => SUBSIZED)
所有特性的功能这里只概括了核心的定义,还有一些小字或者特例描述限于篇幅没有完全加上,这一点可以参考具体的源码中的API
注释。这些特性最终会转化为StreamOpFlag
再提供给Stream
中的操作判断使用,由于StreamOpFlag
会更加复杂,下文再进行详细分析。
流的实现原理以及源码分析 由于流的实现是高度抽象的工程代码,所以在源码阅读上会有点困难。整个体系涉及到大量的接口、类和枚举,如下图:
图中的顶层类结构图描述的就是流的流水线相关类继承关系,其中IntStream
、LongStream
和DoubleStream
都是特化类型,分别针对于Integer
、Long
和Double
三种类型,其他引用类型构建的Pipeline
都是ReferencePipeline
实例,因此笔者认为,ReferencePipeline
(引用类型流水线)是流的核心数据结构,下面会基于ReferencePipeline
的实现做深入分析。
StreamOpFlag源码分析
注意,这一小节很烧脑,也有可能是笔者的位操作不怎么熟练,这篇文章大部分时间消耗在这一小节
StreamOpFlag
是一个枚举,功能是存储Stream
和操作的标志(Flags corresponding to characteristics of streams and operations
,下称Stream
标志),这些标志提供给Stream
框架用于控制、定制化和优化计算。Stream
标志可以用于描述与流相关联的若干不同实体的特征,这些实体包括:Stream
的源、Stream
的中间操作(Op
)和Stream
的终端操作(Terminal Op
)。但是并非所有的Stream
标志对所有的Stream
实体都具备意义,目前这些实体和标志映射关系如下:
Type(Stream Entity Type)
DISTINCT
SORTED
ORDERED
SIZED
SHORT_CIRCUIT
SPLITERATOR
01
01
01
01
00
STREAM
01
01
01
01
00
OP
11
11
11
10
01
TERMINAL_OP
00
00
10
00
01
UPSTREAM_TERMINAL_OP
00
00
10
00
00
其中:
01:表示设置/注入
10:表示清除
11:表示保留
00:表示初始化值(默认填充值),这是一个关键点,0
值表示绝对不会是某个类型的标志
StreamOpFlag
的顶部注释中还有一个表格如下:
-
DISTINCT
SORTED
ORDERED
SIZED
SHORT_CIRCUIT
Stream source(Stream
的源)
Y
Y
Y
Y
N
Intermediate operation(中间操作)
PCI
PCI
PCI
PC
PI
Terminal operation(终结操作)
N
N
PC
N
PI
标记 ->
含义:
Y
:允许
N
:非法
P
:保留
C
:清除
I
:注入
组合PCI
:可以保留、清除或者注入
组合PC
:可以保留或者清除
组合PI
:可以保留或者注入
两个表格其实是在描述同一个结论,可以相互对照和理解,但是最终实现参照于第一个表的定义 。注意一点:这里的preserved
(P
)表示保留的意思,如果Stream
实体某个标志被赋值为preserved
,意味着该实体可以使用此标志代表的特性。例如此小节第一个表格中的OP
的DISTINCT
、SORTED
和ORDERED
都赋值为11
(preserved
),意味着OP
类型的实体允许使用去重、自然排序和顺序处理特性。回到源码部分,先看StreamOpFlag
的核心属性和构造器:
enum StreamOpFlag { enum Type { SPLITERATOR, STREAM, OP, TERMINAL_OP, UPSTREAM_TERMINAL_OP } private static final int SET_BITS = 0b01 ; private static final int CLEAR_BITS = 0b10 ; private static final int PRESERVE_BITS = 0b11 ; private static MaskBuilder set (Type t) { return new MaskBuilder(new EnumMap<>(Type.class)).set(t); } private static class MaskBuilder { final Map<Type, Integer> map; MaskBuilder(Map<Type, Integer> map) { this .map = map; } MaskBuilder mask (Type t, Integer i) { map.put(t, i); return this ; } MaskBuilder set (Type t) { return mask(t, SET_BITS); } MaskBuilder clear (Type t) { return mask(t, CLEAR_BITS); } MaskBuilder setAndClear (Type t) { return mask(t, PRESERVE_BITS); } Map<Type, Integer> build () { for (Type t : Type.values()) { map.putIfAbsent(t, 0b00 ); } return map; } } private final Map<Type, Integer> maskTable; private final int bitPosition; private final int set; private final int clear; private final int preserve; private StreamOpFlag (int position, MaskBuilder maskBuilder) { this .maskTable = maskBuilder.build(); position *= 2 ; this .bitPosition = position; this .set = SET_BITS << position; this .clear = CLEAR_BITS << position; this .preserve = PRESERVE_BITS << position; } static final int IS_DISTINCT = DISTINCT.set; static final int NOT_DISTINCT = DISTINCT.clear; static final int IS_SORTED = SORTED.set; static final int NOT_SORTED = SORTED.clear; static final int IS_ORDERED = ORDERED.set; static final int NOT_ORDERED = ORDERED.clear; static final int IS_SIZED = SIZED.set; static final int NOT_SIZED = SIZED.clear; static final int IS_SHORT_CIRCUIT = SHORT_CIRCUIT.set; }
又因为StreamOpFlag
是一个枚举,一个枚举成员是一个独立的标志,而一个标志会对多个Stream
实体类型产生作用,所以它的一个成员描述的是上面实体和标志映射关系的一个列(竖着看):
// 纵向看 DISTINCT Flag: maskTable: { SPLITERATOR: 0000 0001, STREAM: 0000 0001, OP: 0000 0011, TERMINAL_OP: 0000 0000, UPSTREAM_TERMINAL_OP: 0000 0000 } position(input): 0 bitPosition: 0 set: 1 => 0000 0000 0000 0000 0000 0000 0000 0001 clear: 2 => 0000 0000 0000 0000 0000 0000 0000 0010 preserve: 3 => 0000 0000 0000 0000 0000 0000 0000 0011 SORTED Flag: maskTable: { SPLITERATOR: 0000 0001, STREAM: 0000 0001, OP: 0000 0011, TERMINAL_OP: 0000 0000, UPSTREAM_TERMINAL_OP: 0000 0000 } position(input): 1 bitPosition: 2 set: 4 => 0000 0000 0000 0000 0000 0000 0000 0100 clear: 8 => 0000 0000 0000 0000 0000 0000 0000 1000 preserve: 12 => 0000 0000 0000 0000 0000 0000 0000 1100 ORDERED Flag: maskTable: { SPLITERATOR: 0000 0001, STREAM: 0000 0001, OP: 0000 0011, TERMINAL_OP: 0000 0010, UPSTREAM_TERMINAL_OP: 0000 0010 } position(input): 2 bitPosition: 4 set: 16 => 0000 0000 0000 0000 0000 0000 0001 0000 clear: 32 => 0000 0000 0000 0000 0000 0000 0010 0000 preserve: 48 => 0000 0000 0000 0000 0000 0000 0011 0000 SIZED Flag: maskTable: { SPLITERATOR: 0000 0001, STREAM: 0000 0001, OP: 0000 0010, TERMINAL_OP: 0000 0000, UPSTREAM_TERMINAL_OP: 0000 0000 } position(input): 3 bitPosition: 6 set: 64 => 0000 0000 0000 0000 0000 0000 0100 0000 clear: 128 => 0000 0000 0000 0000 0000 0000 1000 0000 preserve: 192 => 0000 0000 0000 0000 0000 0000 1100 0000 SHORT_CIRCUIT Flag: maskTable: { SPLITERATOR: 0000 0000, STREAM: 0000 0000, OP: 0000 0001, TERMINAL_OP: 0000 0001, UPSTREAM_TERMINAL_OP: 0000 0000 } position(input): 12 bitPosition: 24 set: 16777216 => 0000 0001 0000 0000 0000 0000 0000 0000 clear: 33554432 => 0000 0010 0000 0000 0000 0000 0000 0000 preserve: 50331648 => 0000 0011 0000 0000 0000 0000 0000 0000
接着就用到按位与(&
)和按位或(|
)的操作,假设A = 0001
、B = 0010
、C = 1000
,那么:
A|B = A | B = 0001 | 0010 = 0011
(按位或,1|0=1, 0|1=1,0|0 =0,1|1=1
)
A&B = A & B = 0001 | 0010 = 0000
(按位与,1|0=0, 0|1=0,0|0 =0,1|1=1
)
MASK = A | B | C = 0001 | 0010 | 1000 = 1011
那么判断A|B
是否包含A
的条件为:A == (A|B & A)
那么判断MASK
是否包含A
的条件为:A == MASK & A
这里把StreamOpFlag
中的枚举套用进去分析:
static int DISTINCT_SET = 0b0001 ;static int SORTED_CLEAR = 0b1000 ;public static void main (String[] args) throws Exception { int flags = DISTINCT_SET | SORTED_CLEAR; System.out.println(Integer.toBinaryString(flags)); System.out.printf("支持DISTINCT标志:%s\n" , DISTINCT_SET == (DISTINCT_SET & flags)); System.out.printf("不支持SORTED标志:%s\n" , SORTED_CLEAR == (SORTED_CLEAR & flags)); } 1001 支持DISTINCT标志:true 不支持SORTED标志:true
由于StreamOpFlag
的修饰符是默认,不能直接使用,可以把它的代码拷贝出来修改包名验证里面的功能:
public static void main (String[] args) { int flags = StreamOpFlag.DISTINCT.set | StreamOpFlag.SORTED.clear; System.out.println(StreamOpFlag.DISTINCT.set == (StreamOpFlag.DISTINCT.set & flags)); System.out.println(StreamOpFlag.SORTED.clear == (StreamOpFlag.SORTED.clear & flags)); } true true
下面这些方法就是基于这些运算特性而定义的:
enum StreamOpFlag { int set () { return set; } int clear () { return clear; } boolean isStreamFlag () { return maskTable.get(Type.STREAM) > 0 ; } boolean isKnown (int flags) { return (flags & preserve) == set; } boolean isCleared (int flags) { return (flags & preserve) == clear; } boolean isPreserved (int flags) { return (flags & preserve) == preserve; } boolean canSet (Type t) { return (maskTable.get(t) & SET_BITS) > 0 ; } }
这里有个特殊操作,位运算的时候采用了(flags & preserve)
,理由是:同一个标志中的同一个Stream
实体类型只可能存在set/inject
、clear
和preserve
的其中一种,也就是同一个flags
中不可能同时存在StreamOpFlag.SORTED.set
和StreamOpFlag.SORTED.clear
,从语义上已经矛盾,而set/inject
、clear
和preserve
在bit map
中的大小(为2
位)和位置已经是固定的,preserve
在设计的时候为0b11
刚好2
位取反,因此可以特化为(这个特化也让判断更加严谨):
(flags & set) == set => (flags & preserve) == set (flags & clear) == clear => (flags & preserve) == clear (flags & preserve) == preserve => (flags & preserve) == preserve
分析这么多,总的来说,就是想通过一个32
位整数,每2
位分别表示3
种状态,那么一个完整的Flags
(标志集合)一共可以表示16
种标志(position=[0,15]
,可以查看API
注释,[4,11]
和[13,15]
的位置是未需实现或者预留的,属于gap
)。接着分析掩码Mask
的计算过程例子:
// 横向看(位移动运算符优先级高于与或,例如<<的优先级比|高) SPLITERATOR_CHARACTERISTICS_MASK: mask(init) = 0 mask(DISTINCT,SPLITERATOR[DISTINCT]=01,bitPosition=0) = 0000 0000 | 0000 0001 << 0 = 0000 0000 | 0000 0001 = 0000 0001 mask(SORTED,SPLITERATOR[SORTED]=01,bitPosition=2) = 0000 0001 | 0000 0001 << 2 = 0000 0001 | 0000 0100 = 0000 0101 mask(ORDERED,SPLITERATOR[ORDERED]=01,bitPosition=4) = 0000 0101 | 0000 0001 << 4 = 0000 0101 | 0001 0000 = 0001 0101 mask(SIZED,SPLITERATOR[SIZED]=01,bitPosition=6) = 0001 0101 | 0000 0001 << 6 = 0001 0101 | 0100 0000 = 0101 0101 mask(SHORT_CIRCUIT,SPLITERATOR[SHORT_CIRCUIT]=00,bitPosition=24) = 0101 0101 | 0000 0000 << 24 = 0101 0101 | 0000 0000 = 0101 0101 mask(final) = 0000 0000 0000 0000 0000 0000 0101 0101(二进制)、85(十进制) STREAM_MASK: mask(init) = 0 mask(DISTINCT,SPLITERATOR[DISTINCT]=01,bitPosition=0) = 0000 0000 | 0000 0001 << 0 = 0000 0000 | 0000 0001 = 0000 0001 mask(SORTED,SPLITERATOR[SORTED]=01,bitPosition=2) = 0000 0001 | 0000 0001 << 2 = 0000 0001 | 0000 0100 = 0000 0101 mask(ORDERED,SPLITERATOR[ORDERED]=01,bitPosition=4) = 0000 0101 | 0000 0001 << 4 = 0000 0101 | 0001 0000 = 0001 0101 mask(SIZED,SPLITERATOR[SIZED]=01,bitPosition=6) = 0001 0101 | 0000 0001 << 6 = 0001 0101 | 0100 0000 = 0101 0101 mask(SHORT_CIRCUIT,SPLITERATOR[SHORT_CIRCUIT]=00,bitPosition=24) = 0101 0101 | 0000 0000 << 24 = 0101 0101 | 0000 0000 = 0101 0101 mask(final) = 0000 0000 0000 0000 0000 0000 0101 0101(二进制)、85(十进制) OP_MASK: mask(init) = 0 mask(DISTINCT,SPLITERATOR[DISTINCT]=11,bitPosition=0) = 0000 0000 | 0000 0011 << 0 = 0000 0000 | 0000 0011 = 0000 0011 mask(SORTED,SPLITERATOR[SORTED]=11,bitPosition=2) = 0000 0011 | 0000 0011 << 2 = 0000 0011 | 0000 1100 = 0000 1111 mask(ORDERED,SPLITERATOR[ORDERED]=11,bitPosition=4) = 0000 1111 | 0000 0011 << 4 = 0000 1111 | 0011 0000 = 0011 1111 mask(SIZED,SPLITERATOR[SIZED]=10,bitPosition=6) = 0011 1111 | 0000 0010 << 6 = 0011 1111 | 1000 0000 = 1011 1111 mask(SHORT_CIRCUIT,SPLITERATOR[SHORT_CIRCUIT]=01,bitPosition=24) = 1011 1111 | 0000 0001 << 24 = 1011 1111 | 0100 0000 0000 0000 0000 0000 0000 = 0100 0000 0000 0000 0000 1011 1111 mask(final) = 0000 0000 1000 0000 0000 0000 1011 1111(二进制)、16777407(十进制) TERMINAL_OP_MASK: mask(init) = 0 mask(DISTINCT,SPLITERATOR[DISTINCT]=00,bitPosition=0) = 0000 0000 | 0000 0000 << 0 = 0000 0000 | 0000 0000 = 0000 0000 mask(SORTED,SPLITERATOR[SORTED]=00,bitPosition=2) = 0000 0000 | 0000 0000 << 2 = 0000 0000 | 0000 0000 = 0000 0000 mask(ORDERED,SPLITERATOR[ORDERED]=10,bitPosition=4) = 0000 0000 | 0000 0010 << 4 = 0000 0000 | 0010 0000 = 0010 0000 mask(SIZED,SPLITERATOR[SIZED]=00,bitPosition=6) = 0010 0000 | 0000 0000 << 6 = 0010 0000 | 0000 0000 = 0010 0000 mask(SHORT_CIRCUIT,SPLITERATOR[SHORT_CIRCUIT]=01,bitPosition=24) = 0010 0000 | 0000 0001 << 24 = 0010 0000 | 0001 0000 0000 0000 0000 0000 0000 = 0001 0000 0000 0000 0000 0010 0000 mask(final) = 0000 0001 0000 0000 0000 0000 0010 0000(二进制)、16777248(十进制) UPSTREAM_TERMINAL_OP_MASK: mask(init) = 0 mask(DISTINCT,SPLITERATOR[DISTINCT]=00,bitPosition=0) = 0000 0000 | 0000 0000 << 0 = 0000 0000 | 0000 0000 = 0000 0000 mask(SORTED,SPLITERATOR[SORTED]=00,bitPosition=2) = 0000 0000 | 0000 0000 << 2 = 0000 0000 | 0000 0000 = 0000 0000 mask(ORDERED,SPLITERATOR[ORDERED]=10,bitPosition=4) = 0000 0000 | 0000 0010 << 4 = 0000 0000 | 0010 0000 = 0010 0000 mask(SIZED,SPLITERATOR[SIZED]=00,bitPosition=6) = 0010 0000 | 0000 0000 << 6 = 0010 0000 | 0000 0000 = 0010 0000 mask(SHORT_CIRCUIT,SPLITERATOR[SHORT_CIRCUIT]=00,bitPosition=24) = 0010 0000 | 0000 0000 << 24 = 0010 0000 | 0000 0000 = 0010 0000 mask(final) = 0000 0000 0000 0000 0000 0000 0010 0000(二进制)、32(十进制)
相关的方法和属性如下:
enum StreamOpFlag { static final int SPLITERATOR_CHARACTERISTICS_MASK = createMask(Type.SPLITERATOR); static final int STREAM_MASK = createMask(Type.STREAM); static final int OP_MASK = createMask(Type.OP); static final int TERMINAL_OP_MASK = createMask(Type.TERMINAL_OP); static final int UPSTREAM_TERMINAL_OP_MASK = createMask(Type.UPSTREAM_TERMINAL_OP); private static int createMask (Type t) { int mask = 0 ; for (StreamOpFlag flag : StreamOpFlag.values()) { mask |= flag.maskTable.get(t) << flag.bitPosition; } return mask; } private static final int FLAG_MASK = createFlagMask(); private static int createFlagMask () { int mask = 0 ; for (StreamOpFlag flag : StreamOpFlag.values()) { mask |= flag.preserve; } return mask; } private static final int FLAG_MASK_IS = STREAM_MASK; private static final int FLAG_MASK_NOT = STREAM_MASK << 1 ; static final int INITIAL_OPS_VALUE = FLAG_MASK_IS | FLAG_MASK_NOT; }
SPLITERATOR_CHARACTERISTICS_MASK
等5
个成员(见上面的Mask
计算例子)其实就是预先计算好对应的Stream
实体类型的所有StreamOpFlag
标志 的bit map
,也就是之前那个展示Stream
的类型和标志的映射图的”横向”展示:
前面的分析已经相对详细,过程非常复杂,但是更复杂的Mask
应用还在后面的方法。Mask
的初始化就是提供给标志的合并(combine
)和转化(从Spliterator
中的characteristics
转化为flags
)操作的,见下面的方法:
enum StreamOpFlag { private static int getMask (int flags) { return (flags == 0 ) ? FLAG_MASK : ~(flags | ((FLAG_MASK_IS & flags) << 1 ) | ((FLAG_MASK_NOT & flags) >> 1 )); } static int combineOpFlags (int newStreamOrOpFlags, int prevCombOpFlags) { return (prevCombOpFlags & StreamOpFlag.getMask(newStreamOrOpFlags)) | newStreamOrOpFlags; } static int toStreamFlags (int combOpFlags) { return ((~combOpFlags) >> 1 ) & FLAG_MASK_IS & combOpFlags; } static int toCharacteristics (int streamFlags) { return streamFlags & SPLITERATOR_CHARACTERISTICS_MASK; } static int fromCharacteristics (Spliterator<?> spliterator) { int characteristics = spliterator.characteristics(); if ((characteristics & Spliterator.SORTED) != 0 && spliterator.getComparator() != null ) { return characteristics & SPLITERATOR_CHARACTERISTICS_MASK & ~Spliterator.SORTED; } else { return characteristics & SPLITERATOR_CHARACTERISTICS_MASK; } } static int fromCharacteristics (int characteristics) { return characteristics & SPLITERATOR_CHARACTERISTICS_MASK; } }
这里的位运算很复杂,只展示简单的计算结果和相关功能:
combineOpFlags()
:用于合并新的flags
和上一个flags
,因为Stream
的数据结构是一个Pipeline
,后继节点需要合并前驱节点的flags
,例如前驱节点flags
是ORDERED.set
,当前新加入Pipeline
的节点(后继节点)的新flags
为SIZED.set
,那么在后继节点中应该合并前驱节点的标志,简单想象为SIZED.set | ORDERED.set
,如果是头节点,那么初始化头节点时候的flags
要合并INITIAL_OPS_VALUE
,这里举个例子:
int left = ORDERED.set | DISTINCT.set;int right = SIZED.clear | SORTED.clear;System.out.println("left:" + Integer.toBinaryString(left)); System.out.println("right:" + Integer.toBinaryString(right)); System.out.println("right mask:" + Integer.toBinaryString(getMask(right))); System.out.println("combine:" + Integer.toBinaryString(combineOpFlags(right, left))); left:1010001 right:10001000 right mask:11111111111111111111111100110011 combine:10011001
characteristics
的转化问题:Spliterator
中的characteristics
可以通过简单的按位与转换为flags
的原因是Spliterator
中的characteristics
在设计时候本身就是和StreamOpFlag
匹配的,准确来说就是bit map
的位分布是匹配的,所以直接与SPLITERATOR_CHARACTERISTICS_MASK
做按位与即可,见下面的例子:
// 这里简单点只展示8 bit SPLITERATOR_CHARACTERISTICS_MASK: 0101 0101 Spliterator.ORDERED: 0001 0000 StreamOpFlag.ORDERED.set: 0001 0000
至此,已经分析完StreamOpFlag
的完整实现,Mask
相关的方法限于篇幅就不打算详细展开,下面会开始分析Stream
中的”流水线”结构实现,因为习惯问题,下文的”标志”和”特性”两个词语会混用。
ReferencePipeline源码分析 既然Stream
具备流的特性,那么就需要一个链式数据结构,让元素能够从Source
一直往下”流动”和传递到每一个链节点,实现这种场景的常用数据结构就是双向链表(考虑需要回溯,单向链表不太合适),目前比较著名的实现有AQS
和Netty
中的ChannelHandlerContext
。例如Netty
中的流水线ChannelPipeline
设计如下:
对于这个双向链表的数据结构,Stream
中对应的类就是AbstractPipeline
,核心实现类在ReferencePipeline
和ReferencePipeline
的内部类。
主要接口 先简单展示AbstractPipeline
的核心父类方法定义,主要接父类是Stream
、BaseStream
和PipelineHelper
:
Stream
代表一个支持串行和并行聚合操作集合的元素序列,此顶层接口提供了流中间操作、终结操作和一些静态工厂方法的定义(由于方法太多,这里不全部列举),这个接口本质是一个建造器类型接口(对接中间操作来说),可以构成一个多中间操作,单终结操作的链,例如:
public interface Stream <T > extends BaseStream <T , Stream <T >> { Stream<T> filter (Predicate<? super T> predicate) ; <R> Stream<R> map (Function<? super T, ? extends R> mapper) ; void forEach (Consumer<? super T> action) ; } Stream x = buildStream(); x.filter().map().forEach()
BaseStream
:Stream
的基础接口,定义流的迭代器、流的等效变体(并发处理变体、同步处理变体和不支持顺序处理元素变体)、并发和同步判断以及关闭相关方法
public interface BaseStream <T , S extends BaseStream <T , S >> extends AutoCloseable { Iterator<T> iterator () ; Spliterator<T> spliterator () ; boolean isParallel () ; S sequential () ; S parallel () ; S unordered () ; S onClose (Runnable closeHandler) ; @Override void close () ; }
abstract class PipelineHelper <P_OUT > { abstract StreamShape getSourceShape () ; abstract int getStreamAndOpFlags () ; abstract <P_IN> long exactOutputSizeIfKnown (Spliterator<P_IN> spliterator) ; abstract <P_IN, S extends Sink<P_OUT>> S wrapAndCopyInto (S sink, Spliterator<P_IN> spliterator) ; abstract <P_IN> void copyInto (Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) ; abstract <P_IN> boolean copyIntoWithCancel (Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) ; abstract <P_IN> Sink<P_IN> wrapSink (Sink<P_OUT> sink) ; abstract <P_IN> Spliterator<P_OUT> wrapSpliterator (Spliterator<P_IN> spliterator) ; abstract Node.Builder<P_OUT> makeNodeBuilder (long exactSizeIfKnown, IntFunction<P_OUT[]> generator) ; abstract <P_IN> Node<P_OUT> evaluate (Spliterator<P_IN> spliterator, boolean flatten, IntFunction<P_OUT[]> generator) ;}
注意一点(重复3
次):
这里把同步流称为同步处理|执行的流,”并行流”称为并发处理|执行的流,因为并行流有歧义,实际上只是并发执行,不是并行执行
这里把同步流称为同步处理|执行的流,”并行流”称为并发处理|执行的流,因为并行流有歧义,实际上只是并发执行,不是并行执行
这里把同步流称为同步处理|执行的流,”并行流”称为并发处理|执行的流,因为并行流有歧义,实际上只是并发执行,不是并行执行
Sink和引用类型链 PipelineHelper
的几个方法中存在Sink
这个接口,上一节没有分析,这一小节会详细展开。Stream
在构建的时候虽然是一个双向链表的结构,但是在最终应用终结操作的时候,会把所有操作转化为引用类型链(ChainedReference
),记得之前也提到过这种类似于多层包装器的编程模式,简化一下模型如下:
public class WrapperApp { interface Wrapper { void doAction () ; } public static void main (String[] args) { AtomicInteger counter = new AtomicInteger(0 ); Wrapper first = () -> System.out.printf("wrapper [depth => %d] invoke\n" , counter.incrementAndGet()); Wrapper second = () -> { first.doAction(); System.out.printf("wrapper [depth => %d] invoke\n" , counter.incrementAndGet()); }; second.doAction(); } } wrapper [depth => 1 ] invoke wrapper [depth => 2 ] invoke
上面的例子有点突兀,两个不同Sink
的实现可以做到无感知融合,举另一个例子如下:
public interface Sink <T > extends Consumer <T > { default void begin (long size) { } default void end () { } abstract class ChainedReference <T , OUT > implements Sink <T > { protected final Sink<OUT> downstream; public ChainedReference (Sink<OUT> downstream) { this .downstream = downstream; } } } @SuppressWarnings({"unchecked", "rawtypes"}) public class ReferenceChain <OUT , R > { private final List<Supplier<Sink<?>>> sinkBuilders = new ArrayList<>(); private final AtomicReference<Sink> sinkReference = new AtomicReference<>(); public ReferenceChain<OUT, R> filter (Predicate<OUT> predicate) { sinkBuilders.add(() -> { Sink<OUT> prevSink = (Sink<OUT>) sinkReference.get(); Sink.ChainedReference<OUT, OUT> currentSink = new Sink.ChainedReference<>(prevSink) { @Override public void accept (OUT out) { if (predicate.test(out)) { downstream.accept(out); } } }; sinkReference.set(currentSink); return currentSink; }); return this ; } public ReferenceChain<OUT, R> map (Function<OUT, R> function) { sinkBuilders.add(() -> { Sink<R> prevSink = (Sink<R>) sinkReference.get(); Sink.ChainedReference<OUT, R> currentSink = new Sink.ChainedReference<>(prevSink) { @Override public void accept (OUT in) { downstream.accept(function.apply(in)); } }; sinkReference.set(currentSink); return currentSink; }); return this ; } public void forEachPrint (Collection<OUT> collection) { forEachPrint(collection, false ); } public void forEachPrint (Collection<OUT> collection, boolean reverse) { Spliterator<OUT> spliterator = collection.spliterator(); Sink<OUT> sink = System.out::println; sinkReference.set(sink); Sink<OUT> stage = sink; if (reverse) { for (int i = 0 ; i <= sinkBuilders.size() - 1 ; i++) { Supplier<Sink<?>> supplier = sinkBuilders.get(i); stage = (Sink<OUT>) supplier.get(); } } else { for (int i = sinkBuilders.size() - 1 ; i >= 0 ; i--) { Supplier<Sink<?>> supplier = sinkBuilders.get(i); stage = (Sink<OUT>) supplier.get(); } } Sink<OUT> finalStage = stage; spliterator.forEachRemaining(finalStage); } public static void main (String[] args) { List<Integer> list = new ArrayList<>(); list.add(1 ); list.add(2 ); list.add(3 ); list.add(12 ); ReferenceChain<Integer, Integer> chain = new ReferenceChain<>(); chain.filter(item -> item > 10 ) .map(item -> item * 2 ) .forEachPrint(list); } } 24
执行的流程如下:
多层包装器的编程模式的核心要领就是:
绝大部分操作可以转换为java.util.function.Consumer
的实现,也就是实现accept(T t)
方法完成对传入的元素进行处理
先处理的Sink
总是以后处理的Sink
为入参,在自身处理方法中判断和回调传入的Sink
的处理方法回调,也就是构建引用链的时候,需要从后往前构建,这种方式的实现逻辑可以参考AbstractPipeline#wrapSink()
,例如:
Sink mapSink = new Sink(inputSink){ private Function mapper; public void accept (E ele) { inputSink.accept(mapper.apply(ele)) } } Sink filterSink = new Sink(mapSink){ private Predicate predicate; public void accept (E ele) { if (predicate.test(ele)){ mapSink.accept(ele); } } }
由上一点得知,一般来说,最后的终结操作会应用在引用链的第一个Sink
上
上面的代码并非笔者虚构出来,可见java.util.stream.Sink
的源码:
interface Sink <T > extends Consumer <T > { default void begin (long size) {} default void end () {} default boolean cancellationRequested () { return false ; } default void accept (int value) { throw new IllegalStateException("called wrong accept method" ); } default void accept (long value) { throw new IllegalStateException("called wrong accept method" ); } default void accept (double value) { throw new IllegalStateException("called wrong accept method" ); } abstract static class ChainedReference <T , E_OUT > implements Sink <T > { protected final Sink<? super E_OUT> downstream; public ChainedReference (Sink<? super E_OUT> downstream) { this .downstream = Objects.requireNonNull(downstream); } @Override public void begin (long size) { downstream.begin(size); } @Override public void end () { downstream.end(); } @Override public boolean cancellationRequested () { return downstream.cancellationRequested(); } } }
如果用过RxJava
或者Project-Reactor
,Sink
更像是Subscriber
,多个Subscriber
组成了ChainedReference
(Sink Chain
,可以理解为一个复合的Subscriber
),而Terminal Op
则类似于Publisher
,只有在Subscriber
订阅Publisher
的时候才会进行数据的处理,这里是应用了Reactive
编程模式。
AbstractPipeline和ReferencePipeline的实现 AbstractPipeline
和ReferencePipeline
都是抽象类,AbstractPipeline
用于构建Pipeline
的数据结构,提供一些Shape
相关的抽象方法给ReferencePipeline
实现,而ReferencePipeline
就是Stream
中Pipeline
的基础类型,从源码上看,Stream
链式(管道式)结构的头节点和操作节点都是ReferencePipeline
的子类。先看AbstractPipeline
的成员变量和构造函数:
abstract class AbstractPipeline <E_IN , E_OUT , S extends BaseStream <E_OUT , S >> extends PipelineHelper <E_OUT > implements BaseStream <E_OUT , S > { @SuppressWarnings("rawtypes") private final AbstractPipeline sourceStage; @SuppressWarnings("rawtypes") private final AbstractPipeline previousStage; protected final int sourceOrOpFlags; @SuppressWarnings("rawtypes") private AbstractPipeline nextStage; private int depth; private int combinedFlags; private Spliterator<?> sourceSpliterator; private Supplier<? extends Spliterator<?>> sourceSupplier; private boolean linkedOrConsumed; private boolean sourceAnyStateful; private Runnable sourceCloseAction; private boolean parallel; AbstractPipeline(Supplier<? extends Spliterator<?>> source, int sourceFlags, boolean parallel) { this .previousStage = null ; this .sourceSupplier = source; this .sourceStage = this ; this .sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK; this .combinedFlags = (~(sourceOrOpFlags << 1 )) & StreamOpFlag.INITIAL_OPS_VALUE; this .depth = 0 ; this .parallel = parallel; } AbstractPipeline(Spliterator<?> source, int sourceFlags, boolean parallel) { this .previousStage = null ; this .sourceSpliterator = source; this .sourceStage = this ; this .sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK; this .combinedFlags = (~(sourceOrOpFlags << 1 )) & StreamOpFlag.INITIAL_OPS_VALUE; this .depth = 0 ; this .parallel = parallel; } AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) { if (previousStage.linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); previousStage.linkedOrConsumed = true ; previousStage.nextStage = this ; this .previousStage = previousStage; this .sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK; this .combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags); this .sourceStage = previousStage.sourceStage; if (opIsStateful()) sourceStage.sourceAnyStateful = true ; this .depth = previousStage.depth + 1 ; } }
至此,可以看出流管道的数据结构:
Terminal Op
不参与管道链式结构的构建。接着看AbstractPipeline
中的终结求值方法(Terminal evaluation methods
):
abstract class AbstractPipeline <E_IN , E_OUT , S extends BaseStream <E_OUT , S >> extends PipelineHelper <E_OUT > implements BaseStream <E_OUT , S > { final <R> R evaluate (TerminalOp<E_OUT, R> terminalOp) { assert getOutputShape () == terminalOp.inputShape(); if (linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true ; return isParallel() ? terminalOp.evaluateParallel(this , sourceSpliterator(terminalOp.getOpFlags())) : terminalOp.evaluateSequential(this , sourceSpliterator(terminalOp.getOpFlags())); } @SuppressWarnings("unchecked") final Node<E_OUT> evaluateToArrayNode (IntFunction<E_OUT[]> generator) { if (linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true ; if (isParallel() && previousStage != null && opIsStateful()) { depth = 0 ; return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0 ), generator); } else { return evaluate(sourceSpliterator(0 ), true , generator); } } final Spliterator<E_OUT> sourceStageSpliterator () { if (this != sourceStage) throw new IllegalStateException(); if (linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true ; if (sourceStage.sourceSpliterator != null ) { @SuppressWarnings("unchecked") Spliterator<E_OUT> s = sourceStage.sourceSpliterator; sourceStage.sourceSpliterator = null ; return s; } else if (sourceStage.sourceSupplier != null ) { @SuppressWarnings("unchecked") Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSupplier.get(); sourceStage.sourceSupplier = null ; return s; } else { throw new IllegalStateException(MSG_CONSUMED); } } }
AbstractPipeline
中实现了BaseStream
的方法:
abstract class AbstractPipeline <E_IN , E_OUT , S extends BaseStream <E_OUT , S >> extends PipelineHelper <E_OUT > implements BaseStream <E_OUT , S > { @Override @SuppressWarnings("unchecked") public final S sequential () { sourceStage.parallel = false ; return (S) this ; } @Override @SuppressWarnings("unchecked") public final S parallel () { sourceStage.parallel = true ; return (S) this ; } @Override public void close () { linkedOrConsumed = true ; sourceSupplier = null ; sourceSpliterator = null ; if (sourceStage.sourceCloseAction != null ) { Runnable closeAction = sourceStage.sourceCloseAction; sourceStage.sourceCloseAction = null ; closeAction.run(); } } @Override @SuppressWarnings("unchecked") public S onClose (Runnable closeHandler) { if (linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); Objects.requireNonNull(closeHandler); Runnable existingHandler = sourceStage.sourceCloseAction; sourceStage.sourceCloseAction = (existingHandler == null ) ? closeHandler : Streams.composeWithExceptions(existingHandler, closeHandler); return (S) this ; } @Override @SuppressWarnings("unchecked") public Spliterator<E_OUT> spliterator () { if (linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true ; if (this == sourceStage) { if (sourceStage.sourceSpliterator != null ) { @SuppressWarnings("unchecked") Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSpliterator; sourceStage.sourceSpliterator = null ; return s; } else if (sourceStage.sourceSupplier != null ) { @SuppressWarnings("unchecked") Supplier<Spliterator<E_OUT>> s = (Supplier<Spliterator<E_OUT>>) sourceStage.sourceSupplier; sourceStage.sourceSupplier = null ; return lazySpliterator(s); } else { throw new IllegalStateException(MSG_CONSUMED); } } else { return wrap(this , () -> sourceSpliterator(0 ), isParallel()); } } @Override public final boolean isParallel () { return sourceStage.parallel; } final int getStreamFlags () { return StreamOpFlag.toStreamFlags(combinedFlags); } @SuppressWarnings("unchecked") private Spliterator<?> sourceSpliterator(int terminalFlags) { Spliterator<?> spliterator = null ; if (sourceStage.sourceSpliterator != null ) { spliterator = sourceStage.sourceSpliterator; sourceStage.sourceSpliterator = null ; } else if (sourceStage.sourceSupplier != null ) { spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get(); sourceStage.sourceSupplier = null ; } else { throw new IllegalStateException(MSG_CONSUMED); } if (isParallel() && sourceStage.sourceAnyStateful) { int depth = 1 ; for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this ; u != e; u = p, p = p.nextStage) { int thisOpFlags = p.sourceOrOpFlags; if (p.opIsStateful()) { depth = 0 ; if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) { thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT; } spliterator = p.opEvaluateParallelLazy(u, spliterator); thisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED) ? (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED : (thisOpFlags & ~StreamOpFlag.IS_SIZED) | StreamOpFlag.NOT_SIZED; } p.depth = depth++; p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags); } } if (terminalFlags != 0 ) { combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags); } return spliterator; } }
AbstractPipeline
中实现了PipelineHelper
的方法:
abstract class AbstractPipeline <E_IN , E_OUT , S extends BaseStream <E_OUT , S >> extends PipelineHelper <E_OUT > implements BaseStream <E_OUT , S > { @Override final StreamShape getSourceShape () { @SuppressWarnings("rawtypes") AbstractPipeline p = AbstractPipeline.this ; while (p.depth > 0 ) { p = p.previousStage; } return p.getOutputShape(); } @Override final <P_IN> long exactOutputSizeIfKnown (Spliterator<P_IN> spliterator) { return StreamOpFlag.SIZED.isKnown(getStreamAndOpFlags()) ? spliterator.getExactSizeIfKnown() : -1 ; } @Override final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto (S sink, Spliterator<P_IN> spliterator) { copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator); return sink; } @Override final <P_IN> void copyInto (Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) { Objects.requireNonNull(wrappedSink); if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) { wrappedSink.begin(spliterator.getExactSizeIfKnown()); spliterator.forEachRemaining(wrappedSink); wrappedSink.end(); } else { copyIntoWithCancel(wrappedSink, spliterator); } } @Override @SuppressWarnings("unchecked") final <P_IN> boolean copyIntoWithCancel (Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) { @SuppressWarnings({"rawtypes","unchecked"}) AbstractPipeline p = AbstractPipeline.this ; while (p.depth > 0 ) { p = p.previousStage; } wrappedSink.begin(spliterator.getExactSizeIfKnown()); boolean cancelled = p.forEachWithCancel(spliterator, wrappedSink); wrappedSink.end(); return cancelled; } @Override final int getStreamAndOpFlags () { return combinedFlags; } final boolean isOrdered () { return StreamOpFlag.ORDERED.isKnown(combinedFlags); } @Override @SuppressWarnings("unchecked") final <P_IN> Sink<P_IN> wrapSink (Sink<E_OUT> sink) { Objects.requireNonNull(sink); for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this ; p.depth > 0 ; p=p.previousStage) { sink = p.opWrapSink(p.previousStage.combinedFlags, sink); } return (Sink<P_IN>) sink; } @Override @SuppressWarnings("unchecked") final <P_IN> Spliterator<E_OUT> wrapSpliterator (Spliterator<P_IN> sourceSpliterator) { if (depth == 0 ) { return (Spliterator<E_OUT>) sourceSpliterator; } else { return wrap(this , () -> sourceSpliterator, isParallel()); } } @Override @SuppressWarnings("unchecked") final <P_IN> Node<E_OUT> evaluate (Spliterator<P_IN> spliterator, boolean flatten, IntFunction<E_OUT[]> generator) { if (isParallel()) { return evaluateToNode(this , spliterator, flatten, generator); } else { Node.Builder<E_OUT> nb = makeNodeBuilder( exactOutputSizeIfKnown(spliterator), generator); return wrapAndCopyInto(nb, spliterator).build(); } } }
AbstractPipeline
中剩余的待如XXYYZZPipeline
等子类实现的抽象方法:
abstract class AbstractPipeline <E_IN , E_OUT , S extends BaseStream <E_OUT , S >> extends PipelineHelper <E_OUT > implements BaseStream <E_OUT , S > { abstract StreamShape getOutputShape () ; abstract <P_IN> Node<E_OUT> evaluateToNode (PipelineHelper<E_OUT> helper, Spliterator<P_IN> spliterator, boolean flattenTree, IntFunction<E_OUT[]> generator) ; abstract <P_IN> Spliterator<E_OUT> wrap (PipelineHelper<E_OUT> ph, Supplier<Spliterator<P_IN>> supplier, boolean isParallel) ; abstract <P_IN> Spliterator<E_OUT> wrap (PipelineHelper<E_OUT> ph, Supplier<Spliterator<P_IN>> supplier, boolean isParallel) ; abstract boolean forEachWithCancel (Spliterator<E_OUT> spliterator, Sink<E_OUT> sink) ; abstract Node.Builder<E_OUT> makeNodeBuilder (long exactSizeIfKnown, IntFunction<E_OUT[]> generator) ; abstract boolean opIsStateful () ; abstract Sink<E_IN> opWrapSink (int flags, Sink<E_OUT> sink) ; <P_IN> Node<E_OUT> opEvaluateParallel (PipelineHelper<E_OUT> helper, Spliterator<P_IN> spliterator, IntFunction<E_OUT[]> generator) { throw new UnsupportedOperationException("Parallel evaluation is not supported" ); } @SuppressWarnings("unchecked") <P_IN> Spliterator<E_OUT> opEvaluateParallelLazy (PipelineHelper<E_OUT> helper, Spliterator<P_IN> spliterator) { return opEvaluateParallel(helper, spliterator, i -> (E_OUT[]) new Object[i]).spliterator(); } }
这里提到的抽象方法opWrapSink()
其实就是元素引用链的添加链节点的方法,它的实现逻辑见子类,这里只考虑非特化子类ReferencePipeline
的部分源码:
abstract class ReferencePipeline <P_IN , P_OUT > extends AbstractPipeline <P_IN , P_OUT , Stream <P_OUT >> implements Stream <P_OUT > { ReferencePipeline(Supplier<? extends Spliterator<?>> source, int sourceFlags, boolean parallel) { super (source, sourceFlags, parallel); } ReferencePipeline(Spliterator<?> source, int sourceFlags, boolean parallel) { super (source, sourceFlags, parallel); } ReferencePipeline(AbstractPipeline<?, P_IN, ?> upstream, int opFlags) { super (upstream, opFlags); } @Override final StreamShape getOutputShape () { return StreamShape.REFERENCE; } @Override final <P_IN> Node<P_OUT> evaluateToNode (PipelineHelper<P_OUT> helper, Spliterator<P_IN> spliterator, boolean flattenTree, IntFunction<P_OUT[]> generator) { return Nodes.collect(helper, spliterator, flattenTree, generator); } @Override final <P_IN> Spliterator<P_OUT> wrap (PipelineHelper<P_OUT> ph, Supplier<Spliterator<P_IN>> supplier, boolean isParallel) { return new StreamSpliterators.WrappingSpliterator<>(ph, supplier, isParallel); } @Override final Spliterator<P_OUT> lazySpliterator (Supplier<? extends Spliterator<P_OUT>> supplier) { return new StreamSpliterators.DelegatingSpliterator<>(supplier); } @Override final boolean forEachWithCancel (Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) { boolean cancelled; do { } while (!(cancelled = sink.cancellationRequested()) && spliterator.tryAdvance(sink)); return cancelled; } @Override final Node.Builder<P_OUT> makeNodeBuilder (long exactSizeIfKnown, IntFunction<P_OUT[]> generator) { return Nodes.builder(exactSizeIfKnown, generator); } @Override public final Iterator<P_OUT> iterator () { return Spliterators.iterator(spliterator()); } static class Head <E_IN , E_OUT > extends ReferencePipeline <E_IN , E_OUT > { Head(Supplier<? extends Spliterator<?>> source, int sourceFlags, boolean parallel) { super (source, sourceFlags, parallel); } Head(Spliterator<?> source, int sourceFlags, boolean parallel) { super (source, sourceFlags, parallel); } @Override final boolean opIsStateful () { throw new UnsupportedOperationException(); } @Override final Sink<E_IN> opWrapSink (int flags, Sink<E_OUT> sink) { throw new UnsupportedOperationException(); } @Override public void forEach (Consumer<? super E_OUT> action) { if (!isParallel()) { sourceStageSpliterator().forEachRemaining(action); } else { super .forEach(action); } } @Override public void forEachOrdered (Consumer<? super E_OUT> action) { if (!isParallel()) { sourceStageSpliterator().forEachRemaining(action); } else { super .forEachOrdered(action); } } } abstract static class StatelessOp <E_IN , E_OUT > extends ReferencePipeline <E_IN , E_OUT > { StatelessOp(AbstractPipeline<?, E_IN, ?> upstream, StreamShape inputShape, int opFlags) { super (upstream, opFlags); assert upstream.getOutputShape() == inputShape; } @Override final boolean opIsStateful () { return false ; } } abstract static class StatefulOp <E_IN , E_OUT > extends ReferencePipeline <E_IN , E_OUT > { StatefulOp(AbstractPipeline<?, E_IN, ?> upstream, StreamShape inputShape, int opFlags) { super (upstream, opFlags); assert upstream.getOutputShape() == inputShape; } @Override final boolean opIsStateful () { return true ; } @Override abstract <P_IN> Node<E_OUT> opEvaluateParallel (PipelineHelper<E_OUT> helper, Spliterator<P_IN> spliterator, IntFunction<E_OUT[]> generator) ; } }
这里重重重点分析一下ReferencePipeline
中的wrapSink
方法实现:
final <P_IN> Sink<P_IN> wrapSink (Sink<E_OUT> sink) { Objects.requireNonNull(sink); for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this ; p.depth > 0 ; p=p.previousStage) { sink = p.opWrapSink(p.previousStage.combinedFlags, sink); } return (Sink<P_IN>) sink; }
入参是一个Sink
实例,返回值也是一个Sink
实例,里面的for
循环是基于当前的AbstractPipeline
节点向前遍历,直到depth
为0
的节点跳出循环,而depth
为0
意味着该节点必定为头节点,也就是该循环是遍历当前节点到头节点的后继节点,Sink
是”向前包装的”,也就是处于链后面的节点Sink
总是会作为其前驱节点的opWrapSink()
方法的入参,在同步执行流求值计算的时候,前驱节点的Sink
处理完元素后就会通过downstream
引用(其实就是后驱节点的Sink
)调用其accept()
把元素或者处理完的元素结果传递进去,激活下一个Sink
,以此类推。另外,ReferencePipeline
的三个内部类Head
、StatelessOp
和StatefulOp
就是流的节点类,其中只有Head
是非抽象类,代表流管道结构(或者说双向链表结构)的头节点,StatelessOp
(无状态操作)和StatefulOp
(有状态操作)的子类构成了流管道结构的操作节点或者是终结操作。在忽略是否有状态操作的前提下看ReferencePipeline
,它只是流数据结构的承载体,表面上看到的双向链表结构在流的求值计算过程中并不会进行直接遍历每个节点进行求值,而是先转化成一个多层包装的Sink
,也就是前文笔者提到的元素引用链后者前一句分析的Sink
元素处理以及传递,正确来说应该是一个Sink
栈或者Sink
包装器,它的实现可以类比为现实生活中的洋葱,或者编程模式中的AOP
编程模式。形象一点的描述如下:
Head(Spliterator) -> Op(filter) -> Op(map) -> Op(sorted) -> Terminal Op(forEach) ↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓ forEach ele in Spliterator: Sink[filter](ele){ if filter process == true: Sink[map](ele){ ele = mapper(ele) Sink[sorted](ele){ var array begin: accept(ele): add ele to array end: sort ele in array } } }
终结操作forEach
是目前分析源码中最简单的实现,下面会详细分析每种终结操作的实现细节。
流中间操作的源码实现 限于篇幅,这里只能挑选一部分的中间Op
进行分析。流的中间操作基本都是由BaseStream
接口定义,在ReferencePipeline
中进行实现,这里挑选比较常用的filter
、map
和sorted
进行分析。先看filter
:
abstract class ReferencePipeline <P_IN , P_OUT > extends AbstractPipeline <P_IN , P_OUT , Stream <P_OUT >> implements Stream <P_OUT > { @Override public final Stream<P_OUT> filter (Predicate<? super P_OUT> predicate) { Objects.requireNonNull(predicate); return new StatelessOp<P_OUT, P_OUT>(this , StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED) { @Override Sink<P_OUT> opWrapSink (int flags, Sink<P_OUT> sink) { return new Sink.ChainedReference<P_OUT, P_OUT>(sink) { @Override public void begin (long size) { downstream.begin(-1 ); } @Override public void accept (P_OUT u) { if (predicate.test(u)) downstream.accept(u); } }; } }; } }
接着是map
:
abstract class ReferencePipeline <P_IN , P_OUT > extends AbstractPipeline <P_IN , P_OUT , Stream <P_OUT >> implements Stream <P_OUT > { @Override @SuppressWarnings("unchecked") public final <R> Stream<R> map (Function<? super P_OUT, ? extends R> mapper) { Objects.requireNonNull(mapper); return new StatelessOp<P_OUT, R>(this , StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { @Override Sink<P_OUT> opWrapSink (int flags, Sink<R> sink) { return new Sink.ChainedReference<P_OUT, R>(sink) { @Override public void accept (P_OUT u) { downstream.accept(mapper.apply(u)); } }; } }; } }
然后是sorted
,sorted
操作会相对复杂一点:
abstract class ReferencePipeline <P_IN , P_OUT > extends AbstractPipeline <P_IN , P_OUT , Stream <P_OUT >> implements Stream <P_OUT > { @Override public final Stream<P_OUT> sorted (Comparator<? super P_OUT> comparator) { return SortedOps.makeRef(this , comparator); } } final class SortedOps { static <T> Stream<T> makeRef (AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) { return new OfRef<>(upstream, comparator); } private static final class OfRef <T > extends ReferencePipeline .StatefulOp <T , T > { private final boolean isNaturalSort; private final Comparator<? super T> comparator; OfRef(AbstractPipeline<?, T, ?> upstream) { super (upstream, StreamShape.REFERENCE, StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED); this .isNaturalSort = true ; @SuppressWarnings("unchecked") Comparator<? super T> comp = (Comparator<? super T>) Comparator.naturalOrder(); this .comparator = comp; } OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) { super (upstream, StreamShape.REFERENCE, StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED); this .isNaturalSort = false ; this .comparator = Objects.requireNonNull(comparator); } @Override public Sink<T> opWrapSink (int flags, Sink<T> sink) { Objects.requireNonNull(sink); if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort) return sink; else if (StreamOpFlag.SIZED.isKnown(flags)) return new SizedRefSortingSink<>(sink, comparator); else return new RefSortingSink<>(sink, comparator); } @Override public <P_IN> Node<T> opEvaluateParallel (PipelineHelper<T> helper, Spliterator<P_IN> spliterator, IntFunction<T[]> generator) { if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort) { return helper.evaluate(spliterator, false , generator); } else { T[] flattenedData = helper.evaluate(spliterator, true , generator).asArray(generator); Arrays.parallelSort(flattenedData, comparator); return Nodes.node(flattenedData); } } } private static final class RefSortingSink <T > extends AbstractRefSortingSink <T > { private ArrayList<T> list; RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) { super (sink, comparator); } @Override public void begin (long size) { if (size >= Nodes.MAX_ARRAY_SIZE) throw new IllegalArgumentException(Nodes.BAD_SIZE); list = (size >= 0 ) ? new ArrayList<>((int ) size) : new ArrayList<>(); } @Override public void end () { list.sort(comparator); downstream.begin(list.size()); if (!cancellationRequestedCalled) { list.forEach(downstream::accept); } else { for (T t : list) { if (downstream.cancellationRequested()) break ; downstream.accept(t); } } downstream.end(); list = null ; } @Override public void accept (T t) { list.add(t); } } }
sorted
操作有个比较显著的特点,一般的Sink
处理完自身的逻辑,会在accept()
方法激活下一个Sink
引用,但是它在accept()
方法中只做元素的累积(元素富集 ),在end()
方法进行最终的排序操作和模仿Spliterator
的两个元素遍历方法向downstream
推送待处理的元素。示意图如下:
其他中间操作的实现逻辑是大致相同的。
同步执行流终结操作的源码实现 限于篇幅,这里只能挑选一部分的Terminal Op
进行分析,简单起见只分析同步执行的场景 ,这里挑选最典型和最复杂的froEach()
和collect()
,还有比较独特的toArray()
方法。先看froEach()
方法的实现过程:
abstract class ReferencePipeline <P_IN , P_OUT > extends AbstractPipeline <P_IN , P_OUT , Stream <P_OUT >> implements Stream <P_OUT > { @Override public void forEach (Consumer<? super P_OUT> action) { evaluate(ForEachOps.makeRef(action, false )); } final <R> R evaluate (TerminalOp<E_OUT, R> terminalOp) { assert getOutputShape () == terminalOp.inputShape(); if (linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true ; return isParallel() ? terminalOp.evaluateParallel(this , sourceSpliterator(terminalOp.getOpFlags())) : terminalOp.evaluateSequential(this , sourceSpliterator(terminalOp.getOpFlags())); } } final class ForEachOps { public static <T> TerminalOp<T, Void> makeRef (Consumer<? super T> action, boolean ordered) { Objects.requireNonNull(action); return new ForEachOp.OfRef<>(action, ordered); } abstract static class ForEachOp <T > implements TerminalOp <T , Void >, TerminalSink <T , Void > { private final boolean ordered; protected ForEachOp (boolean ordered) { this .ordered = ordered; } @Override public int getOpFlags () { return ordered ? 0 : StreamOpFlag.NOT_ORDERED; } @Override public <S> Void evaluateSequential (PipelineHelper<T> helper, Spliterator<S> spliterator) { return helper.wrapAndCopyInto(this , spliterator).get(); } @Override public <S> Void evaluateParallel (PipelineHelper<T> helper, Spliterator<S> spliterator) { if (ordered) new ForEachOrderedTask<>(helper, spliterator, this ).invoke(); else new ForEachTask<>(helper, spliterator, helper.wrapSink(this )).invoke(); return null ; } @Override public Void get () { return null ; } static final class OfRef <T > extends ForEachOp <T > { final Consumer<? super T> consumer; OfRef(Consumer<? super T> consumer, boolean ordered) { super (ordered); this .consumer = consumer; } @Override public void accept (T t) { consumer.accept(t); } } } }
forEach
终结操作实现上,自身这个操作并不会构成流的链式结构的一部分,也就是它不是一个AbstractPipeline
的子类实例,而是构建一个回调Consumer
实例操作的一个Sink
实例(准确来说是TerminalSink
)实例,这里暂且叫forEach terminal sink
,通过流最后一个操作节点的wrapSink()
方法,把forEach terminal sink
添加到Sink
链的尾部,通过流最后一个操作节点的copyInto()
方法进行元素遍历,按照copyInto()
方法的套路,只要多层包装的Sink
方法在回调其实现方法的时候总是激活downstream
的前提下,执行的顺序就是流链式结构定义的操作节点顺序,而forEach
最后添加的Consumer
实例一定就是最后回调的。
接着分析collect()
方法的实现,先看Collector
接口的定义:
public interface Collector <T , A , R > { Supplier<A> supplier () ; BiConsumer<A, T> accumulator () ; BinaryOperator<A> combiner () ; Function<A, R> finisher () ; Set<Characteristics> characteristics () ; enum Characteristics { CONCURRENT, UNORDERED, IDENTITY_FINISH } } public final class Collectors { static final Set<Collector.Characteristics> CH_CONCURRENT_ID = Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.CONCURRENT, Collector.Characteristics.UNORDERED, Collector.Characteristics.IDENTITY_FINISH)); static final Set<Collector.Characteristics> CH_CONCURRENT_NOID = Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.CONCURRENT, Collector.Characteristics.UNORDERED)); static final Set<Collector.Characteristics> CH_ID = Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH)); static final Set<Collector.Characteristics> CH_UNORDERED_ID = Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.UNORDERED, Collector.Characteristics.IDENTITY_FINISH)); static final Set<Collector.Characteristics> CH_NOID = Collections.emptySet(); static final Set<Collector.Characteristics> CH_UNORDERED_NOID = Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.UNORDERED)); private Collectors () { } static class CollectorImpl <T , A , R > implements Collector <T , A , R > { private final Supplier<A> supplier; private final BiConsumer<A, T> accumulator; private final BinaryOperator<A> combiner; private final Function<A, R> finisher; private final Set<Characteristics> characteristics; CollectorImpl(Supplier<A> supplier, BiConsumer<A, T> accumulator, BinaryOperator<A> combiner, Function<A,R> finisher, Set<Characteristics> characteristics) { this .supplier = supplier; this .accumulator = accumulator; this .combiner = combiner; this .finisher = finisher; this .characteristics = characteristics; } CollectorImpl(Supplier<A> supplier, BiConsumer<A, T> accumulator, BinaryOperator<A> combiner, Set<Characteristics> characteristics) { this (supplier, accumulator, combiner, castingIdentity(), characteristics); } @Override public BiConsumer<A, T> accumulator () { return accumulator; } @Override public Supplier<A> supplier () { return supplier; } @Override public BinaryOperator<A> combiner () { return combiner; } @Override public Function<A, R> finisher () { return finisher; } @Override public Set<Characteristics> characteristics () { return characteristics; } } private static <I, R> Function<I, R> castingIdentity () { return i -> (R) i; } }
collect()
方法的求值执行入口在ReferencePipeline
中:
abstract class ReferencePipeline <P_IN , P_OUT > extends AbstractPipeline <P_IN , P_OUT , Stream <P_OUT >> implements Stream <P_OUT > { public final <R, A> R collect (Collector<? super P_OUT, A, R> collector) { A container; if (isParallel() && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT)) && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) { container = collector.supplier().get(); BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator(); forEach(u -> accumulator.accept(container, u)); } else { container = evaluate(ReduceOps.makeRef(collector)); } return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH) ? (R) container : collector.finisher().apply(container); } } final class ReduceOps { private ReduceOps () { } public static <T, I> TerminalOp<T, I> makeRef (Collector<? super T, I, ?> collector) { Supplier<I> supplier = Objects.requireNonNull(collector).supplier(); BiConsumer<I, ? super T> accumulator = collector.accumulator(); BinaryOperator<I> combiner = collector.combiner(); class ReducingSink extends Box <I > implements AccumulatingSink <T , I , ReducingSink > { @Override public void begin (long size) { state = supplier.get(); } @Override public void accept (T t) { accumulator.accept(state, t); } @Override public void combine (ReducingSink other) { state = combiner.apply(state, other.state); } } return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) { @Override public ReducingSink makeSink () { return new ReducingSink(); } @Override public int getOpFlags () { return collector.characteristics().contains(Collector.Characteristics.UNORDERED) ? StreamOpFlag.NOT_ORDERED : 0 ; } }; } private interface AccumulatingSink <T , R , K extends AccumulatingSink <T , R , K >> extends TerminalSink <T , R > { void combine (K other) ; } private abstract static class Box <U > { U state; Box() {} public U get () { return state; } } private abstract static class ReduceOp <T , R , S extends AccumulatingSink <T , R , S >> implements TerminalOp <T , R > { private final StreamShape inputShape; ReduceOp(StreamShape shape) { inputShape = shape; } public abstract S makeSink () ; @Override public StreamShape inputShape () { return inputShape; } @Override public <P_IN> R evaluateSequential (PipelineHelper<T> helper, Spliterator<P_IN> spliterator) { return helper.wrapAndCopyInto(makeSink(), spliterator).get(); } @Override public <P_IN> R evaluateParallel (PipelineHelper<T> helper, Spliterator<P_IN> spliterator) { return new ReduceTask<>(this , helper, spliterator).invoke().get(); } } }
接着就看Collector
的静态工厂方法,看一些常用的Collector
实例是如何构建的,例如看Collectors.toList()
:
public static <T>Collector<T, ?, List<T>> toList() { return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new , List::add, (left, right) -> { left.addAll(right); return left; }, CH_ID); }
把过程画成流程图如下:
甚至可以更通俗地用伪代码表示Collector
这类Terminal Op
的执行过程(还是以Collectors.toList()
为例):
[begin] Supplier supplier = () -> new ArrayList<T>(); Container container = supplier.get(); Box.state = container; [accept] Box.state.add(element); [end] return supplier.get(); (=> return Box.state);↓↓↓↓↓↓↓↓↓甚至更加通俗的过程如下↓↓↓↓↓↓↓↓↓↓↓↓↓↓ ArrayList<T> container = new ArrayList<T>(); loop: container.add(element) return container;
也就是虽然工程化的代码看起来很复杂,最终的实现就是简单的:初始化ArrayList
实例由state
属性持有,遍历处理元素的时候把元素添加到state
中,最终返回state
。最后看toArray()
的方法实现(下面的方法代码没有按照实际的位置贴出,笔者把零散的代码块放在一起方便分析):
abstract class ReferencePipeline <P_IN , P_OUT > extends AbstractPipeline <P_IN , P_OUT , Stream <P_OUT >> implements Stream <P_OUT > { @Override @SuppressWarnings("unchecked") public final <A> A[] toArray(IntFunction<A[]> generator) { @SuppressWarnings("rawtypes") IntFunction rawGenerator = (IntFunction) generator; return (A[]) Nodes.flatten(evaluateToArrayNode(rawGenerator), rawGenerator) .asArray(rawGenerator); } @Override public final Object[] toArray() { return toArray(Object[]::new ); } final Node<E_OUT> evaluateToArrayNode (IntFunction<E_OUT[]> generator) { if (linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true ; if (isParallel() && previousStage != null && opIsStateful()) { depth = 0 ; return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0 ), generator); } else { return evaluate(sourceSpliterator(0 ), true , generator); } } final <P_IN> Node<E_OUT> evaluate (Spliterator<P_IN> spliterator, boolean flatten, IntFunction<E_OUT[]> generator) { if (isParallel()) { return evaluateToNode(this , spliterator, flatten, generator); } else { Node.Builder<E_OUT> nb = makeNodeBuilder( exactOutputSizeIfKnown(spliterator), generator); return wrapAndCopyInto(nb, spliterator).build(); } } final Node.Builder<P_OUT> makeNodeBuilder (long exactSizeIfKnown, IntFunction<P_OUT[]> generator) { return Nodes.builder(exactSizeIfKnown, generator); } } interface Node <T > { Spliterator<T> spliterator () ; void forEach (Consumer<? super T> consumer) ; default int getChildCount () { return 0 ; } default Node<T> getChild (int i) { throw new IndexOutOfBoundsException(); } default Node<T> truncate (long from, long to, IntFunction<T[]> generator) { if (from == 0 && to == count()) return this ; Spliterator<T> spliterator = spliterator(); long size = to - from; Node.Builder<T> nodeBuilder = Nodes.builder(size, generator); nodeBuilder.begin(size); for (int i = 0 ; i < from && spliterator.tryAdvance(e -> { }); i++) { } if (to == count()) { spliterator.forEachRemaining(nodeBuilder); } else { for (int i = 0 ; i < size && spliterator.tryAdvance(nodeBuilder); i++) { } } nodeBuilder.end(); return nodeBuilder.build(); } T[] asArray(IntFunction<T[]> generator); void copyInto (T[] array, int offset) ; default StreamShape getShape () { return StreamShape.REFERENCE; } long count () ; interface Builder <T > extends Sink <T > { Node<T> build () ; interface OfInt extends Node .Builder <Integer >, Sink .OfInt { @Override Node.OfInt build () ; } interface OfLong extends Node .Builder <Long >, Sink .OfLong { @Override Node.OfLong build () ; } interface OfDouble extends Node .Builder <Double >, Sink .OfDouble { @Override Node.OfDouble build () ; } } } final class Nodes { public static <T> Node<T> flatten (Node<T> node, IntFunction<T[]> generator) { if (node.getChildCount() > 0 ) { long size = node.count(); if (size >= MAX_ARRAY_SIZE) throw new IllegalArgumentException(BAD_SIZE); T[] array = generator.apply((int ) size); new ToArrayTask.OfRef<>(node, array, 0 ).invoke(); return node(array); } else { return node; } } static <T> Node.Builder<T> builder (long exactSizeIfKnown, IntFunction<T[]> generator) { return (exactSizeIfKnown >= 0 && exactSizeIfKnown < MAX_ARRAY_SIZE) ? new FixedNodeBuilder<>(exactSizeIfKnown, generator) : builder(); } static <T> Node.Builder<T> builder () { return new SpinedNodeBuilder<>(); } private static final class FixedNodeBuilder <T > extends ArrayNode <T > implements Node .Builder <T > { FixedNodeBuilder(long size, IntFunction<T[]> generator) { super (size, generator); assert size < MAX_ARRAY_SIZE; } @Override public Node<T> build () { if (curSize < array.length) throw new IllegalStateException(String.format("Current size %d is less than fixed size %d" , curSize, array.length)); return this ; } @Override public void begin (long size) { if (size != array.length) throw new IllegalStateException(String.format("Begin size %d is not equal to fixed size %d" , size, array.length)); curSize = 0 ; } @Override public void accept (T t) { if (curSize < array.length) { array[curSize++] = t; } else { throw new IllegalStateException(String.format("Accept exceeded fixed size of %d" , array.length)); } } @Override public void end () { if (curSize < array.length) throw new IllegalStateException(String.format("End size %d is less than fixed size %d" , curSize, array.length)); } @Override public String toString () { return String.format("FixedNodeBuilder[%d][%s]" , array.length - curSize, Arrays.toString(array)); } } private static class ArrayNode <T > implements Node <T > { final T[] array; int curSize; @SuppressWarnings("unchecked") ArrayNode(long size, IntFunction<T[]> generator) { if (size >= MAX_ARRAY_SIZE) throw new IllegalArgumentException(BAD_SIZE); this .array = generator.apply((int ) size); this .curSize = 0 ; } ArrayNode(T[] array) { this .array = array; this .curSize = array.length; } @Override public Spliterator<T> spliterator () { return Arrays.spliterator(array, 0 , curSize); } @Override public void copyInto (T[] dest, int destOffset) { System.arraycopy(array, 0 , dest, destOffset, curSize); } @Override public T[] asArray(IntFunction<T[]> generator) { if (array.length == curSize) { return array; } else { throw new IllegalStateException(); } } @Override public long count () { return curSize; } @Override public void forEach (Consumer<? super T> consumer) { for (int i = 0 ; i < curSize; i++) { consumer.accept(array[i]); } } @Override public String toString () { return String.format("ArrayNode[%d][%s]" , array.length - curSize, Arrays.toString(array)); } } }
很多集合容器的Spliterator
其实并不支持SIZED
特性,其实Node
的最终实现很多情况下都是Nodes.SpinedNodeBuilder
,因为SpinedNodeBuilder
重实现实现了数组扩容和Spliterator
基于数组进行分割的方法,源码相对复杂(特别是spliterator()
方法),这里挑部分进行分析,由于SpinedNodeBuilder
绝大部分方法都是使用父类SpinedBuffer
中的实现,这里可以直接分析SpinedBuffer
:
class SpinedBuffer <E > extends AbstractSpinedBuffer implements Consumer <E >, Iterable <E > { protected E[] curChunk; protected E[][] spine; SpinedBuffer(int initialCapacity) { super (initialCapacity); curChunk = (E[]) new Object[1 << initialChunkPower]; } @SuppressWarnings("unchecked") SpinedBuffer() { super (); curChunk = (E[]) new Object[1 << initialChunkPower]; } public void copyInto (E[] array, int offset) { long finalOffset = offset + count(); if (finalOffset > array.length || finalOffset < offset) { throw new IndexOutOfBoundsException("does not fit" ); } if (spineIndex == 0 ) System.arraycopy(curChunk, 0 , array, offset, elementIndex); else { for (int i=0 ; i < spineIndex; i++) { System.arraycopy(spine[i], 0 , array, offset, spine[i].length); offset += spine[i].length; } if (elementIndex > 0 ) System.arraycopy(curChunk, 0 , array, offset, elementIndex); } } public E[] asArray(IntFunction<E[]> arrayFactory) { long size = count(); if (size >= Nodes.MAX_ARRAY_SIZE) throw new IllegalArgumentException(Nodes.BAD_SIZE); E[] result = arrayFactory.apply((int ) size); copyInto(result, 0 ); return result; } @Override public void clear () { if (spine != null ) { curChunk = spine[0 ]; for (int i=0 ; i<curChunk.length; i++) curChunk[i] = null ; spine = null ; priorElementCount = null ; } else { for (int i=0 ; i<elementIndex; i++) curChunk[i] = null ; } elementIndex = 0 ; spineIndex = 0 ; } @Override public void forEach (Consumer<? super E> consumer) { for (int j = 0 ; j < spineIndex; j++) for (E t : spine[j]) consumer.accept(t); for (int i=0 ; i<elementIndex; i++) consumer.accept(curChunk[i]); } @Override public void accept (E e) { if (elementIndex == curChunk.length) { inflateSpine(); if (spineIndex+1 >= spine.length || spine[spineIndex+1 ] == null ) increaseCapacity(); elementIndex = 0 ; ++spineIndex; curChunk = spine[spineIndex]; } curChunk[elementIndex++] = e; } }
源码已经基本分析完毕,下面还是用一个例子转化为流程图:
流并发执行的源码实现 如果流实例调用了parallel()
,注释中提到会返回一个异步执行流的变体,实际上并没有构造变体,只是把sourceStage.parallel
标记为true
,异步求值的基本过程是:构建流管道结构的时候和同步求值的过程一致,构建完Sink
链之后,Spliterator
会使用特定算法基于trySplit()
进行自分割,自分割算法由具体的子类决定,例如ArrayList
采用的就是二分法,分割完成后每个Spliterator
持有所有元素中的一小部分,然后把每个Spliterator
作为sourceSpliterator
在fork-join
线程池中执行Sink
链,得到多个部分的结果在当前调用线程中聚合,得到最终结果。这里用到的技巧就是:线程封闭和fork-join
。因为不同Terminal Op
的并发求值过程大同小异,这里只分析forEach
并发执行的实现。首先展示一个使用fork-join
线程池的简单例子:
public class MapReduceApp { public static void main (String[] args) { Integer result = new MapReducer<>(new Integer[]{1 , 2 , 3 , 4 }, x -> x * 2 , Integer::sum).invoke(); System.out.println(result); } interface Mapper <S , T > { T apply (S source) ; } interface Reducer <S , T > { T apply (S first, S second) ; } public static class MapReducer <T > extends CountedCompleter <T > { final T[] array; final Mapper<T, T> mapper; final Reducer<T, T> reducer; final int lo, hi; MapReducer<T> sibling; T result; public MapReducer (T[] array, Mapper<T, T> mapper, Reducer<T, T> reducer) { this .array = array; this .mapper = mapper; this .reducer = reducer; this .lo = 0 ; this .hi = array.length; } public MapReducer (CountedCompleter<?> p, T[] array, Mapper<T, T> mapper, Reducer<T, T> reducer, int lo, int hi) { super (p); this .array = array; this .mapper = mapper; this .reducer = reducer; this .lo = lo; this .hi = hi; } @Override public void compute () { if (hi - lo >= 2 ) { int mid = (lo + hi) >> 1 ; MapReducer<T> left = new MapReducer<>(this , array, mapper, reducer, lo, mid); MapReducer<T> right = new MapReducer<>(this , array, mapper, reducer, mid, hi); left.sibling = right; right.sibling = left; setPendingCount(1 ); right.fork(); left.compute(); } else { if (hi > lo) { result = mapper.apply(array[lo]); } tryComplete(); } } @Override public T getRawResult () { return result; } @SuppressWarnings("unchecked") @Override public void onCompletion (CountedCompleter<?> caller) { if (caller != this ) { MapReducer<T> child = (MapReducer<T>) caller; MapReducer<T> sib = child.sibling; if (Objects.isNull(sib) || Objects.isNull(sib.result)) { result = child.result; } else { result = reducer.apply(child.result, sib.result); } } } } }
这里简单使用了fork-join
编写了一个简易的MapReduce
应用,main
方法中运行的是数组[1,2,3,4]
中的所有元素先映射为i -> i * 2
,再进行reduce
(求和)的过程,代码中也是简单使用二分法对原始的array
进行分割,当最终的任务只包含一个元素,也就是lo < hi
且hi - lo == 1
的时候,会基于单个元素调用Mapper
的方法进行完成通知tryComplete()
,任务完成会最终通知onCompletion()
方法,Reducer
就是在此方法中进行结果的聚合操作。对于流的并发求值来说,过程是类似的,ForEachOp
中最终调用ForEachOrderedTask
或者ForEachTask
,这里挑选ForEachTask
进行分析:
abstract static class ForEachOp <T > implements TerminalOp <T , Void >, TerminalSink <T , Void > { @Override public <S> Void evaluateParallel (PipelineHelper<T> helper, Spliterator<S> spliterator) { if (ordered) new ForEachOrderedTask<>(helper, spliterator, this ).invoke(); else new ForEachTask<>(helper, spliterator, helper.wrapSink(this )).invoke(); return null ; } } final class ForEachOps { private ForEachOps () { } static final class ForEachTask <S , T > extends CountedCompleter <Void > { private Spliterator<S> spliterator; private final Sink<S> sink; private final PipelineHelper<T> helper; private long targetSize; ForEachTask(PipelineHelper<T> helper, Spliterator<S> spliterator, Sink<S> sink) { super (null ); this .sink = sink; this .helper = helper; this .spliterator = spliterator; this .targetSize = 0L ; } ForEachTask(ForEachTask<S, T> parent, Spliterator<S> spliterator) { super (parent); this .spliterator = spliterator; this .sink = parent.sink; this .targetSize = parent.targetSize; this .helper = parent.helper; } public void compute () { Spliterator<S> rightSplit = spliterator, leftSplit; long sizeEstimate = rightSplit.estimateSize(), sizeThreshold; if ((sizeThreshold = targetSize) == 0L ) targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate); boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags()); boolean forkRight = false ; Sink<S> taskSink = sink; ForEachTask<S, T> task = this ; while (!isShortCircuit || !taskSink.cancellationRequested()) { if (sizeEstimate <= sizeThreshold || (leftSplit = rightSplit.trySplit()) == null ) { task.helper.copyInto(taskSink, rightSplit); break ; } ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit); task.addToPendingCount(1 ); ForEachTask<S, T> taskToFork; if (forkRight) { forkRight = false ; rightSplit = leftSplit; taskToFork = task; task = leftTask; } else { forkRight = true ; taskToFork = leftTask; } taskToFork.fork(); sizeEstimate = rightSplit.estimateSize(); } task.spliterator = null ; task.propagateCompletion(); } } }
上面的源码分析看起来可能比较难理解,这里举个简单的例子:
public static void main (String[] args) throws Exception { List<Integer> list = new ArrayList<>(); list.add(1 ); list.add(2 ); list.add(3 ); list.add(4 ); list.stream().parallel().forEach(System.out::println); }
这段代码中最终转换成ForEachTask
中评估后得到的targetSize = sizeThreshold == 1
,当前调用线程会参与计算,会执行3
次fork
,也就是一共有4
个处理流程实例(也就是原始的Spliterator
实例最终会分割出3
个全新的Spliterator
实例,加上自身一个4
个Spliterator
实例),每个处理流程实例只处理1
个元素,对应的流程图如下:
最终的计算结果是调用CountedCompleter.invoke()
方法获取的,此方法会阻塞直到所有子任务处理完成,当然forEach
终结操作不需要返回值,所以没有实现getRawResult()
方法,这里只是为了阻塞到所有任务执行完毕才解除调用线程的阻塞状态。
状态操作与短路操作 Stream
中按照中间操作 是否有状态可以把这些操作分为无状态操作 和有状态操作 。Stream
中按照终结操作 是否支持短路特性可以把这些操作分为非短路操作 和短路操作 。理解如下:
无状态操作:当前操作节点处理元素完成后,在满足前提条件下直接把结果传递到下一个操作节点,也就是操作内部不存在状态也不需要保存状态,例如filter
、map
等操作
有状态操作:处理元素的时候,依赖于节点的内部状态对元素进行累积,当处理一个新的元素的时候,其实可以感知到所有处理过的元素的历史状态,这个”状态”其实更像是缓冲区的概念,例如sort
、limit
等操作,以sort
操作为例,一般是把所有待处理的元素全部添加到一个容器如ArrayList
,再进行所有元素的排序,然后再重新模拟Spliterator
把元素推送到后一个节点
非短路(终结)操作:终结操作在处理元素时候不能基于短路条件提前中断处理并且返回,也就是必须处理所有的元素,如forEach
短路(终结)操作:终结操作在处理元素时候允许基于短路条件提前中断处理并且返回,但是最终实现中是有可能遍历完所有的元素中,只是在处理方法中基于前置的短路条件跳过了实际的处理过程,如anyMatch
(实际上anyMatch
会遍历完所有的元素,不过在命中了短路条件下,元素回调Sink.accept()
方法时候会基于stop
短路标记跳过具体的处理流程)
这里不展开源码进行分析,仅仅展示一个经常见到的Stream
操作汇总表如下:
这里还有两点要注意:
从源码上看部分中间操作也是支持短路的,例如slice
和while
相关操作
从源码上看find
相关终结操作中findFirst
、findAny
均支持和判断StreamOpFlag.SHORT_CIRCUIT
,而match
相关终结操作是通过内部的临时状态stop
和value
进行短路控制
小结 前前后后写了十多万字,其实也仅仅相对浅层次介绍了Stream
的基本实现,笔者认为很多没分析到的中间操作实现和终结操作实现,特别是并发执行的终结操作实现是十分复杂的,多线程环境下需要进行一些想象和多处DEBUG
定位执行位置和推演执行的过程。简单总结一下:
JDK
中Stream
的实现是精炼的高度工程化代码
Stream
的载体虽然是AbstractPipeline
,管道结构,但是只用其形,实际求值操作之前会转化为一个多层包裹的Sink
结构,也就是前文一直说的Sink
链,从编程模式来看,应用的是Reactor
编程模式
Stream
目前支持的固有求值执行结构一定是Head(Source Spliterator) -> Op -> Op ... -> Terminal Op
的形式,这算是一个局限性,没有办法做到像LINQ
那样可以灵活实现类似内存视图的功能
Stream
目前支持并发求值方案是针对Source Spliterator
进行分割,封装Terminal Op
和固定Sink
链构造的ForkJoinTask
进行并发计算,调用线程和fork-join
线程池中的工作线程都可以参与求值过程,笔者认为这部分是Stream
中除了那些标志集合位运算外最复杂的实现
Stream
实现的功能是一个突破,也有人说过此功能是一个”早产儿”,在此希望JDK
能够在矛盾螺旋中前进和发展
(s-a-202101005 e-a-20210822 c-14-d)