Spark SQL

​ Spark SQL是一个Spark模块用于结构化数据处理。与基本的Spark RDD API不同,Spark SQL提供的接口为Spark提供了有关数据结构和正在执行的计算的更多信息。 在内部,Spark SQL使用此额外信息来执行额外的优化。 有几种与Spark SQL交互的方法,包括SQL和Dataset API。 在使用相同的执行引擎计算结果时,与使用表达计算的API或者语言无关。 这种统一意味着开发人员可以轻松地在不同的API之间来回切换,从而提供表达给定转换的最自然的方式。

SparkSession

Spark中所有功能的入口点是SparkSession类

1
2
3
4
5
6
7
8
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("Spark SQL basic example") //APP 运行时的名字
.config("spark.some.config.option", "some-value")
.getOrCreate()
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

Dataset &DataFrame

​ Dataset数据集是分布式数据集合。数据集是Spark 1.6中添加的一个新接口,它提供了RDD的优势(强类型,使用强大的lambda函数的能力)和Spark SQL优化执行引擎的优点。 数据集可以从JVM对象构造,然后使用功能转换(map,flatMap,filter等)进行操作。 数据集API在Scala和Java中可用。 Python没有对Dataset API的支持。 但由于Python的动态特性,数据集API的许多好处已经可用(即可以通过名称自然地访问行的字段row.columnName)。 R的情况类似。

​ DataFrame是一个组织成命名列的数据集。 它在概念上等同于关系数据库中的表或R / Python中的数据框,但在底层具有更丰富的优化。 DataFrame可以从多种来源构建,例如:结构化数据文件,Hive中的表,外部数据库或现有RDD。 DataFrame API在Scala,Java,Python和R中可用。在Scala和Java中,DataFrame由行数据集表示。 在Scala API中,DataFrame只是Dataset[Row]的类型别名。 而在Java API中,用户需要使用Dataset来表示DataFrame。

Spark SQL Dataset & DataFrame API

Actions
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class Dataset[T] extends Serializable //org.apache.spark.sql.Dataset是Spark SQL中核心的类

collect(): Array[T] //返回一个数组,将Dataset所有行的数据加载进driver进程的内存。
collectAsList(): List[T] //同上,但是返回Java list。
count(): Long // 数据行数
describe(cols: String*): DataFrame //计算指定列的统计指标,包括count, mean, stddev, min, and max.
head(): T //返回第一行
head(n: Int): Array[T] //返回前N行
first(): T //返回第一行,是head()的别名。
foreach(f: (T) ⇒ Unit): Unit // 所有元素上应用f函数
foreachPartition(f: (Iterator[T]) ⇒ Unit): Unit //所有元素分区上应用f函数
reduce(func: (T, T) ⇒ T): T//根据映射函数func,对RDD中的元素进行二元计算,返回计算结果。
show(numRows: Int, truncate: Int, vertical: Boolean): Unit//表格形式打印出数据。numRows:显示的行数,truncate:裁剪字符串类型值到指定长度,vertical:垂直打印。
summary(statistics: String*): DataFrame //计算数据集statistics指定的指标,可指定 count, mean, stddev, min, approximate quartiles (percentiles at 25%, 50%, and 75%), and max.如未指定则会计算全部。
take(n: Int): Array[T] //获取前n行
takeAsList(n: Int): List[T] //获取前n行保存为list
toLocalIterator(): Iterator[T] //返回一个所有行的迭代器
Transformations(函数)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
//Typed transformations
as[U](implicit arg0: Encoder[U]): Dataset[U] //将数据映射成指定类型U,返回新的Dataset
persist(newLevel: StorageLevel): Dataset.this.type //缓存数据,可设置缓存级别。
persist(): Dataset.this.type // 同cache方法
cache(): Dataset.this.type //缓存数据,MEMORY_AND_DISK模式。RDD的cache函数默认是MEMORY_ONLY。
checkpoint(eager: Boolean): Dataset[T]//返回一个checkpointed的Dataset,Dataset的逻辑执行计划将被截断。
checkpoint(): Dataset[T] //同上,eager=true.
columns: Array[String] //数组形式返回所有列名。
dtypes: Array[(String, String)] // 数组形式返回所有列名及类型。
createGlobalTempView(viewName: String): Unit //创建全局临时视图(view),生命周期与Spark应用一致。可以跨session访问。e.g. SELECT * FROM global_temp.view1.
createOrReplaceGlobalTempView(viewName: String): Unit //同上,已存在则替换。
createTempView(viewName: String): Unit//创建本地临时视图(view),仅当前SparkSession可访问。不跟任何库绑定,不能用db1.view1这样的形式访问。
createOrReplaceTempView(viewName: String): Unit //同上,已存在则替换。
explain(): Unit //打印物理执行计划 queryExecution变量,完整执行计划。
explain(extended: Boolean): Unit //打印物理+逻辑执行计划
hint(name: String, parameters: Any*): Dataset[T] //当前dataset指定hint。//todoe.g. df1.join(df2.hint("broadcast"))
inputFiles: Array[String] //返回组成Dataset的输入文件
isLocal: Boolean //collect和take是否可以本地执行,不需要executor.
localCheckpoint(eager: Boolean): Dataset[T] //执行本地Checkpoint,返回新dataset。
printSchema(): Unit //打印schema结构
rdd: RDD[T] //dataset转换成RDD
storageLevel: StorageLevel //当前存储等级,没有被persist则是StorageLevel.NONE
toDF(colNames: String*): DataFrame //转为DataFrame,也可以将RDD转为DataFrame。
unpersist(blocking: Boolean): Dataset.this.type //删除缓存,blocking表示是否等所有blocks删除后才返回,删除期间阻塞。
write: DataFrameWriter[T] //DataFrameWriter,非流式数据写接口。
writeStream: DataStreamWriter[T] //DataStreamWriter,流式数据写接口。
isStreaming: Boolean //是否流式数据
alias(alias: Symbol): Dataset[T] //给Dataset一个别名
alias(alias: String): Dataset[T] //给Dataset一个别名
as(alias: Symbol): Dataset[T] //给Dataset一个别名
as(alias: String): Dataset[T] //给Dataset一个别名
coalesce(numPartitions: Int): Dataset[T] // 分区合并,只能用于分区数变少时
dropDuplicates(col1: String, cols: String*): Dataset[T] //根据指定字段,对数据去重。
except(other: Dataset[T]): Dataset[T] //排除掉参数中的行
filter(func: (T) ⇒ Boolean): Dataset[T] //根据条件过滤行 e.g. peopleDs.filter($"age" > 15)
filter(conditionExpr: String): Dataset[T] //根据条件过滤行 e.g. peopleDs.filter("age > 15")
flatMap[U](func: (T) ⇒ TraversableOnce[U])(implicit arg0: Encoder[U]): Dataset[U]// map处理后压扁元素
groupByKey[K](func: (T) ⇒ K)(implicit arg0: Encoder[K]): KeyValueGroupedDataset[K, T] //现根据func函数生成key,然后按key分组。
intersect(other: Dataset[T]): Dataset[T] //求两个dataset的交集,等同于INTERSECT in SQL.
joinWith[U](other: Dataset[U], condition: Column): Dataset[(T, U)]//inner equi-join两个dataset
joinWith[U](other: Dataset[U], condition: Column, joinType: String): Dataset[(T, U)]//joinType可选:inner, cross, outer, full, full_outer, left, left_outer, right, right_outer
limit(n: Int): Dataset[T]//返回前n行,与head的区别是,head是一个action,会马上返回结果数组。
map[U](func: (T) ⇒ U)(implicit arg0: Encoder[U]): Dataset[U]//在每一个元素应用func函数,返回包含结果集的dataset。
mapPartitions[U](func: (Iterator[T]) ⇒ Iterator[U])(implicit arg0: Encoder[U]): Dataset[U]//在每一个分区应用func函数,返回包含结果集的dataset。
sort(sortCol: String, sortCols: String*): Dataset[T] //按指定列排序,默认asc。
orderBy(sortCol: String, sortCols: String*): Dataset[T]//sort的别名
sortWithinPartitions(sortCol: String, sortCols: String*): Dataset[T] //分区内排序,同"SORT BY" in SQL (Hive QL).
randomSplit(weights: Array[Double], seed: Long): Array[Dataset[T]] //按权重随机分割数据
repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] //按指定表达式,分区数,重新分区(hash),同"DISTRIBUTE BY" in SQL。默认分区数为spark.sql.shuffle.partitions
repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T] //按指定表达式,分区数,重新分区,采用Range partition方式,按键范围分区。分区默认排序方式为ascending nulls first,分区内数据未排序。
sample(withReplacement: Boolean, fraction: Double, seed: Long): Dataset[T] //随机取样本数据
select[U1](c1: TypedColumn[T, U1]): Dataset[U1] //根据列/表达式获取列数据
transform[U](t: (Dataset[T]) ⇒ Dataset[U]): Dataset[U] //应用t函数转换Dataset。
union(other: Dataset[T]): Dataset[T] //等于UNION ALL in SQL。注意是按列位置合并:
unionByName(other: Dataset[T]): Dataset[T] //同union方法,但是按列名合并:
where(conditionExpr: String): Dataset[T] //filter的别名

//Untyped transformations
agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame//在整个dataset进行聚合。
ds.agg(...) 是 ds.groupBy().agg(...) 的简写。
e.g.
ds.agg(max($"age"), avg($"salary"))
ds.agg(Map("age" -> "max", "salary" -> "avg"))
ds.agg("age" -> "max", "salary" -> "avg")

apply(colName: String): Column //返回指定列。
col(colName: String): Column //返回指定列。
colRegex(colName: String): Column //返回指定列。
crossJoin(right: Dataset[_]): DataFrame //cross join。
cube(col1: String, cols: String*): RelationalGroupedDataset //使用指定列创建多维cube。
drop(col: Column): DataFrame //剪掉指定字段。
groupBy(col1: String, cols: String*): RelationalGroupedDataset //按指定列分组
join(right: Dataset[_], usingColumns: Seq[String], joinType: String): DataFrame//与另一个DataFrame join。joinType:Default inner. Must be one of: inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, left_anti.
na: DataFrameNaFunctions //见DataFrameNaFunctions
stat: DataFrameStatFunctions //见DataFrameStatFunctions
rollup(col1: String, cols: String*): RelationalGroupedDataset //使用指定列进行rollup聚合。
select(col: String, cols: String*): DataFrame //选取指定列、SQL表达式。
selectExpr(exprs: String*): DataFrame //选取指定列、SQL表达式。
withColumn(colName: String, col: Column): DataFrame//新增或替换一列。
withColumnRenamed(existingName: String, newName: String): DataFrame//将指定列更名。

Spark SQL 函数

​ Spark SQL的一个用途是执行SQL查询。Spark SQL还可用于从现有Hive中读取数据。 有关如何配置此功能的更多信息,请参阅Hive Tables部分。 从其他编程语言中运行SQL时,结果将作为Dataset/DataFrame返回。 还可以使用命令行或JDBC/ODBC与SQL接口进行交互

org.apache.spark.sql.functions是一个Object,提供了约两百多个函数。
大部分函数与Hive的差不多。
除UDF函数,均可在spark-sql中直接使用。
经过import org.apache.spark.sql.functions._ ,也可以用于Dataframe,Dataset。
version
2.3.0
大部分支持Column的函数也支持String类型的列名。这些函数的返回类型基本都是Column。
函数很多,都在下面了。

聚合函数(agg)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
approx_count_distinct //count_distinct近似值
avg //平均值
collect_list //聚合指定字段的值到list
collect_set //聚合指定字段的值到set
corr //计算两列的Pearson相关系数
count //计数
countDistinct //去重计数 SQL中用法
covar_pop //总体协方差(population covariance)
covar_samp //样本协方差(sample covariance)
first //一个元素
last //后一个元素
grouping
grouping_id
kurtosis //态(kurtosis)值
skewness //度(skewness)
max //最大值
min //最小值
mean //平均值
stddev //即stddev_samp
stddev_samp //样本标准偏差(sample standard deviation)
stddev_pop //总体标准偏差(population standard deviation)
sum //求和
sumDistinct //非重复值求和 SQL中用法
select sum(distinct class)
select count(distinct class)
var_pop //总体方差(population variance)
var_samp //样本无偏方差(unbiased variance)
variance //即var_samp

集合函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
array_contains(column,value) //检查array类型字段是否包含指定元素
explode //展开array或map为多行
explode_outer //同explode,但当array或map为空或null时,会展开为null。
posexplode //同explode,带位置索引。
posexplode_outer //同explode_outer,带位置索引。
from_json //解析JSON字符串为StructType or ArrayType,有多种参数形式,详见文档。
to_json //转为json字符串,支持StructType, ArrayType of StructTypes, a MapType or ArrayType of MapTypes。
get_json_object(column,path) //获取指定json路径的json对象字符串。
select get_json_object('{"a"1,"b":2}','$.a'); //[JSON Path介绍](http://blog.csdn.net/koflance/article/details/63262484)
json_tuple(column,fields) //获取json中指定字段值。select json_tuple('{"a":1,"b":2}','a','b');
map_keys //返回map的键组成的array
map_values //返回map的值组成的array
size //array or map的长度
sort_array(e: Column, asc: Boolean) //将array中元素排序(自然排序),默认asc。

时间函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
add_months(startDate: Column, numMonths: Int) //指定日期添加n月
date_add(start: Column, days: Int) //指定日期之后n天 e.g. select date_add('2018-01-01',3)
date_sub(start: Column, days: Int) //指定日期之前n天
datediff(end: Column, start: Column) //两日期间隔天数
current_date() //当前日期
current_timestamp() //当前时间戳,TimestampType类型
date_format(dateExpr: Column, format: String) //日期格式化
dayofmonth(e: Column) //日期在一月中的天数,支持 date/timestamp/string
dayofyear(e: Column) //日期在一年中的天数, 支持 date/timestamp/string
weekofyear(e: Column) //日期在一年中的周数, 支持 date/timestamp/string
from_unixtime(ut: Column, f: String)//戳转字符串格式
from_utc_timestamp(ts: Column, tz: String)//戳转指定时区时间戳
to_utc_timestamp(ts: Column, tz: String)//时区时间戳转UTF时间戳
hour(e: Column)//小时值
minute(e: Column)//分钟值
month(e: Column)//提取月份值
quarter(e: Column)//提取季度
second(e: Column)//提取秒
year(e: Column)://提取年
last_day(e: Column)//指定日期的月末日期
months_between(date1: Column, date2: Column) //计算两日期差几个月
next_day(date: Column, dayOfWeek: String) //计算指定日期之后的下一个周一、二...,dayOfWeek区分大小写,只接受 "Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"。
to_date(e: Column) //字段类型转为DateType
trunc(date: Column, format: String) //日期截断
unix_timestamp(s: Column, p: String) //指定格式的时间字符串转时间戳
unix_timestamp(s: Column) //同上,默认格式为 yyyy-MM-dd HH:mm:ss
unix_timestamp()://当前时间戳(秒),底层实现为unix_timestamp(current_timestamp(), yyyy-MM-dd HH:mm:ss)
window(timeColumn: Column, windowDuration: String, slideDuration: String, startTime: String)//时间窗口函数,将指定时间(TimestampType)划分到窗口

数学函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
cos,sin,tan //计算角度的余弦,正弦。。。
sinh,tanh,cosh //计算双曲正弦,正切,。。
acos,asin,atan,atan2 //计算余弦/正弦值对应的角度
bin //将long类型转为对应二进制数值的字符串For example, bin("12") returns "1100".
bround //舍入,使用Decimal的HALF_EVEN模式,v>0.5向上舍入,v< 0.5向下舍入,v0.5向最近的偶数舍入。
round(e: Column, scale: Int) //HALF_UP模式舍入到scale为小数点。v>=0.5向上舍入,v< 0.5向下舍入,即四舍五入。
ceil //向上舍入
floor //向下舍入
cbrt //Computes the cube-root of the given value.
conv(num:Column, fromBase: Int, toBase: Int) //转换数值(字符串)的进制
log(base: Double, a: Column):$log_{base}(a)$
log(a: Column):$log_e(a)$
log10(a: Column):$log_{10}(a)$
log2(a: Column):$log_{2}(a)$
log1p(a: Column):$log_{e}(a+1)$
pmod(dividend: Column, divisor: Column)://Returns the positive value of dividend mod divisor.
pow(l: Double, r: Column):$r^l$ //注意r是列
pow(l: Column, r: Double):$r^l$ //注意l是列
pow(l: Column, r: Column):$r^l$ //注意r,l都是列
radians(e: Column)://角度转弧度
rint(e: Column)://Returns the double value that is closest in value to the argument and is equal to a mathematical integer.
shiftLeft(e: Column, numBits: Int)://向左位移
shiftRight(e: Column, numBits: Int)://向右位移
shiftRightUnsigned(e: Column, numBits: Int)://向右位移(无符号位)
signum(e: Column)://返回数值正负符号
sqrt(e: Column)://平方根
hex(column: Column)://转十六进制
unhex(column: Column)://逆转十六进制

混杂(misc)函数

1
2
3
4
5
crc32(e: Column)://计算CRC32,返回bigint
hash(cols: Column*)://计算 hash code,返回int
md5(e: Column)://计算MD5摘要,返回32位,16进制字符串
sha1(e: Column)://计算SHA-1摘要,返回40位,16进制字符串
sha2(e: Column, numBits: Int)://计算SHA-1摘要,返回numBits位,16进制字符串。numBits支持224, 256, 384, or 512.

其他非聚合函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
abs(e: Column) //绝对值
array(cols: Column*) //多列合并为array,cols必须为同类型
map(cols: Column*): //将多列组织为map,输入列必须为(key,value)形式,各列的key/value分别为同一类型。
bitwiseNOT(e: Column): //Computes bitwise NOT.
broadcast[T](df: Dataset[T]): Dataset[T]: //将df变量广播,用于实现broadcast join。如left.join(broadcast(right), "joinKey")
coalesce(e: Column*): //返回第一个非空值
col(colName: String): //返回colName对应的Column
column(colName: String): //col函数的别名
expr(expr: String): //解析expr表达式,将返回值存于Column,并返回这个Column。
greatest(exprs: Column*): //返回多列中的最大值,跳过Null
least(exprs: Column*): //返回多列中的最小值,跳过Null
input_file_name(): //返回当前任务的文件名 ??
isnan(e: Column): //检查是否NaN(非数值)
isnull(e: Column): //检查是否为Null
lit(literal: Any): //将字面量(literal)创建一个Column
typedLit[T](literal: T)(implicit arg0: scala.reflect.api.JavaUniverse.TypeTag[T]): //将字面量(literal)创建一个Column,literal支持 scala types e.g.: List, Seq and Map.
monotonically_increasing_id(): //返回单调递增唯一ID,但不同分区的ID不连续。ID为64位整型。
nanvl(col1: Column, col2: Column): //col1为NaN则返回col2
negate(e: Column): //负数,同df.select( -df("amount") )
not(e: Column): //取反,同df.filter( !df("isActive") )
rand(): //随机数[0.0, 1.0]
rand(seed: Long): //随机数[0.0, 1.0],使用seed种子
randn(): //随机数,从正态分布取
randn(seed: Long): //同上
spark_partition_id(): //返回partition ID
struct(cols: Column*)://多列组合成新的struct column ??
when(condition: Column, value: Any): //当condition为true返回value,如
people.select(when(people("gender") === "male", 0)
.when(people("gender") === "female", 1)
.otherwise(2))
//如果没有otherwise且condition全部没命中,则返回null.

排序函数

1
2
3
4
5
6
asc(columnName: String):正序
asc_nulls_first(columnName: String):正序,null排最前
asc_nulls_last(columnName: String):正序,null排最后
e.g.
df.sort(asc("dept"), desc("age")) //对应有desc函数
desc,desc_nulls_first,desc_nulls_last

字符串函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
ascii(e: Column): //计算第一个字符的ascii码
base64(e: Column): //base64转码
unbase64(e: Column): //base64解码
concat(exprs: Column*)://连接多列字符串
concat_ws(sep: String, exprs: Column*)://使用sep作为分隔符连接多列字符串
decode(value: Column, charset: String): //解码
encode(value: Column, charset: String): //转码,charset支持 'US-ASCII', 'ISO-8859-1', 'UTF-8', 'UTF-16BE', 'UTF-16LE', 'UTF-16'。
format_number(x: Column, d: Int)://格式化'#,###,###.##'形式的字符串
format_string(format: String, arguments: Column*): //将arguments按format格式化,格式为printf-style。
initcap(e: Column): //单词首字母大写
lower(e: Column): //转小写
upper(e: Column): //转大写
instr(str: Column, substring: String)://substring在str中第一次出现的位置
length(e: Column): //字符串长度
levenshtein(l: Column, r: Column): //计算两个字符串之间的编辑距离(Levenshtein distance)
locate(substr: String, str: Column): //substring在str中第一次出现的位置,位置编号从1开始,0表示未找到。
locate(substr: String, str: Column, pos: Int): //同上,但从pos位置后查找。
lpad(str: Column, len: Int, pad: String)://字符串左填充。用pad字符填充str的字符串至len长度。有对应的rpad,右填充。
ltrim(e: Column)://剪掉左边的空格、空白字符,对应有rtrim.
ltrim(e: Column, trimString: String)://剪掉左边的指定字符,对应有rtrim.
trim(e: Column, trimString: String)://剪掉左右两边的指定字符
trim(e: Column)://剪掉左右两边的空格、空白字符
regexp_extract(e: Column, exp: String, groupIdx: Int): //正则提取匹配的组
regexp_replace(e: Column, pattern: Column, replacement: Column): //正则替换匹配的部分,这里参数为列。
regexp_replace(e: Column, pattern: String, replacement: String): //正则替换匹配的部分
repeat(str: Column, n: Int)://将str重复n次返回
reverse(str: Column): //将str反转
soundex(e: Column): //计算桑迪克斯代码(soundex code)PS:用于按英语发音来索引姓名,发音相同但拼写不同的单词,会映射成同一个码。
split(str: Column, pattern: String): //用pattern分割str
substring(str: Column, pos: Int, len: Int): //在str上截取从pos位置开始长度为len的子字符串。
translate(src: Column, matchingString: String, replaceString: String)://把src中的matchingString全换成replaceString。

UDF函数

1
2
3
4
5
6
7
8
user-defined function.
callUDF(udfName: String, cols: Column*): //调用UDF
import org.apache.spark.sql._
val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id", "value")
val spark = df.sparkSession
spark.udf.register("simpleUDF", (v: Int) => v * v)
df.select($"id", callUDF("simpleUDF", $"value"))
udf: 定义UDF

窗口函数

1
2
3
4
5
6
7
8
9
10
cume_dist(): //cumulative distribution of values within a window partition
currentRow(): //returns the special frame boundary that represents the current row in the window partition.
rank()://排名,返回数据项在分组中的排名,排名相等会在名次中留下空位 1,2,2,4。
dense_rank(): //排名,返回数据项在分组中的排名,排名相等会在名次中不会留下空位 1,2,2,3。
row_number()://行号,为每条记录返回一个数字 1,2,3,4
percent_rank()://returns the relative rank (i.e. percentile) of rows within a window partition.
lag(e: Column, offset: Int, defaultValue: Any): //offset rows before the current row
lead(e: Column, offset: Int, defaultValue: Any): //returns the value that is offset rows after the current row
ntile(n: Int): //returns the ntile group id (from 1 to n inclusive) in an ordered window partition.
unboundedFollowing()://returns the special frame boundary that represents the last row in the window partition.

Spark RDD

参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
context: SparkContext
创建RDDSparkContext

sparkContext: SparkContext
创建RDDSparkContext

dependencies: Seq[Dependency[_]]
RDD的依赖列表

getNumPartitions: Int
获取RDD的分区数

getStorageLevel: StorageLevel
获取存储等级,如果设置为none,则返回StorageLevel.NONE

id: Int
RDD的unique ID

isCheckpointed: Boolean
是否checkpointed and materialized, either reliably or locally.

name: String
RDD的名字

partitioner: Option[Partitioner]
分区器

partitions: Array[Partition]
各个分区

保存数据

1
2
3
4
5
6
saveAsObjectFile(path: String): Unit //保存为SequenceFile

saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit

saveAsTextFile(path: String): Unit
保存为文本文件

键值转换操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
filter(f: (T) ⇒ Boolean): RDD[T] //过滤数据,仅留下使得f返回true的元素。
map[U](f: (T) ⇒ U)(implicit arg0: ClassTag[U]): RDD[U]
将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素。
输入分区与输出分区一对一,即:有多少个输入分区,就有多少个输出分区。

flatMap[U](f: (T) ⇒ TraversableOnce[U])(implicit arg0: ClassTag[U]): RDD[U]
第一步和map一样,最后将所有的输出分区合并成一个。
使用flatMap时候需要注意:
flatMap会将字符串看成是一个字符数组。

mapPartitions[U](f: (Iterator[T]) ⇒ Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]
该函数和map函数类似,只不过映射函数的参数由RDD中的每一个元素变成了RDD中每一个分区的迭代器。如果在映射的过程中需要频繁创建额外的对象,使用mapPartitions要比map高效的过。
比如,将RDD中的所有数据通过JDBC连接写入数据库,如果使用map函数,可能要为每一个元素都创建一个connection,这样开销很大,如果使用mapPartitions,那么只需要针对每一个分区建立一个connection。
参数preservesPartitioning表示是否保留父RDD的partitioner分区信息。

mapPartitionsWithIndex[U](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]
函数作用同mapPartitions,不过提供了两个参数,第一个参数为分区的索引。

keyBy[K](f: (T) ⇒ K): RDD[(K, T)]
通过f函数为每个元素生成一个KEY

sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
通过给定的函数对元素排序

zip[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]
与另一个RDD组合成(k,v)对。

zipPartitions[B, V](rdd2: RDD[B], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B]) ⇒ Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[V]): RDD[V]

zipWithIndex(): RDD[(T, Long)]

zipWithUniqueId(): RDD[(T, Long)]

聚合相关

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U
aggregate用户聚合RDD中的元素,先使用seqOp将RDD中每个分区中的T类型元素聚合成U类型,再使用combOp将之前每个分区聚合后的U类型聚合成U类型,特别注意seqOp和combOp都会使用zeroValue的值,zeroValue的类型为U

treeAggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U, depth: Int = 2)(implicit arg0: ClassTag[U]): U //多层级聚合
reduce(f: (T, T) ⇒ T): T //根据映射函数f,对RDD中的元素进行二元计算,返回计算结果。
treeReduce(f: (T, T) ⇒ T, depth: Int = 2): T //多级reduce归并聚合
fold(zeroValue: T)(op: (T, T) ⇒ T): T //fold是aggregate的简化,将aggregate中的seqOp和combOp使用同一个函数op。
count(): Long //count返回RDD中的元素数量。
countApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] //近似count
countApproxDistinct(relativeSD: Double = 0.05): Long
countApproxDistinct(p: Int, sp: Int): Long //近似distinct count
countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] //计算每个值出现次数
countByValueApprox(timeout: Long, confidence: Double = 0.95)(implicit ord: Ordering[T] = null)://计算每个值出现次数近似值
distinct(): RDD[T]
distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]//返回元素去重后的RDD
groupBy[K](f: (T) ⇒ K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
groupBy[K](f: (T) ⇒ K, numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
groupBy[K](f: (T) ⇒ K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null): RDD[(K, Iterable[T])]
按指定函数生成key,并按key分组。
注意:性能比较差,推荐用PairRDDFunctions.reduceByKey or PairRDDFunctions.aggregateByKey.
因为reduceByKey会先在分区内做聚合,再进行数据交换(shuffle)。

glom(): RDD[Array[T]] //该函数是将RDD中每一个分区中类型为T的元素转换成Array[T],这样每一个分区就只有一个数组元素。
max()(implicit ord: Ordering[T]): T //最大的元素
min()(implicit ord: Ordering[T]): T //最小的元素

遍历元素

1
2
3
4
5
6
7
foreach(f: (T) ⇒ Unit): Unit
foreach用于遍历RDD,将函数f应用于每一个元素。
但要注意,如果对RDD执行foreach,只会在Executor端有效,而并不是Driver端。
比如:rdd.foreach(println),只会在Executor的stdout中打印出来,Driver端是看不到的。

foreachPartition(f: (Iterator[T]) ⇒ Unit): Unit
foreachPartition和foreach类似,只不过是对每一个分区使用f。

取元素相关

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
collect(): Array[T]
collect用于将一个RDD转换成数组。

first(): T
first返回RDD中的第一个元素,不排序。

take(num: Int): Array[T]
take用于获取RDD中从0到num-1下标的元素,不排序。

top(num: Int)(implicit ord: Ordering[T]): Array[T]
top函数用于从RDD中,按照默认(降序)或者指定的排序规则,返回前num个元素。

takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
takeOrdered和top类似,只不过以和top相反的顺序返回元素

takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]
取样本元素

集合间运算

1
2
3
4
5
6
7
8
9
10
11
12
++(other: RDD[T]): RDD[T] //与另一个RDD union。
intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
intersection(other: RDD[T], numPartitions: Int): RDD[T]
intersection(other: RDD[T]): RDD[T]
取交集

subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
subtract(other: RDD[T], numPartitions: Int): RDD[T]
subtract(other: RDD[T]): RDD[T]
求差集

union(other: RDD[T]): RDD[T] //与另一个RDD合并,类似union all,不会去重。

其他

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
persist(): RDD.this.type
persist(newLevel: StorageLevel): RDD.this.type
缓存数据,可设置缓存级别(如果尚未设置过,才可以设置,本地checkpoint除外)

unpersist(blocking: Boolean = true): RDD.this.type
Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.

cache(): RDD.this.type //MEMORY_ONLY级别缓存数据

cartesian[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]: //计算两个RDD的迪卡尔积

checkpoint(): Unit //标记将该RDD进行checkpoint处理?

coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T]
分区合并(只能减少分区),使用HashPartitioner
第一个参数为重分区的数目,第二个为是否进行shuffle,默认为false;

repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
调整分区数,会导致shuffle,如果是减少分区,可以使用coalesce,避免shuffle。

toDebugString: String//返回RDD依赖树/血统图

getCheckpointFile: Option[String]
获取checkpoint文件夹名称

localCheckpoint(): RDD.this.type
标记为使用本地checkpoint

isEmpty(): Boolean
是否含0个元素

iterator(split: Partition, context: TaskContext): Iterator[T]
返回迭代器,不应直接调用,而是给RDD的子类用的。

toLocalIterator: Iterator[T] //返回元素的本地迭代器

pipe(command: String): RDD[String]
pipe(command: String, env: Map[String, String]): RDD[String]
pipe(command: Seq[String], env: Map[String, String] = Map(), printPipeContext: ((String) ⇒ Unit) ⇒ Unit = null, printRDDElement: (T, (String) ⇒ Unit) ⇒ Unit = null, separateWorkingDir: Boolean = false, bufferSize: Int = 8192, encoding: String = Codec.defaultCharsetCodec.name): RDD[String]
调用外部进程处理RDD,如通过标准输入传给shell脚本。

preferredLocations(split: Partition): Seq[String]
Get the preferred locations of a partition, taking into account whether the RDD is checkpointed.

randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]
按权随机将元素分组

sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]
取样本/子集

setName(_name: String): RDD.this.type //设置RDD名字