2022年8月3日 - Chip Huyen
由于机器学习越来越实时,流处理技术对数据科学家来说变得越来越重要。像许多来自机器学习背景的人一样,我曾经很害怕流。在我们最近的调查中,几乎有一半的被问及的数据科学家表示,他们希望从批量预测转向在线预测,但却不能,因为流处理在技术上和操作上都很难。流处理社区认为理所当然的术语,如 "时间变化的结果"、"时间旅行"、"物化视图",当然没有帮助。
在过去的一年里,我在与我的合作伙伴的合作中了解到,我了解到流处理可以是相当直观的。这篇文章是试图重新表述我所学到的东西。我将尽力拿出具体的例子来说明流处理是如何帮助机器学习工作流程的。
如果幸运的话,作为一个数据科学家,你不应该自己建立或维护一个流处理系统。你的公司应该有基础设施来帮助你解决这个问题。然而,了解流处理在哪些方面是有用的,为什么流处理很难,可以帮助你评估正确的工具,并为你的需求分配足够的资源。
快速回顾:历史数据与流数据
一旦你的数据被存储在文件、数据湖或数据仓库中,它就成为历史数据。
流式数据指的是仍在系统中流动的数据,例如从一个微服务转移到另一个微服务。
批量处理与流处理
历史数据通常在批处理作业中处理,即定期启动的作业。例如,每天一次,你可能想启动一个批处理作业,为所有用户生成建议。当数据在批处理作业中被处理时,我们将其称为批处理。批量处理是几十年来的一个研究课题,公司已经提出了像MapReduce和Spark这样的分布式系统来有效地处理批量数据。
流处理指的是在流数据上进行计算。流处理是比较新的。我们将在这篇文章中讨论它。
1.流处理在机器学习中的作用是什么?
在机器靴子生产管道中,有三个地方的流处理可能非常重要:在线预测,实时观察,和持续学习。
在线预测(相对于批量预测)。
批量预测是指在预测请求出现之前,定期离线生成预测。例如,每隔4小时,Netflix可能为其所有用户生成电影推荐。这些推荐被存储起来,并在用户访问网站时显示给他们。
在线预测是指在预测请求出现后,按需生成预测。例如,一个电子商务网站可能在用户访问该网站时为其生成产品建议。因为建议是在线生成的,所以延迟是至关重要的。
想象一下,你在一个电子商务网站工作,想预测给用户的最佳折扣,以鼓励他们购买某个产品。你可能使用的特征之一是该用户在过去30分钟内所看的所有商品的平均价格。这是一个在线功能--它需要根据在线数据进行计算(而不是根据历史数据预先计算)。
一个 "简单 "的方法是启动AWS Lambda和Google Cloud Function等服务,在请求到达时计算这个功能。这是许多公司的做法,在许多简单的情况下是可行的。然而,如果你想使用复杂的在线功能,它并不能扩展。
这种 "简单 "的方式是无状态的,这意味着每次计算都是独立于前一次的。每次你计算这个特征时,你必须处理过去30分钟的所有数据,例如
- 为了计算上午10:33的这个特征,你必须处理从10:03到10:33的数据。
- 为了在10:35再次计算这个特征,你必须处理从10:05到10:35的数据。
从10:05到10:33的数据被处理了两次!
流处理也可以是无状态的,但无状态的流处理有点无聊,所以我们在这里只谈有状态的流处理。有状态的流处理可以避免冗余,因此更快、更便宜。例如,当在10:35再次计算这个特征时,你只需要处理从10:33到10:35的数据,并将新的结果与之前的结果连接起来。
实时监测(相对于批量监测)。
批量监测和实时监测之间有天壤之别,许多监测解决方案(尤其是批处理解决方案)将这两者混为一谈。
批量监测是指定期(如每天一次)运行一个脚本来计算你关心的指标,通常是在S3、BigQuery、Snowflake等仓库中的数据。批量监测很慢。你首先要等待数据到达仓库,然后等待脚本的运行。如果你的新模型在部署时失败了,你至少要等到下一批指标的计算才会发现,假设计算的指标足以发现这个问题。而且,在你发现故障之前,你无法修复它。
实时监控意味着在数据到达时对其进行计算,使你能够(接近)实时地了解你的系统。就像在线特征计算一样,实时指标计算通常可以通过流计算更有效地完成。
如果批处理监控对你有用,那很好。然而,在大多数用例中,批量监控往往最终是不够的。在软件工程中,我们知道,迭代速度就是一切。DevOps社区痴迷于加快迭代速度,二十多年来,DevOps社区一直痴迷于软件。DevOps中的两个重要指标是。
- MTTD: 平均检测时间
- MTTR: 平均反应时间
如果某样东西要失败,你希望它快速失败,而且你想快速知道它,特别是在你的客户知道之前。如果你的用户不得不在半夜给你打电话抱怨故障,或者更糟糕的是在推特上抱怨,你很可能会失去不止一个客户。
然而,大多数做MLOps的人仍然可以接受长达一天、甚至一周的反馈循环。
许多团队在没有任何ML监控系统的情况下,想从批量监控开始。批量监测仍然比没有监测好,而且可以说比实时监测更容易设置。
然而,批处理监控并不是实时监控的先决条件--今天建立批处理监控不太可能使以后建立实时监控变得更容易。先进行批处理监控意味着以后当你需要获得实时监控时,又要拆掉一个遗留系统。
持续学习(相对于人工再训练)。
Continual learning是指在需要时随时更新你的模型并快速部署这种更新的能力。快速的模型更新使我们的模型能够适应不断变化的环境和业务需求。
我曾与100多家公司的ML工程师/数据科学家交谈。几乎所有人都告诉我,持续学习太 "遥远 "了。大多数公司还在为依赖性管理和监控而挣扎,无法考虑持续学习的问题。
同时,从来没有人告诉我,他们不想要持续学习。持续学习太 "遥远 "了,因为我们还没有合适的工具来实现它。当正确的工具出现,使持续学习 "恰到好处 "时,我相信过渡会很迅速。
我们不需要流计算来不断地更新我们的模型。假设你已经有了获取新鲜数据的方法,持续学习的困难部分不在于更新模型,而在于确保更新后的模型能够发挥作用。我们需要找到一种方法来快速评估每个模型的迭代,这使我们回到了实时监控和流处理。
2.从表到日志
我希望我已经说服了你,为什么我们在ML中需要流处理,以及在哪里需要流处理。如果你不相信,我很想听听你的意见。现在,让我们来看看流处理背后的一些核心概念,我发现这些概念真的很酷。
第一篇帮助我理解流处理的文章是Jay Kreps的经典文章,他是Kafka的创建者。题外话:Kreps在一篇文章中提到 tweet中提到,他写这篇文章是为了了解人们对流处理是否有足够的兴趣,以便他的团队围绕流处理创办一家公司。这个帖子一定很受欢迎,因为他的团队从LinkedIn分离出来,成为Confluent。
这篇文章的核心思想(对我来说)是表和日志的双重性。你可能已经熟悉了数据表,比如MySQL表或pandas DataFrame。为了简单起见,考虑一下下面这个库存表。
在2022-06-19之前,3号物品的价格为30美元
比方说,在6月19日,你想对这个表做一个改动--例如,将3号商品(上衣)的价格从30美元改为35美元。有许多方法可以做到这一点,但2种常见的方法是。
- 去存储表的地方,直接进行修改(例如覆盖一个文件)。这意味着我们将没有办法恢复这个变化,但许多公司还是这样做了。
- 发送一个带有新价格的更新 {"timestamp":"2022-06-19 09:23:23", "id":3, "价格($)":35},这个更新将被应用于该表。随着时间的推移,我们会有一系列有序的更新,这被称为日志。每个更新都是一个事件的例子。日志是只能追加的。你只能将新的事件追加到你现有的日志中。你不能覆盖以前的事件。
...
{"timestamp":"2022-06-08 11:05:04", "id":3, "项目":"上衣", "价格($)":30}
{"timestamp":"2022-06-17 03:12:16", "id":4, "项目":"牛仔裤", "价格($)":40}
{"timestamp":"2022-06-19 09:23:23", "id":3, "项目":"上衣", "价格($)":35}
我使用时钟的时间戳是为了可读性。在实践中,时钟是不可靠的。分布式系统通常利用 logical clock来代替。
3号物品的价格被更新为35美元后的数据表
一张表捕获了某一时间点上的数据状态。仅仅是一张表,我们不知道一天前或一周前的数据状态是什么。如果有一个对这个表的所有变化的日志,我们可以在任何时间点重新创建这个表。
如果你用git工作,你已经对日志很熟悉了。git日志记录了对你的代码的所有修改。鉴于这个日志,你可以在任何时间点重新创建你的代码。
无状态与有状态的日志
可以只发送一个有价格差异的事件{"id":3, "行动":"增加", "价格($)":5}.对于这种类型的事件,我们需要查看以前的价格来确定更新的价格。
这被称为有状态的事件(我们需要知道以前的状态来构建当前的状态)。在#2中提到的事件类型是一个无状态的日志。有状态的事件是需要按顺序处理的--在上衣价格上增加5美元,然后再增加一倍,这与增加一倍后再增加5美元有很大不同。
有一整个行业专注于解决有状态的事件,称为 Change Data Capture (CDC)。见 Debezium, Fivetran, Striim.
实时传输和计算引擎
流系统有两个组成部分:实时传输和计算引擎。
- 实时传输,基本上是分布式日志。实时传输中的数据被称为流数据,或者Confluent公司所创造的 "运动中的数据"。例子。Kafka、AWS Kinesis、GCP Dataflow。
- 计算引擎对被传输的数据进行计算(例如,连接、聚合、过滤等)。例子。Flink, KSQL, Beam, Materialize, Decodable, Spark Streaming。
传输通常有简单计算的能力。例如,Kafka是一个传输,它也有一个叫做Kafka Streams的API,提供基本的流处理能力。然而,如果你想进行复杂的计算,你可能需要一个优化的流计算引擎(类似于酒吧通常有一些食物选择,但如果你想吃真正的食物,你会想去餐厅)。
我们已经详细地讨论了日志。从下一节开始,我们将重点讨论流计算。
3.流处理的一个简单例子
想象一下,我们想用用户看过的所有物品的平均价格作为一个特征。如果我们把所有这些物品放到一个表中,那么特征的定义可能是这样的。这就是批处理。
def average_price(table):
return sum(table.price) / len(table)
如果相反,我们有一个所有项目更新的日志,我们应该如何计算平均价格?
内部状态和检查点
一种方法是对日志中的事件进行处理(重放日志),从时间的开始重新创建所有物品的库存表。然而,如果我们只需要平均价格,实际上我们不需要跟踪整个表格--我们只需要所有物品的总价格和物品数量。平均价格可以通过这两个值计算出来。
总价和计数构成了流处理工作的**内部状态。我们说这个计算是有状态的,因为我们在计算之间保持对内部状态的跟踪。
# Pseudocode
def average_price(log):
initialize total_price, item_count
for each event in the log:
update total_price, item_count
avg_price = total_price / item_count
return avg_price
如果日志很大(在2021年。 Netflix was processing 20 trillion events a day!),从头开始重放日志的速度可能会非常慢。为了缓解这个问题,我们可以偶尔保存作业的内部状态,例如每小时一次。保存的内部状态被称为**检查点**(或保存点),这个作业可以从任何检查点恢复。上面的函数将被更新如下。
# Pseudocode
def average_price(log):
find the latest checkpoint X
load total_price, item_count from X
for each event in the log after X:
update total_price, item_count
avg_price = total_price / item_count
return avg_price
物化视图
计算出的平均价格值,如果保存下来,将成为特征平均价格的一个物化视图。每次我们从物化视图中查询时,我们不会重新计算该值,而是读取保存的值。
你可能会问。"为什么我们必须想出一个新的术语,如物化视图,而不是说保存的副本或缓存?"
因为 "保存的副本 "或 "缓存 "意味着计算值将永远不会改变。物化的视图会因为晚到而发生变化。如果一个发生在上午10点15分的事件被延迟,并在上午11点10分到达,所有可能被这个更新影响的物化视图都应该被重新计算以考虑到这个延迟的事件。我说 "应该 "是因为不是所有的实现都会这样做。
从物化视图中读取是很好的延迟,因为我们不需要重新计算该功能。然而,物化视图最终会变得过时。物化视图将需要定期(每隔几分钟)或基于变化流进行刷新(重新计算)。
当刷新物化视图时,你可以从头开始重新计算(例如,使用所有项目来计算其平均价格)。或者你可以只使用新的信息来更新它(例如,使用最新的物化平均价格+更新项目的价格)。后者被称为增量物化。对于那些感兴趣的人,Materialize有一篇很好的文章,其中有关于 materialized and incremental materialized.
4.时间旅行和回填
我们已经谈到了如何在最新的数据上计算一个特征。接下来,我们将讨论如何计算一个过去的特征。
想象一下,在我们的库存中,商品上衣的最新价格是35美元。我们想按照6月10日的情况计算avg_price特征。这个特征在过去可能有两种不同的方式。
- 数据是不同的。例如,这件衬衫的价格是30美元,而不是35美元。
- 特征逻辑是不同的。比如说。今天,avg_price功能计算所有项目的平均原始价格。6月10日,avg_price功能计算平均上市价格,而上市价格是原始价格加上折扣。
如果我们不小心使用了今天的数据或特征逻辑,而不是6月10日的数据和逻辑,这意味着有一个来自未来的泄漏。时间点正确性指的是系统准确地执行计算的能力,因为它在过去任何时候都会发生。时间点正确性意味着没有数据泄漏。
使用不同的逻辑追溯处理历史数据也被称为回填。回填是数据工作流中一个非常常见的操作,例如,请看回填在 Airflow和 Amplitude. 回填可以在批处理和流式处理中进行。
- 通过批量回填,你可以将新的逻辑(如新的特征定义)应用于过去的表。
- 通过流回填,你可以把新的逻辑应用到过去某个时间段的日志上,比如说,把它应用到2022年6月10日的日志上。
时间旅行是很难的,因为我们需要跟踪你的数据在一段时间内的状态。如果你的数据是从多个地方加入的,那会变得非常复杂。例如,平均上市价格取决于原始价格和折扣,它们都可以独立地随时间变化。
ML工作流程中的时间旅行
在ML工作流程中,有许多场景需要进行时间旅行,但这里有两个主要场景。
- 以确保在线特征的训练-预测一致性(也称为在线-离线倾斜或训练-服务倾斜)。
- 在历史数据上比较两个模型
在线特征的训练-预测一致性
在线预测会导致训练-预测的不一致。想象一下,我们的模型使用在线特征进行在线预测,其中一个是这个用户在过去30分钟内所看的所有物品的平均价格。每当有预测请求到达时,我们就使用这些物品在那一刻的价格来计算这个特征,可能使用流处理。
在训练过程中,我们想使用这个相同的功能,但要根据历史数据进行计算,比如上周的数据。然而,在过去的一周里,许多物品的价格都发生了变化,有些还变化了多次。我们需要时间旅行,以确保在每个给定的时间点上能够正确计算出历史平均价格。
生产优先的机器学习
传统上,机器学习工作流程是开发先行的。数据科学家对训练管道的新特征进行试验。特征定义,又称特征逻辑,通常是为批量处理而写的,完全没有意识到数据的时间依赖性。训练数据通常甚至没有时间戳。
这种方法可能会帮助数据科学家了解一个模型或特征在训练期间的表现,但不了解这个模型/特征在生产中的表现,包括性能方面(如准确性、F1)和操作方面(如延迟)。
在部署时,ML或运营工程师需要将这些批处理特征转化为预测管道的流特征,并针对延迟进行优化。在这个翻译过程中的错误--例如,一个管道中的变化或假设没有更新或在另一个管道中没有充分测试--会造成训练--预测的不一致。
当训练和预测管道使用不同的语言时,这种不一致性会变得更糟糕,例如,训练管道使用Python,而预测管道使用SQL或Java。
为了避免这种不一致,我们应该完全摆脱翻译过程。如果我们让数据科学家直接对预测管道的特征进行实验,会怎么样?你可以编写新的特征,因为它们将在生产中使用。为了测试这些新特征,你将它们应用于历史数据,以产生训练数据来训练模型。将新的特征逻辑应用于历史数据,正是回填的内容。有了这样的设置,ML工作流程就变成了生产优先
由于它能够同时支持流处理和批处理,SQL已经成为生产优先的ML工作流程的流行语言。在关系型数据库时代,SQL计算引擎完全是批处理模式。近年来,人们对SQL进行了大量的投资。 streaming SQL engines诸如KSQL和Apache Flink。(见 Streaming SQL to Unify Batch & Stream Processing w/ Apache Flink @Uber)
开发优先的ML工作流程:特征是为训练而编写的,并适应于在线预测。
生产优先的ML工作流程:为在线预测编写特征,并为训练回填。
用于模型比较的回测
为训练生成特征的回填是FeatureForm和Tecton等特征库确保训练-预测一致性的方式。然而,大多数特征库只支持有限的流计算引擎,当你想处理复杂的在线特征时,这可能会带来性能(延迟+成本)的挑战。
在SQL中定义特征的例子[FeatureForm]
SQL中特征定义的例子[Tecton]
想象一下,你的团队正在使用模型A为实时预测请求提供服务,而你刚刚想出了一个新的模型B。在对实时流量进行A/B测试模型B之前,你想在最近的数据上评估模型B,例如在过去一天的预测请求上。
要做到这一点,你要重新播放上一天的请求,并使用模型B对这些请求进行预测,看看如果我们使用B而不是A,性能会是怎样。
如果模型B使用的特征逻辑与模型A完全相同,那么模型B可以重复使用之前为模型A计算的特征。然而,如果模型B使用不同的特征集,你就必须回填,将模型B的特征逻辑应用于最后一天的数据,为模型B产生特征。
这也被称为回溯测试。
数据科学家的时间旅行挑战
一些数据科学团队告诉我,他们需要为他们的数据做时间旅行,但他们的数据没有时间戳。他们应该怎么做?
没有时间戳,就不可能进行时间旅行。如果不能控制数据的摄入和存储方式,这不是数据科学家可以解决的问题。
那么,为什么数据科学家不能获得带有时间戳的数据?
其中一个原因是,数据平台的设计可能采用了ML生产还没有被充分理解的时代的做法。因此,他们在设计时没有考虑到ML生产,而且时间戳也没有被认为是需要跟踪的重要工件。
另一个原因是摄入数据的人(如数据平台团队)和消费数据的人(如数据科学团队)之间缺乏沟通。许多公司已经有了应用日志,然而,在数据摄取或处理过程中,时间戳没有被保留,或者它们被保留在数据科学家无法访问的地方。
这是一个操作问题,希望现代ML工作流程的公司需要投资。如果没有一个精心设计的数据管道,数据科学家将无法建立生产就绪的数据应用。
5.为什么流处理很难?
我希望到现在为止,我已经说服了你为什么流处理对ML工作负载很重要。另一个问题是:如果流处理如此重要,为什么它在ML工作流中没有更加普遍?
因为这很难。
如果你以前做过流处理,你可能不需要任何说服力。Tyler Aikidau发表了一个精彩的演讲,内容是关于 open challenges in stream processing.在这里,我将从我作为一个数据科学家的角度来谈谈关键的挑战。这远远不是一个详尽的清单。一个流媒体系统需要能够处理所有这些挑战,甚至更多。
分布式
当你有少量的更新,并且它们是由一台机器产生和消费的时候,流是简单的,就像《从表到日志》中的库存例子。然而,一个真实世界的应用可能有成百上千的微服务,分布在不同的机器上。这些机器中的许多人可能会尝试从同一个日志中进行读写。流媒体系统是高度分布式的。直接从事流媒体工作的工程师需要对分布式系统有深刻的理解,而这并不是很多数据科学家在日常工作中会接触到的。我们需要良好的抽象,使数据科学家能够利用流,而不必处理底层的分布式系统。
时间变异的结果
时间增加了另一个层面的复杂性。如果你将同一个查询应用于同一个表两次,你会得到同样的结果。然而,如果你将同一个查询应用于同一个数据流两次,你可能会得到非常不同的结果。
在批处理中,如果一个查询失败了,你可以直接重新运行它。但是在流式处理中,由于结果的时间变化,如果查询失败,我们应该怎么做?不同的流处理引擎有不同的方法来解决这个问题。例如,Flink使用 Chandy–Lamport algorithm,由K. Mani Chandy和图灵奖得主Leslie Lamport开发。
层叠式故障
在批处理环境中,作业通常被安排在很大的间隔内,因此工作负载的波动并不是一个大问题。如果你安排一个作业每24小时运行一次,而一次运行需要6小时,而不是4小时,这额外的2小时不会影响下一次运行。
然而,流处理设置中的波动会使它很难保持持续的低延迟。假设你的流媒体系统能够处理100个事件/秒。如果流量小于100个事件/秒,你的系统就很好。如果流量突然激增到200个事件/秒,约100个事件将被延迟。如果不迅速解决,这些延迟的100秒将在下一秒超过你的系统,以此类推,导致你的系统大大减慢。
可用性与一致性
想象一下,有两台机器试图发送关于同一个项目的更新。
- 机器A在 "03:12:10 "更新了用户对产品123的看法。
- 机器B在 "03:12:16 "更新了用户查看产品234的信息
由于一些网络延迟,机器A的更新在运输过程中被延迟,比机器B的更新晚到达。在计算用户在过去30分钟内看的所有物品的平均价格之前,我们应该等待机器A多久?
如果我们等待的时间不够长,我们就会错过机器A的更新,而这个功能就会不正确。然而,如果我们等待的时间太长,延迟就会很低。一个最佳的时间窗口可能是在中间的某个地方。因此,流计算的结果往往是近似的。
这让我们看到了一个有趣的权衡。 availability vs. consistency.为了使我们的系统具有高可用性,我们希望在多台机器上重复相同的工作负载。然而,机器越多,散兵游勇也就越多,流计算的结果也就越不一致。所有大规模的分布式系统,不仅仅是流式系统,都必须做出这种权衡。
业务挑战
虽然流吹技术在技术上可能很难,但企业适应流媒体的真正挑战是在运营方面。在(接近)实时的数据流上操作意味着我们可以使事情在(接近)实时的情况下发生。不幸的是,这意味着灾难也可以在(近)实时发生。
维护一个流吹系统意味着你的工程师必须24小时待命,以快速响应事件。如果你不能找出问题并快速解决,数据流将被中断,你可能会丢失数据。因为流处理系统的风险更高,所以流处理系统的维护需要高度的自动化和成熟的DevOps实践。
我们和很多想维护自己的Kafka或Flink集群的小团队谈过,这并不美好。在流处理领域,我可以看到管理服务的很多价值(例如Confluent代替Kafka)。
总结
哇,这对我来说是一个很难写的帖子。我希望你还在我身边。
我对流处理在ML应用中的应用感到非常兴奋。我的梦想是,数据科学家应该有机会接触到一个ML平台,使他们能够利用流处理,而不必担心其底层的复杂性。这个平台需要提供足够高的抽象水平,使事情能够顺利进行,同时要有足够的灵活性,使数据科学家能够定制他们的工作流程。
我知道现在利用流处理进行ML是很难的,我相信让它更容易的价值,所以我正在努力。如果你想聊天,请联系我。
这是一个相对较新的领域,我也还在学习。如果您有任何反馈和讨论要点,我将不胜感激!