Redis网络模型
主从复制原理
-
建立连接
- 从节点在配置了 replicaof 配置了主节点的ip和port
- 从库执行replicaof 并发送psync命令
-
同步数据到从库
- 主库bgsave生成RDB文件,并发送给从库,同时为每一个slave开辟一块 replication buffer 缓冲区记录从生成rdb文件开始收到的所有写命令。
- 从库清空数据并加载rdb
-
发送新写的命令给从库
- 从节点加载 RDB 完成后,主节点将 replication buffer 缓冲区的数据(增量写命令)发送到从节点,slave 接收并执行,从节点同步至主节点相同的状态。
-
基于长连接传播
- 方便后续命令传输,除了写命令,还维持着心跳机制
网络模型
-
阻塞IO
-
非阻塞IO
- 没啥用,还是需要等待数据准备就绪
-
IO多路复用
-
信号驱动IO
-
异步IO
IO多路复用
文件描述符:简称FD
select
流程:
-
创建fd_set rfds
- 假设要监听的fd为1,2,5
-
执行select(5+1,null,null,3)
- 第一个参数为最大长度+1,后面是等待时间
-
内核遍历fd
-
没有数据,休眠
- 等待数据就绪或者超时
-
-
遍历fd_set 找到就绪的数据
存在的问题
- 需要将整个fd_set从用户空间拷贝到内核空间,select结束后还要拷贝回用户空间
- 需要遍历一次
- 最大为监听1024
poll
将数据改为了链表,还是需要遍历
epoll
- epoll_create 创建epoll实例
- epoll_ctl 添加需要监听的fd,关联callback
- epoll_wait 等待fd就绪
epoll_wait有两种通知模式
- levelTriggered 简称LT 当FD有数据可读的时候,会重复通知多次,直到数据处理完成。是epoll的默认模式
- EdgeTriggered 简称ET 。当FD有数据可读的时候只会被通知一次,不管数据是否处理完成
ET模式避免了LT的惊群效应。
ET模式最好结合非阻塞IO。
淘汰策略
分类:全体,ttl
LRU
抽样LRU 不是严格的
LFU
RedisObject中使用逻辑访问次数
访问频率越高,增加的越小,还会衰减(16位记录时间,8位记录逻辑访问次数)
总结
附带的tcp连接redis客户端 使用go编写的
package main
import (
"bufio"
"errors"
"fmt"
"io"
"net"
"strconv"
)
// RedisClient 封装用于连接
type RedisClient struct {
conn net.Conn
//包装一层 方便读写
writer *bufio.Writer
reader *bufio.Reader
}
func main() {
//连接redis
conn, err := net.Dial("tcp", "127.0.0.1:6379")
if err != nil {
fmt.Println("连接redis失败", err)
}
client := RedisClient{conn: conn, writer: bufio.NewWriter(conn), reader: bufio.NewReader(conn)}
defer client.conn.Close()
//发送命令
client.sendRequest([]string{"set", "name", "方块"})
//zrange boards:2024-4 0 -1
//client.sendRequest([]string{"zrange", "boards:2024-4", "0", "-1"})
//LRANGE tlist 0 -1
client.sendRequest([]string{"LRANGE", "tlist", "0", "-1"})
}
func (client *RedisClient) sendRequest(args []string) interface{} {
length := len(args)
firstCommand := fmt.Sprintf("%s%d", "*", length)
client.writeCommand(firstCommand)
for _, s := range args {
n := len(s)
client.writeCommand("$" + strconv.Itoa(n))
client.writeCommand(s)
println(n, s)
}
response := client.handleResponse()
//fmt.Printf("%v", response)
return response
}
// 写命令
func (client *RedisClient) writeCommand(s string) {
client.conn.Write([]byte(s + "\r\n"))
}
// 解析返回结果
func (client *RedisClient) handleResponse() interface{} {
//先读取第一个字符
r, _, _ := client.reader.ReadRune()
flag := string(r)
fmt.Println("第一个操作数:" + flag)
switch flag {
case "+":
//一行字符串
return client.ReadLine()
case "-":
//异常
return client.ReadLine()
case ":":
//数字
line := client.ReadLine()
res, _ := strconv.Atoi(line)
return res
case "$":
// 多行字符串
//readRune, _, _ := client.reader.ReadRune()
//去掉换行
readRune := client.ReadLine()
length := string(readRune)
if length == "-1" {
return nil
} else if length == "0" {
return ""
}
lll, _ := strconv.Atoi(length)
//+2是跳过\r\n
bytes := make([]byte, lll+2)
n, _ := client.reader.Read(bytes)
return string(bytes[:n])
case "*":
//多行字符串 递归获取
return client.readBulkString()
default:
return errors.New("错误")
}
}
// 读一行
func (client *RedisClient) ReadLine() string {
bytes, _, _ := client.reader.ReadLine()
return string(bytes)
}
// 读到末尾 估计有点问题
func (client *RedisClient) ReadToEnd() string {
var size = 1024
bytes := make([]byte, size)
var temp = ""
for {
n, err := client.reader.Read(bytes)
temp += string(bytes[:n])
//n, err := client.conn.Read(bytes)
if err == io.EOF || n == 0 || n < size {
break
}
}
return temp
}
func (client *RedisClient) readBulkString() interface{} {
counts, _ := strconv.Atoi(client.ReadLine())
if counts <= 0 {
return nil
}
//var resList = list.List{}
var lists []interface{}
for i := 0; i < counts; i++ {
res := client.handleResponse()
lists = append(lists, res)
//fmt.Println("多行结果:" + fmt.Sprintf("%v", res))
}
return lists
}