技术标签: spark
package testhbase;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class HbaseTest {
/**
* 配置ss
*/
static Configuration config = null;
private Connection connection = null;
private Table table = null;
@Before
public void init() throws Exception {
config = HBaseConfiguration.create();// 配置都封装成<k,v>
config.set("hbase.zookeeper.quorum", "mini1,mini2,mini3,mini4");// zookeeper地址
config.set("hbase.zookeeper.property.clientPort", "2181");// zookeeper端口
connection = ConnectionFactory.createConnection(config);
/*connection.getTable(TableName.valueOf("test"))这种方式获得的连接是一种连接池的方式,
也可以使用new HTable()的方式创建一个单连接,
明显用连接池可以控制多个线程同时连接hbase的情况,优于new HTable()的方式创建一个单连接
*/
table = connection.getTable(TableName.valueOf("test"));
}
/**
* 创建一个表
* 类似于shell命令中的create 'test3','info1','info2' --创建test表和info1族与info2族
* @throws Exception
*/
@Test
public void createTable() throws Exception {
// 创建表管理类
HBaseAdmin admin = new HBaseAdmin(config); // hbase表管理
// 创建表描述类
TableName tableName = TableName.valueOf("test2"); // 表名称
HTableDescriptor desc = new HTableDescriptor(tableName);
// 创建列族的描述类
HColumnDescriptor family = new HColumnDescriptor("info"); // 列族
// 将列族添加到表中
desc.addFamily(family);
HColumnDescriptor family2 = new HColumnDescriptor("info2"); // 列族
// 将列族添加到表中
desc.addFamily(family2);
// 创建表
admin.createTable(desc); // 创建表
}
/*
删除表test2
* */
@Test
@SuppressWarnings("deprecation")
public void deleteTable() throws MasterNotRunningException,
ZooKeeperConnectionException, Exception {
HBaseAdmin admin = new HBaseAdmin(config);
//删除表之前必须将表disabled
admin.disableTable("test2");
admin.deleteTable("test2");
admin.close();
}
/**
* 向hbase中增加数据
*
* @throws Exception
*/
@SuppressWarnings({
"deprecation", "resource" })
@Test
public void insertData() throws Exception {
Put put = new Put(Bytes.toBytes("11"));
put.add(Bytes.toBytes("info1"),Bytes.toBytes("name"),Bytes.toBytes("zhangsan"));
//插入多个put时可以new List<put>,然后table.put(list);
//插入的数据是字典序排序的,后面添加的数据会覆盖原来的数据
table.put(put);
}
/**
* 修改数据
*
* @throws Exception
*/
@Test
public void updateData() throws Exception {
Put put = new Put(Bytes.toBytes("10"));
put.add(Bytes.toBytes("info1"), Bytes.toBytes("name"), Bytes.toBytes("lisi1234"));
put.add(Bytes.toBytes("info2"), Bytes.toBytes("age"), Bytes.toBytes(1234));
//插入数据
table.put(put);
//提交
table.flushCommits();
}
/**
* 删除数据
*
* @throws Exception
*/
@Test
public void deleteDate() throws Exception {
//删除某个id的一行数据
/*Delete delete = new Delete(Bytes.toBytes("1234"));
table.delete(delete);
table.flushCommits();*/
//删除某个列族,或者删除某个列
Delete delete = new Delete(Bytes.toBytes("1234"));
//删除列族
delete.addFamily(Bytes.toBytes("info1"));
//删除列
delete.addColumn(Bytes.toBytes("info2"),Bytes.toBytes("age"));
table.delete(delete);
table.flushCommits();
}
/**
* 单条查询
*
* @throws Exception
*/
@Test
public void queryData() throws Exception {
//查询一行行键为10的数据
Get get = new Get(Bytes.toBytes("10"));
Result result = table.get(get);
//result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("name"))获取info1:name这一列的数据
System.out.println("info1:name为"+Bytes.toString(result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("name"))));
System.out.println("info2:age为"+Bytes.toString(result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("age"))));
System.out.println("-------------------------分隔符--------------------------");
//只查询某一行
Get get2 = new Get(Bytes.toBytes("10"));
get2.addColumn(Bytes.toBytes("info1"), Bytes.toBytes("name"));
Result result2 = table.get(get2);
//result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("name"))获取info1:name这一列的数据
System.out.println("info1:name为"+Bytes.toString(result2.getValue(Bytes.toBytes("info1"), Bytes.toBytes("name"))));
System.out.println("info2:age为"+Bytes.toString(result2.getValue(Bytes.toBytes("info2"), Bytes.toBytes("age"))));
}
/**
* 全表扫描
*
* @throws Exception
*/
@Test
public void scanData() throws Exception {
Scan scan = new Scan();
//下面两行代码表示:从主键为10扫描到11就停止了
scan.setStartRow(Bytes.toBytes("10"));
scan.setStopRow(Bytes.toBytes("12"));
//全表扫描某一个列:此时除了info1:name列,其他列都为空
scan.addColumn(Bytes.toBytes("info1"), Bytes.toBytes("name"));
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("name"))));
System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("age"))));
System.out.println();
}
}
/**
* 全表扫描的过滤器
* 列值过滤器:用来定义“列”的<列名:列值>过滤规则:如过滤列info1:name=zhangsan1的列,
* 凡是info1:name=zhangsan1的列所在的行都过滤掉出来
* @throws Exception
*/
@Test
public void scanDataByFilter1() throws Exception {
// 创建全表扫描的scan
Scan scan = new Scan();
//过滤器:列值过滤器:参数1:过滤的列族 参数2:过滤的列名 参数3:过滤的规则(大于等于小于) 参数4:与参数3比较的值
//CompareFilter.CompareOp.EQUAL表示行健等于10
SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("info1"),
Bytes.toBytes("name"), CompareFilter.CompareOp.EQUAL,
Bytes.toBytes("zhangsan"));
// 设置过滤器
scan.setFilter(filter);
// 打印结果集
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
System.out.println("id" + Bytes.toString(result.getRow()));
System.out.println("info1:name:" + Bytes.toString(result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("name"))));
System.out.println("info2:age" + Bytes.toString(result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("age"))));
System.out.println();
}
}
/**
* rowkey过滤器:列名过滤器,过滤主键(正则表达式)
* @throws Exception
*/
@Test
public void scanDataByFilter2() throws Exception {
// 创建全表扫描的scan
Scan scan = new Scan();
//匹配rowkey以wangsenfeng开头的
RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator("[^1]"));
// 设置过滤器
scan.setFilter(filter);
// 打印结果集
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
System.out.println("id= " + Bytes.toString(result.getRow()));
System.out.println("info1:name=" + Bytes.toString(result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("name"))));
System.out.println("info2:age=" + Bytes.toString(result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("age"))));
System.out.println();
}
}
/**
* 匹配列名前缀
* @throws Exception
*/
@Test
public void scanDataByFilter3() throws Exception {
// 创建全表扫描的scan
Scan scan = new Scan();
//匹配列名中含有以na开头的列(如:含有info1:name的行)所在的行
ColumnPrefixFilter columnPrefixFilter = new ColumnPrefixFilter(Bytes.toBytes("na"));
// 设置过滤器
scan.setFilter(columnPrefixFilter);
// 打印结果集
ResultScanner results = table.getScanner(scan);
for (Result result:results){
System.out.println("id=" + Bytes.toString(result.getRow()));
System.out.println("info1:name=" + Bytes.toString(result.getValue(Bytes.toBytes("info1"),Bytes.toBytes("name"))));
System.out.println("info2:age=" + Bytes.toString(result.getValue(Bytes.toBytes("info2"),Bytes.toBytes("age"))));
System.out.println();
}
}
/**
* 匹配列名多个前缀
* @throws Exception
*/
@Test
public void scanDataByFilter4() throws Exception {
// 创建全表扫描的scan
Scan scan = new Scan();
//匹配列名中含有以nam开头的列和ag(如:含有info1:name的行和info2:age的行)所在的行
byte[][] bytes = new byte[][]{
Bytes.toBytes("nam"),Bytes.toBytes("ag")};
MultipleColumnPrefixFilter mcpfilter = new MultipleColumnPrefixFilter(bytes);
// 设置过滤器
scan.setFilter(mcpfilter);
// 打印结果集
ResultScanner results = table.getScanner(scan);
for (Result result:results){
System.out.println("id=" + Bytes.toString(result.getRow()));
System.out.println("info1:name=" + Bytes.toString(result.getValue(Bytes.toBytes("info1"),Bytes.toBytes("name"))));
System.out.println("info2:age=" + Bytes.toString(result.getValue(Bytes.toBytes("info2"),Bytes.toBytes("age"))));
System.out.println();
}
}
/**
* 过滤器集合:同时使用多个过滤器,有两种方式:多个过滤器是MUST_PASS_ALL(and),MUST_PASS_ONE(or)
* @throws Exception
*/
@Test
public void scanDataByFilter5() throws Exception {
// 创建全表扫描的scan
Scan scan = new Scan();
//过滤器集合:MUST_PASS_ALL(and),MUST_PASS_ONE(or)
FilterList filterList = new FilterList(Operator.MUST_PASS_ONE);
//匹配rowkey以wangsenfeng开头的
RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator("^10"));//以20开头的
//匹配name的值等于wangsenfeng
SingleColumnValueFilter filter2 = new SingleColumnValueFilter(Bytes.toBytes("info1"),
Bytes.toBytes("name"), CompareFilter.CompareOp.EQUAL,
Bytes.toBytes("zhangsan"));//“info1:name = zhangsan”的行
filterList.addFilter(filter);
filterList.addFilter(filter2);
// 设置过滤器
scan.setFilter(filterList);
// 打印结果集
ResultScanner scanner = table.getScanner(scan);
for (Result result:scanner) {
System.out.println("id=" + Bytes.toString(result.getRow()));
System.out.println("info1:name=" + Bytes.toString(result.getValue(Bytes.toBytes("info1"),Bytes.toBytes("name"))));
System.out.println("info2:age=" + Bytes.toString(result.getValue(Bytes.toBytes("info2"),Bytes.toBytes("age"))));
System.out.println();
}
}
@After
public void close() throws Exception {
table.close();
connection.close();
}
}
然后可以在scala代码编写的DAO层中使用如下方式调用hbase
/**
* 保存数据到HBase
* @param list CourseClickCount集合
*/
def save(list: ListBuffer[CourseClickCount]): Unit = {
// HBaseUtils使用单例模式产生一个HBaseUtils实例,其中封装了getTable()等增删改查的方法,和相关过滤器
val table = HBaseUtils.getInstance().getTable(tableName)
//将CourseClickCount这个pojo的数据保存到tableName
for(ele <- list) {
table.incrementColumnValue(Bytes.toBytes(ele.day_course),
Bytes.toBytes(cf),
Bytes.toBytes(qualifer),
ele.click_count)
//incrementColumnValue方法可以直接通过闯入的参数,直接用数据对ele.click_count自加
}
}
Spark中内置提供了两个方法可以将数据写入到Hbase:
(1)saveAsHadoopDataset
(2)saveAsNewAPIHadoopDataset
pom.xml中
<!--增加远程下载仓库-->
<repositories>
<repository>
<id> central-repos1</id>
<name>Central Repository 2</name>
<url>http://repo.hortonworks.com/content/groups/public/</url>
<!--<url>http://repo1.maven.org/maven2</url>-->
</repository>
</repositories>
<properties>
<spark.version>2.4.0</spark.version>
<hadoop.version>2.7.3</hadoop.version>
<hbase.version>0.99.2</hbase.version>
</properties>
<dependencies>
<!--spark hbase connnector-->
<dependency>
<groupId>com.hortonworks</groupId>
<artifactId>shc-core</artifactId>
<version>1.1.1-2.1-s_2.11</version>
</dependency>
<!--spark-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<!--hbase-->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
<!--hadoop-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
数据
aaaa 1234
bbbb 4366
cccc 6577
dddd 1234
eeee 4366
ffff 7577
测试代码:
import org.apache.hadoop.hbase.{
HConstants, HBaseConfiguration}
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.{
SparkConf, SparkContext}
object SparkToHBase {
def main(args: Array[String]) {
if (args.length < 1) {
System.err.println("Usage: SparkToHBase <input file>")
System.exit(1)
}
val conf = new SparkConf().setAppName("SparkToHBase")
val sc = new SparkContext(conf)
val input = sc.textFile(args(0))
//创建HBase配置
val hConf = HBaseConfiguration.create()
hConf.set(HConstants.ZOOKEEPER_QUORUM, "www.iteblog.com:2181")
//创建JobConf,设置输出格式和表名
val jobConf = new JobConf(hConf, this.getClass)
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE, "iteblog")
val data = input.map {
item =>
val Array(key, value) = item.split("\t")
val rowKey = key.reverse
val put = new Put(Bytes.toBytes(rowKey))
put.add(Bytes.toBytes("f1"), Bytes.toBytes("info"), Bytes.toBytes(value))
(new ImmutableBytesWritable, put)
}
//保存到HBase表
data.saveAsHadoopDataset(jobConf)
sc.stop()
}
}
这个 TableInputFormat 有一些缺点:
通过这个类库,我们可以直接使用 Spark SQL 将 DataFrame 中的数据写入到 HBase 中;而且我们也可以使用 Spark SQL 去查询 HBase 中的数据,在查询 HBase 的时候充分利用了 catalyst 引擎做了许多优化,比如分区修剪(partition pruning),列修剪(column pruning),谓词下推(predicate pushdown)和数据本地性(data locality)等等。因为有了这些优化,通过 Spark 查询 HBase 的速度有了很大的提升。
import org.apache.spark.{
SparkConf, SparkContext}
import org.apache.spark.sql.{
DataFrame, SparkSession}
import org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog
object spark_hbase2 {
def main(args: Array[String]): Unit = {
//hbase catalog
//1.表名test
//2.rowkey为id
//3.列族为info
val catalog = s"""{
|"table":{
"namespace":"default", "name":"test"},
|"rowkey":"id",
|"columns":{
|"col0":{
"cf":"rowkey", "col":"id", "type":"String"},
|"col1":{
"cf":"info", "col":"value", "type":"String"},
|}
|}""".stripMargin
val spark = SparkSession.builder()
.appName("WriteHBase")
.master("local")
.config(new SparkConf().set("spark.testing.memory", "512000000"))
.getOrCreate()
val sc = spark.sparkContext
val sqlContext = spark.sqlContext
import sqlContext.implicits._
// 模拟一批数据
val data = Array(("spark hbase connector1","1")).map(x => HBaseRecord(x._1,x._2))
//写数据,跟新
sc.parallelize(data).toDF.write.options(
Map(HBaseTableCatalog.tableCatalog -> catalog))
.format("org.apache.spark.sql.execution.datasources.hbase")
.save()
// 写数据
def withCatalog(cat: String): DataFrame = {
sqlContext
.read
.options(Map(HBaseTableCatalog.tableCatalog->cat))
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
}
//dataframe操作hbase
val df = withCatalog(catalog)
//SQL example
df.createOrReplaceTempView("table")
sqlContext.sql("select count(info) from table").show
spark.stop()
}
}
case class HBaseRecord(col0: String, // sql: string
col1: String)
文章浏览阅读606次。为什么80%的码农都做不了架构师?>>> ..._google trace batch job
文章浏览阅读2.6k次,点赞3次,收藏3次。'''题目描述1、对输入的字符串进行加解密,并输出。2加密方法为:当内容是英文字母时则用该英文字母的后一个字母替换,同时字母变换大小写,如字母a时则替换为B;字母Z时则替换为a;当内容是数字时则把该数字加1,如0替换1,1替换2,9替换0;其他字符不做变化。s'''#-*-coding:utf-8-*-importre#判断是否是字母defisLetter(letter):iflen..._编写函数fun2实现字符串加密,加密规则为:如果是字母,将其进行大小写转换;如果
文章浏览阅读4.4k次,点赞6次,收藏8次。下面列出了所有集合的类图:每个接口做的事情非常明确,比如 Serializable,只负责序列化,Cloneable 只负责拷贝,Map 只负责定义 Map 的接口,整个图看起来虽然接口众多,但职责都很清晰;复杂功能通过接口的继承来实现,比如 ArrayList 通过实现了 Serializable、Cloneable、RandomAccess、AbstractList、List 等接口,从而拥有了序列化、拷贝、对数组各种操作定义等各种功能;上述类图只能看见继承的关系,组合的关系还看不出来,比如说_迭代器是否可以保证容器删除和修改安全操作
在科技金融、绿色金融、普惠金融、养老金融、数字金融这“五篇大文章”中,养老金融以其独特的社会价值和深远影响,占据着不可或缺的地位。通过政策引导与市场机制的双重驱动,激发金融机构创新养老服务产品,如推出更多针对不同年龄层、风险偏好的个性化养老金融产品,不仅能提高金融服务的可获得性,还能增强民众对养老规划的主动参与度,从而逐步建立起适应中国国情、满足人民期待的养老金融服务体系。在人口老龄化的全球趋势下,中国养老金融的发展不仅仅是经济议题,更关乎社会的稳定与进步。养老金融:民生之需,国计之重。
在需要使用图片的地方使用下面的代码,注意xib可以直接设置图片。将相应的图片资源文件放到bundle文件中。
文章浏览阅读3.6k次,点赞4次,收藏66次。目录九、多元统计分析介绍九、多元统计分析介绍_r语言多元统计分析
文章浏览阅读623次。MATLAB课程设计-基于PSK和DPSK的matlab仿真 (41页) 本资源提供全文预览,点击全文预览即可全文预览,如果喜欢文档就下载吧,查找使用更方便哦!9.90 积分武汉理工大学MATLAB课程设计.目录摘要 1Abstract 21.设计目的与要求 32.方案的选择 42.1调制部分 42.2解调部分 43.单元电路原理和设计 63.1PCM编码原理及设计 63.1.1PCM编码原理 ..._通信原理课程设计(基于matlab的psk,dpsk仿真)(五篇模版)
文章浏览阅读3.5k次,点赞6次,收藏28次。腾讯微搭小程序获取微信用户信息无论你对低代码开发的爱与恨, 微信生态的强大毋庸置疑. 因此熟悉微搭技术还是很有必要的! 在大多数应用中, 都需要获取和跟踪用户信息. 本文就微搭中如何获取和存储用户信息进行详细演示, 因为用户信息的获取和存储是应用的基础.一. 微搭每个微搭平台都宣称使用微搭平台可以简单拖拽即可生成一个应用, 这种说法我认为是"夸大其词". 其实微搭优点大致来说, 前端定义了很多组件, 为开发人员封装组件节省了大量的时间,这是其一; 其二对后端开发来说, 省去了服务器的部署(并没有省去后_微搭 用微信号登录
sql中索引的使用分析
文章浏览阅读8.9k次,点赞16次,收藏108次。因为呢,termux作者,不希望让termux变成脚本小子的黑客工具,于是把msf , sqlmap等包删了。至于如何安装metasploit呢。apt update -y && apt upgrade -y #更新升级更新升级之后要安装一个叫 git 的安装包apt install git -y然后我们就开始//这里的话建议把手机放到路由器旁边,保持网络的优良。或者科学上网。//git clone https://github.com/gushmazuko/metaspl_termux安装metasploit
文章浏览阅读141次。一、Docker支持4种网络模式Bridge(默认)--network默认网络,Docker启动后创建一个docker0网桥,默认创建的容器也是添加到这个网桥中;IP地址段是172.17.0.1/16 独立名称空间 docker0桥,虚拟网桥的工作方式和物理交换机类似,这样主机上的所有容器就通过交换机连在了一个二层网络中。host容器不会获得一个独立的network namespace,而是与宿主..._armbian 172.17.0.1
Ansible-Tower安装破解。