HBase与MapReduce整合_hbase hadoop

HBase与MapReduce整合_hbase hadoop

编程文章jaq1232025-02-08 10:04:1423A+A-

HBase与MapReduce整合

HBase与MR整合

以下代码都是基于Hadoop 2.6.5和HBase 2.0.6版本构建,具体的maven依赖如下:

pom.xml

 
        
            org.apache.hbase
            hbase-client
            2.0.6
            
                
                    hadoop-annotations
                    org.apache.hadoop
                
                
                    hadoop-auth
                    org.apache.hadoop
                
                
                    hadoop-common
                    org.apache.hadoop
                
            
        
        
            com.google.protobuf
            protobuf-java
            2.5.0
        
        
            org.apache.hadoop
            hadoop-client
            2.6.5
            
                
                    hadoop-common
                    org.apache.hadoop
                
            
        
        
            org.apache.hadoop
            hadoop-auth
            2.6.5
        
        
            org.apache.hadoop
            hadoop-client
            2.6.5
        
        
            org.apache.hadoop
            hadoop-hdfs
            2.6.5
        
        
            org.apache.hbase
            hbase-server
            2.0.6
            
                
                    hadoop-common
                    org.apache.hadoop
                
                
                    hadoop-auth
                    org.apache.hadoop
                
                
                    hadoop-client
                    org.apache.hadoop
                
                
                    hadoop-hdfs
                    org.apache.hadoop
                
                
                    hadoop-mapreduce-client-core
                    org.apache.hadoop
                
                
                    hbase-client
                    org.apache.hbase
                
                
                    hbase-common
                    org.apache.hbase
                
                
                    hbase-hadoop-compat
                    org.apache.hbase
                
                
                    hbase-hadoop2-compat
                    org.apache.hbase
                
                
                    hbase-protocol
                    org.apache.hbase
                
            
        
        
            org.apache.hbase
            hbase-mapreduce
            2.0.6
        
        
            org.slf4j
            slf4j-api
            2.0.13
        
        
            org.slf4j
            slf4j-log4j12
            1.7.26
        
    

从HDFS读取数据,MR计算,将计算结果存储到HBase

该实例还是以单词统计为例,计算的步骤如下:

  • 从hdfs中读取文件,默认按照行读取
  • 将每行读取的数据发送给Mapper任务,Mapper任务负责拆分单词,并向Context写出数据
  • Reducer接受分组后的数据,并形成一直统计求和结果,并写出到HBase中

WordCountFromHdfsMapper

package org.hadoop.learn.word.count.from.dfs;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * 用于对输入的拆分,将句子拆分成单词等
 */
public class WordCountFromHdfsMapper extends Mapper {

    private Text outKey = new Text();
    private LongWritable outValue = new LongWritable();

    @Override
    protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
        String line = value.toString();
        if (null == line || line.trim().equals("")) {
            return;
        }

        String[] words = line.split(" ");
        for (String word : words) {
            String s = word.toLowerCase().replaceAll("![a-z]]", "");
            outKey.set(s);
            context.write(outKey, outValue);
        }
    }
}

WordCountFromHdfsReducer

package org.hadoop.learn.word.count.from.dfs;

import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.NamespaceNotFoundException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableReduce;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

import java.io.IOException;
import java.util.Iterator;

public class WordCountFromHdfsReducer implements TableReduce {
    private static final String NAMESPACE = "wordcount";
    private static final String TABLE = "wordcount";

    @Override
    public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
        int sum = 0;
        while (values.hasNext()) {
            sum += values.next().get();
        }

        Put put = new Put(key.getBytes());
        put.addColumn(Bytes.toBytes("word"), Bytes.toBytes("count"), Bytes.toBytes(sum));
        output.collect(new ImmutableBytesWritable(key.getBytes()), put);
    }

    @Override
    public void close() throws IOException {

    }

    @Override
    public void configure(JobConf job) {
        try {
            Connection connection = ConnectionFactory.createConnection(job);
            Admin admin = connection.getAdmin();
            initNamespace(admin);
            initTable(admin);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private void initTable(Admin admin) {
        TableName tableName = TableName.valueOf(NAMESPACE, TABLE);
        try {
            boolean isExits = admin.tableExists(tableName);
            if (!isExits) {
                TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(tableName);
                tableDescriptorBuilder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("word")).build());
                admin.createTable(tableDescriptorBuilder.build());
                System.out.println("表创建成功: " + TABLE);
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private void initNamespace(Admin admin) {
        try {
            admin.getNamespaceDescriptor(NAMESPACE);
        } catch (NamespaceNotFoundException e) {
            try {
                admin.createNamespace(NamespaceDescriptor.create(NAMESPACE).build());
                System.out.println("namespace创建成功: " + NAMESPACE);
            } catch (IOException ex) {
                throw new RuntimeException(ex);
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

WordCountFromHdfsMain

package org.hadoop.learn.word.count.from.dfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.mapred.TableMapReduceUtil;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;

import java.io.IOException;

/**
 * 该类主要实现了从HDFS->MR->HBASE这样已过过程,实现了将HDFS的数据通过Reduce任务写出到HBASE中
 */
public class WordCountFromHdfsMain {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

        System.setProperty("HADOOP_HOME", "H:\\xianglujun\\hadoop-2.6.5");
        System.setProperty("hadoop.home.dir", "H:\\xianglujun\\hadoop-2.6.5");
        System.setProperty("HADOOP_USER_NAME", "root");

        Configuration cfg = new Configuration(true);
        cfg.set("hbase.zookeeper.quorum", "node1:2181,node2:2181,node3:2181");
        // 本地运行
        cfg.set("mapreduce.framework.name", "local");
        // 设置hdfs的FileSystem


        JobConf jobConf = new JobConf(cfg, WordCountFromHdfsMain.class);
        FileInputFormat.addInputPath(jobConf, new Path("/xianglujun/wordcount/wc.txt"));
        FileOutputFormat.setOutputPath(jobConf, new Path("/xianglujun/wordcount/wc_out_1231"));
        jobConf.setJobName("mp任务从hdfs中获取数据并存储到hbase");
        jobConf.setMapOutputKeyClass(Text.class);
        jobConf.setMapOutputValueClass(LongWritable.class);

        TableMapReduceUtil.initTableReduceJob(
                "wordcount:wordcount", // 需要操作的表的名称
                WordCountFromHdfsReducer.class, // reduce任务
                jobConf,
                null, // 分区类
                false// 是否上传Hbase的包到分布式存储
        );

        jobConf.setJarByClass(WordCountFromHdfsMain.class);

        Job job = Job.getInstance(jobConf);
        job.setMapperClass(WordCountFromHdfsMapper.class);
        boolean completed = job.waitForCompletion(true);
        System.out.println("任务是否执行完成: " + completed);
    }
}

在配置Job任务的时候,主要使用了TableMapReduceUtil这个工具类,这个工具类主要是帮助配置Reducer任务相关参数,具体的源码如下:

public static void initTableReduceJob(String table, Class reducer, JobConf job, Class partitioner, boolean addDependencyJars) throws IOException {
        job.setOutputFormat(TableOutputFormat.class);
        job.setReducerClass(reducer);
        job.set("hbase.mapred.outputtable", table);
        job.setOutputKeyClass(ImmutableBytesWritable.class);
        job.setOutputValueClass(Put.class);
        job.setStrings("io.serializations", new String[]{job.get("io.serializations"), MutationSerialization.class.getName(), ResultSerialization.class.getName()});
        if (partitioner == HRegionPartitioner.class) {
            job.setPartitionerClass(HRegionPartitioner.class);
            int regions = MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
            if (job.getNumReduceTasks() > regions) {
                job.setNumReduceTasks(regions);
            }
        } else if (partitioner != null) {
            job.setPartitionerClass(partitioner);
        }

        if (addDependencyJars) {
            addDependencyJars(job);
        }

        initCredentials(job);
    }

在这段代码中,主要关注一下几个点:

  • setOutputFormat()这个是设置写出时的格式信息,也是最终将数据写到哪里的入口
  • OutputValue()这个表明了写出的数据类型,这里设置的是Put对象,这个对象就是请求HBase所需要的参数.

执行以上代码,当任务执行完成后,会有以下的输出信息:

如果这段日志没有被输出,需要自行配置一下log4j.properties文件

然后查看HBase是否已经包含了数据:

这就表示数据已经执行完毕,将数据写出到HBase任务就完成啦~~

从HBase读取数据,MR计算,并输出到HBase中

这个实例还是以单词统计为例,然后计算步骤如下:

  • 在wordcount的namespace中准备一张sentences表,每一行存放的就是句子信息
  • 从sentences表中读取数据,并将读出的数据发送给Mapper任务
  • Mapper处理任务,并输出拆分单词结果
  • Reducer任务则将分组数据统计形成一致的结果,并写出到wordcount2表中

SentencesInitializer

该类主要是做数据初始化,保证依赖的HBase的表中包含了需要的任务数据,具体代码如下:

package org.hadoop.learn.word.count.from.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.NamespaceNotFoundException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;

/**
 * 这里是做数据初始化
 */
public class SentencesInitializer {
    private String namespace;
    private String tableName;
    private TableName tbName;
    private Connection connection;

    public SentencesInitializer(String namespace, String tableName, Configuration cfg) throws IOException {
        this.namespace = namespace;
        this.tableName = tableName;
        this.tbName = TableName.valueOf(namespace, tableName);
        this.connection = ConnectionFactory.createConnection(cfg);
    }

    public void init() throws IOException {
        try {
            initNamespace();
            initTable();

            // 读取数据,并写入到数据表
            int key = 1;
            InputStream is = this.getClass().getResourceAsStream("sentences.txt");
            try (BufferedReader reader = new BufferedReader(new InputStreamReader(is))) {
                String line = null;
                BufferedMutator bufferedMutator = connection.getBufferedMutator(this.tbName);
                while ((line = reader.readLine()) != null) {
                    if (!line.trim().equals("")) {
                        Put put = new Put(Bytes.toBytes(key));
                        put.addColumn(Bytes.toBytes("base"), Bytes.toBytes("line"), Bytes.toBytes(line));
                        bufferedMutator.mutate(put);
                        key++;
                    }
                }
                bufferedMutator.flush();
            }
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        } finally {
            if (connection != null) {
                connection.close();
            }
        }
    }

    private void initTable() throws IOException {
        Admin admin = connection.getAdmin();
        boolean b = admin.tableExists(this.tbName);
        if (b) {
            admin.disableTable(this.tbName);
            admin.truncateTable(this.tbName, true);

            return;
        }
        TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(this.tbName);
        tableDescriptorBuilder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("base")).build());
        admin.createTable(tableDescriptorBuilder.build());
    }

    private void initNamespace() throws IOException {
        Admin admin = connection.getAdmin();
        try {
            admin.getNamespaceDescriptor(this.namespace);
        } catch (NamespaceNotFoundException e) {
            e.printStackTrace();
            admin.createNamespace(NamespaceDescriptor.create(this.namespace).build());
        }
    }
}

该类主要是从该类所在的目录下读取sentences.txt文件,文件内容自己定义就行,然后将数据初始化到指定的namespace:table表中

WordCountFromHbaseMapper

package org.hadoop.learn.word.count.from.hbase;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableMap;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

import java.io.IOException;
import java.util.List;

/**
 * 用于对输入的拆分,将句子拆分成单词等
 */
public class WordCountFromHbaseMapper implements TableMap {

    private Text outKey = new Text();
    private LongWritable outValue = new LongWritable(1);

    @Override
    public void map(ImmutableBytesWritable key, Result value, OutputCollector output, Reporter reporter) throws IOException {
        byte[] bases = Bytes.toBytes("base");
        List columnCells = value.getColumnCells(bases, Bytes.toBytes("line"));
        for (Cell cell : columnCells) {
            String line = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
            String[] words = line.split(" ");
            for (String word : words) {
                String s = word.toLowerCase().replaceAll("!([a-z]+)", "");
                outKey.set(s);
                output.collect(outKey, outValue);
            }
        }
    }

    @Override
    public void close() throws IOException {

    }

    @Override
    public void configure(JobConf job) {

    }
}

WordCountFromHbaseReducer

package org.hadoop.learn.word.count.from.hbase;

import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.NamespaceNotFoundException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableReduce;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

import java.io.IOException;
import java.util.Iterator;

public class WordCountFromHbaseReducer implements TableReduce {
    private static final String NAMESPACE = "wordcount";
    private static final String TABLE = "wordcount2";

    @Override
    public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
        int sum = 0;
        while (values.hasNext()) {
            sum += values.next().get();
        }

        Put put = new Put(key.getBytes());
        put.addColumn(Bytes.toBytes("base"), Bytes.toBytes("count"), Bytes.toBytes(sum));
        output.collect(new ImmutableBytesWritable(key.getBytes()), put);
    }

    @Override
    public void close() throws IOException {

    }

    @Override
    public void configure(JobConf job) {
        try {
            Connection connection = ConnectionFactory.createConnection(job);
            Admin admin = connection.getAdmin();
            initNamespace(admin);
            initTable(admin);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private void initTable(Admin admin) {
        TableName tableName = TableName.valueOf(NAMESPACE, TABLE);
        try {
            boolean isExits = admin.tableExists(tableName);
            if (!isExits) {
                TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(tableName);
                tableDescriptorBuilder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("base")).build());
                admin.createTable(tableDescriptorBuilder.build());
                System.out.println("表创建成功: " + TABLE);
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private void initNamespace(Admin admin) {
        try {
            admin.getNamespaceDescriptor(NAMESPACE);
        } catch (NamespaceNotFoundException e) {
            try {
                admin.createNamespace(NamespaceDescriptor.create(NAMESPACE).build());
                System.out.println("namespace创建成功: " + NAMESPACE);
            } catch (IOException ex) {
                throw new RuntimeException(ex);
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

WordCountFromHbaseMain

package org.hadoop.learn.word.count.from.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.mapred.TableMapReduceUtil;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.hadoop.learn.word.count.from.dfs.WordCountFromHdfsMain;

import java.io.IOException;

public class WordCountFromHbaseMain {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        System.setProperty("HADOOP_HOME", "H:\\xianglujun\\hadoop-2.6.5");
        System.setProperty("hadoop.home.dir", "H:\\xianglujun\\hadoop-2.6.5");
        System.setProperty("HADOOP_USER_NAME", "root");

        Configuration cfg = new Configuration(true);
        cfg.set("hbase.zookeeper.quorum", "node1:2181,node2:2181,node3:2181");
        // 本地运行
        cfg.set("mapreduce.framework.name", "local");

        JobConf jobConf = new JobConf(cfg, WordCountFromHdfsMain.class);
//        FileInputFormat.addInputPath(jobConf, new Path("/xianglujun/wordcount/wc.txt"));
        FileOutputFormat.setOutputPath(jobConf, new Path("/xianglujun/wordcount/wc_out_1231"));
        jobConf.setJobName("mp任务从hdfs中获取数据并存储到hbase");
        jobConf.setMapOutputKeyClass(Text.class);
        jobConf.setMapOutputValueClass(LongWritable.class);
        jobConf.set("mapred.mapper.new-api", "false");
        jobConf.set("mapreduce.map.compatibility.major.version", "false");

        //        jobConf.set("mapreduce.job.map.class", WordCountFromHbaseMapper.class.getName());
        TableMapReduceUtil.initTableMapJob(
                "/wordcount:sentences",
                "base:line",
                WordCountFromHbaseMapper.class,
                Text.class,
                LongWritable.class,
                jobConf,
                false
        );

        TableMapReduceUtil.initTableReduceJob(
                "wordcount:wordcount2", // 需要操作的表的名称
                WordCountFromHbaseReducer.class, // reduce任务
                jobConf,
                null, // 分区类
                false// 是否上传Hbase的包到分布式存储
        );

        jobConf.setJarByClass(WordCountFromHdfsMain.class);

        Job job = Job.getInstance(jobConf);
//        job.setMapperClass(WordCountFromHdfsMapper.class);
        new SentencesInitializer("wordcount", "sentences", cfg).init();
        boolean completed = job.waitForCompletion(true);
        System.out.println("任务是否执行完成: " + completed);
    }
}

执行以上代码,将会看到成功的结果标志:

从HBase表中查看数据是否正常:

以上就是HBase和Hadoop MapReduce任务整合的实现过程。

点击这里复制本文地址 以上内容由jaq123整理呈现,请务必在转载分享时注明本文地址!如对内容有疑问,请联系我们,谢谢!

苍茫编程网 © All Rights Reserved.  蜀ICP备2024111239号-21