大数据之 Hadoop-14-Spark

本节主要学习Spark的两个核心组件 Spark RDD 和 Spark SQL 的具体使用。

一、Spark概念

Apache Spark是一个快速通用的集群计算系统。它提供了Java、Scala、 Python和R的高级API,以及一个支持通用的执行图计算的优化引擎。它还支持一组丰富的高级工具,包括使用SQL进行结构化数据处理的Spark SQL、用于机器学习的MLlib、用于图处理的GraphX,以及用于实时流处理的 Spark Streaming。

特点

1.快速

我们已经知道,MapReduce主要包括 Map'和 Reduce两种操作,且将多个任务的中间结果存储于HDFS中。与 MapReduce相比,Spark可以支持包括Map和Reduce在内的更多操作,这些操作相互连接形成一个有向无环图(Directed Acyclic Graph,DAG),各个操作的中间数据则会被保存在内存中。因此,处理速度比 MapReduce更加快。

Spark通过使用先进的DAG调度器、查询优化器和物理执行引擎,从而能够高性能地实现批处理和流数据处理。

2.易用

Spark可以使用Java、Scala、Python、R和SQL 快速编写应用程序。
Spark提供了超过80个高级算子(关于算子,本章后续会详细讲解),使用这些算子可以轻松构建并行应用程序,并且可以从Scala、Python、R和SQL 的Shell 中交互式地使用它们。

3.通用

Spark 拥有一系列库,包括SQL和 DataFrames、用于机器学习的 MLlib、GraphX、SparkStreaming。用户可以在同一个应用程序中无缝地组合这些库。

4.到处运行

Spark可以使用独立集群模式运行(使用自带的独立资源调度器,称为Standalone模式),也可以运行在Amazon EC2、Hadoop YARN、Mesos (Apache下的一个开源分布式资源管理框架)。

主要组件

如图下所示,Spark是由多个组件构成的软件栈,Spark 的核心(Spark Core)是一个对由很多计算任务组成的、运行在多个工作机器或者一个计算集群上的应用进行调度、分发以及监控的计算引擎。
在Spark Core的基础上,Spark提供了一系列面向不同应用需求的组件,例如Spark SQL结构化处理和MLlib 机器学习等。这些组件关系密切并且可以相互调用,这样可以方便地在同一应用程序中组合使用。
Spark自带一个简易的资源调度器,称为独立调度器(Standalone)。若集群中没有任何资源管理器,则可以使用自带的独立调度器。当然,Spark 也支持在其他的集群管理器上运行,包括Hadoop YARN、Apache Mesos等。
Spark本身并没有提供分布式文件系统,因此Spark 的分析大多依赖于HDFS,也可以从HBase和Amazon s3等持久层读取数据。

file

Spark Core

Spark Core是Spark的核心模块,主要包含两部分功能:一是负责任务调度、内存管理、错误恢复、与存储系统交互等;二是其包含了对弹性分布式数据集(Resilient Distributed Dataset,RDD)的API定义RDD表示分布在多个计算节点上可以并行操作的元素集合,是 Spark主要的编程抽象。Spark Core提供了创建和操作这些集合的多个API。

Spark SQL

Spark SQL是一个用于结构化数据处理的Spark 工具包,提供了面向结构化数据的SQL查询接口,使用户可以通过编写SQL或基于Apache Hive的HiveQL来方便地处理数据。当然, Spark SQL也可以查询数据仓库Hive中的数据,相当于数据仓库的查询引擎,提供了很强大的计算速度。
Spark SQL还支持开发者将SQL语句融入到Spark应用程序开发过程中,使用户可以在单个的应用中同时进行SQL查询和复杂的数据分析。

Spark Streaming

用于对实时数据进行流式计算的组件。

二、Spark运行时架构

Spark有多种运行模式,可以运行在一台计算机上,称为本地(单机)模式;也可以以YARN或Mesos作为底层资源调度系统以分布式的方式在集群中运行;还可以使用Spark自带的Standalone模式(自带资源调度系统)。

2.1 Spark Standalone模式

Spark Standalone模式为经典的Master/Slave架构,资源调度是Spark自己实现的,如下图:

file

集群的主节点称为Master节点,在集群启动时会在主节点启动一个名为“Master”的守护进程,类似YARN集群的ResourceManager;从节点称为Worker节点,在集群启动时会在各个从节点上启动一个名为“Worker”的守护进程,类似YARN集群的NodeManager。

2.2 Spark ON YARN模式

Spark On YARN模式遵循YARN的官方规范,YARN只负责资源的管理和调度,运行哪种应用程序由用户自己实现,因此可能在YARN上同时运行MapReduce程序和Spark 程序,YARN很好地对每一个程序实现了资源的隔离。这使得Spark与MapReduce可以运行于同一个集群中,共享集群存储资源与计算资源。
Spark On YARN模式与Standalone模式一样,也分为client和 cluster两种运行方式。
Spark On YARN 的 client运行方式的主要进程有:SparkSubmit、ResourceManager 、NodeManager、CoarseGrainedExecutorBackend、ExecutorLauncher,运行架构如图如下所示。

file

与Standalone模式的client运行方式类似,客户端会产生一个名为“SparkSubmit”的进程,Driver 程序则运行于该进程中,且 ResourceManager 的功能类似于·Standalone模式的 Master;NodeManager的功能类似于Standalone模式的Worker。当Spark程序运行时,ResourceManager 会在集群中选择一个NodeManager进程启动一个名为“ExecutorLauncher”的子进程,该子进程是Spark的自定义实现,承担YARN中的ApplicationMaster角色,类似 MapReduce的“MRAppMaster”进程。

三、Spark安装配置

3.1 Spark Standalone模式

Spark Standalone模式的搭建需要在集群的每个节点都安装Spark,集群角色分配如表所示。

节点 角色
centos01 Master
centos02 Worker
centos03 Worker

3.1.1 下载并解压

到清华镜像 mirrors.tuna.tsinghua.edu.cn 找到 Spark ,选择Spark版本为Spark-2.4.8,

$ cd /opt/softwares
$ wget https://mirrors.tuna.tsinghua.edu.cn/apache/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz
$ tar -zxvf spark-2.4.8-bin-hadoop2.7.tgz -C /opt/modules/

3.1.2 修改配置文件

Spark的配置文件都存放于安装目录下的conf目录,进入该目录,执行以下操作:
(1)修改slaves文件。
slaves文件必须包含所有需要启动的Worker 节点的主机名,且每个主机名占一行。
执行以下命令,复制 slaves.template 文件为 slaves 文件:

$ cp slaves.template slaves

然后修改 slaves 文件,将其中默认的 localhost 改为以下内容:

centos02
centos03

上述配置表示将 centos02 和 centos03 节点设置为某集群的从节点(Worker节点)。

(2)修改 spark-env.sh 文件。
执行以下命令,复制 spark-env.sh.template 文件为 spark-env.sh 文件:

$ cp spark-env.sh.template spark-env.sh

然后修改 spark-env.sh 文件,添加以下内容:

export JAVA_HOME=/opt/modules/jdk1.8.0_211
export SPARK_MASTER_IP=centos01
export SPARK_MASTER_PORT=7077

上述配置属性解析如下。

  • JAVA_HOME:指定JAVA_HOME的路径。若集群中每个节点在 /etc/profile 文件中都配置了
    JAVA_HOME,则该选项可以省略,Spark集群启动时会自动读取。为了防止出错,建议此处将该选项配置上。
  • SPARK_MASTER_IP:指定集群主节点(Master)的主机名或IP地址。此处为centos01。
  • SPARK_MASTER_PORT:指定Master节点的访问端口。默认为7077。

3.1.3 复制Spark安装文件到其他节点

在 centos01 节点中执行以下命令,将Spark 安装文件复制到其他节点:

$ scp -r /opt/modules/spark-2.4.8-bin-hadoop2.7/ hadoop@centos02:/opt/modules/
$ scp -r /opt/modules/spark-2.4.8-bin-hadoop2.7/ hadoop@centos03:/opt/modules/

说明:hadoop用户密码为:hadoop@123

3.1.4 启动Spark集群

在centos01节点上进入Spark安装目录,执行以下命令,启动Spark集群:

$ cd /opt/modules/spark-2.4.8-bin-hadoop2.7
$ sbin/start-all.sh

查看 start-all.sh 的源码,其中有以下两条命令:

#Start Master
"${SPARK_HOME} /sbin"/start-master.sh

#Start Workers
"${SPARK_HOME}/sbin" /start-slaves.sh

可以看到,当执行 start-all.sh 命令时,会直接在本地执行 start-master.sh 命令启动Master,而Worker 的启动会读取 slaves文件中的配置。因此,为了防止后续出错,必须在 spark-env.sh 中的SPARK_MASTER_IP属性指定的节点中启动Spark集群。
启动完毕后,分别在各节点执行jps命令,查看启动的Java进程。若在centos01节点存在Master进程,centos02节点存在Worker进程,centos03节点存在Worker进程,说明集群启动成功。

[root@centos01 spark-2.4.8-bin-hadoop2.7]# sbin/start-all.sh
starting org.apache.spark.deploy.master.Master, logging to /opt/modules/spark-2.4.8-bin-hadoop2.7/logs/spark-root-org.apache.spark.deploy.master.Master-1-centos01.out
root@centos02's password: root@centos03's password: 
root@centos02's password: centos02: Permission denied, please try again.

root@centos03's password: centos03: Permission denied, please try again.

root@centos02's password: centos02: Permission denied, please try again.

centos03: starting org.apache.spark.deploy.worker.Worker, logging to /opt/modules/spark-2.4.8-bin-hadoop2.7/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-centos03.out

centos02: starting org.apache.spark.deploy.worker.Worker, logging to /opt/modules/spark-2.4.8-bin-hadoop2.7/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-centos02.out
[root@centos01 spark-2.4.8-bin-hadoop2.7]# jps
7385 Jps
7325 Master

注意:三台服务器root用户的密码为:123456

此时可以在浏览器中访问网址 http:llcentos01:8080,查看Spark的Web界面,如图所示。

四、Spark应用程序的提交和Spark Shell

spark-submit

Spark提供了一个客户端应用程序提交工具spark-submit,使用该工具可以将编写好的Spark应用程序提交到Spark集群。

例如,在Standalone模式下,将Spark自带的求圆周率的程序提交到集群,并且设置Driver进程使用内存为512 MB,每个Executor进程使用内存为1 GB,每个 Executor进程所使用的CPU核心1,运行方式为cluster (即 Driver进程运行在集群的工作节点中),执行命令如下:

$ bin/spark-submit \
--master spark://centos01:7077 \
--deploy-mode cluster \
--class org.apache.spark.examples.SparkPi \
--driver-memory 512m \
--executor-memory 1g \
--executor-cores 1 \
./examples/jars/spark-examples_2.11-2.4.8.jar

执行:

[root@centos01 spark-2.4.8-bin-hadoop2.7]# bin/spark-submit \
> --master spark://centos01:7077 \
> --deploy-mode cluster \
> --class org.apache.spark.examples.SparkPi \
> --driver-memory 512m \
> --executor-memory 1g \
> --executor-cores 1 \
> ./examples/jars/spark-examples_2.11-2.4.8.jar
log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.NativeCodeLoader).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/09/26 07:02:23 INFO SecurityManager: Changing view acls to: root
21/09/26 07:02:23 INFO SecurityManager: Changing modify acls to: root
21/09/26 07:02:23 INFO SecurityManager: Changing view acls groups to: 
21/09/26 07:02:23 INFO SecurityManager: Changing modify acls groups to: 
21/09/26 07:02:23 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
21/09/26 07:02:23 INFO Utils: Successfully started service 'driverClient' on port 41293.
21/09/26 07:02:23 INFO TransportClientFactory: Successfully created connection to centos01/192.168.222.10:7077 after 55 ms (0 ms spent in bootstraps)
21/09/26 07:02:24 INFO ClientEndpoint: Driver successfully submitted as driver-20210926070224-0000
21/09/26 07:02:24 INFO ClientEndpoint: ... waiting before polling master for driver state
21/09/26 07:02:29 INFO ClientEndpoint: ... polling master for driver state
21/09/26 07:02:29 INFO ClientEndpoint: State of driver-20210926070224-0000 is RUNNING
21/09/26 07:02:29 INFO ClientEndpoint: Driver running on 192.168.222.11:41313 (worker-20210926064500-192.168.222.11-41313)
21/09/26 07:02:29 INFO ShutdownHookManager: Shutdown hook called
21/09/26 07:02:29 INFO ShutdownHookManager: Deleting directory /tmp/spark-10ecd62a-f60c-4417-b87a-e362832049a6

file

file

spark-shell

  1. Spark Standalone模式下Spark Shell的启动
    在任意节点进入Spark 安装目录,执行以下命令,启动Spark Shell终端:
    $ bin/spark-shell --master spark://centos01:7077

上述命令中的--master 参数指定了Master节点的访问地址,其中的centos01为 Master所在节点的主机名。

[root@centos01 spark-2.4.8-bin-hadoop2.7]# bin/spark-shell --master spark://centos01:7077
21/09/26 07:44:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://centos01:4040
Spark context available as 'sc' (master = spark://centos01:7077, app id = app-20210926074423-0001).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.8
      /_/

Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_211)
Type in expressions to have them evaluated.
Type :help for more information.

scala> 

从启动过程的输出信息可以看出,Spark Shell启动时创建了一个名为“sc”的变量,该变量为类SparkContext的实例,可以在Spark Shell 中直接使用。SparkContext存储Spark 上下文环境,是提交Spark应用程序的入口,负责与Spark集群进行交互。

若需退出Spark Shell,可以执行以下命令:

scala>:quit

五、Spark RDD

Spark提供了一种对数据的核心抽象,称为弹性分布式数据集(Resilient Distributed Dataset,RDD)。每个RDD 被分为多个分区,这些分区运行在集群中的不同节点上。也就是说,RDD是跨集群节点分区的元素集合,并且这些元素可以并行操作。
在编程时,可以把RDD看作是一个数据操作的基本单位。Spark中对数据的操作主要是对RDD的操作。

创建RDD

RDD中的数据来源可以是程序中的对象集合,也可以来源于外部存储系统中的数据集,例如共享文件系统、HDFS、HBase或任何提供Hadoop InputFormat的数据源。
下面使用 Spark Shell 讲解创建RDD的常用两种方式。

1.从对象集合创建RDD

Spark可以通过 parallelize()makeRDD() 方法将一个对象集合转化为RDD。
例如,将一个List集合转化为RDD,代码如下:

scala> val rdd=sc.parallelize(List(1,2,3,4,5,6))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> rdd.collect
res0: Array[Int] = Array(1, 2, 3, 4, 5, 6) 

从返回信息可以看出,上述创建的RDD中存储的是Int类型数据。实际上,RDD也是一个集合,与常用的List集合不同的是,RDD集合的数据分布于多台计算机上

2.从外部存储系统创建RDD

Spark的 textFile() 方法可以读取本地文件系统或外部其他系统中的数据,并创建RDD。所不同的是,数据的来源路径不同。
(1)读取本地系统文件。
例如,本地CentOS系统中有一个文件 /home/words.txt,该文件的内容如下:

hello wenyin
hello java
hello Spark

使用textFile()方法将上述文件内容转化为一个RDD,并使用 collect() 方法(该方法是RDD的一个行动算子,)查看RDD中的内容。代码如下:

scala> val rdd=sc.textFile("/root/words.txt")
rdd: org.apache.spark.rdd.RDD[String] = /root/words.txt MapPartitionsRDD[2] at textFile at <console>:24

rdd.collect

(2)读取HDFS 系统文件。
将本地系统文件 /home/words.txt 上传到HDFS系统的 /input 目录,然后读取文件 /input/words.txt 中的数据。代码如下:

scala> val rdd=sc.textFile("hdfs://centos01:9000/input/words.txt")
rdd:org.apache.spark.rdd.RDD[String] = hdfs://centos01:9000/input/words.txtMapPartitionsRDD[2]
scala> rdd.collect
res2: Array[String] =Array("hello hadoop ","hello java ", "scala ")

RDD算子

RDD被创建后是只读的,不允许修改。Spark提供了丰富的用于操作RDD的方法,这些方法被称为算子。一个创建完成的RDD只支持两种算子:转化(Transformation)算子行动(Action)算子

1.转化算子

转化算子负责对RDD中的数据进行计算并转化为新的RDD。Spark 中的所有转化算子都是惰性的,因为它们不会立即计算结果,而只是记住对某个RDD的具体操作过程,直到遇到行动算子才会与行动算子一起执行。
例如,map() 是一种转化算子,它接收一个函数作为参数,并把这个函数应用于RDD的每个元素,最后将函数的返回结果作为结果RDD中对应元素的值。
如下代码所示,对rdd1应用map()算子,将rdd1中的每个元素加1并返回一个名为rdd2的新RDD:

scala> val rdd1=sc.parallelize(List(1,2,3,4,5))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24

scala> val rdd2=rdd1.map(x => x+1)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at map at <console>:25

scala> rdd2.collect
res3: Array[Int] = Array(2, 3, 4, 5, 6)                                         

上述代码中,向算子map()传入了一个函数x=>x+1。其中x为函数的参数名称,也可以使用其他字符,例如 a=>a+1。Spark 会将RDD 中的每个元素传入该函数的参数中。当然,也可以将参数使用下划线“_”代替。例如以下代码:

scala> val rdd1=sc.parallelize(List(1,2,3,4,5,6))
scala> val rdd2-rdd1.map(_+1)

2.行动算子

Spark中的转化算子并不会立即进行运算,而是在遇到行动算子时才会执行相应的语句,触发Spark的任务调度。
Spark 常用的行动算子及其介绍如所示。

file

下面对其中的几个行动算子进行实例讲解:

( 1 ) reduce(func)算子

将数字1到100所组成的集合转化为RDD,然后对该RDD进行 reduce() 算子计算,统计RDD中所有元素值的总和。代码如下:

scala> val rdd1 = sc.parallelize(1 to 100)
rddl: org.apache.spark.rdd.RDD[Int] = Paralle1collectionRDD[1]

scala> rdd1 .reduce(_+_)
res2 : Int = 5050

上述代码中的下画线“_”代表RDD中的元素。

六、Spark SQL

Spark SQL是Spark用于结构化数据计算的一个组件,本节主要讲解Spark SQL 的相关概念与基本使用。

DataFrame和 Dataset

DataFrame是Spark SQL提供的一个编程抽象,与RDD类似,也是一个分布式的数据集合。但与RDD不同的是,DataFrame的数据都被组织到有名字的列中,就像关系型数据库中的表一样。此外,多种数据都可以转化为DataFrame,例如Spark计算过程中生成的RDD、结构化数据文件、Hive 中的表、外部数据库等。
DataFrame在RDD的基础上添加了数据描述信息(Schema,即元信息),因此看起来更像是一张数据库表。例如,在一个RDD中有图所示的三行数据。

file

将该RDD转换成DataFrame后,其中的数据可能如图16-31所示。

file

使用DataFrame API结合SOL处理结构化数据比RDD更加容易,而且通过DataFrame API或SQL 处理数据,Spark 优化器会自动对其优化,即使你写的程序或SQL不高效,也可以运行得很快。

Dataset也是一个分布式数据集,是Spark1.6中添加的一个新的API。相对于RDD,Dataset提供了强类型支持,在RDD的每行数据加了类型约束。而且使用Dataset API同样会经过Spark SQL优化器的优化,从而提高了程序执行效率。

同样是对于图16-30中的RDD数据,将其转换为Dataset后的数据可能如图所示。
file

基本使用

Spark Shell 启动时除了默认创建一个名为“sc”的 SparkContext实例外,还创建了一个名为“spark”的SparkSession 实例,该“spark”变量也可以在Spark Shell中直接使用。
SparkSession只是在SparkContext基础上的封装,应用程序的入口仍然是SparkContext。SparkSession允许用户通过它调用DataFrame和 Dataset 相关API来编写Spark程序,支持从不同的数据源加载数据,并把数据转换成 DataFrame,然后使用SQL语句来操作数据。
例如,在 HDFS中有一个文件/input/person.txt,文件内容如下:

1, zhangsan, 25
2,lisi,22
3 ,wangwu, 30

现需要使用Spark SQL将该文件中的数据按照年龄降序排列,步骤如下。

1.加载数据为Dataset

调用SparkSession 的API read.textFile()可以读取指定路径中的文件内容,代码如下:

scala> val d1=spark.read.textFile("hdfs://centos01:9000/input/person.txt")
d1:org.apache.spark.sql. Dataset [String] =[value: string]

从变量d1的类型可以看出,textFile()方法将读取的数据转换为了Dataset。除了使用textFile()方法读取文本内容外,还可以使用csv()、jdbc()、json()等方法读取CSV文件、JDBC数据源、JSON文件等数据。

调用Dataset中的show()方法可以输出 Dataset中的数据内容。查看d1中的数据内容,代码如下:

scala> d1 .show()
+-------------+
|  value       l
+-------------+
| 1, zhangsan,25 |
| 2 ,lisi,22        |
| 3, wangwu,30|
十-------------+

从上述内容可以看出,Dataset 将文件中的每一行看作一个元素,并且所有元素组成了一列,列名默认为“value”。

2.给Dataset添加元数据信息

定义一个样例类Person,用于存放数据描述信息(Schema)。代码如下:

scala> case class Person(id: Int,name:String, age:Int)
defined class Person

导入 SparkSession 的隐式转换,以便后续可以使用Dataset的算子。代码如下:

scala> import spark.implicits._

3.将Dataset转换为 DataFrame

Spark SQL查询的是DataFrame中的数据,因此需要将存有元数据信息的Dataset 转换为DataFrame。
调用Dataset的 toDF()方法,将存有元数据的Dataset转换为DataFrame。代码如下:

scala> val pdf = personDataset.toDF()
pdf: org.apache.spark.sql.DataFrame =[id:int,name: string ... 1 more field]

4.执行SQL查询

在 DataFrame 上创建一个临时视图“v_person”。代码如下:

scala> pdf.createTempView("v_person")

使用SparkSession对象执行SQL查询。代码如下:

scala> val result = spark.sql("select * from v_person order by age desc")
result:org.apache.spark.sql . DataFrame =[id: int,name: string ... 1 morefield]

调用show()方法输出结果数据,代码如下:

file

六、SparkSQL与Hive整合

Spark SQL与Hive整合后,可以在Spark SQL中使用HiveQL轻松操作数据仓库。与Hive不同的是,Hive的执行引擎为MapReduce,而 Spark SQL 的执行引擎为Spark RDD。
Spark sQL 与 Hive的整合比较简单,总体来说只需要以下两步:

1、将 $HIVE_HOME/conf 中的 hive-site.xml 文件复制到 $SPARK_HOME/conf 中。
2、在Spark配置文件 spark-env.sh 中指定Hadoop及其配置文件的主目录。

Hive的安装不是必须的,如果没有安装 Hive,可以手动在$SPARK_HOME/conf 中创建hive-site.xml,并加入相应配置信息。Spark SQL相当于一个命令执行的客户端,只在一台计算机上配置即可。
本例以MySOL作为元数据库配置Spark SQL与Hive整合,Spark集群使用Standalone模式,且集群中未安装Hive客户端。
在 Spark集群中选择一个节点作为Spark SQL客户端,进行以下操作:

为者常成,行者常至