余晖落尽暮晚霞,黄昏迟暮远山寻
本站
当前位置:网站首页 > 编程知识 > 正文

Spark 2.4.0编程指南——Spark SQL UDF和UDAF

xiyangw 2022-11-25 12:01 19 浏览 0 评论

视频加载中...

Spark 2.4.0编程指南--Spark SQL UDF和UDAF

更多资源

  • github: https://github.com/opensourceteams/spark-scala-maven-2.4.0

文档

  • (官网文档): http://spark.apache.org/docs/2.4.0/sql-getting-started.html#aggregations

前置条件

  • 已安装好java(选用的是java 1.8.0_191)
  • 已安装好scala(选用的是scala 2.11.121)
  • 已安装好hadoop(选用的是Hadoop 3.1.1)
  • 已安装好spark(选用的是spark 2.4.0)

技能标签

  • 了解UDF 用户定义函数(User-defined functions, UDFs)
  • 了解UDAF (user-defined aggregate function), 用户定义的聚合函数
  • UDF示例(统计行数据字符长度)
  • UDF示例(统计行数据字符转大写)
  • UDAF示例(统计总行数)
  • UDAF示例(统计最大收入)
  • UDAF示例(统计平均收入)
  • UDAF示例(统计按性别分组的最大收入)
  • 官网: http://spark.apache.org/docs/2.4.0/sql-getting-started.html#aggregations

UDF

用户定义函数(User-defined functions, UDFs)是大多数 SQL 环境的关键特性,用于扩展系统的内置功能。 UDF允许开发人员通过抽象其低级语言实现来在更高级语言(如SQL)中启用新功能。 Apache Spark 也不例外,并且提供了用于将 UDF 与 Spark SQL工作流集成的各种选项。

  • 用户定义函数(User-defined functions, UDFs)
  • UDF对表中的单行进行转换,以便为每行生成单个对应的输出值

##示例

  • 得到SparkSession

BaseSparkSession

/**
 * 得到SparkSession
 * 首先 extends BaseSparkSession
 * 本地: val spark = sparkSession(true)
 * 集群: val spark = sparkSession()
 */
class BaseSparkSession {
 var appName = "sparkSession"
 var master = "spark://standalone.com:7077" //本地模式:local standalone:spark://master:7077
 def sparkSession(): SparkSession = {
 val spark = SparkSession.builder
 .master(master)
 .appName(appName)
 .config("spark.eventLog.enabled","true")
 .config("spark.history.fs.logDirectory","hdfs://standalone.com:9000/spark/log/historyEventLog")
 .config("spark.eventLog.dir","hdfs://standalone.com:9000/spark/log/historyEventLog")
 .getOrCreate()
 spark.sparkContext.addJar("/opt/n_001_workspaces/bigdata/spark-scala-maven-2.4.0/target/spark-scala-maven-2.4.0-1.0-SNAPSHOT.jar")
 //import spark.implicits._
 spark
 }
 def sparkSession(isLocal:Boolean = false): SparkSession = {
 if(isLocal){
 master = "local"
 val spark = SparkSession.builder
 .master(master)
 .appName(appName)
 .getOrCreate()
 //spark.sparkContext.addJar("/opt/n_001_workspaces/bigdata/spark-scala-maven-2.4.0/target/spark-scala-maven-2.4.0-1.0-SNAPSHOT.jar")
 //import spark.implicits._
 spark
 }else{
 val spark = SparkSession.builder
 .master(master)
 .appName(appName)
 .config("spark.eventLog.enabled","true")
 .config("spark.history.fs.logDirectory","hdfs://standalone.com:9000/spark/log/historyEventLog")
 .config("spark.eventLog.dir","hdfs://standalone.com:9000/spark/log/historyEventLog")
 .getOrCreate()
 // spark.sparkContext.addJar("/opt/n_001_workspaces/bigdata/spark-scala-maven-2.4.0/target/spark-scala-maven-2.4.0-1.0-SNAPSHOT.jar")
 //import spark.implicits._
 spark
 }
 }
 /**
 * 得到当前工程的路径
 * @return
 */
 def getProjectPath:String=System.getProperty("user.dir")
}

UDF (统计字段长度)

  • 对数据集中,每行数据的特定字段,计算字符长度
  • 通过 spark.sql 直接在字段查询处调用函数名称
/**
 * 自定义匿名函数
 * 功能: 得到某列数据长度的函数
 */
object Run extends BaseSparkSession{
 def main(args: Array[String]): Unit = {
 val spark = sparkSession(true)
 val ds = spark.read.json("hdfs://standalone.com:9000/home/liuwen/data/employees.json")
 ds.show()
// +-------+------+
// | name|salary|
// +-------+------+
// |Michael| 3000|
// | Andy| 4500|
// | Justin| 3500|
// | Berta| 4000|
// +-------+------+
 spark.udf.register("strLength",(str: String) => str.length())
 ds.createOrReplaceTempView("employees")
 spark.sql("select name,salary,strLength(name) as name_Length from employees").show()
// +-------+------+-----------+
// | name|salary|name_Length|
// +-------+------+-----------+
// |Michael| 3000| 7|
// | Andy| 4500| 4|
// | Justin| 3500| 6|
// | Berta| 4000| 5|
// +-------+------+-----------+
 spark.stop()
 }
}

UDF (字段转成大写)

  • 对数据集中,每行数据的特定字段,计算字符长度
  • 通过 dataSet.withColumn 调用column
  • Column通过udf函数转换
import com.opensource.bigdata.spark.standalone.base.BaseSparkSession
/**
 * 自定义匿名函数
 * 功能: 得到某列数据长度的函数
 */
object Run extends BaseSparkSession{
 def main(args: Array[String]): Unit = {
 val spark = sparkSession(true)
 val ds = spark.read.json("hdfs://standalone.com:9000/home/liuwen/data/employees.json")
 ds.show()
// +-------+------+
// | name|salary|
// +-------+------+
// |Michael| 3000|
// | Andy| 4500|
// | Justin| 3500|
// | Berta| 4000|
// +-------+------+
 import org.apache.spark.sql.functions._
 val strUpper = udf((str: String) => str.toUpperCase())
 import spark.implicits._
 ds.withColumn("toUpperCase", strUpper($"name")).show
// +-------+------+-----------+
// | name|salary|toUpperCase|
// +-------+------+-----------+
// |Michael| 3000| MICHAEL|
// | Andy| 4500| ANDY|
// | Justin| 3500| JUSTIN|
// | Berta| 4000| BERTA|
// +-------+------+-----------+
 spark.stop()
 }
}

UDAF

  • UDAF(user-defined aggregate function, 用户定义的聚合函数
  • 同时处理多行,并且返回一个结果,通常结合使用 GROUP BY 语句(例如 COUNT 或 SUM)

count

  • 统计一共有多少行数据
package com.opensource.bigdata.spark.sql.n_08_spark_udaf.n_01_spark_udaf_count
import com.opensource.bigdata.spark.standalone.base.BaseSparkSession
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
/**
 * ).initialize()方法,初使使,即没数据时的值
 * ).update() 方法把每一行的数据进行计算,放到缓冲对象中
 * ).merge() 把每个分区,缓冲对象进行合并
 * ).evaluate()计算结果表达式,把缓冲对象中的数据进行最终计算
 */
object Run2 extends BaseSparkSession{
 object CustomerCount extends UserDefinedAggregateFunction{
 //聚合函数的输入参数数据类型
 def inputSchema: StructType = {
 StructType(StructField("inputColumn",StringType) :: Nil)
 }
 //中间缓存的数据类型
 def bufferSchema: StructType = {
 StructType(StructField("sum",LongType) :: Nil)
 }
 //最终输出结果的数据类型
 def dataType: DataType = LongType
 def deterministic: Boolean = true
 //初始值,要是DataSet没有数据,就返回该值
 def initialize(buffer: MutableAggregationBuffer): Unit = {
 buffer(0) = 0L
 }
 /**
 *
 * @param buffer 相当于把当前分区的,每行数据都需要进行计算,计算的结果保存到buffer中
 * @param input
 */
 def update(buffer: MutableAggregationBuffer, input: Row): Unit ={
 if(!input.isNullAt(0)){
 buffer(0) = buffer.getLong(0) + 1
 }
 }
 /**
 * 相当于把每个分区的数据进行汇总
 * @param buffer1 分区一的数据
 * @param buffer2 分区二的数据
 */
 def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit={
 buffer1(0) = buffer1.getLong(0) +buffer2.getLong(0) // salary
 }
 //计算最终的结果
 def evaluate(buffer: Row): Long = buffer.getLong(0)
 }
 def main(args: Array[String]): Unit = {
 val spark = sparkSession(true)
 spark.udf.register("customerCount",CustomerCount)
 val df = spark.read.json("hdfs://standalone.com:9000/home/liuwen/data/employees.json")
 df.createOrReplaceTempView("employees")
 val sqlDF = spark.sql("select customerCount(name) as average_salary from employees ")
 df.show()
// +-------+------+
// | name|salary|
// +-------+------+
// |Michael| 3000|
// | Andy| 4500|
// | Justin| 3500|
// | Berta| 4000|
// +-------+------+
 sqlDF.show()
// +--------------+
// |average_salary|
// +--------------+
// | 4.0|
// +--------------+
 spark.stop()
 }
}

max

  • 统计收入最高的
package com.opensource.bigdata.spark.sql.n_08_spark_udaf.n_03_spark_udaf_sum
import com.opensource.bigdata.spark.standalone.base.BaseSparkSession
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
/**
 * ).initialize()方法,初使使,即没数据时的值
 * ).update() 方法把每一行的数据进行计算,放到缓冲对象中
 * ).merge() 把每个分区,缓冲对象进行合并
 * ).evaluate()计算结果表达式,把缓冲对象中的数据进行最终计算
 */
object Run extends BaseSparkSession{
 object CustomerSum extends UserDefinedAggregateFunction{
 //聚合函数的输入参数数据类型
 def inputSchema: StructType = {
 StructType(StructField("inputColumn",LongType) :: Nil)
 }
 //中间缓存的数据类型
 def bufferSchema: StructType = {
 StructType(StructField("sum",LongType) :: StructField("count",LongType) :: Nil)
 }
 //最终输出结果的数据类型
 def dataType: DataType = LongType
 def deterministic: Boolean = true
 //初始值,要是DataSet没有数据,就返回该值
 def initialize(buffer: MutableAggregationBuffer): Unit = {
 buffer(0) = 0L
 }
 /**
 *
 * @param buffer 相当于把当前分区的,每行数据都需要进行计算,计算的结果保存到buffer中
 * @param input
 */
 def update(buffer: MutableAggregationBuffer, input: Row): Unit ={
 if(!input.isNullAt(0)){
 buffer(0) = buffer.getLong(0) + input.getLong(0)
 }
 }
 /**
 * 相当于把每个分区的数据进行汇总
 * @param buffer1 分区一的数据
 * @param buffer2 分区二的数据
 */
 def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit={
 buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
 }
 //计算最终的结果
 def evaluate(buffer: Row): Long = buffer.getLong(0)
 }
 def main(args: Array[String]): Unit = {
 val spark = sparkSession(true)
 spark.udf.register("customerSum",CustomerSum)
 val df = spark.read.json("hdfs://standalone.com:9000/home/liuwen/data/employees.json")
 df.createOrReplaceTempView("employees")
 val sqlDF = spark.sql("select customerSum(salary) as average_salary from employees ")
 df.show
// +-------+------+
// | name|salary|
// +-------+------+
// |Michael| 3000|
// | Andy| 4500|
// | Justin| 3500|
// | Berta| 4000|
// +-------+------+
 sqlDF.show()
// +--------------+
// |average_salary|
// +--------------+
// | 15000|
// +--------------+
 spark.stop()
 }
}

average

  • 统计平均收入水平
package com.opensource.bigdata.spark.sql.n_08_spark_udaf.n_04_spark_udaf_average
import com.opensource.bigdata.spark.standalone.base.BaseSparkSession
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
object Run extends BaseSparkSession{
 object MyAverage extends UserDefinedAggregateFunction{
 //聚合函数的输入参数数据类型
 def inputSchema: StructType = {
 StructType(StructField("inputColumn",LongType) :: Nil)
 }
 //中间缓存的数据类型
 def bufferSchema: StructType = {
 StructType(StructField("sum",LongType) :: StructField("count",LongType) :: Nil)
 }
 //最终输出结果的数据类型
 def dataType: DataType = DoubleType
 def deterministic: Boolean = true
 //初始值,要是DataSet没有数据,就返回该值
 def initialize(buffer: MutableAggregationBuffer): Unit = {
 buffer(0) = 0L
 buffer(1) = 0L
 }
 /**
 *
 * @param buffer 相当于把当前分区的,每行数据都需要进行计算,计算的结果保存到buffer中
 * @param input
 */
 def update(buffer: MutableAggregationBuffer, input: Row): Unit ={
 if(!input.isNullAt(0)){
 buffer(0) = buffer.getLong(0) + input.getLong(0) // salary
 buffer(1) = buffer.getLong(1) + 1 // count
 }
 }
 /**
 * 相当于把每个分区的数据进行汇总
 * @param buffer1 分区一的数据
 * @param buffer2 分区二的数据
 */
 def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit={
 buffer1(0) = buffer1.getLong(0) +buffer2.getLong(0) // salary
 buffer1(1) = buffer1.getLong(1) +buffer2.getLong(1) // count
 }
 //计算最终的结果
 def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
 }
 def main(args: Array[String]): Unit = {
 val spark = sparkSession(true)
 spark.udf.register("MyAverage",MyAverage)
 val df = spark.read.json("hdfs://standalone.com:9000/home/liuwen/data/employees.json")
 df.createOrReplaceTempView("employees")
 val sqlDF = spark.sql("select MyAverage(salary) as average_salary from employees ")
 sqlDF.show()
 spark.stop()
 }
}

group by max

  • 按性别分组统计收入最高是多少
  • 即统计男,女,各收入最高是多少
package com.opensource.bigdata.spark.sql.n_08_spark_udaf.n_05_spark_udaf_groupby_max
import com.opensource.bigdata.spark.standalone.base.BaseSparkSession
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
/**
 * ).initialize()方法,初使使,即没数据时的值
 * ).update() 方法把每一行的数据进行计算,放到缓冲对象中
 * ).merge() 把每个分区,缓冲对象进行合并
 * ).evaluate()计算结果表达式,把缓冲对象中的数据进行最终计算
 */
object Run extends BaseSparkSession{
 object CustomerMax extends UserDefinedAggregateFunction{
 //聚合函数的输入参数数据类型
 def inputSchema: StructType = {
 StructType(StructField("inputColumn",LongType) :: Nil)
 }
 //中间缓存的数据类型
 def bufferSchema: StructType = {
 StructType(StructField("sum",LongType) :: StructField("count",LongType) :: Nil)
 }
 //最终输出结果的数据类型
 def dataType: DataType = LongType
 def deterministic: Boolean = true
 //初始值,要是DataSet没有数据,就返回该值
 def initialize(buffer: MutableAggregationBuffer): Unit = {
 buffer(0) = 0L
 }
 /**
 *
 * @param buffer 相当于把当前分区的,每行数据都需要进行计算,计算的结果保存到buffer中
 * @param input
 */
 def update(buffer: MutableAggregationBuffer, input: Row): Unit ={
 if(!input.isNullAt(0)){
 if(input.getLong(0) > buffer.getLong(0)){
 buffer(0) = input.getLong(0)
 }
 }
 }
 /**
 * 相当于把每个分区的数据进行汇总
 * @param buffer1 分区一的数据
 * @param buffer2 分区二的数据
 */
 def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit={
 if( buffer2.getLong(0) > buffer1.getLong(0)) buffer1(0) = buffer2.getLong(0)
 }
 //计算最终的结果
 def evaluate(buffer: Row): Long = buffer.getLong(0)
 }
 def main(args: Array[String]): Unit = {
 val spark = sparkSession(true)
 spark.udf.register("customerMax",CustomerMax)
 val df = spark.read.json("hdfs://standalone.com:9000/home/liuwen/data/employeesCN.json")
 df.createOrReplaceTempView("employees")
 val sqlDF = spark.sql("select gender,customerMax(salary) as average_salary from employees group by gender ")
 df.show
// +------+----+------+
// |gender|name|salary|
// +------+----+------+
// | 男|小王| 30000|
// | 女|小丽| 50000|
// | 男|小军| 80000|
// | 女|小李| 90000|
// +------+----+------+
 sqlDF.show()
// +------+--------------+
// |gender|average_salary|
// +------+--------------+
// | 男| 80000|
// | 女| 90000|
// +------+--------------+
 spark.stop()
 }
}

其它支持

  • Spark SQL 支持集成现有 Hive 中的 UDF ,UDAF 和 UDTF 的(Java或Scala)实现。
  • UDTFs(user-defined table functions, 用户定义的表函数)可以返回多列和多行 end

相关推荐

想学习编程但是看不懂代码该怎么办(编程看不懂代码什么意思先看什么)
想学习编程但是看不懂代码该怎么办(编程看不懂代码什么意思先看什么)

实际上有不少编程的初学者都面临这样一个问题,自身对于编程还是比较有兴趣的,但是一看到各种程序代码就打退堂鼓了,感觉难度太大,不知道该从哪里开始学习。在学习编程的...

2023-03-21 18:18 xiyangw

入门C++,代码生成的过程你知道吗?(c代码生成流程图)
入门C++,代码生成的过程你知道吗?(c代码生成流程图)

易道云出品在此,小编与大家聊一聊代码的生成过程。[小鼓掌]代码生成主要包括三步:预编译、编译、链接和载入[奋斗]预编译:主要包括宏替换、文件包含和条件编译三个部...

2023-03-21 18:17 xiyangw

命名不善的代码会有多坑?这段Python官方入门教程的例程是个典型
命名不善的代码会有多坑?这段Python官方入门教程的例程是个典型

看过Python3官方入门的同学也许对这段代码有印象:大多数读者也许都认为这些字符串和数字并没有特别意义。如果真是这样,这个例程还不如用“dict={'a...

2023-03-21 18:17 xiyangw

从零开始学编程-2(从零开始学编程要几年)
从零开始学编程-2(从零开始学编程要几年)

<!DOCTYPEhtml><html><head></head><bo...

2023-03-21 18:17 xiyangw

千行代码入门python(一千行代码)

#_*_coding:utf-8_*_"""类型和运算----类型和运算----类型和运算----类型和运算----类型和运算----类型和运算----类型和运算----类型和运算----类型...

初学者如何更好自学Python代码?(python怎么自学,可以达到什么程度)
初学者如何更好自学Python代码?(python怎么自学,可以达到什么程度)

Python很适合初学者自学的编程语言,试着从“HelloWorld”开始,你会发现Python适合自学成才。在你慢慢熟悉Python那一刻,你很快被它通过...

2023-03-21 18:16 xiyangw

(1-2我们的第一段PHP代码)php基础php学习基础实例代码操作教程
(1-2我们的第一段PHP代码)php基础php学习基础实例代码操作教程

我们的第一段PHP代码这句代码非常神奇,一句话能变成一个网页。是我们PHP入门的第一段代码。现在你可以在网页服务器的根目录当中新建一个文件。文件的名字为:abc...

2023-03-21 18:16 xiyangw

初学JS必会的10种JavaScript代码优雅写法(炒股入门初学者基础知识)
初学JS必会的10种JavaScript代码优雅写法(炒股入门初学者基础知识)

当我们刚开始学习JS代码时,我们只需要掌握JS中对应知识点就好,随着对JS代码的熟悉程度,我们就要思考如何写出更优雅,更简洁的代码。接下来我分享10种常用JS代...

2023-03-21 18:16 xiyangw

七小时带你入门HTML+CSS网页设计,编写网页代码的思路(三)

上一篇文章中我分享了一段图文标签,这篇文章主要给大家详细解释一下这些代码的作用和意义,以及编写网页代码的格式与思路。下面我贴上html代码:<!--HTML--><div>...

C语言编程基础知识汇总学习,适合初学者!更新常量知识
C语言编程基础知识汇总学习,适合初学者!更新常量知识

(二)整型常量整型常量有3种形式:十进制整型常量、八进制整型常量和十六进制整型常量。(注意:c语言中没有直接表示二进制的整型常量,在c语言源程序中不会出现二进制...

2023-03-21 18:15 xiyangw

带你快速入门数控编程(数控编程入门教学)
带你快速入门数控编程(数控编程入门教学)

1、机床坐标系和运动方向的承认机床的直线运动X、Y、Z三个坐标系选用右手笛卡儿直角坐标系,如图11-6所示。坐标轴定义次第是先承认Z轴,再承认X轴,最终承认Y轴...

2023-03-21 18:15 xiyangw

Html5入门详细教程第一课,代码结构跟紧我免费教你建站
Html5入门详细教程第一课,代码结构跟紧我免费教你建站

大家好,通俗易懂讲营销,我是江湖哥,今天为大家分享我今天录制一个Html5入门教程第一个课的视频准备的课件,大家想学建站,想拥有自己一个简单的博客请关注我,如果...

2023-03-21 18:14 xiyangw

Python语言入门(3)-程序代码里的记忆体:变量
Python语言入门(3)-程序代码里的记忆体:变量

python上篇《Python语言入门(2)-三种核心语句》讲了python语言里的:1,赋值语句;2,循环语句;3,选择语句,介绍怎样运用这三种语句和计算机...

2023-03-21 18:14 xiyangw

零基础如何自学编程「2」(从零开始自学编程)
零基础如何自学编程「2」(从零开始自学编程)

在《零基础如何自学编程「1」》里说了一下零基础的同学要学编程的话究竟眼怎么正确开始。下面从三个方面说一下:1.选一门编程语言。2.学好数据结构和算法。3.选择一...

2023-03-21 18:13 xiyangw

没见过的 Java 编程入门教程!例程使用中文标识符代码:问个好吧
没见过的 Java 编程入门教程!例程使用中文标识符代码:问个好吧

前言Java教程用中文写(如下)更能被新手理解学习。可惜至今没有看到类似入门教程,在此敢为人先。注意:本教程的所有Java代码都可以正确运行,因为Jav...

2023-03-21 18:13 xiyangw

取消回复欢迎 发表评论: