Spark SQL
Spark SQL是一个Spark模块用于结构化数据处理。与基本的Spark RDD API不同,Spark SQL提供的接口为Spark提供了有关数据结构和正在执行的计算的更多信息。 在内部,Spark SQL使用此额外信息来执行额外的优化。 有几种与Spark SQL交互的方法,包括SQL和Dataset API。 在使用相同的执行引擎计算结果时,与使用表达计算的API或者语言无关。 这种统一意味着开发人员可以轻松地在不同的API之间来回切换,从而提供表达给定转换的最自然的方式。
SparkSession
Spark中所有功能的入口点是SparkSession类
1 2 3 4 5 6 7 8 import org.apache.spark.sql.SparkSession val spark = SparkSession .builder() .appName("Spark SQL basic example" ) .config("spark.some.config.option" , "some-value" ) .getOrCreate() import spark.implicits._
Dataset &DataFrame
Dataset数据集是分布式数据集合。数据集是Spark 1.6中添加的一个新接口,它提供了RDD的优势(强类型,使用强大的lambda函数的能力)和Spark SQL优化执行引擎的优点。 数据集可以从JVM对象构造,然后使用功能转换(map,flatMap,filter等)进行操作。 数据集API在Scala和Java中可用。 Python没有对Dataset API的支持。 但由于Python的动态特性,数据集API的许多好处已经可用(即可以通过名称自然地访问行的字段row.columnName)。 R的情况类似。
DataFrame是一个组织成命名列的数据集。 它在概念上等同于关系数据库中的表或R / Python中的数据框,但在底层具有更丰富的优化。 DataFrame可以从多种来源构建,例如:结构化数据文件,Hive中的表,外部数据库或现有RDD。 DataFrame API在Scala,Java,Python和R中可用。在Scala和Java中,DataFrame由行数据集表示。 在Scala API中,DataFrame只是Dataset[Row]的类型别名。 而在Java API中,用户需要使用Dataset来表示DataFrame。
Spark SQL Dataset & DataFrame API
Actions
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 class Dataset [T ] extends Serializable collect(): Array [T ] collectAsList(): List [T ] count(): Long describe(cols: String *): DataFrame head(): T head(n: Int ): Array [T ] first(): T foreach(f: (T ) ⇒ Unit ): Unit foreachPartition(f: (Iterator [T ]) ⇒ Unit ): Unit reduce(func: (T , T ) ⇒ T ): T show(numRows: Int , truncate: Int , vertical: Boolean ): Unit summary(statistics: String *): DataFrame take(n: Int ): Array [T ] takeAsList(n: Int ): List [T ] toLocalIterator(): Iterator [T ]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 as[U ](implicit arg0: Encoder [U ]): Dataset [U ] persist(newLevel: StorageLevel ): Dataset .this .type persist(): Dataset .this .type cache(): Dataset .this .type checkpoint(eager: Boolean ): Dataset [T ] checkpoint(): Dataset [T ] columns: Array [String ] dtypes: Array [(String , String )] createGlobalTempView(viewName: String ): Unit createOrReplaceGlobalTempView(viewName: String ): Unit createTempView(viewName: String ): Unit createOrReplaceTempView(viewName: String ): Unit explain(): Unit explain(extended: Boolean ): Unit hint(name: String , parameters: Any *): Dataset [T ] inputFiles: Array [String ] isLocal: Boolean localCheckpoint(eager: Boolean ): Dataset [T ] printSchema(): Unit rdd: RDD [T ] storageLevel: StorageLevel toDF(colNames: String *): DataFrame unpersist(blocking: Boolean ): Dataset .this .type write: DataFrameWriter [T ] writeStream: DataStreamWriter [T ] isStreaming: Boolean alias(alias: Symbol ): Dataset [T ] alias(alias: String ): Dataset [T ] as(alias: Symbol ): Dataset [T ] as(alias: String ): Dataset [T ] coalesce(numPartitions: Int ): Dataset [T ] dropDuplicates(col1: String , cols: String *): Dataset [T ] except(other: Dataset [T ]): Dataset [T ] filter(func: (T ) ⇒ Boolean ): Dataset [T ] filter(conditionExpr: String ): Dataset [T ] flatMap[U ](func: (T ) ⇒ TraversableOnce [U ])(implicit arg0: Encoder [U ]): Dataset [U ] groupByKey[K ](func: (T ) ⇒ K )(implicit arg0: Encoder [K ]): KeyValueGroupedDataset [K , T ] intersect(other: Dataset [T ]): Dataset [T ] joinWith[U ](other: Dataset [U ], condition: Column ): Dataset [(T , U )] joinWith[U ](other: Dataset [U ], condition: Column , joinType: String ): Dataset [(T , U )] limit(n: Int ): Dataset [T ] map[U ](func: (T ) ⇒ U )(implicit arg0: Encoder [U ]): Dataset [U ] mapPartitions[U ](func: (Iterator [T ]) ⇒ Iterator [U ])(implicit arg0: Encoder [U ]): Dataset [U ] sort(sortCol: String , sortCols: String *): Dataset [T ] orderBy(sortCol: String , sortCols: String *): Dataset [T ] sortWithinPartitions(sortCol: String , sortCols: String *): Dataset [T ] randomSplit(weights: Array [Double ], seed: Long ): Array [Dataset [T ]] repartition(numPartitions: Int , partitionExprs: Column *): Dataset [T ] repartitionByRange(numPartitions: Int , partitionExprs: Column *): Dataset [T ] sample(withReplacement: Boolean , fraction: Double , seed: Long ): Dataset [T ] select[U1 ](c1: TypedColumn [T , U1 ]): Dataset [U1 ] transform[U ](t: (Dataset [T ]) ⇒ Dataset [U ]): Dataset [U ] union(other: Dataset [T ]): Dataset [T ] unionByName(other: Dataset [T ]): Dataset [T ] where(conditionExpr: String ): Dataset [T ] agg(aggExpr: (String , String ), aggExprs: (String , String )*): DataFrame ds.agg(...) 是 ds.groupBy().agg(...) 的简写。 e.g. ds.agg(max($"age" ), avg($"salary" )) ds.agg(Map ("age" -> "max" , "salary" -> "avg" )) ds.agg("age" -> "max" , "salary" -> "avg" ) apply(colName: String ): Column col(colName: String ): Column colRegex(colName: String ): Column crossJoin(right: Dataset [_]): DataFrame cube(col1: String , cols: String *): RelationalGroupedDataset drop(col: Column ): DataFrame groupBy(col1: String , cols: String *): RelationalGroupedDataset join(right: Dataset [_], usingColumns: Seq [String ], joinType: String ): DataFrame na: DataFrameNaFunctions stat: DataFrameStatFunctions rollup(col1: String , cols: String *): RelationalGroupedDataset select(col: String , cols: String *): DataFrame selectExpr(exprs: String *): DataFrame withColumn(colName: String , col: Column ): DataFrame withColumnRenamed(existingName: String , newName: String ): DataFrame
Spark SQL 函数
Spark SQL的一个用途是执行SQL查询。Spark SQL还可用于从现有Hive中读取数据。 有关如何配置此功能的更多信息,请参阅Hive Tables部分。 从其他编程语言中运行SQL时,结果将作为Dataset/DataFrame返回。 还可以使用命令行或JDBC/ODBC与SQL接口进行交互
org.apache.spark.sql.functions是一个Object,提供了约两百多个函数。
大部分函数与Hive的差不多。
除UDF函数,均可在spark-sql中直接使用。
经过import org.apache.spark.sql.functions._ ,也可以用于Dataframe,Dataset。
version
2.3.0
大部分支持Column的函数也支持String类型的列名。这些函数的返回类型基本都是Column。
函数很多,都在下面了。
聚合函数(agg)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 approx_count_distinct avg collect_list collect_set corr count countDistinct covar_pop covar_samp first last grouping grouping_id kurtosis skewness max min mean stddev stddev_samp stddev_pop sum sumDistinct select sum(distinct class ) select count(distinct class ) var_pop var_samp variance
集合函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 array_contains(column,value) explode explode_outer posexplode posexplode_outer from_json to_json get_json_object(column,path) select get_json_object('{"a" 1 ,"b" :2 }','$.a'); json_tuple(column,fields) map_keys map_values size sort_array(e: Column , asc: Boolean )
时间函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 add_months(startDate: Column , numMonths: Int ) date_add(start: Column , days: Int ) date_sub(start: Column , days: Int ) datediff(end: Column , start: Column ) current_date() current_timestamp() date_format(dateExpr: Column , format: String ) dayofmonth(e: Column ) dayofyear(e: Column ) weekofyear(e: Column ) from_unixtime(ut: Column , f: String ) from_utc_timestamp(ts: Column , tz: String ) to_utc_timestamp(ts: Column , tz: String ) hour(e: Column ) minute(e: Column ) month(e: Column ) quarter(e: Column ) second(e: Column ) year(e: Column ): last_day(e: Column ) months_between(date1: Column , date2: Column ) next_day(date: Column , dayOfWeek: String ) to_date(e: Column ) trunc(date: Column , format: String ) unix_timestamp(s: Column , p: String ) unix_timestamp(s: Column ) unix_timestamp(): window(timeColumn: Column , windowDuration: String , slideDuration: String , startTime: String )
数学函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 cos,sin,tan sinh,tanh,cosh acos,asin,atan,atan2 bin bround round(e: Column , scale: Int ) ceil floor cbrt conv(num:Column , fromBase: Int , toBase: Int ) log(base: Double , a: Column ):$log_{base}(a)$ log(a: Column ):$log_e(a)$ log10(a: Column ):$log_{10 }(a)$ log2(a: Column ):$log_{2 }(a)$ log1p(a: Column ):$log_{e}(a+1 )$ pmod(dividend: Column , divisor: Column ): pow(l: Double , r: Column ):$r^l$ pow(l: Column , r: Double ):$r^l$ pow(l: Column , r: Column ):$r^l$ radians(e: Column ): rint(e: Column ): shiftLeft(e: Column , numBits: Int ): shiftRight(e: Column , numBits: Int ): shiftRightUnsigned(e: Column , numBits: Int ): signum(e: Column ): sqrt(e: Column ): hex(column: Column ): unhex(column: Column ):
混杂(misc)函数
1 2 3 4 5 crc32(e: Column ): hash(cols: Column *): md5(e: Column ): sha1(e: Column ): sha2(e: Column , numBits: Int ):
其他非聚合函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 abs(e: Column ) array(cols: Column *) map(cols: Column *): bitwiseNOT(e: Column ): broadcast[T ](df: Dataset [T ]): Dataset [T ]: coalesce(e: Column *): col(colName: String ): column(colName: String ): expr(expr: String ): greatest(exprs: Column *): least(exprs: Column *): input_file_name(): isnan(e: Column ): isnull(e: Column ): lit(literal: Any ): typedLit[T ](literal: T )(implicit arg0: scala.reflect.api.JavaUniverse .TypeTag [T ]): monotonically_increasing_id(): nanvl(col1: Column , col2: Column ): negate(e: Column ): not(e: Column ): rand(): rand(seed: Long ): randn(): randn(seed: Long ): spark_partition_id(): struct(cols: Column *): when(condition: Column , value: Any ): people.select(when(people("gender" ) === "male" , 0 ) .when(people("gender" ) === "female" , 1 ) .otherwise(2 ))
排序函数
1 2 3 4 5 6 asc(columnName: String ):正序 asc_nulls_first(columnName: String ):正序,null 排最前 asc_nulls_last(columnName: String ):正序,null 排最后 e.g. df.sort(asc("dept" ), desc("age" )) desc,desc_nulls_first,desc_nulls_last
字符串函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 ascii(e: Column ): base64(e: Column ): unbase64(e: Column ): concat(exprs: Column *): concat_ws(sep: String , exprs: Column *): decode(value: Column , charset: String ): encode(value: Column , charset: String ): format_number(x: Column , d: Int ): format_string(format: String , arguments: Column *): initcap(e: Column ): lower(e: Column ): upper(e: Column ): instr(str: Column , substring: String ): length(e: Column ): levenshtein(l: Column , r: Column ): locate(substr: String , str: Column ): locate(substr: String , str: Column , pos: Int ): lpad(str: Column , len: Int , pad: String ): ltrim(e: Column ): ltrim(e: Column , trimString: String ): trim(e: Column , trimString: String ): trim(e: Column ): regexp_extract(e: Column , exp: String , groupIdx: Int ): regexp_replace(e: Column , pattern: Column , replacement: Column ): regexp_replace(e: Column , pattern: String , replacement: String ): repeat(str: Column , n: Int ): reverse(str: Column ): soundex(e: Column ): split(str: Column , pattern: String ): substring(str: Column , pos: Int , len: Int ): translate(src: Column , matchingString: String , replaceString: String ):
UDF函数
1 2 3 4 5 6 7 8 user-defined function. callUDF(udfName: String , cols: Column *): import org.apache.spark.sql._val df = Seq (("id1" , 1 ), ("id2" , 4 ), ("id3" , 5 )).toDF("id" , "value" )val spark = df.sparkSessionspark.udf.register("simpleUDF" , (v: Int ) => v * v) df.select($"id" , callUDF("simpleUDF" , $"value" )) udf: 定义UDF
窗口函数
1 2 3 4 5 6 7 8 9 10 cume_dist(): currentRow(): rank(): dense_rank(): row_number(): percent_rank(): lag(e: Column , offset: Int , defaultValue: Any ): lead(e: Column , offset: Int , defaultValue: Any ): ntile(n: Int ): unboundedFollowing():
Spark RDD
参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 context: SparkContext 创建RDD 的SparkContext sparkContext: SparkContext 创建RDD 的SparkContext dependencies: Seq [Dependency [_]] RDD 的依赖列表getNumPartitions: Int 获取RDD 的分区数 getStorageLevel: StorageLevel 获取存储等级,如果设置为none,则返回StorageLevel .NONE 。 id: Int 该RDD 的unique ID isCheckpointed: Boolean 是否checkpointed and materialized, either reliably or locally. name: String RDD 的名字partitioner: Option [Partitioner ] 分区器 partitions: Array [Partition ] 各个分区
保存数据
1 2 3 4 5 6 saveAsObjectFile(path: String ): Unit saveAsTextFile(path: String , codec: Class [_ <: CompressionCodec ]): Unit saveAsTextFile(path: String ): Unit 保存为文本文件
键值转换操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 filter(f: (T ) ⇒ Boolean ): RDD [T ] map[U ](f: (T ) ⇒ U )(implicit arg0: ClassTag [U ]): RDD [U ] 将一个RDD 中的每个数据项,通过map中的函数映射变为一个新的元素。 输入分区与输出分区一对一,即:有多少个输入分区,就有多少个输出分区。 flatMap[U ](f: (T ) ⇒ TraversableOnce [U ])(implicit arg0: ClassTag [U ]): RDD [U ] 第一步和map一样,最后将所有的输出分区合并成一个。 使用flatMap时候需要注意: flatMap会将字符串看成是一个字符数组。 mapPartitions[U ](f: (Iterator [T ]) ⇒ Iterator [U ], preservesPartitioning: Boolean = false )(implicit arg0: ClassTag [U ]): RDD [U ] 该函数和map函数类似,只不过映射函数的参数由RDD 中的每一个元素变成了RDD 中每一个分区的迭代器。如果在映射的过程中需要频繁创建额外的对象,使用mapPartitions要比map高效的过。 比如,将RDD 中的所有数据通过JDBC 连接写入数据库,如果使用map函数,可能要为每一个元素都创建一个connection,这样开销很大,如果使用mapPartitions,那么只需要针对每一个分区建立一个connection。 参数preservesPartitioning表示是否保留父RDD 的partitioner分区信息。 mapPartitionsWithIndex[U ](f: (Int , Iterator [T ]) => Iterator [U ], preservesPartitioning: Boolean = false )(implicit arg0: ClassTag [U ]): RDD [U ] 函数作用同mapPartitions,不过提供了两个参数,第一个参数为分区的索引。 keyBy[K ](f: (T ) ⇒ K ): RDD [(K , T )] 通过f函数为每个元素生成一个KEY sortBy[K ](f: (T ) ⇒ K , ascending: Boolean = true , numPartitions: Int = this .partitions.length)(implicit ord: Ordering [K ], ctag: ClassTag [K ]): RDD [T ] 通过给定的函数对元素排序 zip[U ](other: RDD [U ])(implicit arg0: ClassTag [U ]): RDD [(T , U )] 与另一个RDD 组合成(k,v)对。 zipPartitions[B , V ](rdd2: RDD [B ], preservesPartitioning: Boolean )(f: (Iterator [T ], Iterator [B ]) ⇒ Iterator [V ])(implicit arg0: ClassTag [B ], arg1: ClassTag [V ]): RDD [V ] zipWithIndex(): RDD [(T , Long )] zipWithUniqueId(): RDD [(T , Long )]
聚合相关
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 aggregate[U ](zeroValue: U )(seqOp: (U , T ) ⇒ U , combOp: (U , U ) ⇒ U )(implicit arg0: ClassTag [U ]): U aggregate用户聚合RDD 中的元素,先使用seqOp将RDD 中每个分区中的T 类型元素聚合成U 类型,再使用combOp将之前每个分区聚合后的U 类型聚合成U 类型,特别注意seqOp和combOp都会使用zeroValue的值,zeroValue的类型为U 。 treeAggregate[U ](zeroValue: U )(seqOp: (U , T ) ⇒ U , combOp: (U , U ) ⇒ U , depth: Int = 2 )(implicit arg0: ClassTag [U ]): U reduce(f: (T , T ) ⇒ T ): T treeReduce(f: (T , T ) ⇒ T , depth: Int = 2 ): T fold(zeroValue: T )(op: (T , T ) ⇒ T ): T count(): Long countApprox(timeout: Long , confidence: Double = 0.95 ): PartialResult [BoundedDouble ] countApproxDistinct(relativeSD: Double = 0.05 ): Long countApproxDistinct(p: Int , sp: Int ): Long countByValue()(implicit ord: Ordering [T ] = null ): Map [T , Long ] countByValueApprox(timeout: Long , confidence: Double = 0.95 )(implicit ord: Ordering [T ] = null ): distinct(): RDD [T ] distinct(numPartitions: Int )(implicit ord: Ordering [T ] = null ): RDD [T ] groupBy[K ](f: (T ) ⇒ K )(implicit kt: ClassTag [K ]): RDD [(K , Iterable [T ])] groupBy[K ](f: (T ) ⇒ K , numPartitions: Int )(implicit kt: ClassTag [K ]): RDD [(K , Iterable [T ])] groupBy[K ](f: (T ) ⇒ K , p: Partitioner )(implicit kt: ClassTag [K ], ord: Ordering [K ] = null ): RDD [(K , Iterable [T ])] 按指定函数生成key,并按key分组。 注意:性能比较差,推荐用PairRDDFunctions .reduceByKey or PairRDDFunctions .aggregateByKey. 因为reduceByKey会先在分区内做聚合,再进行数据交换(shuffle)。 glom(): RDD [Array [T ]] max()(implicit ord: Ordering [T ]): T min()(implicit ord: Ordering [T ]): T
遍历元素
1 2 3 4 5 6 7 foreach(f: (T ) ⇒ Unit ): Unit foreach用于遍历RDD ,将函数f应用于每一个元素。 但要注意,如果对RDD 执行foreach,只会在Executor 端有效,而并不是Driver 端。 比如:rdd.foreach(println),只会在Executor 的stdout中打印出来,Driver 端是看不到的。 foreachPartition(f: (Iterator [T ]) ⇒ Unit ): Unit foreachPartition和foreach类似,只不过是对每一个分区使用f。
取元素相关
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 collect(): Array [T ] collect用于将一个RDD 转换成数组。 first(): T first返回RDD 中的第一个元素,不排序。 take(num: Int ): Array [T ] take用于获取RDD 中从0 到num-1 下标的元素,不排序。 top(num: Int )(implicit ord: Ordering [T ]): Array [T ] top函数用于从RDD 中,按照默认(降序)或者指定的排序规则,返回前num个元素。 takeOrdered(num: Int )(implicit ord: Ordering [T ]): Array [T ] takeOrdered和top类似,只不过以和top相反的顺序返回元素 takeSample(withReplacement: Boolean , num: Int , seed: Long = Utils .random.nextLong): Array [T ] 取样本元素
集合间运算
1 2 3 4 5 6 7 8 9 10 11 12 ++(other: RDD [T ]): RDD [T ] intersection(other: RDD [T ], partitioner: Partitioner )(implicit ord: Ordering [T ] = null ): RDD [T ] intersection(other: RDD [T ], numPartitions: Int ): RDD [T ] intersection(other: RDD [T ]): RDD [T ] 取交集 subtract(other: RDD [T ], p: Partitioner )(implicit ord: Ordering [T ] = null ): RDD [T ] subtract(other: RDD [T ], numPartitions: Int ): RDD [T ] subtract(other: RDD [T ]): RDD [T ] 求差集 union(other: RDD [T ]): RDD [T ]
其他
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 persist(): RDD .this .type persist(newLevel: StorageLevel ): RDD .this .type 缓存数据,可设置缓存级别(如果尚未设置过,才可以设置,本地checkpoint除外) unpersist(blocking: Boolean = true ): RDD .this .type Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.cache(): RDD .this .type cartesian[U ](other: RDD [U ])(implicit arg0: ClassTag [U ]): RDD [(T , U )]: checkpoint(): Unit coalesce(numPartitions: Int , shuffle: Boolean = false )(implicit ord: Ordering [T ] = null ): RDD [T ] 分区合并(只能减少分区),使用HashPartitioner 。 第一个参数为重分区的数目,第二个为是否进行shuffle,默认为false ; repartition(numPartitions: Int )(implicit ord: Ordering [T ] = null ): RDD [T ] 调整分区数,会导致shuffle,如果是减少分区,可以使用coalesce,避免shuffle。 toDebugString: String getCheckpointFile: Option [String ] 获取checkpoint文件夹名称 localCheckpoint(): RDD .this .type 标记为使用本地checkpoint isEmpty(): Boolean 是否含0 个元素 iterator(split: Partition , context: TaskContext ): Iterator [T ] 返回迭代器,不应直接调用,而是给RDD 的子类用的。 toLocalIterator: Iterator [T ] pipe(command: String ): RDD [String ] pipe(command: String , env: Map [String , String ]): RDD [String ] pipe(command: Seq [String ], env: Map [String , String ] = Map (), printPipeContext: ((String ) ⇒ Unit ) ⇒ Unit = null , printRDDElement: (T , (String ) ⇒ Unit ) ⇒ Unit = null , separateWorkingDir: Boolean = false , bufferSize: Int = 8192 , encoding: String = Codec .defaultCharsetCodec.name): RDD [String ] 调用外部进程处理RDD ,如通过标准输入传给shell脚本。 preferredLocations(split: Partition ): Seq [String ] Get the preferred locations of a partition, taking into account whether the RDD is checkpointed.randomSplit(weights: Array [Double ], seed: Long = Utils .random.nextLong): Array [RDD [T ]] 按权随机将元素分组 sample(withReplacement: Boolean , fraction: Double , seed: Long = Utils .random.nextLong): RDD [T ] 取样本/子集 setName(_name: String ): RDD .this .type