** :重点知识点
*** : 重点应用
非关系型数据库
区别 | SQL | NoSQL |
---|---|---|
数据结构 | 结构化 | 非结构化 |
数据关联 | 关联的,多表中的外键约束 | 无关联的 |
SQL查询 | 使用固定的(sql)语句查询,通用 | 相对简单,不统一 |
事务特性 | ACID:原子性,一致性,隔离性,持久性 | BASE(基本可用,允许中间态,最终一致性 |
存储方式 | 磁盘 | 内存 |
垂直 | 水平 | |
使用场景 | 1. 数据结构固定 2.相关业务对数据安全性、一致性要求较高 | 1. 数据结构不固定 2. 对一致性、安全性要求不高 3. 对性能要求 |
特征
Redis是一个key-value的数据库,key一般是String类型,不过value的类型多种多样:
通用指令是部分数据类型的,都可以使用的指令,常见的有:help commands——帮助文档
String类型,也就是字符串类型,是Redis中最简单的存储类型。
其value是字符串,不过根据字符串的格式不同,又可以分为3类:
不管是哪种格式,底层都是字节数组形式存储,只不过是编码方式不同。字符串类型的最大空间不能超过512m
常见命令
Redis的key允许有多个单词形成层级结构,多个单词之间用':'隔开,格式如下: 项目名:业务名:类型:id 这个格式并非固定,也可以根据自己的需求来删除或添加词条。
Hash类型,也叫散列,其value是一个无序字典,类似于引ava中的HashMap结构。
常见命令
Redis中的List类型与ava中的LinkedList类似,可以看做是一个双向链表结构。既可以支持正向检索和也可以支持反向检索。
特征
常见命令
问题
Redis的Set结构与java中的HashSet:类似,可以看做是一个value为null的HashMap。因为也是一个hash表,因此具备与HashSet类似的特征:
常见命令
单集合操作
SADD key member..:向set中添加一个或多个元素
SREM key member.:移除set中的指定元素
SCARD key:返回set中元素的个数
SISMEMBER key member:判断一个元素是否存在于set中
SMEMBERS:获取set中的所有元素
多集合操作
Redist的SortedSet是一个可排序的set集合,与java中的TreeSet有些类似,但底层数据结构却差别很大。SortedSet中的每一个元素都带有一个score属性,可以基于score属性对元素排序,底层的实现是一个跳表(SkipList)加hash表。
特性
因为SortedSet的可排序特性,经常被用来实现 排行榜 这样的功能。
常见命令
ZADD key score member:添加一个或多个元素到sorted set,如果已经存在则更新其score值
ZREM key member:删除sorted set中的一个指定元素
ZSCORE key member:获取sorted set中的指定元素的score值
ZRANK key member:获取sorted set中的指定元素的排名
ZCARD key:获取sorted set中的元素个数
ZCOUNT key min max:统计score值在给定范围内的所有元素的个数
ZINCRBY key increment member:让sorted set中的指定元素自增,步长为指定的increment值
ZRANGE key min max :按照score排序后,获取指定排名范围内的元素(按角标 0 ~ n)
ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]:按照score:排序后,获取指定score范围内的元素(按分数查,offset:第一次参数的偏移量,count:查几条)
ZDIFF、ZINTER、ZUNION:求差集、交集、并集
注意:所有的排名默认都是升序,如果要降序则在命令的 Z 后面添加 REV 即可
Jedis.本身是线程不安全的,并且频繁的创建和销毁连接会有性能损耗,因此我们推荐大家使用 jedis连接池 代替 jedis 的直连方式
SpringData:是Spring中数据操作的模块,包含对各种数据库的集成,其中对Redis的集成模块就叫做SpringDataRedis,官网地址:https:/spring.io/projects/spring-data-redis
spring:
redis:
host: 192.168.153.128
port: 6379
password:
lettuce:
pool:
max-active: 8
max-idle: 8
min-idle: 0
max-wait: 100
RedisTemplate可以接收任意Object作为值写入Redis,只不过写入前会把Object/序列化为字节形式,默认是采用JDK序列化,得到的结果是这样的:
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
// 创建 RedisTemplate 对象
RedisTemplate<String, Object> template = new RedisTemplate<>();
// 设置连接工厂
template.setConnectionFactory(connectionFactory);
// 创建 JSON 序列化工具
GenericJackson2JsonRedisSerializer jackson2JsonRedisSerializer = new GenericJackson2JsonRedisSerializer();
// 设置 key 序列化
template.setKeySerializer(RedisSerializer.string());
template.setHashKeySerializer(RedisSerializer.string());
// 设置 value 序列化
template.setValueSerializer(RedisSerializer.json());
template.setValueSerializer(RedisSerializer.json());
//返回
return template;
}
}
为了在 反序列化 时知道对象的类型,JSON序列化器会将类的class类型写入json结果中,存入Redis,会带来额外的内存开销。
为了节省内存空间,我们并不会使用 JSON 序列化器来处理 value ,而是**统一使用String序列化器**,要求只能存储 String 类型的key和value。当需要存储 Java 对象时,手动完成对象的序列化和反序列化。
Spring默认提供了一个StringRedisTemplate:类,它的key和value的序列化方式默认就是String方式。省去了我们自定义RedisTemplate的过程:
@Resource
private StringRedisTemplate stringRedisTemplate;
@Test
void testSaveUser() {
User user = new User("yoshino", 22);
// 手动序列化
Gson gson = new Gson();
String json = gson.toJson(user);
stringRedisTemplate.opsForValue().set("user:100", json);
String jsonUser = stringRedisTemplate.opsForValue().get("user:100");
// 手动反序列化
User user1 = gson.fromJson(jsonUser, User.class);
System.out.println(json);
System.out.println(user1);
}
数据库
后端直接导入,修改 yml 文件
输入:localhost:8081/shop-type/list 测试
前端导入
//需要引入
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.17</version>
</dependency>
session.setAttribute("user", BeanUtil.copyProperties(user, UserDTO.class));
session共享问题:多台Tomcat并不共享session存储空间,当请求切换到不同tomcat服务时导致数据丢失的问题。
方案应该满足:
// 7.2将 User 对象转为 Hash 存储
UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
Map<String, Object> userMap = BeanUtil.beanToMap(userDTO, new HashMap<>(),
CopyOptions.create()
.setIgnoreNullValue(true)
.setFieldValueEditor((fieldName, fieldValue) -> fieldValue.toString()));
Redis代替session需要考虑的问题:
问题
缓存就是数据交换的缓冲区(称作 Cache),是存贮数据的临时地方,一般读写性能较高
给店铺类型查询业务添加缓存
业务场景
主动更新策略
3:停电数据丢失,数据库的数据非准确的
选择 1 策略
修改ShopController中的业务逻辑,满足下面的需求:
缓存穿透 是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。
解决方案
缓存穿透产生的原因是什么?
缓存穿透的解决方案有哪些?
**缓存雪崩 **是指在同一时段大量的缓存 key 同时失效或者 Redis 服务宕机,导致大量请求到达数据库,带来巨大压力。
解决方案
缓存击穿问题也叫热点 Key 问题,就是一个被**高并发访问并且缓存重建业务较复杂**的 key 突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。
解决方案
需求:修改根据d查询商铺的业务,基于互斥锁方式来解决缓存击穿问题
需求:修改根据d查询商铺的业务,基于逻辑过期方式来解决缓存击穿问题
活动开始前,就已经存好(已经预热过的,在活动中的热点店铺)
基于StringRedisTemplate:封装一个缓存工具类,满足下列需求:
当用户抢购时,就会生成订单并保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID就存在一些问题:
全局引D生成器,是一种在分布式系统下用来生成全局唯一 ID 的工具,一般要满足下列特性:
为了增加 ID 的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其它信息:
ID的组成部分:
全局唯一 ID 生成策略
Redis 自增 ID 策略
每个店铺都可以发布优惠券,分为平价券和特价券。平价券可以任意购买,而特价券需要秒杀抢购:
表关系如下:
下单时需要判断两点:
测试
使用 JMeter 测试
错误流程图
超卖问题是典型的多线程安全问题,针对这一问题的常见解决方案就是 加锁:
乐观锁的关键是判断之前查询得到的数据是否有被修改过,常见的方式有两种:
stock = 原来的,改为 stock > 1
超卖这样的线程安全问题,解决方案有哪些?
需求:修改秒杀业务,要求同一个优惠券,一个用户只能下一单
通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了。
Ctrl + D 实现
出现一人多单的问题
什么是分布式锁
分布式锁:满足分布式系统或集群模式下 多进程可见 并且 互斥 的锁。
实现分布式锁时需要实现的两个基本方法:
获取锁
释放锁
需求:定义一个类,实现下面接口,利用Redis.实现分布式锁功能。
public interface ILock {
/**
* 尝试获取锁
* @param timeoutSec 锁持有的超时时间,过期后自动释放
* @return true代表获取锁成功;false代表获取锁失败
*/
boolean tryLock(long timeoutSec);
/**
* 释放锁
*/
void unlock();
}
单台 jvm 适用
误删问题
需求:修改之前的分布式锁实现,满足:
误删问题
Redis提供了Lua脚本功能,在一个脚本中编写多条Redist命令,确保多条命令执行时的原子性。Lua是一种编程语言,它的基本语法大家可以参考网站:https:/www.runoob.com/lua/lua-tutorial.html
写好脚本以后,需要用Redis命令来调用脚本,调用脚本的常见命令如下:
**释放锁的业务流程**是这样的:
Lua 脚本表示:
-- 获取锁中的线程标示 get key
local id = redis.call('get', KEYS[1])
-- 比较线程标示与锁中的标示是否一致
if (id == ARGV[1]) then
-- 释放锁
return redis.call('del', KEYS[1])
end
return 0
实现
基于setnx实现的分布式锁存在下面的问题:
Redisson:是一个在Redis的基础上实现的 java 驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的 java 常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。
官网地址:https://redisson.org GitHub地址:https://github.com/redisson/redisson
<!--https://github.com/redisson/redisson#quick-start-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.17.5</version>
</dependency>
/**
* @author yoshino
**/
@Configuration
@ConfigurationProperties(prefix = "spring.redis")
@Data
public class RedissonConfig {
private String host;
private String port;
@Bean
public RedissonClient redissonClient(){
// 配置
Config config = new Config();
String redisAddress = String.format("redis://%s:%s", host, port);
config.useSingleServer().setAddress(redisAddress);
// 创建 RedissonClient 对象
return Redisson.create(config);
}
}
local key = KEYS[1]; -- 锁的key
local threadId = ARGV[1];-- 线程唯一标识
local releaseTime = ARGV[2];-- 锁的自动释放时间
-- 判断惭是否存往
if (redis.call('exists', key) == 0) then
-- 不存在,获取锁
redis.call('hincrby', key, threadId, '1');
-- 设置有效期
redis.call('expire', key, releaseTime);
return nil; -- 返回结果
end;
-- 锁已经存在,判断threadId是否是自己
if (redis.call('hexists', key, threadId) == 1)
then
-- 存在,获取锁,重入次数 + 1
redis.call('hincrby', key, threadId, '1');
-- 设置有效期
redis.call('expire', key, releaseTime);
return nil;--返回结果
end ;
return redis.call('pttl', key);--代码走到这里,说明获取锁的不是自己,获取锁失败,返回锁的剩余有效期
local key = KEYS[1]; -- 锁的key
local threadId = ARGV[1];-- 线程唯一标识
local releaseTime = ARGV[2];-- 锁的自动释放时间
-- 判断当前锁是否还是被自己持有
if (redis.call('HEXISTS', key, threadId) == 0)
then
return nil;-- 如果已经不是自己,则直接返回
end
--是自己的锁,则重入次数-1
local count = redis.call('HINCRBY', key, threadId, -1);
-- 判断是否重入次数是否已经为0
if (count > 0) then
-- 大于0说明不能释放锁,
-- 重置有效期然后返回
redis.call('EXPIRE', key, releaseTime);
return 0;
else
-- 等于0说明可以释放锁直接删除
redis.call('DEL', key);
rediscall('publish',KEYS[2],threadId); -- 发布消息通知
return 1;
end
return nil;
优化 setnx 实现的分布式锁带来的问题
**trylock 携带 等待时间时 (锁重试): **利用 消息队列 和 信号量 方式
**trylock 不携带 释放时间时(-1) (锁重试):**看门狗机制 每 10s 更新一次 ttl
如何取消(释放锁时)
lock = redissonClient.getMultiLock(lock1, lock2, lock3);
必须获取所有 Node 的锁才算成功 获取,否则算失败(联锁)
不可重入Redis分布式锁:
可重入的Redis:分布式锁:
Redisson的multiLock:
原理:多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功
缺陷:运维成本高、实现复杂
上图,为串联的执行顺序,效率低
解决方案
redis 的数据结构
redis 中的操作(lua 脚本)
tomcat 中的操作
需求:
消息队列(Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:
Redis提供了三种不同的方式来实现消息队列:
消息队列(Message Queue),字面意思就是存放消息的队列。而Redis的list数据结构是一个双向链表,很容易模拟出队列效果。
队列是入口和出口不在一边,因此我们可以利用:LPUSH结合RPOP、或者RPUSH结合LPOP来实现。
不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回 null,并不像 JVM 的阻塞队列那样会阻塞并等待消息。
因此这里应该使用BRPOP或者BLPOP来实现阻塞效果。
基于Lst的消息队列有哪些优缺点?
优点:
缺点:
**PubSub(发布订阅)**是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
基于PubSub的消息队列有哪些优缺点?
优点:
缺点:
Stream是Redis5.0引入的一种新数据类型,可以实现一个功能非常完善的消息队列。
发送消息的命令
阻塞时间 :0 表示等待时间无限长
!注意
当我们指定起始 ID 为$时,代表读取最新的消息,如果我们处理一条消息的过程中又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现 漏读消息 的问题。
STREAM:类型消息队列的XREAD命令特点:
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:
创建消费者组
从消费者组读取消息
确认消息
读取pending-list中的消息(消费但未确认)
XPENDING s1 g1 - + 10
:读10条,- + 全部
STREAM类型消息队列的XREADGROUP命令特点:
需求:
127.0.0.1:6379> XGROUP CREATE stream.orders g1 0 MKSTREAM
探店笔记类似点评网站的评价,往往是图文结合。对应的表有两个:
点击首页最下方菜单栏中的+按钮,即可发布探店图文:
需求:点击首页的探店笔记,会进入详情页面,实现该页面的查询接口:
在首页的探店笔记排行榜和探店图文详情页面都有点赞的功能:
需求
在探店笔记的详情页面,应该把给该笔记点赞的人显示出来,比如最早点赞的TOP5,形成点赞排行榜:
实现查询点赞排行榜的接口 需求:按照点赞时间先后排序,返回Top5的用户
问题
数据库查询时,是使用 in 来获取,导致点赞顺序的排序问题
解决:WHERE id IN ( 1010 , 1 ) ORDER BY FIELD(id, 1010, 1)
拼接:String idStr = StrUtil.join(",", ids);
查询语句:query().in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list()
需求:基于该表数据结构,实现两个接口:
关注是User之间的关系,是博主与粉丝的关系,数据库中有一张tb follow表来标示:
需求:利用Rdis中恰当的数据结构,实现共同关注功能。在博主个人页面展示出当前用户与博主的共同好友。
关注推送也叫做Feed流,直译为 投喂 。为用户持续的提供“沉浸式”的体验,通过无限下拉刷新获取新的信息。
Feed流产品有两种常见模式:
Timeline:不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注。例如朋友圈
优点:信息全面,不会有缺失。并且实现也相对简单
缺点:信息噪音较多,用户不一定感兴趣,内容获取效率低
智能排序:利用智能算法屏蔽掉违规的、用户不感兴趣的内容。推送用户感兴趣信息来吸引用户
本例中的个人页面,是基于关注的好友来做Feed流,因此采用 Timeline 的模式。该模式的实现方案有三种:
需求:
Feed流的分页问题
Feed流中的数据会不断更新,所以数据的角标也在变化,因此不能采用传统的分页模式。
Feed 流的滚动分页
需求:在个人主页的“关注”卡片中,查询并展示推送的blog信息:
GEO就是Geolocation的简写形式,代表地理坐标。Redis在3.2版本中加入了对GEO的支持,允许存储地理坐标信息,帮助我们根据经纬度来检索数据。常见的命令有:
在首页中点击某个频道,即可看到频道下的商户:
按照商户类型做分组,类型相同的商户作为同一组,以 typeld 为 key 存入同一个GEO集合中即可
SpringDataRedisl的2.3.9版本并不支持Redis6.2提供的GEOSEARCH命令,因此我们需要提示其版本,修改自己的POM文件,内容如下:
把每一个bit位对应当月的每一天,形成了映射关系。用0和1标示业务状态,这种思路就称为位图(BitMap)
Redis中是利用string类型数据结构实现BitMap,因此最大上限是512M,转换为bit则是2^32个bit位
需求:实现签到接口,将当前用户当天签到信息保存到Redis中
首先我们搞懂两个概念:
UV:全称 Unique Visitor,也叫独立访客量,是指通过互联网访问、浏览这个网页的自然人。1天内同一个用户多次访问该网站,只记录1次。
PV:全称Page View,也叫页面访问量或点击量,用户每访问网站的一个页面,记录1次PV,用户多次打开页面,则记录多次PV。往往用来衡量网站的流量。
数据丢失问题:Reds是内存存储,服务重启可能会丢失数据
并发能力问题:单节点Redis并发能力虽然不错,但也无法满足如618这样的高并发场景
故障恢复问题:如果Redis:宕机,则服务不可用,需要一种自动的故障恢复手段
存储能力问题:Redis基于内存,单节点能存储的数据量难以满足海量数据需求
方法:
实现 Redis 数据持久化
搭建主从集群,实现读写分离
利用 Redis 哨兵,实现健康检测和自动恢复
搭建分片集群,利用插槽机制实现动态扩容
RDB全称Redis Database Backup file(Redis数据备份文件),也被叫做Redis数据快照。简单来说就是把内存中的所有数据都记录到磁盘中。当Redis实例故障重启后,从磁盘读取快照文件,恢复数据。
快照文件称为RDB文件,默认是保存在当前运行目录。
Redis停机时会执行一次RDB。保存到当前运行的目录下
配置
原理
bgsave开始时会 fork 主进程得到子进程,子进程**共享**主进程的内存数据。完成 fork 后读取内存数据并写入RDB文件。
fork采用的是copy-on-write技术:
总结
RDB方式bgsave的基本流程?
RDB会在什么时候执行?Save 60 1000代表什么含义?
RDB的缺点?
AOF全称为Append Only File(追加文件)。Redis处理的每一个写命令都会记录在AOF文件,可以看做是命令日志文件。
配置
AOF默认是关闭的,需要修改redis.conf配置文件来开启AOF:
AOF的命令记录的频率也可以通过redis.conf文件来配:
因为是记录命令,AOF文件会比RDB文件大的多。而且AOF会记录对同一个key的多次写操作,但只有最后一次写操作才有意义。通过执行 bgrewriteaof 命令,可以让AOF文件执行重写功能,用最少的命令达到相同效果。
Redis也会在触发阈值时自动去重写AOF文件。阈值也可以在redis.conf中配置:
单节点Redis的并发能力是有上限的,要进一步提高Redist的并发能力,就需要搭建主从集群,实现读写分离。
视频 P102
假设有A、B两个Redis:实例,如何让B作为A的slave节点?
master如何判断slave是不是第一次来同步数据?这里会用到两个很重要的概念:
因此 slave 做数据同步,必须向 master 声明自己的replication id和offset,master才可以判断到底需要同步哪些数据
slave 第一次同步前,自身就是 master,replid 记录为本身
简述全量同步的流程?
简述全量同步和增量同步区别?
什么时候执行全量同步?
什么时候执行增量同步?
slave节点宕机恢复后可以找master节点同步数据,那master节点宕机怎么办?
Redis:提供了哨兵(Sentinel)机制来实现主从集群的自动故障恢复。哨兵的结构和作用如下:
服务状态监控
Sentinel的三个作用是什么?
Sentinel如何判断一个redis实例是否健康?
故障转移步骤有哪些?
视频 p106
在Sentinel集群监管下的Redis主从集群,其节点会因为自动故障转移而发生变化,Redis的客户端必须感知这种变化,及时更新连接信息。Spring的RedisTemplate底层利用lettuce实现了节点的感知和自动切换。
Sentinel是特殊的Redis节点,也能发布订阅;
Sentinel没有主从之分;
Sentinel订阅所有Redis节点的_sentinel_:hello频道,并在上线时给所有Redis节点的_sentinel_:hello频道发送消息,包括自己的host、进程ID(runid)、以及Master配置,让其他Sentinel感知自己,更新存储的Sentinel列表(如果是新的host、新的进程号,则进行添加;如果已经有host相同,但进程ID不同的Sentinel,代表原有的Sentinel重启了,会进行替换),如果Master配置不同,代表其他的Sentinel的Master配置的版本低,也会进行替换。
Sentinel上线后会每隔1秒给其他Sentinel发送消息。
主从和哨兵可以解决高可用、高并发读的问题。但是依然有两个问题没有解决:
使用分片集群可以解决上述问题,分片集群特征:
p108
Redis会把每一个 master 节点映射到 0~16383 共16384个插槽(hash slot)上,查看集群信息时就能看到:
数据key不是与节点绑定,而是与插槽绑定。redis会根据key的有效部分计算插槽值,分两种情况:
例如:key是num,那么就根据num计算,如果是{itcast}num,则根据itcasti计算。计算方式是利用CRC16算法得到一个hash值,然后对16384取余,得到的结果就是slot值。
Redis如何判断某个key应该在哪个实例?
如何将同一类数据固定的保存在同一个Redis实例?
向集群中添加一个新的masteri节点,并向其中存储num=10
需求:
在7002这个slave节点执行**手动故障转移**,重新夺回master:地位 步骤如下:
传统的缓存策略一般是请求到达 Tomcat 后,先查询 Redis ,如果未命中则查询数据库,存在下面的问题:
多级缓存就是充分利用请求处理的每个环节,分别添加缓存,减轻Tomcat压力,提升服务性能:
用作缓存的Nginx是业务Nginx,需要部署为集群,再有专门的Nginx用来做反向代理:
安装 mysql ,导入工程
本地进程缓存
缓存在日常开发中启动至关重要的作用,由于是存储在内存中,数据的读取速度是非常快的,能大量减少对数据库的访问,减少数据库的压力。我们把缓存分为两类:
**Caffeine**是一个基于刊 java8 开发的,提供了近乎最佳命中率的高性能的本地缓存库。目前Spring内部的缓存使用的就是Caffeine。GitHub地址:https://github.com/ben-manes/caffeine
<!-- https://mvnrepository.com/artifact/com.github.ben-manes.caffeine/caffeine -->
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.9.3</version>
</dependency>
For Java 11 or above, use
3.x
otherwise use2.x
.
Caffeine 的驱逐策略
实现商品的查询的本地进程缓存
利用Caffeine实现下列需求:
配置 Caffeine
通过注入,使用 get 方法即可
Lua是一种轻量小巧的脚本语言,用标准C语言编写并以源代码形式开放,其设计目的是为了嵌入应用程序中,从而为应用程序提供灵话的扩展和定制功能。官网:https://www.lua.org/
function printArr(arr)
for index,value in ipairs(arr)do
print(value)
end
end
OpenResty是一个基于Nginx的高性能Web平台,用于方便地搭建能够处理超高并发、扩展性极高的动态Web应用、Web服务和动态网关。具备下列特点:
官方网站:https://openresty.org/cn/
实现商品详情页数据查询
步骤一:修改nginx.conf文件
步骤二:编写item.lua文件
获取请求路径中的商品id信息,拼接到json结果中返回
--获取路径参数
local id = ngx.var[1]
--返回结果
ngx.say ('{"id":'.. id ..'}')
获取请求路径中的商品id信息,根据id向Tomcat查询商品信息
这里要修改item.lua,满足下面的需求:
nginx内部发送Http请求
nginx提供了内部API用以发送http请求:
返回的响应内容包括:
注意:这里的path是路径,并不包含lP和端口。这个请求会被nginx内部的server监听并处理。
封装http查询的函数
我们可以把http查询的请求封装为一个函数,放到OpenResty函数库中,方便后期使用。
JSON结果处理
OpenResty 提供了一个 cjson 的模块用来处理 JSON 的序列化和反序列化。
官方地址:https://github.com/openresty/lua-cjson/
使用Http函数查询数据
item.lua
--导入common函数库
local common = require('common')
local read_http = common.read_http
--导入 cjson 库
local cjson = require('cjson')
--获取路径参数
local id = ngx.var[1]
--查询商品信息
local itemJSON = read_http("/item/" .. id, nil)
--查询库存信息
local stockJSON = read_http("/item/stock/" .. id, nil)
-- JSON 转为 lua 的 table
local item = cjosn.decode(itemJSON)
local stock = cjson.decoode(stockJSON)
--组合数据
item.stock = stock.stock
item.sold = stock.sold
-- 把 item 序列化为 JSON 返回结果
ngx.say(cjson.encode(item))
Tomcat:集群的负载均衡
hash :表示会根据请求路径url 进行hash运算,使得同一个访问地址指向同一个 tomcat 服务器
冷启动与缓存预热
冷启动:服务刚刚启动时,Redis中并没有缓存,如果所有商品数据都在第一次查询时添加缓存,可能会给数据库带来较大压力。
缓存预热:在实际开发中,我们可以利用大数据统计用户访问的热点数据,在项目启动时将这些热点数据提前查询并保存到Redis中。
我们数据量较少,可以在启动时将所有数据都放入缓存中。
private static final ObjectMapper MAPPER = new objectMapper();
@Override
public void afterPropertiesSet()throws Exception {
//初始化缓存
//1.查询商品信息
List<Item> itemList = itemService.list();
//2.放入缓存
for (Item item : itemList) {
//2.1.item 序列化为切 JSON
String json = MAPPER.writeValueAsString(item);
//2.2.存入redis
redisTemplate.opsForValue().set("item:id:" + item.getId(),json);
}
}
注意 :需要实现 InitializingBean 接口,重写 afterPropertiesSet() 方法,会在 RedisHandler 创建后,成员变量完成初始化后 执行
OpenResty的 Redis 模块
common.lua
--导入redis
local redis = require('resty.redis')
--初始化redis
Local red = redis:new()
red:set_timeouts(1000, 1000, 1000)
--关闭redis连接的工具方法,其实是放入连接池
local function close_redis(red)
local pool_max_idLe_time = l0000 --连接的空闲时间,单位是毫秒
local poo1_size = 100 --连接池大小
Local ok,err = red:set_keepalive(pool_max_idle_time, pool_size)
if not ok then
ngx.log(ngx.ERR,"放入redis连接池失败:",err)
end
end
--查询redis的方法ip和port是redis,地址,key是查询的key
local function read_redis(ip,port,key)
--获取一个连接
local ok,err = red:connect(ip,port)
if not ok then
ngx.log(ngx.ERR,"连接redis:失败:",err)
return nil
end
--查询redis
Local resp,err = red:get(key)
--查询失败处理
if not resp then
ngx.log(ngx.ERR,"查询Redis失败:",err,",key=",key)
end
--得到的数据为空处理
if resp == ngx.null then
resp = nil
ngx.log(ngx.ERR,"查询Redis数据为空,key=",key)
end
close_redis(red)
return resp
end
--封装函数,发送http请求,并解析响应
Local function = read_http(path,params)
local resp = ngx.location.capture(path,{
method = ngx.HTTP_GET,
args = params,
})
if not resp then
--记录错误信息,返回404
ngx.log(ngx.ERR,"http查询失败,path:",path,",args:",args)
ngx.exit(404)
end
return resp.body
end
--一将方法导出
local _M = {
read_http = read_http,
read_redis = read_redis
}
return _M
查询商品时,优先Redis缓存查询
需求:
修改item.lua,封装一个函数read data,实现先查询Redis,如果未命中,再查询tomcat
修改item.lua,查询商品和库存时都调用read data这个函数
OpenResty为 Nginx 提供了 shard dict 的功能,可以在 nginx 的多个worker之间共享数据,实现缓存功能。
在查询商品时,优先查询OpenResty的本地缓存
需求:
缓存数据同步的常见方式有三种:
初识Canal
Canal,译意为水道/管道/沟渠,canal是阿里巴巴旗下的一款开源项目,基于刊 java 开发。基于数据库增量日志解析,提供增量数据订阅&消费。GitHub的地址:https://github.com/alibaba/canal
Canal就是把自己 伪装成MySQL的一个slave节点 ,从而监听masterl的binary log变化。再把得到的变化信息通知给Canal的客户端,进而完成对其它数据库的同步。
安装和配置Canal P131
Canal客户端
Canal 提供了各种语言的客户端,当Canall监听到 bin_log 变化时,会通知 Canal 的客户端。
不过这里我们会使用GitHub上的第三方开源的canal-starter。地址:https://github.com/NormanGyllenhaal/canal-client
编写监听器,监听 Canal 消息:
Canal推送给canal-client的是被修改的这一行数据(row),而我们引入的canal-client则会帮我们把行数据封装到 Item 实体类中。这个过程中需要知道数据库与实体的映射关系,要用到到PA的几个注解:
Redis的Key虽然可以自定义,但最好遵循下面的几个最佳实践约定:
BigKey通常以**Key的大小和Key中成员的数量**来综合判定,例如:
查询一个 key 的准确大小
看元素长度
Big Key的危害
对BigKey执行读请求时,少量的QPS就可能导致带宽使用率被占满,导致Redis实例,乃至所在物理机变慢
BigKey所在的Redis实例内存使用率远超其他实例,无法使数据分片的内存资源达到均衡
对元素较多的hash、list、Zset等做运算会耗时较旧,使主线程被阻塞
对BigKey的数据序列化和反序列化会导致CPU的使用率飙升,影响Redis实例和本机其它应用
如何发现BigKey
利用redis-cli提供的-bigkeys参数,可以遍历分析所有key,并返回Key的整体统计信息与每个数据的Top1的big key
自己编程,利用scan扫描Redist中的所有key,利用strlen、hlen等命令判断key的长度(此处不建议使用MEMORY USAGE)
利用第三方工具,如 Redis-Rdb-Tools 分析RDB快照文件,全面分析内存使用情况
自定义工具,监控进出Redis的网络数据,超出预警值时主动告警
如何删除 BigKey
Key的最佳实践:
Value的最佳实践:
结论:批量执行的效率大大提高
MSET
Redis提供了很多MXX这样的命令,可以实现批量插入数据,例如:
不要在一次批处理中传输太多命令,否则单次命令占用宽带过多,会导致网络阻塞
不同
Pipeline没有限制,可以任何数据结构做组合
mset 是 redis 原生操作,具有原子性,会一次执行完毕。
总结
批量处理的方案:
注意事项:
如 MSET 或 Pipeline 这样的批处理需要在一次请求中携带多条命令,而此时如果 Redis 是一个集群,那批处理命令的**多个key必须落在一个插槽**中,否则就会导致执行失败。
Spring 已经提供 并行slot 的方式实现批处理,在集群模式下
StringRedisTemplate中,multiSet 方法,已实现并行slot方法
首先会按照 传递数组中的 key值(entrySet),来计算 插槽 得到 map<Integer,List> partitioned,然后遍历 map,将插槽一样的数据放入一个 map 集合中并开启异步任务,依次遍历完 partitioned 中的所有插槽。
Rdis的持久化虽然可以保证数据安全,但也会带来很多额外的开销,因此持久化请遵循下列建议:
慢查询:在Redis执行时耗时超过某个阈值的命令,称为慢查询。
Redis会绑定在0.0.0.0:6379,这样将会将Redis服务暴露到公网上,而Redis如果没有做身份认证,会出现严重的安全漏洞.漏洞重现方式:https://cloud.tencent.com/developer/article/1039000
漏洞出现的核心的原因有以下几点:
为了避免这样的漏洞,这里给出一些建议:
Redis一定要设置密码
禁止线上使用下面命令:keys、flushall、flushdb、config set等命令。可以利用rename-command禁用。
bind:限制网卡,禁止外网网卡访问
开启防火墙
不要使用Root账户启动Redis
尽量不是有默认的端口
当Redis内存不足时,可能导致Key频繁被删除、响应时间变长、QPS不稳定等问题。当内存使用率达到90%以上时就需要我们警惕,并快速定位到内存占用的原因。
数据内存的问题
Redis:提供了一些命令,可以查看到Redis目前的内存分配状态:
内存缓存区配置
内存缓存区常见的有三种:
集群虽然具备高可用特性,能实现自动故障恢复,但是如果使用不当,也会存在一些问题:
集群完整性问题
在Rdis的默认配置中,如果发现任意一个插槽不可用,则整个集群都会停止对外服务:
为了保证高可用特性,这里建议将cluster-require-full-coverage配置为 no
集群带宽问题
集群节点之间会不断的互相Ping来确定集群中其它节点的状态。每次Ping携带的信息至少包括:
集群中节点越多,集群状态信息数据量也越大,10个节点的相关信息可能达到1kb,此时每次集群互通需要的带宽会非常高。
解决途径:
数据倾斜问题
客户端性能问题
命令的集群兼容性问题
mset 的 key 问题,插槽是否在一个结点上
lua和事务问题
mset 的 key 问题,插槽是否在一个结点上
我们都知道Redis中保存的Key是字符串,value往往是字符串或者字符串的集合。可见字符串是Redis中最常用的一种数据结构。
Redis构建了一种新的字符串结构,称为简单动态字符串(Simple Dynamic String),简称 SDS。
IntSet是Redist中set集合的一种实现方式,基于整数数组来实现,并且具备 长度可变、有序 等特征。
IntSet 升级
总结
Intset可以看做是特殊的整数数组,具备一些特点:
我们知道Redis是一个键值型(Key-Value Pair)的数据库,我们可以根据键实现快速的增删改查。而键与值的映射关系正是通过Dict来实现的。
Dict由三部分组成,分别是:哈希表(DictHashTable)、哈希节点(DictEntry)、字典(Dict)
size 必须为 2 的 n 次幂,用于得到掩码 做 & 运算(和 % 的效果一致)。
当我们向Dict添加键值对时,Redis首先根据 key 计算出hash值(h),然后利用 h & sizemask 来计算元素应该存储到数组中的哪个索引位置。
Dict 的扩容
Dict 的收缩
Dict 的 rehash
由于为需要主线程操作,若是一次性复制完,十分影响性能,故采用渐进式rehsh
每次增伤改查,只迁移旧数组中的一列
总结
Dict的结构:
Dict的伸缩;
zipList 是一种特殊的 “双端链表”,由一系列特殊编码的 **连续内存块组成 **。可以在任意一端进行压入/弹出操作,并且该操作的时间复杂度为O(1)。
ZipList 的连锁更新问题
ZipList的每个Entry都包含previous_entry_length来记录上一个节点的大小,长度是1个或5个字节:
**注意:**并未解决,因为发生的概率十分低,必须连续的字节长度为 250~253 的 entry
总结
ZipList特性:
总结
QuickList的特点:
SkipList**(跳表)**首先是链表,但与传统链表相比有几点差异:
总结
skipList的特点:
Redis中的任意数据类型的键和值都会被封装为一个RedisObject,也叫做Redis对象,源码如下:
encoding
String是Redis中最常见的数据存储类型:
其基本编码方式是 RAW,基于 *简单动态字符串(SDS)*实现,存储上限为512mb。
如果存储的 SDS 长度小于44字节,则会采用 EMBSTR 编码,此时object head与SDS是一段连续空间。申请内存时只需要调用一次内存分配函数,效率更高。
原因:Redis 内部内存分配,每次分配的空间为 2 的 n 次幂
如果存储的字符串是整数值,并且大小在 LONG_MAX 范围内,则会采用 INT 编码:直接将数据保存在 RedisObject 的 ptr 指针位置(刚好8字节),不再需要SDS了。
Redis的List类型可以从首、尾操作列表中的元素
LinkedList:普通链表,可以从双端访问,内存占用较高,内存碎片较多.
ZipList :压缩列表,可以从双端访问,内存占用低,存储上限低
QuickList : LinkedList + ZipList,可以从双端访问,内存占用较低,包含多个ZipList,存储上限高
在3.2版本之前,Redis采用ZipList和LinkedList来实现List,当元素数量小于512并且元素大小小于64字节时采用zipList编码,超过则采用LinkedList编码。
在3.2版本之后,Redis统一采用QuickList来实现List
Set是Redis中的单列集合,满足下列特点:
可以看出,Set对查询元素的效率要求非常高,思考一下,什么样的数据结构可以满足?
Set是Redis中的集合,不一定确保元素有序,可以满足元素唯一、查询效率要求极高
ZSet也就是SortedSet,其中每一个元素都需要指定一个score值和member值:
因此,zset底层数据结构必须满足 键值存储、键必须唯一、可排序 这几个需求。之前学习的哪种编码结构可以满足?
当元素数量不多时,HT 和 SkipList 的优势不明显,而且更耗内存。因此zset还会采用 ZipList 结构来节省内存,不过需要同时满足两个条件:
ziplist本身没有排序功能,而且没有键值对的概念,因此需要有zset通过编码实现:
ZipList 是连续内存,因此 score 和 element 是紧挨在一起的两个 entry,element 在前 score 在后
score越小越接近队首,score越大越接近队尾,按照score值升序排列
Hash结构与Redis中的Zset非常类似:
区别如下:
zset的键是member,值是score; hash的键和值都是任意值
zset要根据score排序; hash则无需排序
因此,Hash底层采用的编码与Zset也基本一致,只需要把排序有关的SkipList去掉即可:
Hash结构默认采用ZipList编码,用以节省内存。ZipList中相邻的两个entry分别保存field和value
当数据量较大时,Hash结构会转为HT编码,也就是Dict,触发条件有两个:
ZipList中的元素数量超过了hash-max-ziplist-entries(默认512)
在 插入元素后 进行判断,若超过,则执行转换成 HT
ZipList中的任意entry大小超过了hash-max-ziplist-value (默认64字节)
在 插入元素前 就先进行一次集体插入元素的判断,若中间出现超过的情况,则执行转换;全部判断完,在执行插入指令
用户态 和 内核态 切换
无论是阻塞IO还是非阻塞lO,用户应用在一阶段都需要调用 recvfrom 来获取数据,差别在于无数据时的处理方案:
比如服务端处理客户端Socket请求时,在单线程情况下,只能依次处理每一个socket,如果正在处理的socket恰好未就绪(数据不可读或不可写),线程就会被阻塞,所有其它客户端socket都必须等待,性能自然会很差。
那么问题来了:用户进程 如何知道内核中数据是否就绪 呢?
文件描述符(File Descriptor):简称FD,是一个从0开始递增的无符号整数,用来关联Linux中的一个文件。在Linux中,一切皆文件,例如常规文件、视频、硬件设备等,当然也包括网络套接字(Socket)。
IO多路复用︰是利用单个线程来同时监听多个FD,并在某个FD可读、可写时得到通知,从而避免无效的等待,充分利用CPU资源
不过监听FD的方式、通知的方式又有多种实现,常见的有:
差异:
用户态 和 核心态 内存空间不共享
区别
select:每次调用,都得把 监听的fd数组,拷贝到内核;结果也需要全部拷贝到用户空间。
epoll:
select模式存在的三个问题:
poll模式的问题:
epoll模式中如何解决这些问题的?
当FD有数据可读时,我们调用epoll_wait就可以得到通知。但是事件通知的模式有两种:
LevelTriggered:简称LT。当FD有数据可读时,会重复通知多次,直至数据处理完成。是Epoll的默认模式。
EdgeTriggered:简称ET。当FD有数据可读时,只会被通知一次,不管数据是否处理完成
结论:
Redis到底是单线程还是多线程?
在Redis版本迭代过程中,在两个重要的时间节点上引入了多线程的支持:
来看下Redis 单线程网络模型 的整个流程:
服务器初始化:
Redis是一个CS架构的软件,通信一般分两步(不包括pipeline和PubSub) :
因此客户端发送命令的格式、服务端响应结果的格式必须有一个规范,这个规范就是通信协议。
而在Redis中采用的是RESP ( Redis Serialization Protocol)协议:
但目前,默认使用的依然是RESP2协议,也是我们要学习的协议版本(以下简称RESP)。
public class Main {
static Socket s;
static PrintWriter writer;
static BufferedReader reader;
public static void main ( String[] args) {
try {
//1.建立连接
String host = "192.168.150.101";
int port = 6379;
s = new Socket(host,port);
//2.获取输出流、输入流
writer = new Printwriter(new OutputStreamwriter( s.getOutputStream(),StandardCharsets.UTF_8));
reader = new BufferedReader(new InputStreamReader(s.getInputStream(),StandardCharsets.UTF_8));
//3.发出请求
//3.1.获取授权
sendRequest("auth", "123456");
Object obj = handleResponse();
System.out.println( "obj = " + obj);
//3.2.set name 虎哥
sendRequest("set", "name", "yoshino");
//4.解析响应
obj = handleResponse();
System.out.println( "obj = " + obj);
sendRequest("get", "name");
Object obj = handleResponse();
System.out.println( "obj = " + obj);
} catch (IOException e) {
e.printStackTrace();
} finally {
//5.释放连接
try {
if ( reader != null) reader.close();
if (writer != null) writer.close();
if (s != null) s.close();
}catch (IOException e) {
e.printStackTrace();
}
}
}
private static Object handleResponse() throws IOException {
//读取首字节
int prefix = reader.read();
//判断数据类型标示
switch (prefix) {
case '+'://单行字符串,直接读一行
return reader.readLine();
case '-'://异常,也读一行
throw new RuntimeException(reader.readLine());
case ':'://数字
return Long.parseLong( reader.readLine());
case '$'://多行字符串
//先读长度
int len = Integer.parseInt(reader.readLine());
if ( len == -1){
return null;
}
if (len == 0){
return "";
}
//再读数据,读len个字节。我们假设没有特殊字符,所以读一行(简化)
return reader.readLine();
case '*':
return readBulkString();
default:
throw new RuntimeException("错误的数据格式! ");
}
return null;
}
private static Object readBulkString() throws IOException {
//获取数组大小
int len = Integer.parseInt(reader.readLine());
if (len <= 0) {
return null;
}
//遍历,依次读取每个元素
List<0bject> list = new ArrayList<> (len);//遍历,依次读取每个元素
for (int i = 0; i < len; i++){
list.add (handleResponse());
}
return list;
}
private static void sendRequest(String ... args) {
writer.println("*" + args.length);
for(String arg : args) {
writer.println("$" + arg.getBytes (StandardCharsets.UTF_8).length);
writer.println(arg);
}
writer.flush();
}
}
Redis内存回收
Redis之所以性能强,最主要的原因就是基于内存存储。然而单节点的Redis其内存大小不宜过大,会影响持久化或主从同步性能。
我们可以通过修改配置文件来设置Redis的最大内存:
# 格式:
# maxmemory <bytes>
#例如∶
maxmemory 1gb
当内存使用达到上限时,就无法存储更多数据了。
这里有两个问题需要我们思考:
Redis是如何知道一个key是否过期呢?
利用两个Dict分别记录key-value对及key-ttl对
是不是TTL到期就立即删除了呢?
DB结构
总结
RedisKey的TTL记录方式:
过期 key 的删除策略:
定期清理的两种模式:
内存淘汰︰就是当Redis内存使用达到设置的阈值时,Redis 主动挑选部分key 删除以释放更多内存的流程。
performEvictions() 方法 流程图
事务实现需要依赖其代理对象(由spring管理创建)中的方法
该图调用是使用 this. 获取的,事务失效
还需要额外添加:
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
@PostMapping("blog")
public Result uploadImage(@RequestParam("file") MultipartFile image) {
try {
// 获取原始文件名称
String originalFilename = image.getOriginalFilename();
// 生成新文件名
String fileName = createNewFileName(originalFilename);
// 保存文件
image.transferTo(new File(SystemConstants.IMAGE_UPLOAD_DIR, fileName));
// 返回结果
log.debug("文件上传成功,{}", fileName);
return Result.ok(fileName);
} catch (IOException e) {
throw new RuntimeException("文件上传失败", e);
}
}
@GetMapping("/blog/delete")
public Result deleteBlogImg(@RequestParam("name") String filename) {
File file = new File(SystemConstants.IMAGE_UPLOAD_DIR, filename);
if (file.isDirectory()) {
return Result.fail("错误的文件名称");
}
FileUtil.del(file);
return Result.ok();
}
private String createNewFileName(String originalFilename) {
// 获取后缀
String suffix = StrUtil.subAfter(originalFilename, ".", true);
// 生成目录
String name = UUID.randomUUID().toString();
int hash = name.hashCode();
int d1 = hash & 0xF;
int d2 = (hash >> 4) & 0xF;
// 判断目录是否存在
File dir = new File(SystemConstants.IMAGE_UPLOAD_DIR, StrUtil.format("/blogs/{}/{}", d1, d2));
if (!dir.exists()) {
dir.mkdirs();
}
// 生成文件名
return StrUtil.format("/blogs/{}/{}/{}.{}", d1, d2, name, suffix);
}
public class SystemConstants {
public static final String IMAGE_UPLOAD_DIR = " \\nginx-1.18.0\\html\\hmdp\\imgs\\"; // nginx 前端服务器存图片的位置
public static final String USER_NICK_NAME_PREFIX = "user_";
public static final int DEFAULT_PAGE_SIZE = 5;
public static final int MAX_PAGE_SIZE = 10;
}
fork 是linux指令,创建子进程,只复制页表数据给子进程
主进程 - 页表虚拟内存 - 真实内存
fork采用的是copy-on-write技术:
当主进程执行读操作时,访问共享内存;
当主进程执行写操作时,则会拷贝一份数据,执行写操作。
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。