Spark SQL DataSource V2 学习入门 + 代码模板

Data Source API V1

​ Spark 1.3 版本开始引入了 Data Source API V1,通过这个 API 我们可以很方便的读取各种来源的数据,而且 Spark 使用 SQL 组件的一些优化引擎对数据源的读取进行优化,比如列裁剪、过滤下推等等。

这个版本的 Data Source API 有以下几个优点:

  • 接口实现非常简单
  • 能够满足大部分的使用场景

同时存在一些问题:

  • 扩展能力有限,难以下推其他算子
  • 缺乏对列式存储读取的支持
  • 写操作不支持事务
  • 缺乏分区和排序信息
  • 不支持流处理

Data Source API V2

Data Source API V2为了解决 Data Source V1 的一些问题,从 Apache Spark 2.3.0 版本开始,社区引入了 Data Source API V2,在保留原有的功能之外,还解决了 Data Source API V1 存在的一些问题,比如不再依赖上层 API,扩展能力增强。

这个版本的 Data Source API V2 有以下几个优点:

  • DataSourceV2 API使用Java编写
  • 不依赖于上层API(DataFrame/RDD)
  • 易于扩展,可以添加新的优化,同时保持向后兼容
  • 提供物理信息,如大小、分区等
  • 支持Streamin Source/Sink
  • 灵活、强大和事务性的写入API

Spark2.3中V2的功能

  • 支持列扫描和行扫描
  • 列裁剪和过滤条件下推
  • 可以提供基本统计和数据分区
  • 事务写入API
  • 支持微批和连续的Streaming Source/Sink

Data Source API V2 支持读写、流数据写、微批处理读(比如 KafkaSource 就用到这个了)以及 ContinuousRead(continuous stream processing)等多种方式读。

ReadSupport & WriteSupport

​ 为了使用 Data Source API V2,我们肯定是需要使用到 Data Source API V2 包里面相关的类库,对于读取程序,我们只需要实现 ReadSupport 相关接口就行,如下:

在 reader 包里面有

  • SupportsPushDownFilters:算子下推
  • SupportsPushDownRequiredColumns :列裁剪
  • SupportsReportPartitioning :数据分区
  • SupportsReportStatistics :统计信息
  • SupportsScanColumnarBatch:批量列扫描。

代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package cn.jsledd.spark.sql

import java.util.Optional
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.sources.v2.reader.DataSourceReader
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, WriteSupport}

/**
* @author :jsledd
* @date :Created in 2019/4/5 0001 上午 9:19
* @description:自定义DataSourceV2 的数据源
* @modified By:
* @version: $version$
*/
class CustomDataSourceV2 extends DataSourceV2 with ReadSupport with WriteSupport {
/**
* 创建Reader
*
* @param dataSourceOptions 用户自定义的options
* @return 返回自定义的DataSourceReader
*/
override def createReader(dataSourceOptions: DataSourceOptions): DataSourceReader = ???
/**
* 创建Writer
*
* @param jobId jobId
* @param schema schema
* @param mode 保存模式
* @param options 用于定义的option
* @return Optional[自定义的DataSourceWriter]
*/
override def createWriter(jobId: String, schema: StructType, mode: SaveMode, options: DataSourceOptions): Optional[DataSourceWriter] = ???

}

DataSourceReader & DataSourceWriter

前面我们实现了 ReadSupport 接口,并重写了 createReader 方法。这里我们需要实现 DataSourceReader 接口相关的操作,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package cn.jsledd.spark.sql
import java.util
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, InputPartition}
import org.apache.spark.sql.types.StructType
/**
* @author :jsledd
* @date :Created in 2019/4/5 0001 下午 14:12
* @description:${description}
* @modified By:
* @version: $version$
*/
case class CustomDataSourceV2Reader(options: Map[String, String]) extends DataSourceReader {
/**
* 读取的列相关信息
* @return
*/
override def readSchema(): StructType = ???

/**
* 每个分区拆分及读取逻辑
* @return
*/
override def planInputPartitions(): util.List[InputPartition[InternalRow]] = ???
}

class CustomDataSourceWriter(dataSourceOptions: DataSourceOptions) extends DataSourceWriter {
/**
* 创建RestDataWriter工厂类
*
* @return DataWriterFactory
*/
override def createWriterFactory(): DataWriterFactory[InternalRow] = ???

/**
* commit
*
* @param writerCommitMessages 所有分区提交的commit信息
* 触发一次
*/
override def commit(writerCommitMessages: Array[WriterCommitMessage]): Unit = ???

/** *
* abort
*
* @param writerCommitMessages 当write异常时调用,该方法用于事务回滚,当write方法发生异常之后触发该方法
*/
override def abort(writerCommitMessages: Array[WriterCommitMessage]): Unit = ???
}

我们可以在这里加上 SupportsPushDownFilters、SupportsPushDownRequiredColumns、SupportsReportPartitioning 等相关的优化

读写实现

​ 最后一个需要我们实现的就是分片读取,在 DataSource V1 里面缺乏分区的支持,而 DataSource V2 支持完整的分区处理,也就是上面的 planInputPartitions 方法。在那里我们可以定义使用几个分区读取数据源的数据。比如如果是 TextInputFormat,我们可以读取到对应文件的 splits 个数,然后每个 split 构成这里的一个分区,使用一个 Task 读取。为了简便起见,我这里使用了只使用了一个分区,也就是 List[InputPartition[InternalRow]].asJava。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package cn.jsledd.spark.sql

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader}
import org.apache.spark.sql.types._

/**
* @author :jsledd
* @date :Created in 2019/11/1 0001 下午 14:21
* @description:定义每个分区具体是如何读取的
* @modified By:
* @version: $version$
*/

case class CustomInputPartition(requiredSchema: StructType, pushed: Array[Filter], options: Map[String, String]) extends InputPartition[InternalRow] {

override def createPartitionReader(): InputPartitionReader[InternalRow] = ???
}

case class CustomInputPartitionReader(requiredSchema: StructType, pushed: Array[Filter], options: Map[String, String]) extends InputPartitionReader[InternalRow] {

override def next(): Boolean = ???

override def get(): InternalRow = ???

override def close(): Unit = ???
}
/**
* DataWriterFactory工厂类
*/
class CustomDataWriterFactory extends DataWriterFactory[Row] {
/**
* 创建DataWriter
*
* @param partitionId 分区ID
* @param taskId task ID
* @param epochId 一种单调递增的id,用于将查询分成离散的执行周期。对于非流式查询,此ID将始终为0。
*/
override def createDataWriter(partitionId: Int, taskId: Long, epochId: Long): DataWriter[Row] = ???
}

/**
* RestDataWriter
*
* @param partitionId 分区ID
*/
class RestDataWriter(partitionId: Int) extends DataWriter[Row] {
/**
* write
*
* @param record 单条记录
* 每条记录都会触发该方法
*/
override def write(record: Row): Unit = ???

/**
* commit
*
* @return commit message
* 每个分区触发一次
*/
override def commit(): WriterCommitMessage = ???


/**
* 回滚:当write发生异常时触发该方法,该方法用于事务回滚,当write方法发生异常之后触发该方法
*/
override def abort(): Unit = ???
}

调用

1
2
3
4
val df = spark.read
.format("cn.jsledd.spark.sql.CustomDataSourceV2")
.option("schema", "`s` INT,`t` LONG,`v` DOUBLE")
.load()

​ 目前 DataSource API V2 还在不断演化中,不同版本的 API 可能和这里介绍的不一样,比如 Spark 2.3.x 支持分区的 API 是 createDataReaderFactories,而 Spark 2.4.x 是 planInputPartitions,详见 SPARK-24073。同时,Apache Spark DataSource API V2 是一个比较大的 Feature ,虽然早在 Spark 2.3 版本中已经引入了,但是其实还有很多功能未发布,内置的各种数据源实现基本上都是基于 DataSource API V1 实现的.
​ Spark DataSource API V2 最终稳定版以及新功能将会随着年底和 Apache Spark 3.0.0 版本一起发布,其也算是 Apache Spark 3.0.0 版本的一大新功能。

代码 Github地址:https://github.com/shadowagnoy/spark_learn/