SparkSQL_"spark.sql(\"update usr_info set 'age_range'=-1 wh-程序员宅基地

技术标签: PySpark  Spark  Sql  

1. 预览

  • Spark SQL是Spark处理结构化数据的模块
  • Spark SQL的功能之一是执行SQL查询, 也支持Hive, 查询结果以Dataset/DataFrame的形式返回
  • 一个DataFrame是一个Dataset组成的指定列

2. 开始入门

2.1 起始点: SparkSession

Spark SQL中所有功能的入口是SparkSession类.

使用SparkSession.builder创建一个SparkSession

>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.appName('appName').config('spark.some.config.option', 'some-value').getOrCreate()
>>> spark
SparkSession - in-memory

SparkContext

Spark UI

Version
v2.2.1
Master
local[*]
AppName
appName

2.2 创建数据框

可以从以下对象中创建DataFrame:
- 从一个已经存在的RDD
- 从Spark数据源
- 从Hive表

示例: 基于一个JSON文件创建一个DataFrame

>>> df = spark.read.json(r"C:\Install\spark-2.2.1-bin-hadoop2.7\examples\src\main\resources\people.json")
>>> df.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

2.3 DataFrame相关操作

举例简单介绍一下DataFrame的操作, 完整列表请参考DataFrame函数指南

>>> df.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

>>> df.select('name').show()
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

>>> df.select(df['name'], df['age']+1).show()
+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+

>>> df.filter(df['age']>12).show()
+---+------+
|age|  name|
+---+------+
| 30|  Andy|
| 19|Justin|
+---+------+

>>> df.groupBy('age').count().show()
+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+

2.4 运行SQL查询

SparkSession的sql函数可以运行SQL查询, 返回DataFrame

>>> df.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+
>>> df.createOrReplaceTempView('people')
>>> sqlDF = spark.sql('select * from people')
>>> sqlDF.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

2.5 全局临时视图

  • Spark SQL中的临时视图是session级别的, 会随着session的消失而消失.
  • 如果想让一个临时视图在所有session中相互传递并且可用, 直到Spark应用退出, 可以创建一个全局的临时视图.
  • 全局的临时视图存在于系统数据库global_temp中, 我们必须加上库名去引用
    select * from global_temp.table1
>>> df.createGlobalTempView('people')
>>> spark.sql('select * from global_temp.people').show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+
>>> spark.newSession().sql('select * from global_temp.people').show()  # 新建一个session结果还是一样, 说明是全局的
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

2.6 创建Datasets(Java and Scala) pass

2.7 将RDD转化为DataFrame的两种方式

  • textfile => [Row1, Row2, …] => DataFrame
>>> from pyspark.sql import Row
>>> sc = spark.sparkContext

>>> lines = sc.textFile(r'C:\Install\spark-2.2.1-bin-hadoop2.7\examples\src\main\resources\people.txt')
>>> parts = lines.map(lambda x: x.split(","))
>>> people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
>>> people.collect()
[Row(age=29, name='Michael'),
 Row(age=30, name='Andy'),
 Row(age=19, name='Justin')]
>>> schemaPeople = spark.createDataFrame(people)
>>> schemaPeople.show()
+---+-------+
|age|   name|
+---+-------+
| 29|Michael|
| 30|   Andy|
| 19| Justin|
+---+-------+
>>> schemaPeople.createOrReplaceTempView('people')
>>> teenagers = spark.sql('select * from people where age between 13 and 19')
>>> teenagers.show()
+---+------+
|age|  name|
+---+------+
| 19|Justin|
+---+------+
  • textfile => [data, schema] => DataFrame
    • RDD从原始的RDD穿件一个RDD的toples或者一个列表;
    • Step 1 被创建后, 创建 Schema 表示一个 StructType 匹配 RDD 中的结构.
    • 通过 SparkSession 提供的 createDataFrame 方法应用 Schema 到 RDD .
>>> from pyspark.sql.types import StringType, IntegerType, StructType, StructField
>>> sc = spark.sparkContext
>>> lines = sc.textFile(r'C:\Install\spark-2.2.1-bin-hadoop2.7\examples\src\main\resources\people.txt')
>>> parts = lines.map(lambda x: x.split(','))
>>> people = parts.map(lambda p: (p[0], p[1].strip()))
>>> fields = [StructField(name, StringType(), True) for name in ['name', 'age']]
>>> schema = StructType(fields)
>>> schemaPeople = spark.createDataFrame(people, schema=schema)
>>> result = spark.sql('select name from people')
>>> result.show()
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

2.8 Aggregations(Java and Scala) pass

  • Untyped User-Defined Aggregate Functions
  • Type-Safe User-Defined Aggregate Functions

3. 数据源

  • spark.read.load(path): 读取parquet文件为DataFrame
  • spark.write.save(path): 保存DataFrame为parquet文件
  • spark.read.load(path, format="json"): 指定源数据类型为给定格式
    • json
    • parquet
    • jdbc
    • orc
    • libsvm
    • csv
    • text
  • spark.write.save(path, format="json")
  • spark.sql(“select * from parquet.examples/src/main/resources/users.parquet“): 直接在文件上运行SQL
>>> df = spark.read.load(r'C:\Install\spark-2.2.1-bin-hadoop2.7\examples\src\main\resources\users.parquet')
>>> df.select('name', 'favorite_color').write.save('nameAndFavColors.parquet')
  • 保存模式(Scala and Java) pass

3.1.4 保存到持久表

>>> df = spark.read.load(r'C:\Install\spark-2.2.1-bin-hadoop2.7\examples\src\main\resources\users.parquet')
>>> df.show()

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+
>>> # df.write.option('path1', 'path2').saveAsTable()

DataFrames 也可以使用 saveAsTable 命令作为 persistent tables (持久表)保存到 Hive metastore 中. 请注意, existing Hive deployment (现有的 Hive 部署)不需要使用此功能. Spark 将为您创建默认的 local Hive metastore (本地 Hive metastore)(使用 Derby ). 与 createOrReplaceTempView 命令不同, saveAsTable 将 materialize (实现) DataFrame 的内容, 并创建一个指向 Hive metastore 中数据的指针. 即使您的 Spark 程序重新启动, Persistent tables (持久性表)仍然存在, 因为您保持与同一个 metastore 的连接. 可以通过使用表的名称在 SparkSession 上调用 table 方法来创建 persistent tabl (持久表)的 DataFrame .

对于 file-based (基于文件)的 data source (数据源), 例如 text, parquet, json等, 您可以通过 path 选项指定 custom table path (自定义表路径), 例如 df.write.option(“path”, “/some/path”).saveAsTable(“t”) . 当表被 dropped (删除)时, custom table path (自定义表路径)将不会被删除, 并且表数据仍然存在. 如果未指定自定义表路径, Spark 将把数据写入 warehouse directory (仓库目录)下的默认表路径. 当表被删除时, 默认的表路径也将被删除.

从 Spark 2.1 开始, persistent datasource tables (持久性数据源表)将 per-partition metadata (每个分区元数据)存储在 Hive metastore 中. 这带来了几个好处:

由于 metastore 只能返回查询的必要 partitions (分区), 因此不再需要将第一个查询上的所有 partitions discovering 到表中.
Hive DDLs 如 ALTER TABLE PARTITION … SET LOCATION 现在可用于使用 Datasource API 创建的表.
请注意, 创建 external datasource tables (外部数据源表)(带有 path 选项)的表时, 默认情况下不会收集 partition information (分区信息). 要 sync (同步) metastore 中的分区信息, 可以调用 MSCK REPAIR TABLE .

3.1.5 分桶, 排序和分区

对于基于文件的数据源, 也可以对输出进行bucket, sort和partition操作, 前二者仅适用于持久表

>>> df.write.bucketBy(42, 'name').sortBy('age').saveAsTable('pepple_buckted')

在使用Dataset API时, partitioning可以同时和save和saveAsTable一起使用

df.write.partitionBy('favorite_color').format('parquet').save('namesPartyByColor.parquet')

可以为单个表使用partitioning和bucketing

df = spark.read.parquet(r'C:\Install\spark-2.2.1-bin-hadoop2.7\examples\src\main\resources\users.parquet')
df.write.partitionBy('favorite_color').bucketBy(42, 'name').saveAsTable('people_partitioned_bucketed')

partitionBy 创建一个 directory structure (目录结构), 如 Partition Discovery 部分所述. 因此, 对 cardinality (基数)较高的 columns 的适用性有限. 相反, bucketBy 可以在固定数量的 buckets 中分配数据, 并且可以在 a number of unique values is unbounded (多个唯一值无界时)使用数据.

3.2 Parquet Files

3.2.1 以编程的方式加载数据

>>> peopleDF = spark.read.json(r'C:\Install\spark-2.2.1-bin-hadoop2.7\examples\src\main\resources\people.json')
>>> peopleDF.write.parquet('people.parquet')
>>> parquetFile = spark.read.parquet('people.parquet')
>>> parquetFile.createOrReplaceTempView('parquetFile')
>>> spark.sql('select name from parquetFile where age between 13 and 19').show()
+------+
|  name|
+------+
|Justin|
+------+

3.2.2 分区发现

3.2.3 模式合并

3.2.4 Hive metastore Oarquet table conversion

3.2.5 配置

可以使用SparkSession上的setConf方法或使用SQL运行set key=value命令来完成parquet的设置

参数名称 默认值 含义
spark.sql.parquet.binaryAsString false 一些其他 Parquet-producing systems (Parquet 生产系统), 特别是 Impala, Hive 和旧版本的 Spark SQL , 在 writing out (写出) Parquet schema 时, 不区分 binary data (二进制数据)和 strings (字符串). 该 flag 告诉 Spark SQL 将 binary data (二进制数据)解释为 string (字符串)以提供与这些系统的兼容性.
spark.sql.parquet.int96AsTimestamp true 一些 Parquet-producing systems , 特别是 Impala 和 Hive , 将 Timestamp 存入INT96 . 该 flag 告诉 Spark SQL 将 INT96 数据解析为 timestamp 以提供与这些系统的兼容性.
spark.sql.parquet.cacheMetadata true 打开 Parquet schema metadata 的缓存. 可以加快查询静态数据.
spark.sql.parquet.compression.codec snappy 在编写 Parquet 文件时设置 compression codec (压缩编解码器)的使用. 可接受的值包括: uncompressed, snappy, gzip, lzo .
spark.sql.parquet.filterPushdown true 设置为 true 时启用 Parquet filter push-down optimization .
spark.sql.hive.convertMetastoreParquet true 当设置为 false 时, Spark SQL 将使用 Hive SerDe 作为 parquet tables , 而不是内置的支持.
spark.sql.parquet.mergeSchema false 当为 true 时, Parquet data source (Parquet 数据源) merges (合并)从所有 data files (数据文件)收集的 schemas , 否则如果没有可用的 summary file , 则从 summary file 或 random data file 中挑选 schema .
spark.sql.optimizer.metadataOnly true 如果为 true , 则启用使用表的 metadata 的 metadata-only query optimization 来生成 partition columns (分区列)而不是 table scans (表扫描). 当 scanned (扫描)的所有 columns (列)都是 partition columns (分区列)并且 query (查询)具有满足 distinct semantics (不同语义)的 aggregate operator (聚合运算符)时, 它将适用.

3.3 Json数据集

通过spark.read.json方法将数据集加载为DataFrame, 支持两种格式:
- 常规的多行json文件, 如\examples\src\main\resources\people.json
- 独立有效的json对象rdd, 如['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}']

>>> sc = spark.sparkContext
>>> path = r'C:\Install\spark-2.2.1-bin-hadoop2.7\examples\src\main\resources\people.json'
>>> peopleDF = spark.read.json(path)
>>> peopleDF.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)
>>> peopleDF.createOrReplaceTempView('people')
>>> spark.sql('select name from people where age between 13 and 19').show()
+------+
|  name|
+------+
|Justin|
+------+

>>> jsonStrings = ['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}']
>>> rdd = sc.parallelize(jsonStrings)
>>> people2 = spark.read.json(rdd)
>>> people2.show()
+---------------+----+
|        address|name|
+---------------+----+
|[Columbus,Ohio]| Yin|
+---------------+----+

3.4 Hive表

  • Spark SQL还支持读取和写入Hive中的数据.
  • 但是, Hive中有大量的依赖关系, 因此这些依赖关系不包含在默认Spark分发中.
  • 如果在类路径中找到Hive依赖项, Spark将自动加载它们.
  • 请注意, 这些Hive依赖关系也必须存在于所有工作节点上, 因为它们将需要访问Hive序列化和反序列化库SerDes, 以访问存储在Hive中的数据
  • 通过修改hive-site.xml, core-site.xml, hdfs-site.xml文件来完成配置
  • 当使用Hive时, 必须用Hive支持实例化SparkSession, 包括:
    • 连接到持续的Hive转移
    • 支持Hive serdes
    • 支持Hive用户自定义功能
  • 没有现有Hive部署的用户仍可以启用Hive支持
  • 当hive-site.xml未配置时, 上下文会自动在当前目录创建metastore_db, 并创建有spark.sql.warehouse.dir配置的目录, 该目录默认为Sprk应用程序当前目录中的spark-warehouse目录
  • 请注意,自从2.0.0以来,hive-site.xml 中的 hive.metastore.warehouse.dir 属性已被弃用。 而是使用 spark.sql.warehouse.dir 来指定仓库中数据库的默认位置
  • 您可能需要向启动 Spark 应用程序的用户授予写权限
from os.path import expanduser, join, abspath
from pyspark.sql import SparkSession, Row
warehouse_location=abspath('spark-warehouse')
spark = SparkSession.builder.config('spark.sql.warehouse.dir', warehouse_location).enableHiveSupport().getOrCreate()
spark.sql('CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive')
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

3.4.1 指定Hive表的存储格式

创建Hive表时
- 需要定义输入格式和输出格式
- 还需要定义该表如何将数据反序列化为行, 或将行序列化为数据, 即serde
- 以下选项可用于指定存储格式 (“serde”, “input format”, “output format”),例如,CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')
- 默认情况下,我们将以纯文本形式读取表格文件
- 请注意,Hive 存储处理程序在创建表时不受支持,您可以使用 Hive 端的存储处理程序创建一个表,并使用 Spark SQL 来读取它

Property Name Meaning
fileFormat fileFormat是一种存储格式规范的包,包括 “serde”,”input format” 和 “output format”。 目前我们支持6个文件格式:’sequencefile’,’rcfile’,’orc’,’parquet’,’textfile’和’avro’。
inputFormat, outputFormat 这两个选项将相应的 “InputFormat” 和 “OutputFormat” 类的名称指定为字符串文字,例如: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat。 这两个选项必须成对出现,如果您已经指定了 “fileFormat” 选项,则无法指定它们。
serde 此选项指定 serde 类的名称。 当指定 fileFormat 选项时,如果给定的 fileFormat 已经包含 serde 的信息,那么不要指定这个选项。 目前的 “sequencefile”, “textfile” 和 “rcfile” 不包含 serde 信息,你可以使用这3个文件格式的这个选项。
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim 这些选项只能与 “textfile” 文件格式一起使用。它们定义如何将分隔的文件读入行。

3.4.2 与不同版本的Hive Metastore进行交互

3.5 JDBC连接其他数据库

  • Spark SQL可以使用JDBC读取其他数据库的数据, 这个功能优于使用JdbcRDD(因为直接返回DataFrame结果)
  • 要开始使用, 你需要在Spark类路径中包含特定数据库的JDBC driver程序. 例如, 要从Spark Shell连接到postgres, 运行命令:spark-shell –driver-class-path postgresql-9.4.1207.jar –jars postgresql-9.4.1207.jar
  • Spark支持以下option
    • url: 要连接的JDBC URL, 如jdbc:postgresql://localhost/test?user=fred&password=secret
    • dbtable: 需读取的JDBC表
    • driver: 用于连接到此URL的JDBC driver程序的类名
    • partitionColumn, lowerBound, upperBound:如果指定了这些选项,则必须指定这些选项。 另外,必须指定 numPartitions. 他们描述如何从多个 worker 并行读取数据时将表给分区。partitionColumn 必须是有问题的表中的数字列。 请注意,lowerBound 和 upperBound 仅用于决定分区的大小,而不是用于过滤表中的行。 因此,表中的所有行将被分区并返回。此选项仅适用于读操作。
    • numPartitions: 在表读写中可以用于并行度的最大分区数。这也确定并发JDBC连接的最大数量。 如果要写入的分区数超过此限制,则在写入之前通过调用 coalesce(numPartitions) 将其减少到此限制。
    • fetchsize:JDBC 抓取的大小,用于确定每次数据往返传递的行数。 这有利于提升 JDBC driver 的性能,它们的默认值较小(例如: Oracle 是 10 行)。 该选项仅适用于读取操作。
    • batchsize: JDBC 批处理的大小,用于确定每次数据往返传递的行数。 这有利于提升 JDBC driver 的性能。 该选项仅适用于写操作。默认值为 1000.
    • isolationLevel: 事务隔离级别,适用于当前连接。 它可以是 NONE, READ_COMMITTED, READ_UNCOMMITTED, REPEATABLE_READ, 或 SERIALIZABLE 之一,对应于 JDBC 连接对象定义的标准事务隔离级别,默认为 READ_UNCOMMITTED。 此选项仅适用于写操作。请参考 java.sql.Connection 中的文档。
    • truncate: 这是一个与 JDBC 相关的选项。 启用 SaveMode.Overwrite 时,此选项会导致 Spark 截断现有表,而不是删除并重新创建。 这可以更有效,并且防止表元数据(例如,索引)被移除。 但是,在某些情况下,例如当新数据具有不同的模式时,它将无法工作。 它默认为 false。 此选项仅适用于写操作。
    • createTableOptions: 这是一个与JDBC相关的选项。 如果指定,此选项允许在创建表时设置特定于数据库的表和分区选项(例如:CREATE TABLE t (name string) ENGINE=InnoDB. )。此选项仅适用于写操作。
    • createTableColumnTypes: 使用数据库列数据类型而不是默认值,创建表时。 数据类型信息应以与 CREATE TABLE 列语法相同的格式指定(例如:”name CHAR(64), comments VARCHAR(1024)”)。 指定的类型应该是有效的 spark sql 数据类型。此选项仅适用于写操作。
jdbcDF = spark.read.format('jdbc').option('url', 'jdbc:postgresql:dbserver').option('dbtable', 'schema.tablename').option('user', 'username').optioni('password', 'password').load()

jdbcDF2 = spark.read.jdbc('jdbc:postgresql:dbserver', 'schema.tablename', properise={
   'user': 'username', 'password': 'password'})

jdbcDF.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .save()

jdbcDF2.write \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={
   "user": "username", "password": "password"})

# Specifying create table column data types on write
jdbcDF.write \
    .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={
   "user": "username", "password": "password"})

3.6 故障排除

JDBC driver 程序类必须对客户端会话和所有执行程序上的原始类加载器可见。 这是因为 Java 的 DriverManager 类执行安全检查,导致它忽略原始类加载器不可见的所有 driver 程序,当打开连接时。一个方便的方法是修改所有工作节点上的compute_classpath.sh 以包含您的 driver 程序 JAR。
一些数据库,例如 H2,将所有名称转换为大写。 您需要使用大写字母来引用 Spark SQL 中的这些名称。

4. 性能调优

对于某些工作负载,可以通过缓存内存中的数据或打开一些实验选项来提高性能。

4.1 在内存中缓存数据

Spark SQL 可以通过调用 spark.catalog.cacheTable(“tableName”) 或 dataFrame.cache() 来使用内存中的列格式来缓存表。 然后,Spark SQL 将只扫描所需的列,并将自动调整压缩以最小化内存使用量和 GC 压力。 您可以调用 spark.catalog.uncacheTable(“tableName”) 从内存中删除该表

内存缓存的配置可以使用 SparkSession 上的 setConf 方法或使用 SQL 运行 SET key=value 命令来完成。

属性名称 默认 含义
spark.sql.inMemoryColumnarStorage.compressed true 当设置为 true 时,Spark SQL 将根据数据的统计信息为每个列自动选择一个压缩编解码器。
spark.sql.inMemoryColumnarStorage.batchSize 10000 控制批量的柱状缓存的大小。更大的批量大小可以提高内存利用率和压缩率,但是在缓存数据时会冒出 OOM 风险。

4.2 其他配置选项

以下选项也可用于调整查询执行的性能。这些选项可能会在将来的版本中被废弃,因为更多的优化是自动执行的

属性名称 默认值 含义
spark.sql.files.maxPartitionBytes 134217728 (128 MB) 在读取文件时,将单个分区打包的最大字节数。
spark.sql.files.openCostInBytes 4194304 (4 MB) 按照字节数来衡量的打开文件的估计费用可以在同一时间进行扫描。 将多个文件放入分区时使用。最好过度估计,那么具有小文件的分区将比具有较大文件的分区(首先计划的)更快。
spark.sql.broadcastTimeout 300 广播连接中的广播等待时间超时(秒)
spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) 配置执行连接时将广播给所有工作节点的表的最大大小(以字节为单位)。 通过将此值设置为-1可以禁用广播。 请注意,目前的统计信息仅支持 Hive Metastore 表,其中已运行命令 ANALYZE TABLE COMPUTE STATISTICS noscan。
spark.sql.shuffle.partitions 200 Configures the number of partitions to use when shuffling data for joins or aggregations.

5. 分布式SQL引擎

Spark SQL 也可以充当使用其 JDBC/ODBC 或命令行界面的分布式查询引擎。 在这种模式下,最终用户或应用程序可以直接与 Spark SQL 交互运行 SQL 查询,而不需要编写任何代码。

5.1 运行Thrift JDBC/ODBC

这里实现的 Thrift JDBC/ODBC 服务器对应于 Hive 1.2 中的 HiveServer2。 您可以使用 Spark 或 Hive 1.2.1 附带的直线脚本测试 JDBC 服务器。

要启动 JDBC/ODBC 服务器,请在 Spark 目录中运行以下命令:

./sbin/start-thriftserver.sh

此脚本接受所有 bin/spark-submit 命令行选项,以及 –hiveconf 选项来指定 Hive 属性。 您可以运行 ./sbin/start-thriftserver.sh –help 查看所有可用选项的完整列表。 默认情况下,服务器监听 localhost:10000. 您可以通过环境变量覆盖此行为,即:

export HIVE_SERVER2_THRIFT_PORT=<listening-port>
export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
./sbin/start-thriftserver.sh \
  --master <master-uri> \
  ...

or system properties:

./sbin/start-thriftserver.sh \
  --hiveconf hive.server2.thrift.port=<listening-port> \
  --hiveconf hive.server2.thrift.bind.host=<listening-host> \
  --master <master-uri>
  ...

现在,您可以使用 beeline 来测试 Thrift JDBC/ODBC 服务器:./bin/beeline

使用 beeline 方式连接到 JDBC/ODBC 服务器:
beeline> !connect jdbc:hive2://localhost:10000

Beeline 将要求您输入用户名和密码。 在非安全模式下,只需输入机器上的用户名和空白密码即可。 对于安全模式,请按照 beeline 文档 中的说明进行操作。

配置Hive是通过将 hive-site.xml, core-site.xml 和 hdfs-site.xml 文件放在 conf/ 中完成的。

您也可以使用 Hive 附带的 beeline 脚本。

Thrift JDBC 服务器还支持通过 HTTP 传输发送 thrift RPC 消息。 使用以下设置启用 HTTP 模式作为系统属性或在 conf/ 中的 hive-site.xml 文件中启用:

hive.server2.transport.mode - Set this to value: http
hive.server2.thrift.http.port - HTTP port number to listen on; default is 10001
hive.server2.http.endpoint - HTTP endpoint; default is cliservice

要测试,请使用 beeline 以 http 模式连接到 JDBC/ODBC 服务器:

beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>

5.2 运行Spark SQL CLI

Spark SQL CLI 是在本地模式下运行 Hive 转移服务并执行从命令行输入的查询的方便工具。 请注意,Spark SQL CLI 不能与 Thrift JDBC 服务器通信。

要启动 Spark SQL CLI,请在 Spark 目录中运行以下命令:

./bin/spark-sql
配置 Hive 是通过将 hive-site.xml, core-site.xml 和 hdfs-site.xml 文件放在 conf/ 中完成的。 您可以运行 ./bin/spark-sql –help 获取所有可用选项的完整列表。

6. 支持的Hive特性

Spark SQL 支持绝大部分的 Hive 功能,如:

  • Hive query(查询)语句, 包括:
    • SELECT
    • GROUP BY
    • ORDER BY
    • CLUSTER BY
    • SORT BY
  • 所有 Hive 操作, 包括:
    • 关系运算符 (=, ⇔, ==, <>, <, >, >=, <=, 等等)
    • 算术运算符 (+, -, *, /, %, 等等)
    • 逻辑运算符 (AND, &&, OR, ||, 等等)
    • 复杂类型的构造
    • 数学函数 (sign, ln, cos, 等等)
    • String 函数 (instr, length, printf, 等等)
  • 用户定义函数 (UDF)
  • 用户定义聚合函数 (UDAF)
  • 用户定义 serialization formats (SerDes)
  • 窗口函数
  • Joins
    • JOIN
    • {LEFT|RIGHT|FULL} OUTER JOIN
    • LEFT SEMI JOIN
    • CROSS JOIN
  • Unions
  • Sub-queries(子查询)
    • SELECT col FROM ( SELECT a + b AS col from t1) t2
  • Sampling
  • Explain
  • Partitioned tables including dynamic partition insertion
  • View
  • 所有的 Hive DDL 函数, 包括:
    • CREATE TABLE
    • CREATE TABLE AS SELECT
    • ALTER TABLE
      大部分的 Hive Data types(数据类型), 包括:
    • TINYINT
    • SMALLINT
    • INT
    • BIGINT
    • BOOLEAN
    • FLOAT
    • DOUBLE
    • STRING
    • BINARY
    • TIMESTAMP
    • DATE
    • ARRAY<>
    • MAP<>
    • STRUCT<>

6. 参考

6.1 数据类型

Spark SQL 和 DataFrames 支持下面的数据类型:

  • Numeric types
    • ByteType: Represents 1-byte signed integer numbers. The range of numbers is from -128 to 127.
    • ShortType: Represents 2-byte signed integer numbers. The range of numbers is from -32768 to 32767.
    • IntegerType: Represents 4-byte signed integer numbers. The range of numbers is from -2147483648 to 2147483647.
    • LongType: Represents 8-byte signed integer numbers. The range of numbers is from -9223372036854775808 to 9223372036854775807.
    • FloatType: Represents 4-byte single-precision floating point numbers.
    • DoubleType: Represents 8-byte double-precision floating point numbers.
    • DecimalType: Represents arbitrary-precision signed decimal numbers. Backed internally by java.math.BigDecimal. A BigDecimal consists of an arbitrary precision integer unscaled value and a 32-bit integer scale.
  • String type
    • StringType: Represents character string values.
  • Binary type
    • BinaryType: Represents byte sequence values.
  • Boolean type
    • BooleanType: Represents boolean values.
  • Datetime type
    • TimestampType: Represents values comprising values of fields year, month, day, hour, minute, and second.
    • DateType: Represents values comprising values of fields year, month, day.
  • Complex types
    • ArrayType(elementType, containsNull): Represents values comprising a sequence of elements with the type of elementType. containsNull is used to indicate if elements in a ArrayType value can have null values.
    • MapType(keyType, valueType, valueContainsNull): Represents values comprising a set of key-value pairs. The data type of keys are described by keyType and the data type of values are described by valueType. For a MapType value, keys are not allowed to have null values. valueContainsNull is used to indicate if values of a MapType value can have null values.
    • StructType(fields): Represents values with the structure described by a sequence of StructFields (fields).
      • StructField(name, dataType, nullable): Represents a field in a StructType. The name of a field is indicated by name. The data type of a field is indicated by dataType. nullable is used to indicate if values of this fields can have null values.

Spark SQL中的数据类型都在pyspark.sql.types的包中, 通过以下方式访问:

from pysparl.sql.types import *
Data type Value type in Python API to access or create a data type
ByteType int or long, Note: Numbers will be converted to 1-byte signed integer numbers at runtime. Please make sure that numbers are within the range of -128 to 127. ByteType()
ShortType int or long, Note: Numbers will be converted to 2-byte signed integer numbers at runtime. Please make sure that numbers are within the range of -32768 to 32767. ShortType()
IntegerType int or long IntegerType()
LongType long, Note: Numbers will be converted to 8-byte signed integer numbers at runtime. Please make sure that numbers are within the range of -9223372036854775808 to 9223372036854775807. Otherwise, please convert data to decimal.Decimal and use DecimalType. LongType()
FloatType float, Note: Numbers will be converted to 4-byte single-precision floating point numbers at runtime. FloatType()
DoubleType float DoubleType()
DecimalType decimal.Decimal DecimalType()
StringType string StringType()
BinaryType bytearray BinaryType()
BooleanType bool BooleanType()
TimestampType datetime.datetime TimestampType()
DateType datetime.date DateType()
ArrayType list, tuple, or array ArrayType(elementType, [containsNull]), Note: The default value of containsNull is True.
MapType dict MapType(keyType, valueType, [valueContainsNull]), Note: The default value of valueContainsNull is True.
StructType list or tuple StructType(fields), Note: fields is a Seq of StructFields. Also, two fields with the same name are not allowed.
StructField The value type in Python of the data type of this field (For example, Int for a StructField with the data type IntegerType) StructField(name, dataType, [nullable]), Note: The default value of nullable is True.

6.2 NaN Semantics

当处理一些不符合标准浮点数语义的 float 或 double 类型时,对于 Not-a-Number(NaN) 需要做一些特殊处理. 具体如下:

  • NaN = NaN 返回 true.
  • 在 aggregations(聚合)操作中,所有的 NaN values 将被分到同一个组中.
  • 在 join key 中 NaN 可以当做一个普通的值.
  • NaN 值在升序排序中排到最后,比任何其他数值都大.
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/xiligey1/article/details/80267873

智能推荐

使用nginx解决浏览器跨域问题_nginx不停的xhr-程序员宅基地

文章浏览阅读1k次。通过使用ajax方法跨域请求是浏览器所不允许的,浏览器出于安全考虑是禁止的。警告信息如下:不过jQuery对跨域问题也有解决方案,使用jsonp的方式解决,方法如下:$.ajax({ async:false, url: 'http://www.mysite.com/demo.do', // 跨域URL ty..._nginx不停的xhr

在 Oracle 中配置 extproc 以访问 ST_Geometry-程序员宅基地

文章浏览阅读2k次。关于在 Oracle 中配置 extproc 以访问 ST_Geometry,也就是我们所说的 使用空间SQL 的方法,官方文档链接如下。http://desktop.arcgis.com/zh-cn/arcmap/latest/manage-data/gdbs-in-oracle/configure-oracle-extproc.htm其实简单总结一下,主要就分为以下几个步骤。..._extproc

Linux C++ gbk转为utf-8_linux c++ gbk->utf8-程序员宅基地

文章浏览阅读1.5w次。linux下没有上面的两个函数,需要使用函数 mbstowcs和wcstombsmbstowcs将多字节编码转换为宽字节编码wcstombs将宽字节编码转换为多字节编码这两个函数,转换过程中受到系统编码类型的影响,需要通过设置来设定转换前和转换后的编码类型。通过函数setlocale进行系统编码的设置。linux下输入命名locale -a查看系统支持的编码_linux c++ gbk->utf8

IMP-00009: 导出文件异常结束-程序员宅基地

文章浏览阅读750次。今天准备从生产库向测试库进行数据导入,结果在imp导入的时候遇到“ IMP-00009:导出文件异常结束” 错误,google一下,发现可能有如下原因导致imp的数据太大,没有写buffer和commit两个数据库字符集不同从低版本exp的dmp文件,向高版本imp导出的dmp文件出错传输dmp文件时,文件损坏解决办法:imp时指定..._imp-00009导出文件异常结束

python程序员需要深入掌握的技能_Python用数据说明程序员需要掌握的技能-程序员宅基地

文章浏览阅读143次。当下是一个大数据的时代,各个行业都离不开数据的支持。因此,网络爬虫就应运而生。网络爬虫当下最为火热的是Python,Python开发爬虫相对简单,而且功能库相当完善,力压众多开发语言。本次教程我们爬取前程无忧的招聘信息来分析Python程序员需要掌握那些编程技术。首先在谷歌浏览器打开前程无忧的首页,按F12打开浏览器的开发者工具。浏览器开发者工具是用于捕捉网站的请求信息,通过分析请求信息可以了解请..._初级python程序员能力要求

Spring @Service生成bean名称的规则(当类的名字是以两个或以上的大写字母开头的话,bean的名字会与类名保持一致)_@service beanname-程序员宅基地

文章浏览阅读7.6k次,点赞2次,收藏6次。@Service标注的bean,类名:ABDemoService查看源码后发现,原来是经过一个特殊处理:当类的名字是以两个或以上的大写字母开头的话,bean的名字会与类名保持一致public class AnnotationBeanNameGenerator implements BeanNameGenerator { private static final String C..._@service beanname

随便推点

二叉树的各种创建方法_二叉树的建立-程序员宅基地

文章浏览阅读6.9w次,点赞73次,收藏463次。1.前序创建#include&lt;stdio.h&gt;#include&lt;string.h&gt;#include&lt;stdlib.h&gt;#include&lt;malloc.h&gt;#include&lt;iostream&gt;#include&lt;stack&gt;#include&lt;queue&gt;using namespace std;typed_二叉树的建立

解决asp.net导出excel时中文文件名乱码_asp.net utf8 导出中文字符乱码-程序员宅基地

文章浏览阅读7.1k次。在Asp.net上使用Excel导出功能,如果文件名出现中文,便会以乱码视之。 解决方法: fileName = HttpUtility.UrlEncode(fileName, System.Text.Encoding.UTF8);_asp.net utf8 导出中文字符乱码

笔记-编译原理-实验一-词法分析器设计_对pl/0作以下修改扩充。增加单词-程序员宅基地

文章浏览阅读2.1k次,点赞4次,收藏23次。第一次实验 词法分析实验报告设计思想词法分析的主要任务是根据文法的词汇表以及对应约定的编码进行一定的识别,找出文件中所有的合法的单词,并给出一定的信息作为最后的结果,用于后续语法分析程序的使用;本实验针对 PL/0 语言 的文法、词汇表编写一个词法分析程序,对于每个单词根据词汇表输出: (单词种类, 单词的值) 二元对。词汇表:种别编码单词符号助记符0beginb..._对pl/0作以下修改扩充。增加单词

android adb shell 权限,android adb shell权限被拒绝-程序员宅基地

文章浏览阅读773次。我在使用adb.exe时遇到了麻烦.我想使用与bash相同的adb.exe shell提示符,所以我决定更改默认的bash二进制文件(当然二进制文件是交叉编译的,一切都很完美)更改bash二进制文件遵循以下顺序> adb remount> adb push bash / system / bin /> adb shell> cd / system / bin> chm..._adb shell mv 权限

投影仪-相机标定_相机-投影仪标定-程序员宅基地

文章浏览阅读6.8k次,点赞12次,收藏125次。1. 单目相机标定引言相机标定已经研究多年,标定的算法可以分为基于摄影测量的标定和自标定。其中,应用最为广泛的还是张正友标定法。这是一种简单灵活、高鲁棒性、低成本的相机标定算法。仅需要一台相机和一块平面标定板构建相机标定系统,在标定过程中,相机拍摄多个角度下(至少两个角度,推荐10~20个角度)的标定板图像(相机和标定板都可以移动),即可对相机的内外参数进行标定。下面介绍张氏标定法(以下也这么称呼)的原理。原理相机模型和单应矩阵相机标定,就是对相机的内外参数进行计算的过程,从而得到物体到图像的投影_相机-投影仪标定

Wayland架构、渲染、硬件支持-程序员宅基地

文章浏览阅读2.2k次。文章目录Wayland 架构Wayland 渲染Wayland的 硬件支持简 述: 翻译一篇关于和 wayland 有关的技术文章, 其英文标题为Wayland Architecture .Wayland 架构若是想要更好的理解 Wayland 架构及其与 X (X11 or X Window System) 结构;一种很好的方法是将事件从输入设备就开始跟踪, 查看期间所有的屏幕上出现的变化。这就是我们现在对 X 的理解。 内核是从一个输入设备中获取一个事件,并通过 evdev 输入_wayland

推荐文章

热门文章

相关标签