余晖落尽暮晚霞,黄昏迟暮远山寻
本站
当前位置:网站首页 > 编程知识 > 正文

Stream如何提高遍历集合效率?

xiyangw 2023-09-16 15:06 17 浏览 0 评论

什么是 Stream?

现在很多大数据量系统中都存在分表分库的情况。例如,电商系统中的订单表,常常使用用户 ID 的 Hash 值来实现分表分库,这样是为了减少单个表的数据量,优化用户查询订单的速度。

但在后台管理员审核订单时,他们需要将各个数据源的数据查询到应用层之后进行合并操作。


例如,当我们需要查询出过滤条件下的所有订单,并按照订单的某个条件进行排序,单个数据源查询出来的数据是可以按照某个条件进行排序的,但多个数据源查询出来已经排序好的数据,并不代表合并后是正确的排序,所以我们需要在应用层对合并数据集合重新进行排序。


在 Java8 之前,我们通常是通过 for 循环或者 Iterator 迭代来重新排序合并数据,又或者通过重新定义 Collections.sorts 的 Comparator 方法来实现,这两种方式对于大数据量系来说,效率并不是很理想。


Java8 中添加了一个新的接口类 Stream,他和我们之前接触的字节流概念不太一样, Java8 集合中的 Stream 相当于高级版的 Iterator,他可以通过 Lambda 表达式对集合进行各种非常便利、高效的聚合操作(Aggregate Operation),或者大批量数据操作 (Bulk Data Operation)。

Stream 的聚合操作与数据库 SQL 的聚合操作 sorted、filter、map 等类似。我们在应用层就可以高效地实现类似数据库 SQL 的聚合操作了,而在数据操作方面,Stream 不仅可以通过串行的方式实现数据操作,还可以通过并行的方式处理大批量数据,提高数据的处理效率。

Stream 如何优化遍历?

上面我们初步了解了 Java8 中的 Stream API,那 Stream 是如何做到优化迭代的呢?并行又是如何实现的?

1.Stream 操作分类

在了解 Stream 的实现原理之前,我们先来了解下 Stream 的操作分类,因为他的操作分类其实是实现高效迭代大数据集合的重要原因之一。为什么这样说,分析完你就清楚了。

官方将 Stream 中的操作分为两大类:中间操作(Intermediate operations)和终结操作 (Terminal operations)。中间操作只对操作进行了记录,即只会返回一个流,不会进行计算操作,而终结操作是实现了计算操作。

中间操作又可以分为无状态(Stateless)与有状态(Stateful)操作,前者是指元素的处理不受之前元素的影响,后者是指该操作只有拿到所有元素之后才能继续下去。

终结操作又可以分为短路(Short-circuiting)与非短路(Unshort-circuiting)操作,前者是指遇到某些符合条件的元素就可以得到最终结果,后者是指必须处理完所有元素才能得到最终结果。

我们通常还会将中间操作称为懒操作,也正是由这种懒操作结合终结操作、数据源构成的处理管道(Pipeline),实现了 Stream 的高效。

2.Stream 源码实现

在了解 Stream 如何工作之前,我们先来了解下 Stream 包是由哪些主要结构类组合而成的,各个类的职责是什么。

BaseStream 和 Stream 为最顶端的接口类。BaseStream 主要定义了流的基本接口方法, 例如,spliterator、isParallel 等;Stream 则定义了一些流的常用操作方法,例如, map、filter 等。

ReferencePipeline 是一个结构类,他通过定义内部类组装了各种操作流。他定义了 Head、StatelessOp、StatefulOp 三个内部类,实现了 BaseStream 与 Stream 的接口方法。

Sink 接口是定义每个 Stream 操作之间关系的协议,他包含 begin()、end()、 cancellationRequested()、accpt() 四个方法。ReferencePipeline 最终会将整个 Stream流操作组装成一个调用链,而这条调用链上的各个 Stream 操作的上下关系就是通过 Sink接口协议来定义实现的。

3.Stream 操作叠加

一个 Stream 的各个操作是由处理管道组装,并统一完成数据处理的。在 JDK中每次的中断操作会以使用阶段(Stage)命名。

管道结构通常是由 ReferencePipeline 类实现的,前面讲解 Stream 包结构时,我提到过 ReferencePipeline 包含了 Head、StatelessOp、StatefulOp 三种内部类。Head 类主要用来定义数据源操作,在我们初次调用 names.stream() 方法时,会初次加载Head 对象,此时为加载数据源操作;接着加载的是中间操作,分别为无状态中间操作StatelessOp 对象和有状态操作 StatefulOp 对象,此时的 Stage 并没有执行,而是通过 AbstractPipeline 生成了一个中间操作 Stage 链表;当我们调用终结操作时,会生成一个 最终的 Stage,通过这个 Stage 触发之前的中间操作,从最后一个 Stage 开始,递归产生 一个 Sink 链。

下面我们再通过一个例子来感受下 Stream 的操作分类是如何实现高效迭代大数据集合

的。

List<String> names = Arrays.asList(" 张三 ", " 李四 ", " 王老五 ", " 李三 ", " 刘老四 ", "

String maxLenStartWithZ = names.stream()

.filter(name -> name.startsWith(" 张 "))

.mapToInt(String::length)

.max()

.toString()

这个例子的需求是查找出一个长度最长,并且以张为姓氏的名字。从代码角度来看,你可能会认为是这样的操作流程:首先遍历一次集合,得到以“张”开头的所有名字;然后遍历一次 filter 得到的集合,将名字转换成数字长度;最后再从长度集合中找到最长的那个名字并且返回。

实际情况并非如此。逐步分析下这个方法里所有的操作是如何执行的。

首先 ,因为 names 是 ArrayList 集合,所以 names.stream() 方法将会调用集合类基础接口 Collection 的 Stream 方法:

default Stream<E> stream() {

return StreamSupport.stream(spliterator(), false);

}

然后,Stream 方法就会调用 StreamSupport 类的 Stream 方法,方法中初始化了一个

ReferencePipeline 的 Head 内部类对象:

public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {

Objects.requireNonNull(spliterator);

return new ReferencePipeline.Head<>(spliterator,

StreamOpFlag.fromCharacteristics(spliterato

parallel);

}

再调用 filter 和 map 方法,这两个方法都是无状态的中间操作,所以执行 filter 和 map操作时,并没有进行任何的操作,而是分别创建了一个 Stage 来标识用户的每一次操作。

而通常情况下 Stream 的操作又需要一个回调函数,所以一个完整的 Stage 是由数据来源、操作、回调函数组成的三元组来表示。

@Override

public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {

Objects.requireNonNull(predicate);

return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,

StreamOpFlag.NOT_SIZED) {

@Override

Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {

return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {

@Override

public void begin(long size) {

downstream.begin(-1);

}

@Override

public void accept(P_OUT u) {

if (predicate.test(u))

downstream.accept(u);

}

};

}

};

}

@Override

@SuppressWarnings("unchecked")

public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {

Objects.requireNonNull(mapper);

return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,

StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT

@Override

Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {

return new Sink.ChainedReference<P_OUT, R>(sink) {

@Override

public void accept(P_OUT u) {

downstream.accept(mapper.apply(u));

}

};

}

};

}

new StatelessOp 将会调用父类 AbstractPipeline 的构造函数,这个构造函数将前后的Stage 联系起来,生成一个 Stage 链表:

AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {

if (previousStage.linkedOrConsumed)

throw new IllegalStateException(MSG_STREAM_LINKED);

previousStage.linkedOrConsumed = true;

previousStage.nextStage = this;// 将当前的 stage 的 next 指针指向之前的 stage

this.previousStage = previousStage;// 赋值当前 stage 当全局变量 previousStage

this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;

this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combined

this.sourceStage = previousStage.sourceStage;

if (opIsStateful())

sourceStage.sourceAnyStateful = true;

this.depth = previousStage.depth + 1;

}

因为在创建每一个 Stage 时,都会包含一个 opWrapSink() 方法,该方法会把一个操作的具体实现封装在 Sink 类中,Sink 采用(处理 -> 转发)的模式来叠加操作。

当执行 max 方法时,会调用 ReferencePipeline 的 max 方法,此时由于 max 方法是终结操作,所以会创建一个 TerminalOp 操作,同时创建一个 ReducingSink,并且将操作封装在 Sink 类中。

@Override

public final Optional<P_OUT> max(Comparator<? super P_OUT> comparator) {

return reduce(BinaryOperator.maxBy(comparator));

}

最后,调用 AbstractPipeline 的 wrapSink 方法,该方法会调用 opWrapSink 生成一个Sink 链表,Sink 链表中的每一个 Sink 都封装了一个操作的具体实现。

@Override

@SuppressWarnings("unchecked")

final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {

Objects.requireNonNull(sink);

for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p

sink = p.opWrapSink(p.previousStage.combinedFlags, sink);

}

return (Sink<P_IN>) sink;

}

当 Sink 链表生成完成后,Stream 开始执行,通过 spliterator 迭代集合,执行 Sink 链表中的具体操作。

@Override

final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {

Objects.requireNonNull(wrappedSink);

if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {

wrappedSink.begin(spliterator.getExactSizeIfKnown());

spliterator.forEachRemaining(wrappedSink);

wrappedSink.end();

}

else {

copyIntoWithCancel(wrappedSink, spliterator);

}

}

Java8 中的 Spliterator 的 forEachRemaining 会迭代集合,每迭代一次,都会执行一次ilter 操作,如果 filter 操作通过,就会触发 map 操作,然后将结果放入到临时数组object 中,再进行下一次的迭代。完成中间操作后,就会触发终结操作 max。

4.Stream 并行处理

Stream 处理数据的方式有两种,串行处理和并行处理。要实现并行处理,我们只需要代码中新增一个 Parallel() 方法,

Stream 的并行处理在执行终结操作之前,跟串行处理的实现是一样的。而在调用终结方法之后,实现的方式就有点不太一样,会调用 TerminalOp 的 evaluateParallel 方法进行并行处理。

final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {

assert getOutputShape() == terminalOp.inputShape();

if (linkedOrConsumed)

throw new IllegalStateException(MSG_STREAM_LINKED);

linkedOrConsumed = true;

return isParallel()

? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFla

: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOp

}

这里的并行处理指的是,Stream 结合了 ForkJoin 框架,对 Stream 处理进行了分片, Splititerator 中的 estimateSize 方法会估算出分片的数据量。

ForkJoin 框架和估算算法,在这里我就不具体讲解了,如果感兴趣,你可以深入源码分析 下该算法的实现。

通过预估的数据量获取最小处理单元的阀值,如果当前分片大小大于最小处理单元的阀值, 就继续切分集合。每个分片将会生成一个 Sink 链表,当所有的分片操作完成后,ForkJoin框架将会合并分片任何结果集。

合理使用 Stream

Stream API 用起来简洁,还能并行处理,那是不是使用 Stream API,系统性能就更好呢?

我们将对常规的迭代、Stream 串行迭代以及 Stream 并行迭代进行性能测试对比,迭代循 环中,我们将对数据进行过滤、分组等操作。分别进行以下几组测试:

多核 CPU 服务器配置环境下,对比长度 100 的 int 数组的性能;

多核 CPU 服务器配置环境下,对比长度 1.00E+8 的 int 数组的性能;

多核 CPU 服务器配置环境下,对比长度 1.00E+8 对象数组过滤分组的性能;

单核 CPU 服务器配置环境下,对比长度 1.00E+8 对象数组过滤分组的性能。

由于篇幅有限,我这里直接给出统计结果,通过以上测试,我统计出的测试结果如下(迭代使用时间):

常规的迭代 <Stream 并行迭代 <Stream 串行迭代

Stream 并行迭代 < 常规的迭代 <Stream 串行迭代

Stream 并行迭代 < 常规的迭代 <Stream 串行迭代

常规的迭代 <Stream 串行迭代 <Stream 并行迭代

通过以上测试结果,我们可以看到:在循环迭代次数较少的情况下,常规的迭代方式性能反而更好;在单核 CPU 服务器配置环境中,也是常规迭代方式更有优势;而在大数据循环迭代中,如果服务器是多核 CPU 的情况下,Stream 的并行迭代优势明显。所以我们在平时 处理大数据的集合时,应该尽量考虑将应用部署在多核 CPU 环境下,并且使用 Stream 的并行迭代方式进行处理。

用事实说话,我们看到其实使用 Stream 未必可以使系统性能更佳,还是要结合应用场景进 行选择,也就是合理地使用 Stream。

相关推荐

数控系统常见术语详解,机加工人士必备资料
数控系统常见术语详解,机加工人士必备资料

增量编码器(Incrementpulsecoder)回转式位置测量元件,装于电动机轴或滚珠丝杠上,回转时发出等间隔脉冲表示位移量。由于没有记忆元件,故不能准...

2023-09-24 17:42 xiyangw

功、功率、扭矩的关系

功=功率×时间work=power×timeW=P×T功=力×距离work=force×lengthW=F×LP×T=F×LP=F×L/T=F×V(velocity)具体到电机输出轴上,圆...

Wi-Fi协议(802.11 )常见专业术语汇总
Wi-Fi协议(802.11 )常见专业术语汇总

Wi-Fi协议(802.11)常见专业术语汇总AP(Accesspoint的简称,即访问点,接入点):是一个无线网络中的特殊节点,通过这个节点,无线网络中的...

2023-09-24 17:41 xiyangw

不需要策略模式也能避免满屏if/else
不需要策略模式也能避免满屏if/else

满屏if/elsejava复制代码publicstaticvoidmain(String[]args){inta=1;if...

2023-09-24 17:41 xiyangw

喜极而泣,我终于干掉了该死的 if-else
喜极而泣,我终于干掉了该死的 if-else

推荐阅读:面试淘宝被Tomcat面到“自闭”,学习这份文档之后“吊打”面试官刷完spring+redis+负载均衡+netty+kafka面试题,再去面试BAT...

2023-09-24 17:40 xiyangw

Python中使用三元运算符简化if-else语句
Python中使用三元运算符简化if-else语句

Python是一种极简主义的编程语言,相比其他编程语言,在多个地方简化了代码的写法,可以让我们用更少的时间更简洁地完成工作。以赋值运算符为例:a=a+b简化...

2023-09-24 17:40 xiyangw

雅思课堂 | 雅思口语写作句型第二讲
雅思课堂 | 雅思口语写作句型第二讲

纯干货,无废话用最少的时间学最制胜的内容!泡图书馆泡不过学霸?碎片时间也能弯道超车!向着雅思8分行动起来吧!雅思口语写作句型1.Ipreferseeing...

2023-09-24 17:39 xiyangw

设计模式(三)——简单的状态模式代替if-else
设计模式(三)——简单的状态模式代替if-else

博主将会针对Java面试题写一组文章,包括J2ee,SQL,主流Web框架,中间件等面试过程中面试官经常问的问题,欢迎大家关注。一起学习,一起成长。前言大多数开...

2023-09-24 17:38 xiyangw

如何优化代码中大量的if/else,switch/case?

前言随着项目的迭代,代码中存在的分支判断可能会越来越多,当里面涉及到的逻辑比较复杂或者分支数量实在是多的难以维护的时候,我们就要考虑下,有办法能让这些代码变得更优雅吗?正文使用枚举这里我们简单的定义一...

优秀程序员早就学会用“状态模式”代替if-else了
优秀程序员早就学会用“状态模式”代替if-else了

2020年已经进入倒计时了,大家立好的flag完成了吗?2020实“鼠”不易,希望2021可以“牛”转乾坤。简介状态模式是行为型设计模式的一种。其设计理念是当对...

2023-09-24 17:37 xiyangw

用Select Case语句对执行多条件进行控制
用Select Case语句对执行多条件进行控制

今日的内容是"VBA之EXCEL应用"的第六章"条件判断语句(If...Then...Else)在VBA中的利用"。这讲是第三节...

2023-09-24 17:37 xiyangw

c#入门教程(四)条件判断if else

条件判断,是编程里常用的判断语句,比如某个代码如果满足条件就执行a代码块否则就执行b代码块。案例1:inti=2*5;if(a>0){执行a代码块}elseif(a<0){执行b代码块...

每日学编程之JAVA(十一)—条件语句(if……else)

一个if语句包含一个布尔表达式和一条或多条语句。如果布尔表达式的值为true,则执行if语句中的代码块,否则执行if语句块后面的代码。if语句后面可以跟else语句,当if语句...

不需要策略模式也能避免满屏if/else

除了使用策略模式以外,还可以使用其他设计模式来避免满屏if/else的问题。以下是一些可能的解决方案:工厂模式:将if/else语句移到工厂类中,由工厂类负责创建对象。这样可以将if/else语句从客...

围绕ifelse与业务逻辑的那些梗
围绕ifelse与业务逻辑的那些梗

ifelse很重要,几乎是程序员编程核心,业务逻辑与规则也通过ifelse体现出来,语句简单但是背后文章很大,先看几则幽默图:1.也许默认使用returnf...

2023-09-24 17:36 xiyangw

取消回复欢迎 发表评论: