Working with Spark Data Sources
Data Source API Versions
Spark has two major versions of its data source APIs, simply known as “v1” and “v2”. There is a configuration property spark.sql.sources.useV1SourceList
which determines which API version is used when reading from data sources such as CSV, Orc, and Parquet. The default value for this configuration option (as of Spark 3.4.0) is "avro,csv,json,kafka,orc,parquet,text"
, meaning that all of these data sources fall back to v1 by default.
When using Spark SQL (including the DataFrame API), the representation of a read in the physical plan will be different depending on the API version being used, and in the plugin we therefore have different code paths for tagging and replacing these operations.
V1 API
In the v1 API, a read from a file-based data source is represented by a FileSourceScanExec
, which wraps a HadoopFsRelation
.
HadoopFsRelation
is an important component in Apache Spark. It represents a relation based on data stored in the Hadoop FileSystem. When we talk about the Hadoop FileSystem in this context, it encompasses various distributed storage systems that are Hadoop-compatible, such as HDFS (Hadoop Distributed FileSystem), Amazon S3, and others.
HadoopFsRelation
is not tied to a specific file format. Instead, it relies on implementations of the FileFormat
interface to read and write data.
This means that various file formats like CSV, Parquet, and ORC can have their implementations of the FileFormat
interface, and HadoopFsRelation
will be able to work with any of them.
When overriding FileSourceScanExec
in the plugin, there are a number of different places where tagging code can be placed, depending on the file format. We start in GpuOverrides with a map entry GpuOverrides.exec[FileSourceScanExec]
, and then the hierarchical flow is typically as follows, although it may vary between shim versions:
FileSourceScanExecMeta.tagPlanForGpu
ScanExecShims.tagGpuFileSourceScanExecSupport
GpuFileSourceScanExec.tagSupport
GpuFileSourceScanExec.tagSupport
will inspect the FileFormat
and then call into one of the following:
GpuReadCSVFileFormat.tagSupport
, which callsGpuCSVScan.tagSupport
GpuReadOrcFileFormat.tagSupport
, which callsGpuOrcScan.tagSupport
GpuReadParquetFileFormat.tagSupport
, which callsGpuParquetScan.tagSupport
The classes GpuCSVScan
, GpuParquetScan
, GpuOrcScan
, and GpuJsonScan
are also called from the v2 API, so this is a good place to put code that is not specific to either API version. These scan classes also call into FileFormatChecks.tag
.
V2 API
When using the v2 API, the physical plan will contain a BatchScanExec
, which wraps a scan that implements the org.apache.spark.sql.connector.read.Scan
trait. The scan implementations include CsvScan
, ParquetScan
, and OrcScan
. These are the same scan implementations used in the v1 API, and the plugin tagging code can be placed in one of the following methods:
GpuCSVScan.tagSupport
GpuOrcScan.tagSupport
GpuParquetScan.tagSupport
When overriding v2 operators in the plugin, we can override both BatchScanExec
and the individual scans, such as CsvScanExec
.