简介
Apache Hive 是一个基于 Hadoop 的数据仓库工具,它提供了一种类似于 SQL 的查询语言(HiveQL),使用户能够在大规模分布式存储和计算框架上进行复杂的数据分析。
主要特点
- SQL-Like 查询语言: Hive 使用类似于 SQL 的查询语言(HiveQL),这使得用户无需学习新的语法就能够进行数据查询和分析。
- 分布式存储和计算: Hive建立在Hadoop生态系统之上,利用Hadoop的HDFS(分布式文件系统)和MapReduce(分布式计算模型)来处理大规模数据。
- Schema on Read: 与传统的关系型数据库不同,Hive 使用"Schema on Read"的模型,这意味着数据在存储时不需要预定义的模式,而是在读取时根据用户的查询动态解析。
- 支持分区表: Hive 支持分区表,可以根据表中的某个列将数据分隔成不同的子目录,提高查询性能。
- 用户自定义函数(UDF): Hive 允许用户编写自定义函数(User Defined Functions),这样用户可以根据自己的需求扩展 Hive 的功能。
- 元数据存储: Hive 使用元数据存储来记录表结构、分区信息等,这使得它能够对数据进行元数据管理。
- 优化器: Hive 包含一个查询优化器,它可以优化用户的查询计划以提高查询性能。
- 可扩展性: Hive 是一个可扩展的框架,可以通过添加新的存储插件(如 HBase、Apache Cassandra)和执行引擎(如 Apache Tez)来扩展其功能。
架构组成
- Hive CLI(Command Line Interface): 提供了一个命令行界面,允许用户通过命令行输入 HiveQL 查询。
- Hive Metastore: 存储 Hive 的元数据信息,包括表结构、分区信息等。默认情况下,元数据存储在关系型数据库中(如 MySQL)。
- Hive Server: 提供了 Thrift 和 JDBC 接口,允许远程客户端连接到 Hive 并执行查询。
- Hive Query Processor: 包括查询编译器和执行引擎,负责将 HiveQL 查询转换为 MapReduce 任务或其他执行引擎的任务。
- Hadoop Distributed File System (HDFS): 作为底层存储系统,存储 Hive 表的数据。
- MapReduce: 在较早的版本中,Hive 使用 MapReduce 作为执行引擎,但随着发展,它也支持其他执行引擎,如 Apache Tez。
HiveQL
-- 创建表
CREATE TABLE employees (
id INT,
name STRING,
salary FLOAT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ',';
-- 加载数据
LOAD DATA LOCAL INPATH '/path/to/employees.csv' INTO TABLE employees;
-- 查询数据
SELECT name, AVG(salary) FROM employees GROUP BY name;
上述 HiveQL 查询演示了创建表、加载数据以及执行聚合查询的基本语法。
nodejs连接hive
以下是个简单的查询工具实例,涉及到了http和websocket两种连接方式,由于hive的慢查询性质,需要维护连接池,建议使用异步连接,并且使用模板过滤掉不友好的输入
npm install express body-parser node-thrift-hive ws generic-pool
<!-- frontend/index.html -->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Hive Query</title>
</head>
<body>
<textarea id="queryInput" rows="5" cols="50"></textarea>
<button onclick="executeQuery()">Execute Query</button>
<div id="queryResult"></div>
<script>
const socket = new WebSocket('ws://localhost:3000');
function executeQuery() {
const queryInput = document.getElementById('queryInput');
const query = queryInput.value;
// 发送查询到服务器
socket.send(JSON.stringify({ query }));
}
socket.onmessage = function(event) {
// 处理从服务器接收到的查询结果
const queryResult = document.getElementById('queryResult');
queryResult.innerText = `Query Result: ${event.data}`;
};
</script>
</body>
</html>
// backend/server.js
const express = require('express');
const http = require('http');
const WebSocket = require('ws');
const bodyParser = require('body-parser');
const thrift = require('thrift');
const Hive = require('node-thrift-hive');
const genericPool = require('generic-pool');
const app = express();
const server = http.createServer(app);
const wss = new WebSocket.Server({ server });
// 创建 Hive 连接池
const hivePool = genericPool.createPool({
create: function () {
return new Promise((resolve, reject) => {
const connection = thrift.createConnection('localhost', 10000, {
transport: thrift.TBufferedTransport,
protocol: thrift.TBinaryProtocol,
});
const client = thrift.createClient(Hive, connection);
connection.on('connect', function () {
console.log('Connected to Hive');
resolve({ connection, client });
});
connection.on('error', function (err) {
console.error('Error connecting to Hive:', err);
reject(err);
});
});
},
destroy: function (resource) {
resource.connection.end();
},
}, {
max: 10, // 最大连接数
min: 2, // 最小连接数
});
// 处理静态文件
app.use(express.static('frontend'));
app.use(bodyParser.json());
// 处理 HTTP 请求
app.post('/executeQuery', async (req, res) => {
const query = req.body.query;
if (!query) {
return res.status(400).json({ error: 'Query is required' });
}
let connection;
try {
// 从连接池中获取连接
connection = await hivePool.acquire();
// 执行 Hive 查询
connection.client.execute(query, function (err, response) {
if (err) {
console.error('Error executing Hive query:', err);
res.status(500).json({ error: 'Error executing Hive query' });
} else {
console.log('Hive query result:', response);
res.json({ result: response });
}
});
} catch (err) {
console.error('Error acquiring Hive connection:', err);
res.status(500).json({ error: 'Error acquiring Hive connection' });
} finally {
// 将连接释放回连接池
if (connection) {
hivePool.release(connection);
}
}
});
// 处理 WebSocket 连接
wss.on('connection', (ws) => {
console.log('WebSocket connection established');
ws.on('message', (message) => {
const { query } = JSON.parse(message);
// 在这里可以执行一些逻辑,然后将查询结果通过 WebSocket 推送给前端
ws.send(`Query Result: ${query}`);
});
});
// 启动服务器
const PORT = 3000;
server.listen(PORT, () => {
console.log(`Server is running on port ${PORT}`);
});
const express = require('express');
const bodyParser = require('body-parser');
const thrift = require('thrift');
const Hive = require('node-thrift-hive');
const genericPool = require('generic-pool');
const app = express();
const port = 3000;
const pool = genericPool.createPool({
create: function () {
return new Promise((resolve, reject) => {
const connection = thrift.createConnection('localhost', 10000, {
transport: thrift.TBufferedTransport,
protocol: thrift.TBinaryProtocol,
});
const client = thrift.createClient(Hive, connection);
connection.on('connect', function () {
console.log('Connected to Hive');
resolve({ connection, client });
});
connection.on('error', function (err) {
console.error('Error connecting to Hive:', err);
reject(err);
});
});
},
destroy: function (resource) {
resource.connection.end();
},
}, {
max: 10, // 最大连接数
min: 2, // 最小连接数
});
app.use(bodyParser.json());
// 提供查询的HTTP接口
app.post('/executeQuery', async (req, res) => {
const query = req.body.query;
if (!query) {
return res.status(400).json({ error: 'Query is required' });
}
let connection;
try {
// 从连接池中获取连接
connection = await pool.acquire();
// 执行Hive查询
connection.client.execute(query, function (err, response) {
if (err) {
console.error('Error executing Hive query:', err);
res.status(500).json({ error: 'Error executing Hive query' });
} else {
console.log('Hive query result:', response);
res.json({ result: response });
}
});
} catch (err) {
console.error('Error acquiring Hive connection:', err);
res.status(500).json({ error: 'Error acquiring Hive connection' });
} finally {
// 将连接释放回连接池
if (connection) {
pool.release(connection);
}
}
});
// 启动服务器
app.listen(port, () => {
console.log(`Server is running on port ${port}`);
});