Spark 与 DataFrame
前言
在 Spark 中,除了 RDD 这种数据容器外,还有一种更容易操作的一个分布式数据容器 DateFrame,它更像传统关系型数据库的二维表,除了包括数据自身以外还包括数据的结构信息(Schema),这就可以利用类似 SQL 的语言来进行数据访问。
Dataframe 读写
手动创建
1 2
| from pyspark.sql import SparkSession spark = SparkSession.builder.appName("Spark").getOrCreate()
|
创建一个列表,列表的元素是字典,将其作为输出初始化 DataFrame:
1 2 3 4 5 6
| data = [{"Category": 'A', "ID": 1, "Value": 121.44, "Truth": True}, {"Category": 'B', "ID": 2, "Value": 300.01, "Truth": False}, {"Category": 'C', "ID": 3, "Value": 10.99, "Truth": None}, {"Category": 'A', "ID": 4, "Value": 33.87, "Truth": True} ] df = spark.createDataFrame(data)
|
分别打印 Schema 和 DataFrame,可以看到创建 DataFrame 时自动分析了每列数据的类型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| df.printSchema() ''' root |-- Category: string (nullable = true) |-- ID: long (nullable = true) |-- Truth: boolean (nullable = true) |-- Value: double (nullable = true) ''' df.show() ''' +--------+---+-----+------+ |Category| ID|Truth| Value| +--------+---+-----+------+ | A| 1| true|121.44| | B| 2|false|300.01| | C| 3| null| 10.99| | A| 4| true| 33.87| +--------+---+-----+------+ '''
|
读取文件创建
除了手动创建 DataFrame 之外,更常见的是通过读取文件,可以通过 spark.read
方法来实现,你也可以指定 options
添加额外选项。
1 2 3
| df = spark.read.csv('hdfs://spark1:9000/data/test.csv', header=True, inferSchema=True)
df.show()
|
类似的,你也可以直接从 json
,mysql
等数据源读取数据。
写数据
write
的使用方法与 read
相同,可以通过 format
指定写入的格式,默认为 csv
,也可以通过 options
添加额外选项。
1 2
| df.write.csv('hdfs://spark1:9000/data/test.csv')
|
写数据时,也可以先将 Pandas-on-Spark Dataframe 转化为 Pandas Dataframe,然后在保存为 csv 文件
1 2
| df.toPandas().to_csv(file_path, index=False)
|
DateFrame 操作
1 2 3 4 5 6 7 8 9
| df.show() +--------+---+-----+------+ |Category| ID|Truth| Value| +--------+---+-----+------+ | A| 1| true|121.44| | B| 2|false|300.01| | C| 3| null| 10.99| | A| 4| true| 33.87| +--------+---+-----+------+
|
select()
1 2 3 4 5 6 7 8 9 10 11
| df.select('Value').show() ''' +------+ | Value| +------+ |121.44| |300.01| | 10.99| | 33.87| +------+ '''
|
另外,你也可以使用标准的 SQL 语句来查询数据,例如:
1 2
| df.createOrReplaceTempView('table') spark.sql('select Value from table').show()
|
withColumn
whtiColumn
方法根据指定 colName
往 DataFrame 中新增一列,如果 colName
已存在,则会覆盖当前列。
1 2 3 4 5 6 7 8 9 10 11
| df.withColumn('New', df['Value'] + 50).show() ''' +--------+---+-----+------+------+ |Category| ID|Truth| Value| New| +--------+---+-----+------+------+ | A| 1| true|121.44|171.44| | B| 2|false|300.01|350.01| | C| 3| null| 10.99| 60.99| | A| 4| true| 33.87| 83.87| +--------+---+-----+------+------+ '''
|
groupby()
根据字段进行 group by 操作
1 2 3 4 5 6 7 8 9 10 11
| df.groupby('Category').mean().show() ''' +--------+-------+----------+ |Category|avg(ID)|avg(Value)| +--------+-------+----------+ | B| 2.0| 300.01| | C| 3.0| 10.99| | A| 2.5| 77.655| +--------+-------+----------+ '''
|
其他常用操作
1 2 3 4 5 6 7 8 9 10 11 12 13
| df.first() df.head(5) df.take(5)
df.count()
df.drop('Truth') df.drop_duplicates() df.dropna()
df.orderBy('Value') df.filter(df['Value'] > 100) df.withColumnRenamed('Value', 'Value_new')
|
Pandas on Spark
在 Spark 3.2 版本中,可以通过 Pandas api 直接对 DataFrame 进行操作
1 2 3 4 5 6 7 8 9 10 11
| import pyspark.pandas as ps
ps_df = ps.DataFrame(range(10))
pd_df = ps_df.to_pandas()
ps_df = ps.from_pandas(pd_df)
|
参考资料