flink写入hbase-程序员宅基地

技术标签: flink  hbase  

参考:  https://www.cnblogs.com/swordfall/p/10527423.html

flink 流处理写入数据到hbase. 采用的是批量写入(500条数据写入一次)。

 

HBaseWriter.java

package com.flink;

import com.flink.model.DeviceData;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 *
 * 写入HBase
 * 继承RichSinkFunction重写父类方法
 *
 * 写入hbase时500条flush一次, 批量插入, 使用的是writeBufferSize
 */
class HBaseWriter extends RichSinkFunction<DeviceData>{
    private static final Logger logger = LoggerFactory.getLogger(HBaseWriter.class);

    private static org.apache.hadoop.conf.Configuration configuration;
    private static Connection connection = null;
    private static BufferedMutator mutator;
    private static int count = 0;

    @Override
    public void open(Configuration parameters) throws Exception {
        configuration = HBaseConfiguration.create();
        configuration.set("hbase.master", "192.168.3.101:60020");
        configuration.set("hbase.zookeeper.quorum", "192.168.3.101");
        configuration.set("hbase.zookeeper.property.clientPort", "2181");
        try {
            connection = ConnectionFactory.createConnection(configuration);
        } catch (IOException e) {
            e.printStackTrace();
        }
        BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf("t1"));
        params.writeBufferSize(2 * 1024 * 1024);
        mutator = connection.getBufferedMutator(params);
    }

    @Override
    public void close() throws IOException {
        if (mutator != null) {
            mutator.close();
        }
        if (connection != null) {
            connection.close();
        }
    }

    @Override
    public void invoke(DeviceData values, Context context) throws Exception {
        //Date 1970-01-06 11:45:55  to 445555000
        long unixTimestamp= 0;
        try {
            String gatherTime = values.GatherTime;
            //毫秒和秒分开处理
            if (gatherTime.length() > 20) {
                long ms = Long.parseLong(gatherTime.substring(20, 23));
                Date date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(gatherTime);
                unixTimestamp = date.getTime() + ms;
            } else {
                Date date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(gatherTime);
                unixTimestamp = date.getTime();
            }
        } catch (ParseException e) {
            e.printStackTrace();
        }
        String RowKey = values.MachID + String.valueOf(unixTimestamp);
        String Key = values.OperationValue;
        String Value = values.OperationData;
        System.out.println("Column Family=f1,  RowKey=" + RowKey + ", Key=" + Key + " ,Value=" + Value);
        Put put = new Put(RowKey.getBytes());
        put.addColumn("f1".getBytes(), Key.getBytes(), Value.getBytes());
        mutator.mutate(put);
        //每满500条刷新一下数据
        if (count >= 500){
            mutator.flush();
            count = 0;
        }
        count = count + 1;
    }
}

Main.java

//写入hbase
dataStream.addSink(new HBaseWriter());
DeviceData.java
package com.flink.model;

/**
 * 设备数据的数据结构
 */
class DeviceData {
    String compID;
    String machID;
    String Type;
    String gateMac;
    String operationValue;
    String operationData;
    String gatherTime;
}

 

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/u012447842/article/details/90203512

智能推荐

给大家说明。Oracle修改ip或主机名后重建em和监听_oracle修改监听host需要重启吗-程序员宅基地

文章浏览阅读2.5k次。oracle数据库修改ip和主机名的详细操作修改ip操作1.修改hosts文件中主机名对应的ip地址。2.重启服务器网络 service network restart3.一定要先删除原来的监听 emca -deconfig dbcontrol db -repos drop (如果不记得sysman的密码,可以先修改下alter user sy_oracle修改监听host需要重启吗

分享:给入职新人的IDEA培训教程。-程序员宅基地

文章浏览阅读153次。点击上方“方志朋”,选择“设为星标”回复”666“获取新整理的面试资料作者:Richard_Yi链接:http://1t.click/aGJW# 前言工欲善其事必先利其器最近受部门的邀请,..._idea研究院入职流程

智能运维都有哪些工作?智能运维哪些领域好_ai运维平台项目工作内容-程序员宅基地

文章浏览阅读367次,点赞8次,收藏10次。预防性维护:该工作内容涉及通过对设备的运行数据进行实时监控和分析,预测设备的故障和维护需求,提前制定维护计划,降低设备故障率和维护成本。某互联网公司使用智能运维技术对其数据库性能进行实时监控和分析,发现并解决了多个慢查询问题,提高了数据库的访问速度和响应时间。某金融机构使用智能运维技术对其服务器性能进行实时监控和分析,发现并解决了多个性能瓶颈问题,提高了系统的运行效率和用户体验。某大型电商企业使用智能运维技术对其网络流量进行实时监控和分析,发现并拦截了大量的网络攻击行为,保障了网络安全和稳定运行。_ai运维平台项目工作内容

梯度下降法——手工求解,Excel求解,python求解_梯度下降法手算-程序员宅基地

文章浏览阅读773次。目录一、梯度下降法原理1.形象的例子2.什么是梯度3.梯度下降算法原理二、手工求解三、Excel求解四、pyhton求解五、参考资料一、梯度下降法原理梯度下降法(Gradient Descent,GD)是一种常用的求解无约束最优化问题的方法,在最优化、统计学以及机器学习等领域有着广泛的应用。1.形象的例子假设这样一个场景:一个人需要从山的某处开始下山,尽快到达山底。在下山之前他需要确认两件事:下山的方向下山的距离这是因为下山的路有很多,他必须利用一些信息,找到从该处开始最陡峭的方向下山,这_梯度下降法手算

51Nod 2654 最小距离最大 c/c++ 题解_c++给出n个位置(数轴上的坐标值),从中选出k个,让这k个位置相邻两个之间的距离(相-程序员宅基地

文章浏览阅读1.5k次。题目描述给出n个位置,从中选出k个,让这k个位置相邻两个之间的距离尽可能的大,尽可能大的意思是这k-1个距离的最小值尽量大。输出这个最大的最小值。样例解释:选位置:1 5 9。输入第一行:2个数n和k(2 <= n <= 100000, 2 <= k <= 10000, k <= n)后面n行:每行一个数Pi,表示具体位置(0 <= Pi <=..._c++给出n个位置(数轴上的坐标值),从中选出k个,让这k个位置相邻两个之间的距离(相

DataMatrix 码提取流程_dm二维码-程序员宅基地

文章浏览阅读2.7k次。DM 码是一种二维矩阵码,由深浅两种不同灰度的码块组成棋盘的样式,其具有正方形和长方形两种符号形式。图1表示了一个DM 码的区域划分。DM 码由定位图形和数据区域组成,定位图形又由一组垂直的实线边缘和虚线边缘组成,实线边缘形成一个“L”形的特征边,用于提供条码的位置基准和方向基准,虚线边缘由深浅两色码块切换组成,其提供二维条码的尺寸信息;数据区域分布着深色和浅色的码块,分别代表数据位“1”和“0”,通过对条码以编码规则顺序读出每个码块代表的数据位信息,得到一组承载一定信息的数据流,按DM 码的编码协议可译码_dm二维码

随便推点

数据中台概述-程序员宅基地

文章浏览阅读7.2k次,点赞6次,收藏27次。数据中台定义数据中台概念由阿里提出,即实现数据分层和水平解耦,沉淀公共数据能力,提供数据模型、数据服务与数据开发功能。数据中台到底是什么?是一种产品?还是一种解决方案型产品?数据中台其实更像一种企业架构方法论,是以"共享"(Sharing)为目标的"业务流程再造"(Business Process Re-engineering)和"企业组织重构"(Organizational Restructuring)过程。数据中台不单单指系统或者工具,而是一个职能部门,通过一系列平台、工具、流程、规范来为整个组织_数据中台

HDFS_副本和认证_一证三副本-程序员宅基地

文章浏览阅读342次。一、 三副本策略三副本策略的含义1)如果写请求方所在机器是其中一个DataNode,则直接存放在本地,否则随机在集群中选择一个DataNode2)第二个副本存放在不同于第一个副本所在的机架3)第三个副本存放于第二个副本所在的机架,但是属于不同的节点三副本策略的使用需要开启机架感知功能,才能正常使用副本放置策略:net.topology.script.file.name_一证三副本

SpringBoot项目中static和template文件夹的区别-程序员宅基地

文章浏览阅读1.5k次。SpringBoot项目创建后,resources下默认有两个文件夹static和template.一般static存放静态资源,template存放动态资源. 在static文件夹下新建cc.html,浏览器输入localhost:8080/cc.html可以正常访问.同样的操作在访问te..._springboot的static与templates目录编译后

[译]MQTT 通配符_为什么mqtt首页加了/之后会多个空格行-程序员宅基地

文章浏览阅读1.4w次,点赞13次,收藏13次。原文Appendix A - Topic wildcards译文一个订阅可能包含特殊字符,允许你一次定义多个主题。主题层次分隔符被用来在主题中引入层次。多层的通配符和单层通配符可以被使用,但他们不能被使用来做发布者的消息。主题层级分隔符// 被用来分割主题树的每一层,并给主题空间提供分等级的结构。当两个通配符在一个主题中出现的时候,主题层次分隔符的使用是很重要的。多层通配符## 是一个匹配主题中任意_为什么mqtt首页加了/之后会多个空格行

排球计分程序(记分员)-程序员宅基地

文章浏览阅读181次。1.计划:做这个任务大概需要六天。2.需求: 作为一名现场记分员,我希望详细记录比赛现场比分增长情况,以便观众及运动员、教练员及时掌握比赛状况。3.生成设计文档: 1) 建一个数据库,将各个国家的比赛情况记录其中。 2)对每场比赛进行计分记录。 3)记录每个球员的进球得分,队伍中担任位置,失误。 4)对个人及每场比赛进行累计计算。4..._排球记分牌c语言程序

通过Visualizing Representations来理解Deep Learning、Neural network、以及输入样本自身的高维空间结构...-程序员宅基地

文章浏览阅读907次。catalogue1. 引言2. Neural Networks Transform Space - 神经网络内部的空间结构3. Understand the data itself by visualizing high-dimensional input dataset - 输入样本内隐含的空间结构4. Example 1: Word Embeddings in NLP -..._如何写visualisation and deep learning