本网站(662p.com)打包出售,且带程序代码数据,662p.com域名,程序内核采用TP框架开发,需要联系扣扣:2360248666 /wx:lianweikj
精品域名一口价出售:1y1m.com(350元) ,6b7b.com(400元) , 5k5j.com(380元) , yayj.com(1800元), jiongzhun.com(1000元) , niuzen.com(2800元) , zennei.com(5000元)
需要联系扣扣:2360248666 /wx:lianweikj
Spark 3.0—简而言之的新功能
奔跑的男人 · 366浏览 · 发布于2020-06-17 +关注

最近,Apache Spark社区发布了Spark 3.0的预览版,该预览版包含许多重要的新功能,这些功能将帮助Spark创造强大的影响力,在此大数据和数据科学时代,该产品已拥有广泛的企业用户和开发人员。

在新版本中,Spark社区已将一些功能从Spark SQL移植到了编程的Scala API(

org.apache.spark.sql.functions),这鼓励开发人员直接将此功能用作其DataFrame转换的一部分,而不是直接输入 进入SQL模式或创建视图,并使用此函数以及SQL表达式或callUDF函数。

社区还辛苦地引入了一些新的数据转换功能和partition_transforms函数,这些功能在与Spark的新DataFrameWriterv2一起使用以将数据写到某些外部存储时非常有用。

Spark 3中的一些新功能已经是Databricks Spark以前版本的一部分。 因此,如果您在Databricks云中工作,您可能会发现其中一些熟悉的功能。

本文通篇介绍了Spark SQL和Scala API中用于DataFrame操作访问的Spark新功能,以及从Spark SQL移植到Scala API以进行编程访问的功能。

Spark SQL中的Spark 3.0中引入的功能以及用于DataFrame转换的功能

from_csv

像from_json一样,此函数解析包含CSV字符串的列,并将其转换为Struct类型。 如果CSV字符串不可解析,则将返回null。

例:

  • 该函数需要一个Struct模式和一些选项,这些模式和选项指示如何解析CSV字符串。 选项与CSV数据源相同。

ss="dp-sql">ss="alt">val studentInfo = ss="string">"1,Jerin,CSE"::ss="string">"2,Jerlin,ECE"::ss="string">"3,Arun,CSE"::Nil ss="">val ss="keyword">schema = new StructType()  ss="alt">            .ss="keyword">add(ss="string">"Id",IntegerType) ss="">            .ss="keyword">add(ss="string">"Name",StringType) ss="alt">            .ss="keyword">add(ss="string">"Dept",StringType) ss="">val options = Map(ss="string">"delimiter" ->ss="string">",") ss="alt">val studentDF = studentInfo.toDF(ss="string">"Student_Info") ss="">.withColumn(ss="string">"csv_struct",from_csv('Student_Info, ss="keyword">schema,options)) ss="alt">studentDF.show()

to_csv

要将"结构类型"列转换为CSV字符串。

例:

  • 与Struct type列一起,此函数还接受可选的options参数,该参数指示如何将Struct列转换为CSV字符串。

ss="dp-sql">ss="alt">studentDF ss="">.withColumn(ss="string">"csv_string",to_csv($ss="string">"csv_struct",Map.empty[String, String].asJava)) ss="alt">.show

推断CSV字符串的模式,并以DDL格式返回模式。

例:

  • 该函数需要一个CSV字符串列和一个可选参数,其中包含如何解析CSV字符串的选项。

ss="dp-sql">ss="alt">studentDF ss="">  .withColumn(ss="string">"schema",schema_of_csv(ss="string">"csv_string")) ss="alt">  .show

for_all

将给定谓词应用于数组中的所有元素,并且仅当数组中的所有元素求值为true时返回true,否则返回false。

例:

  • 检查给定Array列中的所有元素是否均是偶数。

transform

将函数应用于数组中的所有元素后,返回一个新数组。

例:

  • 将" 1"添加到数组中的所有元素。

ss="dp-sql">ss="alt">val df = Seq((Seq(2,4,6)),(Seq(5,10,3))).toDF(ss="string">"num_array") ss="">df.withColumn(ss="string">"num_array",transform($ss="string">"num_array",x=>x+1)).show

overlay

要替换列的内容,请使用从指定字节位置到可选的指定字节长度的实际替换内容。

例:

  • 将特定人员的问候语更改为传统的" Hello World"

这里我们用世界替换人名,因为名字的起始位置是7,并且我们要在替换内容之前删除完整的姓名,需要删除的字节位置的长度应大于或等于最大值 列中名称的长度。

因此,我们将替换词传递为"world",将内容替换为" 7"的特定起始位置,从指定起始位置移除的位置数为" 12"(如果未指定,则该位置是可选的 函数只会从指定的起始位置将源内容替换为替换内容)。

覆盖替换了StringType,TimeStampType,IntegerType等中的内容。但是Column的返回类型将始终为StringType,而与Column输入类型无关。

ss="dp-sql">ss="alt">val greetingMsg = ss="string">"Hello Arun"::ss="string">"Hello Mohit Chawla"::ss="string">"Hello Shaurya"::Nil ss="">val greetingDF = greetingMsg.toDF(ss="string">"greet_msg") ss="alt">greetingDF.withColumn(ss="string">"greet_msg",overlay($ss="string">"greet_msg",lit(ss="string">"World"),lit(ss="string">"7"),lit(ss="string">"12"))) ss="">.show

分裂

根据给定的正则表达式和指定的限制拆分字符串表达式,该限制指示将正则表达式应用于给定的字符串表达式的次数。

如果指定的限制小于或等于零,则正则表达式将在字符串上应用多次,并且结果数组将根据给定的正则表达式包含所有可能的字符串拆分。

如果指定的限制大于零,则将使用不超过该限制的正则表达式

例:

  • 根据正则表达式将给定的字符串表达式拆分为两个。 即 字符串定界符。

ss="dp-sql">ss="alt">val num = ss="string">"one~two~three"::ss="string">"four~five"::Nil ss="">val numDF = num.toDF(ss="string">"numbers") ss="alt">numDF ss="">.withColumn(ss="string">"numbers",split($ss="string">"numbers",ss="string">"~",2)) ss="alt">.show

将同一个字符串表达式分成多个部分,直到出现分隔符

ss="dp-sql">ss="alt">numDF ss="">.withColumn(ss="string">"numbers",split($ss="string">"numbers",ss="string">"~",0)) ss="alt">.show

map_entries

将映射键值转换为无序的条目数组。

例:

  • 获取数组中Map的所有键和值。

ss="dp-sql">ss="alt">val df = Seq(Map(1->ss="string">"x",2->ss="string">"y")).toDF(ss="string">"key_values") ss="">df.withColumn(ss="string">"key_value_array",map_entries($ss="string">"key_values")) ss="alt">.show

map_zip_with

使用功能根据键将两个Map合并为一个。

例:

  • 要计算跨部门员工的总销售额,并通过传递一个函数,该函数将基于键汇总来自两个不同"地图"列的总销售额,从而在单个地图中获取特定员工的总销售额。

ss="dp-sql">ss="alt">val df = Seq((Map(ss="string">"EID_1"->10000,ss="string">"EID_2"->25000), ss="">             Map(ss="string">"EID_1"->1000,ss="string">"EID_2"->2500)))   .toDF(ss="string">"emp_sales_dept1",ss="string">"emp_sales_dept2") ss="alt"> ss="">df. ss="alt">withColumn(ss="string">"total_emp_sales",map_zip_with($ss="string">"emp_sales_dept1",$ss="string">"emp_sales_dept2",(k,v1,v2)=>(v1+v2))) ss="">.show

map_filter

返回仅包含满足给定谓词功能的Map值的新键值对。

例:

  • 仅筛选出销售值高于20000的员工

ss="dp-sql">ss="alt">val df = Seq(Map(ss="string">"EID_1"->10000,ss="string">"EID_2"->25000)) ss="">          .toDF(ss="string">"emp_sales") ss="alt"> ss="">df ss="alt">.withColumn(ss="string">"filtered_sales",map_filter($ss="string">"emp_sales",(k,v)=>(v>20000))) ss="">.show

transform_values

根据给定的函数操作Map列中所有元素的值。

例:

  • 通过给每个雇员加薪5000来计算雇员薪水

ss="dp-sql">ss="alt">val df = Seq(Map(ss="string">"EID_1"->10000,ss="string">"EID_2"->25000)) ss="">         .toDF(ss="string">"emp_salary") ss="alt"> ss="">df ss="alt">.withColumn(ss="string">"emp_salary",transform_values($ss="string">"emp_salary",(k,v)=>(v+5000))) ss="">.show

transform_keys

根据给定的函数操作Map列中所有元素的键。

例:

  • 要将公司名称" XYZ"添加到员工编号。

ss="dp-sql">ss="alt">val df = Seq(Map(ss="string">"EID_1" -> 10000, ss="string">"EID_2" -> 25000)) ss="">        .toDF(ss="string">"employees") ss="alt">df ss="">.withColumn(ss="string">"employees", transform_keys($ss="string">"employees", (k, v) => concat(k,lit(ss="string">"_XYZ")))) ss="alt">.show

xhash64

要计算给定列内容的哈希码,请使用64位xxhash算法并将结果返回为long。

从Spark SQL移植到Spark 3.0中的Scala API进行DataFrame转换的功能

Scala API可使用大多数Spark SQL函数,该函数可将相同的函数用作DataFrame操作的一部分。 但是仍然有一些功能不能作为编程功能使用。 要使用这些功能,必须进入Spark SQL模式并将这些功能用作SQL表达式的一部分,或使用Spark" callUDF"功能使用相同的功能。 随着功能的普及和使用不断发展,这些功能中的某些功能过去曾被移植到新版本的程序化Spark API中。 以下是从以前版本的Spark SQL函数移植到Scala API(

org.spark.apache.sql.functions)的函数

date_sub

从日期,时间戳记和字符串数据类型中减去天数。 如果数据类型为字符串,则其格式应可转换为日期" yyyy-MM-dd"或" yyyy-MM-dd HH:mm:ss.ssss"

例:

  • 从eventDateTime中减去" 1天"。

如果要减去的天数为负,则此功能会将给定的天数添加到实际日期中。

ss="dp-sql">ss="alt">var df = Seq( ss="">        (1, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-01 23:00:01")), ss="alt">        (2, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-02 12:40:32")), ss="">        (3, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-03 09:54:00")), ss="alt">        (4, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-04 10:12:43")) ss="">         ) ss="alt">     .toDF(ss="string">"typeId",ss="string">"eventDateTime") ss=""> ss="alt"> df.withColumn(ss="string">"Adjusted_Date",date_sub($ss="string">"eventDateTime",1)).show()

date_add

与date_sub相同,但是将天数添加到实际天数中。

例:

  • 将" 1天"添加到eventDateTime

ss="dp-sql">ss="alt">var df = Seq( ss="">         (1, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-01 23:00:01")), ss="alt">         (2, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-02 12:40:32")), ss="">         (3, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-03 09:54:00")), ss="alt">         (4, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-04 10:12:43")) ss="">         ) ss="alt">    .toDF(ss="string">"Id",ss="string">"eventDateTime") ss="">df ss="alt">.withColumn(ss="string">"Adjusted Date",date_add($ss="string">"eventDateTime",1)) ss="">.show()

months_add

像date_add和date_sub一样,此功能有助于添加月份。

减去月份,将要减去的月份数设为负数,因为没有单独的减去函数用于减去月份

例:

  • 从eventDateTime添加和减去一个月。

ss="dp-sql">ss="alt">var df = Seq( ss="">    (1, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-01 23:00:01")), ss="alt">    (2, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-02 12:40:32")), ss="">    (3, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-03 09:54:00")), ss="alt">    (4, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-04 10:12:43")) ss="">     ).toDF(ss="string">"typeId",ss="string">"eventDateTime") ss="alt">//ss="keyword">To ss="keyword">add one months ss=""> df ss="alt">.withColumn(ss="string">"Adjusted Date",add_months($ss="string">"eventDateTime",1)) ss="">.show() ss="alt">//ss="keyword">To subtract one months ss="">df ss="alt">.withColumn(ss="string">"Adjusted Date",add_months($ss="string">"eventDateTime",-1)) ss="">.show()

zip_with

通过应用函数合并左右数组。

此函数期望两个数组的长度都相同,如果其中一个数组比另一个数组短,则将添加null以匹配更长的数组长度。

例:

  • 将两个数组的内容相加并合并为一个

ss="dp-sql">ss="alt">val df = Seq((Seq(2,4,6),Seq(5,10,3))) ss="">         .toDF(ss="string">"array_1",ss="string">"array_2") ss="alt">   ss=""> df ss="alt">.withColumn(ss="string">"merged_array",zip_with($ss="string">"array_1",$ss="string">"array_2",(x,y)=>(x+y))) ss=""> .show

将谓词应用于所有元素,并检查数组中的至少一个或多个元素是否对谓词函数成立。

例:

  • 检查数组中至少一个元素是否为偶数。

ss="dp-sql">ss="alt">val df= Seq(Seq(2,4,6),Seq(5,10,3)).toDF(ss="string">"num_array") ss="">df.withColumn(ss="string">"flag",exists($ss="string">"num_array", x =>lit(x%2===0))) ss="alt">.show

过滤

将给定谓词应用于数组中的所有元素,并过滤掉谓词为true的元素。

例:

  • 仅过滤掉数组中的偶数元素。

ss="dp-sql">ss="alt">val df = Seq(Seq(2,4,6),Seq(5,10,3)).toDF(ss="string">"num_array") ss="">df.withColumn(ss="string">"even_array",filter($ss="string">"num_array", x =>lit(x%2===0))) ss="alt">.show

聚合 aggregate

使用给定函数将给定数组和另一个值/状态简化为单个值,并应用可选的finish函数将缩减后的值转换为另一个状态/值。

例:

  • 将10加到数组的总和并将结果乘以2

ss="dp-sql">ss="alt">val df = Seq((Seq(2,4,6),3),(Seq(5,10,3),8)) ss="">  .toDF(ss="string">"num_array",ss="string">"constant") ss="alt">df.withColumn(ss="string">"reduced_array",aggregate($ss="string">"num_array", $ss="string">"constant",(x,y)=>x+y,x => x*2)) ss="">  .show

Spark 3.0中为Spark SQL模式引入的功能

以下是新的SQL函数,您只能在Spark SQL模式下才能使用它们。

acosh

查找给定表达式的双曲余弦的倒数。

asinh

找出给定表达式的双曲正弦的逆。

atanh

查找给定表达式的双曲正切的逆。

bit_and,bit_or和bit_xor

计算按位AND,OR和XOR值

bit_count

返回计数的位数。

bool_and和bool_or

验证表达式的所有值是否为真或验证表达式中的至少一个为真。

count_if

返回一列中的真值数量

例:

  • 找出给定列中的偶数值

ss="dp-sql">ss="alt">var df = Seq((1),(2),(4)).toDF(ss="string">"num") ss=""> ss="alt"> df.createOrReplaceTempView(ss="string">"table") ss="">spark.sql(ss="string">"select count_if(num %2==0) from table").show

date_part

提取日期/时间戳的一部分,例如小时,分钟等…

div

用于将表达式或带有另一个表达式/列的列分开

every 和 sum

如果给定的表达式对每个列的所有列值都求值为true,并且至少一个值对某些值求得true,则此函数返回true。

make_date,make_interval和make_timestamp

构造日期,时间戳和特定间隔。

例:

ss="dp-sql">ss="alt">ss="keyword">SELECT make_timestamp(2020, 01, 7, 30, 45.887)

max_by和min_by

比较两列并返回与右列的最大值/最小值关联的左列的值

例:

ss="dp-sql">ss="alt">var df = Seq((1,1),(2,1),(4,3)).toDF(ss="string">"x",ss="string">"y") ss=""> ss="alt"> df.createOrReplaceTempView(ss="string">"table") ss="">spark.sql(ss="string">"select max_by(x,y) from table").show

类型

返回列值的数据类型

返回Spark版本及其git版本

justify_days,justify_hours和justify_interval

新引入的对齐功能用于调整时间间隔。

例:

  • 表示30天为一个月,

ss="dp-sql">ss="alt">ss="keyword">SELECT justify_days(interval ss="string">'30 day')

分区转换功能

从Spark 3.0及更高版本开始,存在一些新功能,这些功能有助于对数据进行分区,我将在另一篇文章中介绍。

总体而言,我们已经分析了所有数据转换和分析功能,这些功能是3.0版本中产生的火花。 希望本指南有助于您了解这些新功能。 这些功能肯定会加速火花开发工作,并有助于建立坚固有效的火花管道。

ss="dp-sql">ss="alt">val  df = Seq(Seq(2,4,6),Seq(5,10,3)).toDF(ss="string">"int_array") ss="">df.withColumn(ss="string">"flag",forall($ss="string">"int_array",(x:ss="keyword">Column)=>(lit(x%2==0)))) ss="alt">.show

相关推荐

怎么学习大数据?

追忆似水年华 · 651浏览 · 2019-05-24 16:12:08
在线制作数据库ER模型

追忆似水年华 · 1124浏览 · 2019-05-24 16:22:01
大数据云迁移的五大要点

奔跑的男人 · 671浏览 · 2019-05-29 11:21:02
大数据零基础学习hadoop入门教程

· 640浏览 · 2019-06-04 10:30:25
加载中

0评论

评论
分类专栏
小鸟云服务器
扫码进入手机网页