Java------Stream流式编程源码分析(二)
Java------Stream流式编程源码分析(二)
Stream执行流程
Stream是一个接口,表示流的意思,类似一个管道,数据好比流水从管道经过。用于传递数据,并不存储数据。
Stream并不会修改源数据。
Stream整体流水线包括很多操作,按阶段划分分为三部分。
1.生成流(Stream.of)
2.中间操作(过滤、转大写、排序等等,每个操作处理完数据完成后,将结果交给下一个操作)
3.终结操作(结束整个流水线,一个流只有一个终结操作,只要执行终结操作,就触发整个流的执行。,不执行终结操作,中间操作也不会执行。中间操作具有惰性特点。)
@Test
public void test1(){
String[] strs = new String[]{"hello","world","hi","com","student"};
//使用Stream流完成上边对字符串的处理需求
Stream.of(strs)//创建一个Stream流对象
.filter(x->{
System.out.println("1.过滤"+x);
return x.length()>4;
})//过滤字符串,找到长度大于4的字符串
.map(x->{
System.out.println("2.转大写"+x);
return x.toUpperCase();
})//转换大写
.forEach(x->{
System.out.println("3.遍历输出"+x);
});//遍历输出
}
输出结果:
1.过滤hello
2.转大写hello
3.遍历输出HELLO
1.过滤world
2.转大写world
3.遍历输出WORLD
1.过滤hi
1.过滤com
1.过滤student
2.转大写student
3.遍历输出STUDENT
由结果可以看到,遍历输出是按先字符串:hello走一遍之后再走world、hi、com、student。并不是先过滤hello、world、hi、com、student,再转大写,再输出。
如果把forEach注释掉,这些输出语句也不会打印出来。
Stream原理-生成流
@SafeVarargs
@SuppressWarnings("varargs") // Creating a stream from an array is safe
public static<T> Stream<T> of(T... values) {
return Arrays.stream(values);
}
//再下一步
public static <T> Stream<T> stream(T[] array) {
return stream(array, 0, array.length);
}
//再下一步
public static <T> Stream<T> stream(T[] array, int startInclusive, int endExclusive) {
return StreamSupport.stream(spliterator(array, startInclusive, endExclusive), false);
}
//再下一步,重点
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
Objects.requireNonNull(spliterator);
return new ReferencePipeline.Head<>(spliterator,
StreamOpFlag.fromCharacteristics(spliterator),
parallel);
}
最后返回:ReferencePipeline,Head是它的内部类。
Head(Spliterator<?> source,
int sourceFlags, boolean parallel) {
super(source, sourceFlags, parallel);
}
//再往源头找
static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {
发现Head继承了ReferencePipeline。
abstract class ReferencePipeline<P_IN, P_OUT>
extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>
implements Stream<P_OUT> {
而ReferencePipeline实现了Stream接口,Head是它的子类。
spliterator里包含数据源数组,
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
Objects.requireNonNull(spliterator);
return new ReferencePipeline.Head<>(spliterator,
StreamOpFlag.fromCharacteristics(spliterator),
parallel);
}
而spliterator最终执行的是Spliterators.java中的forEachRemaining方法
@SuppressWarnings("unchecked")
@Override
public void forEachRemaining(Consumer<? super T> action) {
Object[] a; int i, hi; // hoist accesses and checks from loop
if (action == null)
throw new NullPointerException();
if ((a = array).length >= (hi = fence) &&
(i = index) >= 0 && i < (index = hi)) {
do { action.accept((T)a[i]); } while (++i < hi);
}
}
其核心代码: do { action.accept((T)a[i]); } while (++i < hi);
每遍历一个数据对数据执行一系列的操作:由action.accept()生成
Stream原理-中间操作
中间操作有很多,相当于组成一个双向链表,前一个操作将结果转给下一个操作。
action本质还是ReferencePipeline,而this标明是Head,previousStage指向上一个操作,nextStage指向下一个操作。
跟踪filter。
通过Sink接口传递数据。
Sink是一个接口,定义了每个阶段的操作规范:
begin:在开始遍历数据源之前执行方法,做一些准备工作。
end:当所有源数据遍历完成,执行此方法。
accept:由很多重载方法,上一个阶段通过下一阶段,执行此方法。
cancellationRequested:是否可以结束流水线,起到短路的功能。如果提前结束整个流水线叫短路现象。
.anyMatch
这样就只执行到world就停止,不会再往下执行了
@Test
public void test1(){
String[] strs = new String[]{"hello","world","hi","com","student"};
//使用Stream流完成上边对字符串的处理需求
Stream.of(strs)//创建一个Stream流对象
.filter(x->{
System.out.println("1.过滤"+x);
return x.length()>4;
})//过滤字符串,找到长度大于4的字符串
.map(x->{
System.out.println("2.转大写"+x);
return x.toUpperCase();
})//转换大写
.anyMatch(x->x.equals("WORLD"));//短路方法
}
有状态和无状态
中间操作分为有状态的和无状态的。
1.无状态的中间操作不依赖前面的元素是否处理完成。
2.有状态:
排序操作就是有状态,等前面元素全部处理完,才可以操作。
增加sorted操作。
@Test
public void test1(){
String[] strs = new String[]{"hello","world","hi","com","student"};
//使用Stream流完成上边对字符串的处理需求
Stream.of(strs)//创建一个Stream流对象
.filter(x->{
System.out.println("1.过滤"+x);
return x.length()>4;
})//过滤字符串,找到长度大于4的字符串
.map(x->{
System.out.println("2.转大写"+x);
return x.toUpperCase();
})//转换大写
.sorted()
.forEach(x->{
System.out.println("3.遍历输出"+x);
});//遍历输出
}
此时输出结果,遍历输出结果会在最后输出,并且是已经排过序的。
1.过滤hello
2.转大写hello
1.过滤world
2.转大写world
1.过滤hi
1.过滤com
1.过滤student
2.转大写student
3.遍历输出HELLO
3.遍历输出STUDENT
3.遍历输出WORLD
Stream原理-终结操作
终结操作要触发整个流水线的执行。
TerminalOp是所有终结操作的接口。
流式编程也可以使用多线程并行处理,isParallel()就是判断是否是并行处理。
是则执行:terminalOp.evaluateParallel
不是则执行:terminalOp.evaluateSequential
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel() \\
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
最后到:wrappedSink.begin(spliterator.getExactSizeIfKnown());,
又回到Sink的包装类,begin:在开始遍历数据源之前执行方法,做一些准备工作。
之后spliterator.forEachRemaining(wrappedSink);遍历源数据。
最后 wrappedSink.end();,结束方法
@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
Objects.requireNonNull(wrappedSink);
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
wrappedSink.begin(spliterator.getExactSizeIfKnown());
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();
}
else {
copyIntoWithCancel(wrappedSink, spliterator);
}
}