hbase的mr实例
13 July 2016

只有map的数据导入

package bi.etl;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HBaseImportJob extends Configured implements Tool {
    private static final Logger LOG = LoggerFactory.getLogger(HBaseImportJob.class);

    public static void main(String[] args) {
        try {
            int ret = ToolRunner.run(new HBaseImportJob(), args);
            System.exit(ret);
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        String sourceTableColumns = args[0];
        String inputPath = args[1];
        String targetTableCFName = args[2];
        String targetTableName = args[3];
        String partitionMip = args[4];
        String partitionDt = args[5];
        String partitionDh = args[6];
        String zkHosts = args[7];
        String mapNum = args[8];
        String zkNodeParent = args[9];

        Job job = Job.getInstance();
        Configuration conf = job.getConfiguration();
        conf.set("source.table.columns", sourceTableColumns);
        conf.set("target.table.family.name", targetTableCFName);
        conf.set("target.table.name", targetTableName);
        conf.set("partition.mip", partitionMip);
        conf.set("partition.dt", partitionDt);
        conf.set("partition.dh", partitionDh);

        conf.set("hbase.zookeeper.quorum", zkHosts);
        conf.set("mapreduce.job.maps", mapNum);
        conf.set("zookeeper.znode.parent", zkNodeParent);
        conf.set("hbase.mapred.outputtable", targetTableName);

        job.setJobName(this.getConf().toString());
        job.setJarByClass(HBaseImportJob.class);
        job.setMapperClass(HBaseImportMapper.class);

        job.setOutputFormatClass(TableOutputFormat.class);
        job.setOutputKeyClass(ImmutableBytesWritable.class);
        job.setOutputValueClass(Mutation.class);
        job.setNumReduceTasks(0);

        FileInputFormat.addInputPath(job, new Path(inputPath));
        boolean success = job.waitForCompletion(true);
        return success ? 0 : 1;
    }
}




package bi.etl;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
    
public class HBaseImportMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Mutation> {
    private String[] columns;
    private String familyName;
    private String tableName;
    private String mipString;
    private String dtString;
    private String dhString;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        super.setup(context);
        Configuration conf = context.getConfiguration();
        String columnsStr = conf.get("source.table.columns");
        if (StringUtils.isNotBlank(columnsStr)) {
            columns = columnsStr.trim().split(",");
        }
        familyName = conf.get("target.table.family.name");
        tableName = conf.get("target.table.name");
        mipString = conf.get("partition.mip");
        dtString = conf.get("partition.dt");
        dhString = conf.get("partition.dh");
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        if (StringUtils.isBlank(line)) {
            return;
        }
        String rowKey = tableName + "#" + mipString + "#" + dtString + "#" + dhString;
        String[] items = line.split("\\001");
        if (null != columns) {
            int columnLength = columns.length;
            if (items.length == columnLength) {
                Put put = new Put(Bytes.toBytes(rowKey));
                for (int i = 0; i < columnLength; ++i) {
                    String columnName = columns[i];
                    if (StringUtils.isNotBlank(columnName)) {
                        put.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName),
                                Bytes.toBytes(items[i]));
                    }
                }
                context.write(null, put);
            }
        }
    }
}
–EOF–