Spark SQL DataSource V2 学习入门 + 代码模板
Data Source API V1
Spark 1.3 版本开始引入了 Data Source API V1 ,通过这个 API 我们可以很方便的读取各种来源的数据,而且 Spark 使用 SQL 组件的一些优化引擎对数据源的读取进行优化,比如列裁剪、过滤下推等等。
这个版本的 Data Source API 有以下几个优点:
同时存在一些问题:
扩展能力有限,难以下推其他算子
缺乏对列式存储读取的支持
写操作不支持事务
缺乏分区和排序信息
不支持流处理
Data Source API V2
Data Source API V2 为了解决 Data Source V1 的一些问题,从 Apache Spark 2.3.0 版本开始,社区引入了 Data Source API V2,在保留原有的功能之外,还解决了 Data Source API V1 存在的一些问题,比如不再依赖上层 API,扩展能力增强。
这个版本的 Data Source API V2 有以下几个优点:
DataSourceV2 API使用Java编写
不依赖于上层API(DataFrame/RDD)
易于扩展,可以添加新的优化,同时保持向后兼容
提供物理信息,如大小、分区等
支持Streamin Source/Sink
灵活、强大和事务性的写入API
Spark2.3中V2的功能
支持列扫描和行扫描
列裁剪和过滤条件下推
可以提供基本统计和数据分区
事务写入API
支持微批和连续的Streaming Source/Sink
Data Source API V2 支持读写、流数据写、微批处理读(比如 KafkaSource 就用到这个了)以及 ContinuousRead(continuous stream processing)等多种方式读。
ReadSupport & WriteSupport
为了使用 Data Source API V2,我们肯定是需要使用到 Data Source API V2 包里面相关的类库,对于读取程序,我们只需要实现 ReadSupport
相关接口就行,如下:
在 reader 包里面有
SupportsPushDownFilters:算子下推
SupportsPushDownRequiredColumns :列裁剪
SupportsReportPartitioning :数据分区
SupportsReportStatistics :统计信息
SupportsScanColumnarBatch:批量列扫描。
代码实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 package cn.jsledd.spark.sqlimport java.util.Optional import org.apache.spark.sql.SaveMode import org.apache.spark.sql.sources.v2.reader.DataSourceReader import org.apache.spark.sql.sources.v2.writer.DataSourceWriter import org.apache.spark.sql.types.StructType import org.apache.spark.sql.sources.v2.{DataSourceOptions , DataSourceV2 , ReadSupport , WriteSupport }class CustomDataSourceV2 extends DataSourceV2 with ReadSupport with WriteSupport { override def createReader (dataSourceOptions: DataSourceOptions ): DataSourceReader = ??? override def createWriter (jobId: String , schema: StructType , mode: SaveMode , options: DataSourceOptions ): Optional [DataSourceWriter ] = ??? }
DataSourceReader & DataSourceWriter
前面我们实现了 ReadSupport
接口,并重写了 createReader
方法。这里我们需要实现 DataSourceReader
接口相关的操作,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 package cn.jsledd.spark.sqlimport java.utilimport org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.sources.v2.reader.{DataSourceReader , InputPartition }import org.apache.spark.sql.types.StructType case class CustomDataSourceV2Reader (options: Map [String , String ] ) extends DataSourceReader { override def readSchema (): StructType = ??? override def planInputPartitions (): util.List [InputPartition [InternalRow ]] = ??? } class CustomDataSourceWriter (dataSourceOptions: DataSourceOptions ) extends DataSourceWriter { override def createWriterFactory (): DataWriterFactory [InternalRow ] = ??? override def commit (writerCommitMessages: Array [WriterCommitMessage ]): Unit = ??? override def abort (writerCommitMessages: Array [WriterCommitMessage ]): Unit = ??? }
我们可以在这里加上 SupportsPushDownFilters、SupportsPushDownRequiredColumns、SupportsReportPartitioning 等相关的优化
读写实现
最后一个需要我们实现的就是分片读取,在 DataSource V1 里面缺乏分区的支持,而 DataSource V2 支持完整的分区处理,也就是上面的 planInputPartitions 方法。在那里我们可以定义使用几个分区读取数据源的数据。比如如果是 TextInputFormat,我们可以读取到对应文件的 splits 个数,然后每个 split 构成这里的一个分区,使用一个 Task 读取。为了简便起见,我这里使用了只使用了一个分区,也就是 List[InputPartition[InternalRow]].asJava。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 package cn.jsledd.spark.sqlimport org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.sources.v2.reader.{InputPartition , InputPartitionReader }import org.apache.spark.sql.types._case class CustomInputPartition (requiredSchema: StructType , pushed: Array [Filter ], options: Map [String , String ] ) extends InputPartition [InternalRow ] { override def createPartitionReader (): InputPartitionReader [InternalRow ] = ??? } case class CustomInputPartitionReader (requiredSchema: StructType , pushed: Array [Filter ], options: Map [String , String ] ) extends InputPartitionReader [InternalRow ] { override def next (): Boolean = ??? override def get (): InternalRow = ??? override def close (): Unit = ??? } class CustomDataWriterFactory extends DataWriterFactory [Row ] { override def createDataWriter (partitionId: Int , taskId: Long , epochId: Long ): DataWriter [Row ] = ??? } class RestDataWriter (partitionId: Int ) extends DataWriter [Row ] { override def write (record: Row ): Unit = ??? override def commit (): WriterCommitMessage = ??? override def abort (): Unit = ??? }
调用
1 2 3 4 val df = spark.read .format("cn.jsledd.spark.sql.CustomDataSourceV2" ) .option("schema" , "`s` INT,`t` LONG,`v` DOUBLE" ) .load()
目前 DataSource API V2 还在不断演化中,不同版本的 API 可能和这里介绍的不一样,比如 Spark 2.3.x 支持分区的 API 是 createDataReaderFactories,而 Spark 2.4.x 是 planInputPartitions,详见 SPARK-24073。同时,Apache Spark DataSource API V2 是一个比较大的 Feature ,虽然早在 Spark 2.3 版本中已经引入了,但是其实还有很多功能未发布,内置的各种数据源实现基本上都是基于 DataSource API V1 实现的.
Spark DataSource API V2 最终稳定版以及新功能将会随着年底和 Apache Spark 3.0.0 版本一起发布,其也算是 Apache Spark 3.0.0 版本的一大新功能。