首页 通过sparkSQL对Mysql执行查询
文章
取消

通过sparkSQL对Mysql执行查询

创建项目

1
2
3
Maven Archetype

Archetype选择maven-archetype-quickstart

idea配置

1
2
File/Project Structure/Modules
+添加scala

项目配置

porm.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<dependencies>
// 加入以下配置
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.12</artifactId>
      <version>2.4.5</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.12</artifactId>
      <version>2.4.5</version>
    </dependency>

    <!--Java连接MySQL的驱动表-->
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>8.0.22</version>
    </dependency>
</dependencies>

创建scala class对象

1
2
3
package org.example

class Sogou(ts:String, uid:String, keyword:String, rank:Int, sorder:Int, url:String)

创建package object

获得sparkSession
1
2
3
4
val sparkSession = SparkSession.builder()
      .appName("CaseClassDemo")
      .master("local[2]")
      .getOrCreate();
通过sparkSession得到sparkcontext
1
val sc = sparkSession.sparkContext;
读取文件
1
2
3
val rdd = sc.textFile("/data/sogou.500w.utf8");  # hdfs

val data = rdd.map(_.split("\t"));
获得RDD
1
val sogouRDD = data.map(x=>Sogou(x(0).toString,x(1).toString,x(2).toString,x(3).toInt,x(4).toInt,x(5).toString));
将RDD转换为DF
1
val sogouDF= sogouRDD.toDF
将DF转换成表
1
sogouDF.createOrReplaceTempView("t_sogou");
执行查询语句
1
val res = sparkSession.sql("select keyword,count(*) query_count from t_sogou  group by keyword order by query_count desc limit 50;");
创建数据库
1
2
3
4
5
6
val url="jdbc:mysql://192.168.75.129:3306/sogou?useUnicode=true&characterEncoding=utf-8&relaxAutoCommit=true&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true";  # 指定库(需要手动创建)
    val table ="sogou_top50";  # 指定用于保存查询的表的表名(不需要手动创建)
    val property = new Properties();
    property.put("user","root");
    property.put("password","root");
    property.put("driver","com.mysql.cj.jdbc.Driver")
保存结果到数据库
1
2
3
res.write.mode(SaveMode.Append).jdbc(url,table,property)
    sc.stop();
    sparkSession.stop();

使用maven打包为jar包

打开maven界面
1
idea右侧边栏有maven界面,界面内顶栏选择m键
Run Anything里写入
1
mvn clean scala:compile compile package

在虚拟机中执行jar包

将mysql-connector-java-8.0.22.jar放入spark目录下的jars文件夹内
1
使用secureFX
将jar包传入虚拟机/data目录
1
使用secureFX
进入spark目录
1
cd /home/hd/apps/spark/bin
执行代码(指定需要具体执行哪一个class,所使用的jar包,需要执行的jar包的绝对路径)
1
./spark-submit --class org.example.SogouToMySQL --jars /home/hd/apps/spark/jars/mysql-connector-java-8.0.22.jar /home/hd/data/FinalPorject-1.0-SNAPSHOT.jar
本文由作者按照 CC BY 4.0 进行授权