package bi.etl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HBaseImportJob extends Configured implements Tool {
private static final Logger LOG = LoggerFactory.getLogger(HBaseImportJob.class);
public static void main(String[] args) {
try {
int ret = ToolRunner.run(new HBaseImportJob(), args);
System.exit(ret);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
}
@Override
public int run(String[] args) throws Exception {
String sourceTableColumns = args[0];
String inputPath = args[1];
String targetTableCFName = args[2];
String targetTableName = args[3];
String partitionMip = args[4];
String partitionDt = args[5];
String partitionDh = args[6];
String zkHosts = args[7];
String mapNum = args[8];
String zkNodeParent = args[9];
Job job = Job.getInstance();
Configuration conf = job.getConfiguration();
conf.set("source.table.columns", sourceTableColumns);
conf.set("target.table.family.name", targetTableCFName);
conf.set("target.table.name", targetTableName);
conf.set("partition.mip", partitionMip);
conf.set("partition.dt", partitionDt);
conf.set("partition.dh", partitionDh);
conf.set("hbase.zookeeper.quorum", zkHosts);
conf.set("mapreduce.job.maps", mapNum);
conf.set("zookeeper.znode.parent", zkNodeParent);
conf.set("hbase.mapred.outputtable", targetTableName);
job.setJobName(this.getConf().toString());
job.setJarByClass(HBaseImportJob.class);
job.setMapperClass(HBaseImportMapper.class);
job.setOutputFormatClass(TableOutputFormat.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Mutation.class);
job.setNumReduceTasks(0);
FileInputFormat.addInputPath(job, new Path(inputPath));
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
}
package bi.etl;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class HBaseImportMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Mutation> {
private String[] columns;
private String familyName;
private String tableName;
private String mipString;
private String dtString;
private String dhString;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration();
String columnsStr = conf.get("source.table.columns");
if (StringUtils.isNotBlank(columnsStr)) {
columns = columnsStr.trim().split(",");
}
familyName = conf.get("target.table.family.name");
tableName = conf.get("target.table.name");
mipString = conf.get("partition.mip");
dtString = conf.get("partition.dt");
dhString = conf.get("partition.dh");
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
if (StringUtils.isBlank(line)) {
return;
}
String rowKey = tableName + "#" + mipString + "#" + dtString + "#" + dhString;
String[] items = line.split("\\001");
if (null != columns) {
int columnLength = columns.length;
if (items.length == columnLength) {
Put put = new Put(Bytes.toBytes(rowKey));
for (int i = 0; i < columnLength; ++i) {
String columnName = columns[i];
if (StringUtils.isNotBlank(columnName)) {
put.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName),
Bytes.toBytes(items[i]));
}
}
context.write(null, put);
}
}
}
}