python 异步执行hive查询

在开始之前,我们需要具备一些基础知识:

什么是hive?

hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供完整的sql查询功能,可以将sql语句转换为MapReduce任务进行运行。

hive对外暴露出来的用法,基本上和Mysql相同,但是它能做到Mysql做不到的对大数据处理,例如几十亿条数据,放在Mysql中,必然不堪重负,但在hive中,举重若轻。

如何使用?

所需环境:

pyhive

安装方式:

pip install pyhive 

hive-thriftserver

hive具有一个可选的组件叫HiveThrift,其允许通过指定端口,通过thrift方式访问hive。
启动Thrift Server:
进入hive安装目录,使用如下命令开启服务

hive --service hiveserver &

检查HiveServer是否启动成功:

netstat -apn | grep 10000

具体思路

hive的thriftserver,支持我们在连接中异步地提交sql,并且通过查询对应的cursor来实时获取任务的执行状态。所以我们可以在提交后将cursor保留,并定时地获取sql的状态,当执行完成后,则fetch结果

python代码

#提交sql,并保存对应的cursor和connection
def execute_async(sql):
    conn = hive.connect(host = '127.0.0.1', port = 10000)#建立连接
    cursor = conn.cursor()#取得cursor
    cursor.execute(sql, async = True)
    return cursor, conn

#检查任务状态,将上面执行后返回的cursor作为参数
def check_state(cursor):
    job_handler = cursor.poll()
    state = job_handler.operationState
    if state in (TOperationState.INITIALIZED_STATE, TOperationState.RUNNING_STATE, TOperationState.PENDING_STATE):
        return "RUNNING"
    elif state in (TOperationState.CANCELED_STATE, TOperationState.ERROR_STATE, TOperationState.UKNOWN_STATE):
        return "ERROR"
    elif state == TOperationState.FINISHED_STATE:
        return "FINISHED"
    elif state == TOperationState.CLOSED_STATE:
        return "CLOSED"
    return "UNKNOW"

上面的代码只是异步执行的核心代码。具体的逻辑还需要结合自身需求进行填充。当然,查询完毕之后,不要忘记最重要的一步:释放资源

cursor.close()
conn.close()