你好,游客 登录
背景:
阅读新闻

开源云计算技术系列(四)(Cloudera体验篇)

[日期:2009-09-02] 来源:  作者:清 [字体: ]

Cloudera  的定位在于

Bringing Big Data to the Enterprise with Hadoop

Cloudera为了让Hadoop的配置标准化,可以帮助企业安装,配置,运行hadoop以达到大规模企业数据的处理和分析。

既然是给企业使用,Cloudera的软件配置不是采用最新的hadoop 0.20,而是采用了Hadoop 0.18.3-12.cloudera.CH0_3的版本进行封装,并且集成了facebook提供的hive,yahoo提供的pig等基于hadoop的sql实现接口,使得这些软件的安装,配置和使用的成本降低并且进行了标准化。当然除了集成和封装这些成熟的工具外,Cloudera一个比较有意思的工具是sqoop,目前这个工具没有独立提供,因此这也是这次我们全面体验Cloudera的一个出发点,就是体验一下sqoop的工具的便捷性。

Sqoop (”SQL-to-Hadoop”),a tool designed to easily import information from SQL databases into your Hadoop cluster.通过sqoop,可以很方便的从传统的RDBMS里面导入数据到hadoop的集群,比如从mysql和oracle里面导入数据,非常方便,从导出到导入一条命令搞定,而且可以进行表的筛选,比起目前比较成熟的通过文本文件或者管道中转来说,开发的效率提升和配置的简洁是这个工具的特色所在。

Sqoop可以做到

  • Imports individual tables or entire databases to files in HDFS
  • Generates Java classes to allow you to interact with your imported data
  • Provides the ability to import from SQL databases straight into your Hive data warehouse

After setting up an import job in Sqoop, you can get started working with SQL database-backed data from your Hadoop MapReduce cluster in minutes.

这里我们先通过一个例子来立即体验一下sqoop,然后在给大家介绍完整的这套云计算环境的配置。

这个例子演示的是如果把客户表的数据拿到hadoop集群上进行分析,如何导出users表的数据并自动导入到hive,在通过hive进行ad-hoc的sql查询分析。这样可以体现出hadoop的强大数据处理能力,并且不影响生产库。

先建立测试USERS表:

mysql> CREATE TABLE USERS (
    ->   user_id INTEGER NOT NULL PRIMARY KEY,
    ->   first_name VARCHAR(32) NOT NULL,
    ->   last_name VARCHAR(32) NOT NULL,
    ->   join_date DATE NOT NULL,
    ->   zip INTEGER,
    ->   state CHAR(2),
    ->   email VARCHAR(128),
    ->   password_hash CHAR(64));
Query OK, 0 rows affected (0.00 sec)

 

插入一条测试数据

insert into USERS (user_id,first_name,last_name,join_date,zip,state,email,password_hash) values (1,'a','b','20080808',330440,'ha','test@test.com','xxxx');       
Query OK, 1 row affected, 1 warning (0.00 sec)

mysql> select * from USERS;
+---------+------------+-----------+------------+--------+-------+---------------+---------------+
| user_id | first_name | last_name | join_date  | zip    | state | email         | password_hash |
+---------+------------+-----------+------------+--------+-------+---------------+---------------+
|       1 | a          | b         | 2008-08-08 | 330440 | ha    | test@test.com | xxxx          |
+---------+------------+-----------+------------+--------+-------+---------------+---------------+
1 row in set (0.00 sec)

然后我们使用sqoop导入mysq库的USERS表到hive。

sqoop --connect jdbc:mysql://localhost/test --username root --password xxx --local --table USERS --hive-import
09/06/20 18:43:50 INFO sqoop.Sqoop: Beginning code generation
09/06/20 18:43:50 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM USERS AS t WHERE 1 = 1
09/06/20 18:43:50 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM USERS AS t WHERE 1 = 1
09/06/20 18:43:50 INFO orm.CompilationManager: HADOOP_HOME is /usr/lib/hadoop
09/06/20 18:43:50 INFO orm.CompilationManager: Found hadoop core jar at: /usr/lib/hadoop/hadoop-0.18.3-12.cloudera.CH0_3-core.jar
09/06/20 18:43:50 INFO orm.CompilationManager: Invoking javac with args: -sourcepath ./ -d /tmp/sqoop/compile/ -classpath /etc/hadoop/conf:/home/hadoop/jdk1.6/lib/tools.jar:/usr/lib/hadoop:/usr/lib/hadoop/hadoop-0.18.3-12.cloudera.CH0_3-core.jar:/usr/lib/hadoop/lib/commons-cli-2.0-SNAPSHOT.jar:/usr/lib/hadoop/lib/commons-codec-1.3.jar:/usr/lib/hadoop/lib/commons-httpclient-3.0.1.jar:/usr/lib/hadoop/lib/commons-logging-1.0.4.jar:/usr/lib/hadoop/lib/commons-logging-api-1.0.4.jar:/usr/lib/hadoop/lib/commons-net-1.4.1.jar:/usr/lib/hadoop/lib/hadoop-0.18.3-12.cloudera.CH0_3-fairscheduler.jar:/usr/lib/hadoop/lib/hadoop-0.18.3-12.cloudera.CH0_3-scribe-log4j.jar:/usr/lib/hadoop/lib/hsqldb.jar:/usr/lib/hadoop/lib/jets3t-0.6.1.jar:/usr/lib/hadoop/lib/jetty-5.1.4.jar:/usr/lib/hadoop/lib/junit-4.5.jar:/usr/lib/hadoop/lib/kfs-0.1.3.jar:/usr/lib/hadoop/lib/libfb303.jar:/usr/lib/hadoop/lib/libthrift.jar:/usr/lib/hadoop/lib/log4j-1.2.15.jar:/usr/lib/hadoop/lib/mysql-connector-java-5.0.8-bin.jar:/usr/lib/hadoop/lib/oro-2.0.8.jar:/usr/lib/hadoop/lib/servlet-api.jar:/usr/lib/hadoop/lib/slf4j-api-1.4.3.jar:/usr/lib/hadoop/lib/slf4j-log4j12-1.4.3.jar:/usr/lib/hadoop/lib/xmlenc-0.52.jar:/usr/lib/hadoop/lib/jetty-ext/commons-el.jar:/usr/lib/hadoop/lib/jetty-ext/jasper-compiler.jar:/usr/lib/hadoop/lib/jetty-ext/jasper-runtime.jar:/usr/lib/hadoop/lib/jetty-ext/jsp-api.jar:/usr/lib/hadoop/hadoop-0.18.3-12.cloudera.CH0_3-core.jar:/usr/lib/hadoop/contrib/sqoop/hadoop-0.18.3-12.cloudera.CH0_3-sqoop.jar ./USERS.java
09/06/20 18:43:51 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop/compile/USERS.jar
09/06/20 18:43:51 INFO manager.LocalMySQLManager: Beginning mysqldump fast path import
09/06/20 18:43:51 INFO manager.LocalMySQLManager: Performing import of table USERS from database test
09/06/20 18:43:52 INFO manager.LocalMySQLManager: Transfer loop complete.
09/06/20 18:43:52 INFO hive.HiveImport: Loading uploaded data into Hive
09/06/20 18:43:52 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM USERS AS t WHERE 1 = 1
09/06/20 18:43:52 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM USERS AS t WHERE 1 = 1
09/06/20 18:43:52 WARN hive.TableDefWriter: Column join_date had to be cast to a less precise type in Hive
09/06/20 18:43:53 INFO hive.HiveImport: Hive history file=/tmp/root/hive_job_log_root_200906201843_1606494848.txt
09/06/20 18:44:00 INFO hive.HiveImport: OK
09/06/20 18:44:00 INFO hive.HiveImport: Time taken: 5.916 seconds
09/06/20 18:44:00 INFO hive.HiveImport: Loading data to table users
09/06/20 18:44:00 INFO hive.HiveImport: OK
09/06/20 18:44:00 INFO hive.HiveImport: Time taken: 0.344 seconds
09/06/20 18:44:01 INFO hive.HiveImport: Hive import complete.

导入成功,我们在hive里面验证一下导入的正确性。

hive
Hive history file=/tmp/root/hive_job_log_root_200906201844_376630602.txt
hive> select * from USERS;
OK
1       'a'     'b'     '2008-08-08'    330440  'ha'    'test@test.com' 'xxxx'
Time taken: 5.019 seconds
hive>

可以看到和mysql库的数据完全一致。

这样我们就完成了从mysql库到HDFS的导入。

并且提供了一个自动生成的USERS.java程序供MapReduce 的分析使用。

more USERS.java
// ORM class for USERS
// WARNING: This class is AUTO-GENERATED. Modify at your own risk.
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.lib.db.DBWritable;
import org.apache.hadoop.sqoop.lib.JdbcWritableBridge;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
public class USERS implements DBWritable, Writable {
  public static final int PROTOCOL_VERSION = 1;
  private Integer user_id;
  public Integer get_user_id() {
    return user_id;
  }
  private String first_name;
  public String get_first_name() {
    return first_name;
  }
  private String last_name;
  public String get_last_name() {
    return last_name;
  }
  private java.sql.Date join_date;
  public java.sql.Date get_join_date() {
    return join_date;
  }
  private Integer zip;
  public Integer get_zip() {
    return zip;
  }
  private String state;
  public String get_state() {
    return state;
  }
  private String email;
  public String get_email() {
    return email;
  }
  private String password_hash;
  public String get_password_hash() {
    return password_hash;
  }
  public void readFields(ResultSet __dbResults) throws SQLException {
    this.user_id = JdbcWritableBridge.readInteger(1, __dbResults);
    this.first_name = JdbcWritableBridge.readString(2, __dbResults);
    this.last_name = JdbcWritableBridge.readString(3, __dbResults);
    this.join_date = JdbcWritableBridge.readDate(4, __dbResults);
    this.zip = JdbcWritableBridge.readInteger(5, __dbResults);
    this.state = JdbcWritableBridge.readString(6, __dbResults);
    this.email = JdbcWritableBridge.readString(7, __dbResults);
    this.password_hash = JdbcWritableBridge.readString(8, __dbResults);
  }
  public void write(PreparedStatement __dbStmt) throws SQLException {
    JdbcWritableBridge.writeInteger(user_id, 1, 4, __dbStmt);
    JdbcWritableBridge.writeString(first_name, 2, 12, __dbStmt);
    JdbcWritableBridge.writeString(last_name, 3, 12, __dbStmt);
    JdbcWritableBridge.writeDate(join_date, 4, 91, __dbStmt);
    JdbcWritableBridge.writeInteger(zip, 5, 4, __dbStmt);
    JdbcWritableBridge.writeString(state, 6, 1, __dbStmt);
    JdbcWritableBridge.writeString(email, 7, 12, __dbStmt);
    JdbcWritableBridge.writeString(password_hash, 8, 1, __dbStmt);
  }
  public void readFields(DataInput __dataIn) throws IOException {
    if (__dataIn.readBoolean()) {
        this.user_id = null;
    } else {
    this.user_id = Integer.valueOf(__dataIn.readInt());
    }
    if (__dataIn.readBoolean()) {
        this.first_name = null;
    } else {
    this.first_name = Text.readString(__dataIn);
    }
    if (__dataIn.readBoolean()) {
        this.last_name = null;
    } else {
    this.last_name = Text.readString(__dataIn);
    }
    if (__dataIn.readBoolean()) {
        this.join_date = null;
    } else {
    this.join_date = new Date(__dataIn.readLong());
    }
    if (__dataIn.readBoolean()) {
        this.zip = null;
    } else {
    this.zip = Integer.valueOf(__dataIn.readInt());
    }
    if (__dataIn.readBoolean()) {
        this.state = null;
    } else {
    this.state = Text.readString(__dataIn);
    }
    if (__dataIn.readBoolean()) {
        this.email = null;
    } else {
    this.email = Text.readString(__dataIn);
    }
    if (__dataIn.readBoolean()) {
        this.password_hash = null;
    } else {
    this.password_hash = Text.readString(__dataIn);
    }
  }
  public void write(DataOutput __dataOut) throws IOException {
    if (null == this.user_id) {
        __dataOut.writeBoolean(true);
    } else {
        __dataOut.writeBoolean(false);
    __dataOut.writeInt(this.user_id);
    }
    if (null == this.first_name) {
        __dataOut.writeBoolean(true);
    } else {
        __dataOut.writeBoolean(false);
    Text.writeString(__dataOut, first_name);
    }
    if (null == this.last_name) {
        __dataOut.writeBoolean(true);
    } else {
        __dataOut.writeBoolean(false);
    Text.writeString(__dataOut, last_name);
    }
    if (null == this.join_date) {
        __dataOut.writeBoolean(true);
    } else {
        __dataOut.writeBoolean(false);
    __dataOut.writeLong(this.join_date.getTime());
    }
    if (null == this.zip) {
        __dataOut.writeBoolean(true);
    } else {
        __dataOut.writeBoolean(false);
    __dataOut.writeInt(this.zip);
    }
    if (null == this.state) {
        __dataOut.writeBoolean(true);
    } else {
        __dataOut.writeBoolean(false);
    Text.writeString(__dataOut, state);
    }
    if (null == this.email) {
        __dataOut.writeBoolean(true);
    } else {
        __dataOut.writeBoolean(false);
    Text.writeString(__dataOut, email);
    }
    if (null == this.password_hash) {
        __dataOut.writeBoolean(true);
    } else {
        __dataOut.writeBoolean(false);
    Text.writeString(__dataOut, password_hash);
    }
  }
  public String toString() {
    StringBuilder sb = new StringBuilder();
    sb.append("" + user_id);
    sb.append(",");
    sb.append(first_name);
    sb.append(",");
    sb.append(last_name);
    sb.append(",");
    sb.append("" + join_date);
    sb.append(",");
    sb.append("" + zip);
    sb.append(",");
    sb.append(state);
    sb.append(",");
    sb.append(email);
    sb.append(",");
    sb.append(password_hash);
    return sb.toString();
  }
}

可以看到,自动生成的程序可读性非常好,可以进行自定义的二次开发使用。

推荐 打印 | 录入: | 阅读:
相关新闻      
本文评论   
评论声明
  • 尊重网上道德,遵守中华人民共和国的各项有关法律法规
  • 承担一切因您的行为而直接或间接导致的民事或刑事法律责任
  • 本站管理人员有权保留或删除其管辖留言中的任意内容
  • 本站有权在网站内转载或引用您的评论
  • 参与本评论即表明您已经阅读并接受上述条款