Hive和Hbase数据互通(用户画像)

数据智能相依偎 2024-03-16 10:28:00

背景

依旧是用户画像的项目,现在标签化的数据存放在hive中,而查询是要在hbase上进行查询,所以需要将hive的数据导入hbase中。

方案:

1、hive和hbase的表建立映射关系,读取的是同一份HDFS文件,只是在上层建立hbase到hive表的映射。

优点:一份数据存储,两种查询模式,数据存储最低;

缺点:底层还是格式化的HDFS文件,查询需要进行映射转换,效率较低;

2、将hive的数据通过生成hfile,通过bulkload导入到hbase,这样底层数据的格式会转变成Hfile存储在hbase中,将hbase完全作为一个数据库去查询

优点:查询效率高;

缺点:同一份数据,两份存储格式,空间换取时间;

介绍

1、环境问题

之前因为各种操作,导致hive的对应的数据存储路径被删了,所以先对hive的环境进行重新配置,主要配置和mysql的互通;

1、删除mysql对应的hive库;2、执行schematool -dbType mysql -initSchema3、重启hive4、查看hive-site的配置 <property> <name>hive.metastore.warehouse.dir</name> <value>/user/hive/warehouse</value> <description>location of default database for the warehouse</description> </property>

2、spark运行环境的配置

在测试的时候,spark的运行环境出现了很多问题,主要是jar包冲突和找不到类的问题。

所以基于hbase的类主要是:

<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.1.2</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-protocol</artifactId> <version>1.1.2</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>1.1.2</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.1.2</version> </dependency>

同时spark的代码框架中要加入resouces包,并将hive-site.xml、core-site.xml、hdfs-site.xml、hbase.xml配置文件扔进去,方便spark运行是能够找到依赖的环境。

3、hive映射hbase的表。

在Spark读写Hbase(用户画像)将如何像hbase写数据方式介绍了,而且在hbase中建立了一张表:TEST.USER_INFO

现在将这张吧映射到hive中:

建立hive映射表:

CREATE EXTERNAL TABLE IF NOT EXISTS test_user_info(key string,C1 string,C2 string,C3 string)stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'with serdeproperties ("hbase.columns.mapping" = ":key,INFO:C1,INFO:C2,INFO:C3")tblproperties("hbase.table.name" = "TEST.USER_INFO");stored by指定数据的存储方式。SERDEPROPERTIES:表示字段映射,对应hive中的表字段的顺序,需要注意的是 :key指的是Hbase中的rowdy,hive表中要有一个key字段与之对应,否则会报错的。TBLPROPERTIES:表示表名映射,指定需要映射的Hbase表名。

具体的映射规则:

hbase中的空cell在hive中会补null。hive和hbase中不匹配的字段会补null。hive内部表的数据,由hive自己管理,因此删除hive表,则对应的Hbase表也会被删除。hbase对应的hive没有时间戳概念,默认返回最新版本的值。由于HBase中没有数据类型信息,所以在存储数据的时候都转化为String类型。建表如果没有指定:key,则第一列默认为行健。

建表语句:

查询结果:

在hbase中新增只有两个列的rowKey。

查询结果:

可以看到在不匹配的列中会自动补NULL。

4、整个hbase的map映射到hive

规则和上面基本一样,只不过建立hive表的时候指定的列的类型修改一下。

CREATE EXTERNAL TABLE test_user_info_2(key string,value map<string,string>)STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,INFO:")tblproperties("hbase.table.name" = "TEST.USER_INFO");

查询结果:

5、spark生成hive表数据

val RDD = spark.sparkContext.textFile("hdfs://localhost:9000/data/user/*") import spark.implicits._ val DF = RDD.map(f => (f.split(",")(0), f.split(",")(1), f.split(",")(2), f.split(",")(3), f.split(",")(4), f.split(",")(5), f.split(",")(6), f.split(",")(7))). toDF("uid","date_create","create_type","level","follow_num","first_follow_time","last_follow_time","follow_dur") RDD.foreach(println) DF.write.mode("overwrite").insertInto("default.user_message_1") //todo 查询hive表数据 spark.sql("select * from default.user_message_1").show

在hive上建立hbase的映射表:

CREATE TABLE user_message(uid string,date_create string,create_type int,level string, follow_num int, first_follow_time string, last_follow_time string, follow_dur bigint)stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'with serdeproperties ("hbase.columns.mapping" = ":key,user_info:date_create,user_info:create_type,user_info:level,follow_info:follow_num,follow_info:first_follow_time,follow_info:last_follow_time,follow_info:follow_dur")tblproperties("hbase.table.name" = "default:TEST.user_message","hbase.mapred.output.outputtable" = "default:TEST.user_message");

查看hbase

将hive中user_message_1中的数据导入user_message中

insert overwrite table user_message select * from user_message_1;

hive中查询结果:

Hbase中查询结果:

这样两边的数据映射成功。

6、查询hive数据写入Hbase

package sparkTestimport org.apache.hadoop.fs.{FileSystem, Path}import org.apache.hadoop.hbase.client.ConnectionFactoryimport org.apache.hadoop.hbase.{HBaseConfiguration, TableName}import org.apache.hadoop.hbase.io.ImmutableBytesWritableimport org.apache.hadoop.hbase.mapreduce.TableInputFormatimport org.apache.log4j.{Level, Logger}import org.apache.spark.sql.SparkSessionimport org.apache.hadoop.conf.Configurationimport org.apache.hadoop.hbase._import org.apache.hadoop.hbase.client._import org.apache.hadoop.hbase.mapred.TableOutputFormatimport org.apache.hadoop.hbase.util.Bytesimport org.apache.hadoop.mapred.JobConf/** * * * @autor gaowei * @Date 2020-08-06 09:53 */object HfiletoHbase { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) val spark = SparkSession .builder() .appName("HfiletoHbase") .enableHiveSupport() .config("spark.master", "local") .getOrCreate() val sc = spark.sparkContext val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) hiveContext.sql("SET hive.exec.dynamic.partition = true") hiveContext.sql("SET hive.exec.dynamic.partition.mode = nonstrict ") hiveContext.sql("SET mapreduce.input.fileinputformat.input.dir.recursive = true") hiveContext.sql("SET hive.input.dir.recursive = true") hiveContext.sql("SET hive.mapred.supports.subdirectories = true") hiveContext.sql("SET hive.supports.subdirectories = true") val tablename = "TEST.user_message_test_c" val conf = HBaseConfiguration.create() //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置 conf.set("hbase.zookeeper.quorum","localhost") //设置zookeeper连接端口,默认2181 conf.set("hbase.zookeeper.property.clientPort", "2181") creteHTable(tablename, conf) conf.set(TableInputFormat.INPUT_TABLE, tablename) val DF = spark.sql( s""" |select uid, |date_create, |create_type, |level, |ifnull(follow_num,0) as follow_num, |first_follow_time, |last_follow_time, |ifnull(follow_dur,0) as follow_dur |from user_message_1 """.stripMargin) val RDD = DF.rdd.map(f => (f.getAs[String]("uid"), f.getAs[String]("date_create"), f.getAs[Int]("create_type").toString, f.getAs[String]("level"), f.getAs[Int]("follow_num").toString, f.getAs[String]("first_follow_time"), f.getAs[String]("last_follow_time"), f.getAs[Long]("follow_dur").toString)) for(arr <- RDD.collect()){println(arr)} val jobConf = new JobConf() jobConf.setOutputFormat(classOf[TableOutputFormat]) jobConf.set(TableOutputFormat.OUTPUT_TABLE,tablename) RDD.map{f => { val put = new Put(Bytes.toBytes(f._1)) put.add(Bytes.toBytes("user_info"),Bytes.toBytes("date_create"),Bytes.toBytes(f._2)) put.add(Bytes.toBytes("user_info"),Bytes.toBytes("create_type"),Bytes.toBytes(f._3)) put.add(Bytes.toBytes("user_info"),Bytes.toBytes("level"),Bytes.toBytes(f._4)) put.add(Bytes.toBytes("follow_info"),Bytes.toBytes("follow_num"),Bytes.toBytes(f._5)) put.add(Bytes.toBytes("follow_info"),Bytes.toBytes("first_follow_time"),Bytes.toBytes(f._6)) put.add(Bytes.toBytes("follow_info"),Bytes.toBytes("last_follow_time"),Bytes.toBytes(f._7)) put.add(Bytes.toBytes("follow_info"),Bytes.toBytes("follow_dur"),Bytes.toBytes(f._8)) (new ImmutableBytesWritable,put) }}.saveAsHadoopDataset(jobConf) sc.stop()} def creteHTable(tableName: String, hBaseConf : Configuration) = { val connection = ConnectionFactory.createConnection(hBaseConf) val hBaseTableName = TableName.valueOf(tableName) val admin = connection.getAdmin if (!admin.tableExists(hBaseTableName)) { val tableDesc = new HTableDescriptor(hBaseTableName) tableDesc.addFamily(new HColumnDescriptor("user_info".getBytes)) tableDesc.addFamily(new HColumnDescriptor("follow_info".getBytes)) admin.createTable(tableDesc) } connection.close() }}

结果:

0 阅读:3

数据智能相依偎

简介:感谢大家的关注