OLAP 联机分析处理
OLTP 联机事物处理 关系型数据库 Oracle MySQL
OLTP and OLAP:tidb
Apache Phoenix enables OLTP and operational analytics in Hadoop for low latency applications
the power of standard SQL (除了upsert语法) and JDBC APIs with full ACID transaction capabilities
the flexibility of late-bound, schema-on-read capabilities from the NoSQL world by leveraging HBase as its backing store
fully integrated with other Hadoop products such as Spark, Hive, Pig, Flume, and Map Reduce
# 服务端 与Hbase绑定
cp ~/app/apache-phoenix-4.14.0-cdh5.14.2-bin/phoenix-4.14.0-cdh5.14.2-server.jar ~/app/hbase-1.2.0-cdh5.16.2/lib/
# 修改配置
cd /home/hadoop/app/hbase-1.2.0-cdh5.16.2/conf
vim hbase-site.xml
cd /home/hadoop/app/hbase-1.2.0-cdh5.16.2
./bin/stop-hbase.sh
./bin/start-hbase.sh
cp hbase-site.xml hbase-site.xml.bak
ln -s ~/app/hbase-1.2.0-cdh5.16.2/conf/hbase-site.xml hbase-site.xml
# 注意 假如 hdfs ha怎么办?
# cdh:/etc/hdfs/conf/core-site.xml
# /etc/hdfs/conf/hdfs-site.xml
# 注意 Python版本 2.7
python -V
# 启动
cd /home/hadoop/app/apache-phoenix-4.14.0-cdh5.14.2-bin/bin
./sqlline.py hadoop000:2181
<property>
<name>hbase.table.sanity.checks</name>
<value>false</value>
</property>
<property>
<name>hbase.regionserver.wal.codec</name>
<value>org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec</value>
</property>
<property>
<name>phoenix.schema.isNamespaceMappingEnabled</name>
<value>true</value>
</property>
<property>
<name>phoenix.schema.mapSystemTablesToNamespace</name>
<value>true</value>
</property>
INTEGER
BIGINT
FLOAT
DOUBLE
DECIMAL
VARCHAR
CHAR
TIMESTAMP(针对MYSQL datetime timestamp)
加字段好加 add语法
不支持modify语法 只能drop 再add
-- phoenix 建表必须指定 pk--> hbase rowkey
-- Phoenix的schema => hbase的namespace => database
-- list_namespace(hbase shell)
create schema rzdata;
create table rzdata.test(id bigint not null primary key, name varchar(255), age integer);
!tables
-- 将insert update融合,数据不存在就insert, 数据存在就update
-- scan 'RZDATA:TEST'(hbase shell)
upsert into rzdata.test values(1, 'rz', 18);
upsert into rzdata.test values(2, '狗狗', 16);
-- 覆盖
upsert into rzdata.test values(1, 'rz2', 18);
<property>
<name>phoenix.schema.isNamespaceMappingEnabled</name>
<value>true</value>
</property>
<property>
<name>phoenix.schema.mapSystemTablesToNamespace</name>
<value>true</value>
</property>
<!-- 基于这2个参数 看文档 注意客户端和服务端 哪边配置 -->
# 创建索引表
create index test_idx on rzdata.test(name,age);
explain select name from rzdata.test where age = 18; # FULL SCAN OVER RZDATA:TEST_IDX
# RANGE SCAN OVER RZDATA:TEST_IDX ['rz'] (优)
## 在索引表 用第一个字段查第二个字段
explain select age from rzdata.test where name = 'rz';
加盐本质是随机数
针对大数据量
好处:有助于数据均匀落在各个region-->各个节点,从而提升写的性能
坏处:查询性能有所下降
1-1000 非盐表 数据在一个rs 查询性能高
1-1000 盐表 数据在多个rs 查询性能低
1-1kw 非盐表 数据在一个rs 查询性能低
1-1kw 盐表 数据在多个rs 查询性能高
坑:
create table rzdata.testsalt (
id integer primary key,
name varchar,
age integer,
address varchar
) salt_buckets=20;
upsert into rzdata.testsalt values(1, 'rz', 17, '北疆');
upsert into rzdata.testsalt values(2, 'jj', 17, '上海');
# hbase shell 查看index
scan 'RZDATA:TESTSALT'
# 比如 设为20, 会有20+1个region 分散到各个rs节点上
# SPLITS 必须大写
create 'testsalt2', 'order', SPLITS => ['a','b','c','d'] # 预分区表
list # 查看所有表
Secondary indexes are an orthogonal way to access data from its primary access path. In HBase, you have a single index that is lexicographically sorted on the primary row key. Access to records in any way other than through the primary row requires scanning over potentially all the rows in the table to test them against your filter. With secondary indexing, the columns or expressions you index form an alternate row key to allow point lookups and range scans along this new axis.
总结
create index testsalt_idx on rzdata.testsalt(name, age);
drop index testsalt_idx on rzdata.testsalt;
-- 查询表
!tables
select * from rzdata.testsalt_idx;
+---------+--------+------+
| 0:NAME | 0:AGE | :ID |
+---------+--------+------+
| j | 17 | 2 |
| rz | 17 | 1 |
+---------+--------+------+
# 0 --> Phoenix建表的默认列族 0
# 全表扫描 all
explain select * from rzdata.testsalt;
explain select /*+ INDEX(rzdata.testsalt testsalt_idx) */ * from rzdata.testsalt; # 强制走索引
# select address
## 全表扫描 all
explain select address from rzdata.testsalt where age=17;
## 强制走index
explain select /*+ INDEX(rzdata.testsalt testsalt_idx) */ address from rzdata.testsalt where age=17;
# 走index表
## RANGE SCAN OVER RZDATA:TESTSALT_IDX SERVER FILTER BY FIRST KEY ONLY AND TO_INTEGER("AGE") = 17
explain select name from rzdata.testsalt where age=17;
重写轻读
local indexes, index data and table data co-reside on same server preventing any network overhead during writes
From 4.8.0 onwards we are storing all local index data in the separate shadow column families in the same data table
本质:
总结:
好处:
坏处:
# region 3+1个
# split 是预分区表
create table rzdata.testlocal (
id integer primary key,
name varchar,
age integer,
address varchar
) split on (1,2,3);
# 插数据
upsert into rzdata.testlocal values(1, 'rz', 17, '北疆');
upsert into rzdata.testlocal values(2, 'j', 17, '上海');
# 建索引
create local index testlocal_idx on rzdata.testlocal(name,age);
# 全表扫描 all
explain select * from rzdata.testlocal;
explain select /*+ INDEX(rzdata.testlocal testlocal_idx) */ * from rzdata.testlocal;
# 走index RANGE SCAN
explain select address from rzdata.testlocal where age=17;
explain select /*+ INDEX(rzdata.testlocal testlocal_idx) */ address from rzdata.testlocal where age=17;
describe 'RZDATA:TESTLOCAL'
# 结果 2个cf
COLUMN FAMILIES DESCRIPTION
{NAME => '0', BLOOMFILTER => 'NONE', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'FAST_DIFF', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}
{NAME => 'L#0', BLOOMFILTER => 'NONE', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'FAST_DIFF', TTL => 'FOREVER',COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}
# 预分区表
# 写 不加 ,读 不加
1值 \x80\x80\x00\x01
region \x80\x80\x00\x01
# 盐表是一种特殊的预分区表
# 写 加前缀,读 去除前缀
\x60\x00\x80\x80\x00
\x60\x80\x00\x00\x01
本质:
好处
坏处
create table rzdata.testinclude
(id integer primary key,
name varchar,
age integer,
address varchar,
salary double) split on (1,2,3);
upsert into rzdata.testinclude values(1, 'rz', 17, '北疆');
upsert into rzdata.testinclude values(2, 'j', 17, '上海');
create index testinclude_idx on rzdata.testinclude(name,age) include(address);
select * from rzdata.testinclude_idx;
+---------+--------+------+------------+
| 0:NAME | 0:AGE | :ID | 0:ADDRESS |
+---------+--------+------+------------+
| j | 17 | 2 | 上海 |
| rz | 17 | 1 | 北疆 |
+---------+--------+------+------------+
# 全表扫描 all
explain select * from rzdata.testinclude;
explain select address,salary from rzdata.testinclude where age=17; # salary没在索引表中
explain select /*+ INDEX(rzdata.testinclude testinclude_idx) */ * from rzdata.testinclude;
# 走index
explain select address from rzdata.testinclude where age=17;
explain select address, id from rzdata.testinclude where age=17;
explain select /*+ INDEX(rzdata.testinclude testinclude_idx) */ * from rzdata.testinclude where age=17;
explain select /*+ INDEX(rzdata.testinclude testinclude_idx) */ salary from rzdata.testinclude where age=17;
create index upper_name_idx on emp (upper(first_name||' '||last_name))
select emp_id from emp where upper(first_name||' '||last_name)='JOHN'
create table rzdata.testimmu (
id integer primary key,
name varchar,
age integer,
address varchar
) IMMUTABLE_ROWS=TRUE split on (1,2,3);
create index testimmu_idx on rzdata.testimmu(name,age) include(address);
upsert into rzdata.testimmu values(1, 'rz', 17, '北疆');
upsert into rzdata.testimmu values(2, 'j', 17, '上海');
upsert into rzdata.testimmu values(2, 'j2', 177, '上海');
select * from rzdata.testimmu;
+-----+-------+------+----------+
| ID | NAME | AGE | ADDRESS |
+-----+-------+------+----------+
| 2 | j | 17 | 上海 |
| 2 | j2 | 177 | 上海 |
| 1 | rz | 17 | 北疆 |
+-----+-------+------+----------+
select * from rzdata.testimmu_idx;
+---------+--------+------+------------+
| 0:NAME | 0:AGE | :ID | 0:ADDRESS |
+---------+--------+------+------------+
| j | 17 | 2 | 上海 |
| j2 | 177 | 2 | 上海 |
| rz | 17 | 1 | 北疆 |
+---------+--------+------+------------+
异步索引应用场景:
create index async_index on my_schema.my_table(v) async;
${HBASE_HOME}/bin/hbase org.apache.phoenix.mapreduce.index.IndexTool
--schema my_schema
--data-table my_table
--index-table async_idx
--output-path async_idx_hfiles
mvn clean package -DskipTests
调大部分参数(翻倍、3倍、10倍)
./sqlline.py hadoop000:2181
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-client</artifactId>
<version>4.14.1-HBase-1.4</version>
<scope>system</scope>
<systemPath>/path/to/xx.jar</systemPath>
</dependency>
-- schema(phoenix) <--> namespace(hbase) <--> database(MySQL)
create schema ruozedata;
-- 建表
create table ruozedata.test
(id bigint not null primary key, name varchar(255), age integer);
-- 加载数据
upsert into ruozedata.test values(1, 'rz', 18);
upsert into ruozedata.test values(2, 'j', 16);
select * from ruozedata.test;
-- 建索引
create index test_idx on ruozedata.test(name,age);
select * from ruozedata.test_idx;
explain select name from ruozedata.test where age = 18;
-- ROUND ROBIN FULL SCAN OVER RUOZEDATA:TEST_IDX
explain select age from ruozedata.test where name = 'rz';
-- RANGE SCAN OVER RUOZEDATA:TEST_IDX ['rz']
-- 生产上建议使用非full scan
hbase shell
list_namespace
list_namespace_tables 'RUOZEDATA'
scan 'RUOZEDATA:TEST'
# 下载客户端
scp phoenix-4.14.0-cdh5.14.2-client.jar
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<target.java.version>1.8</target.java.version>
<scala.binary.version>2.12</scala.binary.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<encoding>UTF-8</encoding>
<log4j.version>2.12.1</log4j.version>
<hadoop.version>2.6.3</hadoop.version>
<spark.version>3.1.1</spark.version>
<hbase.version>1.2.0</hbase.version>
<phoenix.version>4.14.0</phoenix.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-core</artifactId>
<version>4.14.0-HBase-1.2</version>
</dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-spark</artifactId>
<version>4.14.0-HBase-1.2</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>
</dependencies>
log4j.rootLogger=WARN, stdout
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern= "%d{yyyy-MM-dd HH:mm:ss} %p [%c:%L] - %m%n
import org.apache.hadoop.conf.Configuration
import org.apache.phoenix.spark._
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().appName(getClass.getSimpleName).master("local[2]").getOrCreate()
val conf = new Configuration()
conf.set("phoenix.schema.isNamespaceMappingEnabled", "true")
// 具体代码
spark.stop()
// rdd 读数据
spark.sparkContext.phoenixTableAsRDD(
"rzdata.TESTLOCAL",
Seq("ID", "NAME", "AGE", "ADDRESS"),
zkUrl = Some("hadoop000:2181"),
conf = conf
).foreach(println)
// rdd 写入数据
spark.sparkContext.parallelize(List((3, "aa", 22, "test"), (4, "aa22", 212, "test")))
.saveToPhoenix(
"rzdata.TESTLOCAL",
Seq("ID", "NAME", "AGE", "ADDRESS"),
zkUrl = Some("hadoop000:2181"),
conf = conf
)
val df = spark.read.format("org.apache.phoenix.spark")
.option("table", "rzdata.TESTLOCAL")
.option("zkUrl", "hadoop000:2181")
.option("phoenix.schema.isNamespaceMappingEnabled", "true")
.load()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。