編碼的世界 / 優質文選 / 歷史

pyspark通過JDBC鏈接mysql(DataFrame)


2022年5月15日
-   

一前言 Mysql版本:8.0.21 spark版本:3.1.1 hadoop版本:2.7.5 JDBC驅動程序版本:mysql-connector-java-5.1.46.tar.gz
二、正文 1、先在mysql裏建立spark數據庫,同時建立一個student表,向表中插入一些數據
mysql> create database spark;
mysql> use spark;
mysql> create table student (id int(4), name char(20), gender char(4), age int(4));
mysql> alter table student change id id int auto_increment primary key;
mysql> insert into student values(1,'Xueqian','F',23);
mysql> insert into student values(2,'Weiliang','M',24);
mysql> select * from student;
+++++
| id | name | gender | age |
+++++
| 1 | Xueqian | F | 23 |
| 2 | Weiliang | M | 24 |
+++++

2、下載JDBC驅動程序 將下載的文件解壓到spark目錄下的jars目錄裏。
3、啟動msyql服務 service mysql start
4、啟動pyspark
cd /opt/module/spark
./bin/pyspark
jars /opt/module/spark-3.1.1-bin-hadoop2.7/jars/mysql-connector-java-5.1.46-bin.jar
driver-class-path /opt/module/spark-3.1.1-bin-hadoop2.7/jars/mysql-connector-java-5.1.46-bin.jar

在一行的末尾加入斜杠,是為了告訴pyspark,命令還沒有結束。 5、連接數據庫
>>>jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://192.168.64.130:3306/spark").option("driver","com.mysql.jdbc.Driver").option("dbtable", "student").option("user", "root").option("password", "mysql密碼").load()

6、查看數據
>>> jdbcDF.show()

7、寫入數據
>>> from pyspark.sql.types import Row
>>> from pyspark.sql.types import StructType
>>> from pyspark.sql.types import StructField
>>> from pyspark.sql.types import StringType
>>> from pyspark.sql.types import IntegerType
>>> studentRDD = spark.sparkContext.parallelize(["3 Rongcheng M 26","4 Guanhua M 27"]).map(lambda line : line.split(" "))
//下面要設置模式信息
>>> schema = StructType([StructField("name", StringType(), True),StructField("gender", StringType(), True),StructField("age",IntegerType(), True)])
>>> rowRDD = studentRDD.map(lambda p : Row(p[1].strip(), p[2].strip(),int(p[3])))
//建立起Row對象和模式之間的對應關系,也就是把數據和模式對應起來
>>> studentDF = spark.createDataFrame(rowRDD, schema)
>>> prop = {}
>>> prop['user'] = 'root'
>>> prop['password'] = '填寫mysql密碼'
>>> prop['driver'] = "com.mysql.jdbc.Driver"
>>> studentDF.write.jdbc("jdbc:mysql://localhost:3306/spark",'student','append', prop)


8、檢驗數據是否寫入成功
mysql> select * from student;
++-+++
| id | name | gender | age |
++-+++
| 1 | Xueqian | F | 23 |
| 2 | Weiliang | M | 24 |
| 3 | Guanhua | M | 27 |
| 4 | Rongcheng | M | 26 |
++-+++

熱門文章