当前位置: 首页 > 数据库 > 其它数据库 > 正文

如何利用mapreduce访问hbase数据

时间:2015-01-29
package com.mr.test;  
      
import java.io.IOException;  
import java.util.Iterator;  
      
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.hbase.HBaseConfiguration;  
import org.apache.hadoop.hbase.KeyValue;  
import org.apache.hadoop.hbase.client.Result;  
import org.apache.hadoop.hbase.client.Scan;  
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;  
import org.apache.hadoop.hbase.mapreduce.TableMapper;  
import org.apache.hadoop.hbase.util.Bytes;  
import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.LongWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapred.MapReduceBase;  
import org.apache.hadoop.mapred.OutputCollector;  
import org.apache.hadoop.mapred.Reducer;  
import org.apache.hadoop.mapred.Reporter;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
import org.apache.log4j.Logger;  
      
      
public class MRHbase {  
    private static Logger log = Logger.getLogger(MRHbase.class);  
    public static String family = "charactor";  
    public static String col = "hobby";  
      
    public static class HMap extends TableMapper<Text, Text> {  
        @Override
        protected void map(ImmutableBytesWritable key, Result value,  
                Context context) throws IOException, InterruptedException {  
//          KeyValue kv = value.getColumnLatest(family.getBytes(),  
//                  col.getBytes());  
//          context.write(new Text(Bytes.toString(kv.getKey())),  
//                  new Text(Bytes.toString(kv.getValue())));  
            byte[] v = value.getValue(family.getBytes(), col.getBytes());  
            byte[] r = value.getRow();  
            context.write(new Text(Bytes.toString(v)), new Text(Bytes.toString(r)));  
        }  
    }  
      
    public static class Reduce extends MapReduceBase implements
            Reducer<Text, Text, Text, Text> {  
        public void reduce(Text key, Iterator<Text> values,  
                OutputCollector<Text, Text> output, Reporter reporter)  
                throws IOException {  
            while (values.hasNext()) {  
                output.collect(key, values.next());  
            }  
        }  
    }  
      
    public static void main(String[] args) {  
        Configuration conf = HBaseConfiguration.create();  
        try {  
            Job job = new Job(conf, "hbase test");  
            job.setJarByClass(MRHbase.class);  
            Scan scan = new Scan();  
            scan.addColumn(family.getBytes(), col.getBytes());  
            TableMapReduceUtil.initTableMapperJob("person", scan, HMap.class,  
                    Text.class, Text.class, job);  
            job.setOutputFormatClass(TextOutputFormat.class);  
            FileOutputFormat.setOutputPath(job, new Path(args[0]));  
            job.waitForCompletion(true);  
        } catch (Exception e) {  
            // TODO Auto-generated catch block  
            e.printStackTrace();  
        }  
      
    }  
}

注:要把zookeeper添加到hadoop/lib目录下,master&slaves

更多精彩内容:http://www.bianceng.cn/database/extra/