Redis 高级数据类型详解
除了基础数据类型,Redis 还提供了 Bitmap、HyperLogLog、GEO、Stream 等高级数据结构。本文将深入这些高级类型的原理和实战应用。
一、Bitmap 位图
1.1 什么是 Bitmap
Bitmap 是位数组,通过 bit 位操作实现高效存储:
- 每个 bit 位只有 0 或 1
- 极度节省空间
- 适合布尔值存储
# 设置位
SETBIT users:online 0 1 # 用户 0 在线
SETBIT users:online 1 0 # 用户 1 离线
SETBIT users:online 2 1 # 用户 2 在线
# 获取位
GETBIT users:online 0 # 1
# 统计在线人数
BITCOUNT users:online # 2
1.2 Java 实现签到系统
import redis.clients.jedis.Jedis;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
public class CheckInSystem {
private Jedis jedis;
public CheckInSystem(Jedis jedis) {
this.jedis = jedis;
}
/**
* 用户签到
*/
public void checkIn(String userId, LocalDate date) {
String key = buildKey(userId, date);
int dayOfMonth = date.getDayOfMonth();
jedis.setbit(key, dayOfMonth - 1, true);
}
/**
* 检查是否签到
*/
public boolean isCheckedIn(String userId, LocalDate date) {
String key = buildKey(userId, date);
int dayOfMonth = date.getDayOfMonth();
return jedis.getbit(key, dayOfMonth - 1);
}
/**
* 统计签到天数
*/
public long getCheckInDays(String userId, LocalDate date) {
String key = buildKey(userId, date);
return jedis.bitcount(key);
}
/**
* 获取连续签到天数
*/
public int getContinuousCheckInDays(String userId, LocalDate endDate) {
String key = buildKey(userId, endDate);
int continuous = 0;
for (int i = endDate.getDayOfMonth() - 1; i >= 0; i--) {
if (jedis.getbit(key, i)) {
continuous++;
} else {
break;
}
}
return continuous;
}
private String buildKey(String userId, LocalDate date) {
String dateStr = date.format(DateTimeFormatter.ofPattern("yyyyMM"));
return "checkin:" + userId + ":" + dateStr;
}
public static void main(String[] args) {
Jedis jedis = new Jedis("localhost", 6379);
CheckInSystem checkIn = new CheckInSystem(jedis);
LocalDate today = LocalDate.now();
// 签到
checkIn.checkIn("user:1001", today);
// 检查是否签到
boolean checked = checkIn.isCheckedIn("user:1001", today);
System.out.println("Checked in: " + checked);
// 统计签到天数
long days = checkIn.getCheckInDays("user:1001", today);
System.out.println("Total days: " + days);
// 连续签到
int continuous = checkIn.getContinuousCheckInDays("user:1001", today);
System.out.println("Continuous days: " + continuous);
}
}
1.3 Golang 实现用户活跃统计
package main
import (
"fmt"
"github.com/go-redis/redis/v8"
"context"
)
type ActiveUserStats struct {
rdb *redis.Client
}
func NewActiveUserStats(rdb *redis.Client) *ActiveUserStats {
return &ActiveUserStats{rdb: rdb}
}
// 记录用户活跃
func (aus *ActiveUserStats) RecordActive(date string, userID int64) {
ctx := context.Background()
key := fmt.Sprintf("active:%s", date)
aus.rdb.SetBit(ctx, key, userID, 1)
}
// 统计活跃用户数
func (aus *ActiveUserStats) CountActive(date string) int64 {
ctx := context.Background()
key := fmt.Sprintf("active:%s", date)
count, _ := aus.rdb.BitCount(ctx, key, nil).Result()
return count.Val()
}
// 检查用户是否活跃
func (aus *ActiveUserStats) IsActive(date string, userID int64) bool {
ctx := context.Background()
key := fmt.Sprintf("active:%s", date)
active, _ := aus.rdb.GetBit(ctx, key, userID).Result()
return active == 1
}
// 统计两天都活跃的用户(位与运算)
func (aus *ActiveUserStats) CountBothActive(date1, date2 string) int64 {
ctx := context.Background()
key1 := fmt.Sprintf("active:%s", date1)
key2 := fmt.Sprintf("active:%s", date2)
// 位与运算
result, _ := aus.rdb.BitOpAnd(ctx, "active:temp", key1, key2).Result()
// 统计结果
count, _ := aus.rdb.BitCount(ctx, "active:temp", nil).Result()
// 清理临时 key
aus.rdb.Del(ctx, "active:temp")
return count.Val()
}
func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
stats := NewActiveUserStats(rdb)
// 记录活跃用户
stats.RecordActive("2024-01-01", 1)
stats.RecordActive("2024-01-01", 2)
stats.RecordActive("2024-01-01", 3)
stats.RecordActive("2024-01-02", 2)
stats.RecordActive("2024-01-02", 3)
// 统计活跃用户数
count1 := stats.CountActive("2024-01-01")
count2 := stats.CountActive("2024-01-02")
fmt.Printf("2024-01-01 active: %d\n", count1) // 3
fmt.Printf("2024-01-02 active: %d\n", count2) // 2
// 两天都活跃的用户
both := stats.CountBothActive("2024-01-01", "2024-01-02")
fmt.Printf("Both days active: %d\n", both) // 2
}
二、HyperLogLog 基数统计
2.1 什么是 HyperLogLog
HyperLogLog 是概率数据结构,用于基数统计:
- 固定 12KB 内存
- 误差率约 0.81%
- 适合海量数据去重
# 添加元素
PFADD hll user1 user2 user3
# 统计基数
PFCOUNT hll # 3
# 合并多个 HLL
PFMERGE dest source1 source2
2.2 Java 实现 UV 统计
import redis.clients.jedis.Jedis;
public class UVStats {
private Jedis jedis;
public UVStats(Jedis jedis) {
this.jedis = jedis;
}
/**
* 记录页面访问
*/
public void recordVisit(String pageId, String userId) {
String key = "page:" + pageId + ":uv";
jedis.pfadd(key, userId);
}
/**
* 获取 UV 数量
*/
public long getUV(String pageId) {
String key = "page:" + pageId + ":uv";
return jedis.pfcount(key);
}
/**
* 合并多页面 UV
*/
public long getMergedUV(String... pageIds) {
String destKey = "merged:uv";
String[] sourceKeys = new String[pageIds.length];
for (int i = 0; i < pageIds.length; i++) {
sourceKeys[i] = "page:" + pageIds[i] + ":uv";
}
jedis.pfmerge(destKey, sourceKeys);
long uv = jedis.pfcount(destKey);
// 清理临时 key
jedis.del(destKey);
return uv;
}
public static void main(String[] args) {
Jedis jedis = new Jedis("localhost", 6379);
UVStats uvStats = new UVStats(jedis);
// 记录访问
uvStats.recordVisit("article:1001", "user:1");
uvStats.recordVisit("article:1001", "user:2");
uvStats.recordVisit("article:1001", "user:1"); // 重复访问
// 获取 UV
long uv = uvStats.getUV("article:1001");
System.out.println("UV: " + uv); // 2
// 合并统计
uvStats.recordVisit("article:1002", "user:2");
uvStats.recordVisit("article:1002", "user:3");
long totalUV = uvStats.getMergedUV("1001", "1002");
System.out.println("Total UV: " + totalUV); // 3
}
}
2.3 Golang 实现设备统计
package main
import (
"fmt"
"github.com/go-redis/redis/v8"
"context"
)
type DeviceStats struct {
rdb *redis.Client
}
func NewDeviceStats(rdb *redis.Client) *DeviceStats {
return &DeviceStats{rdb: rdb}
}
// 记录设备访问
func (ds *DeviceStats) RecordDevice(date string, deviceID string) {
ctx := context.Background()
key := fmt.Sprintf("devices:%s", date)
ds.rdb.PFAdd(ctx, key, deviceID)
}
// 统计设备数量
func (ds *DeviceStats) CountDevices(date string) int64 {
ctx := context.Background()
key := fmt.Sprintf("devices:%s", date)
count, _ := ds.rdb.PFCount(ctx, key).Result()
return count
}
// 合并多天统计
func (ds *DeviceStats) MergeDays(dates []string) int64 {
ctx := context.Background()
destKey := "devices:merged"
sourceKeys := make([]string, len(dates))
for i, date := range dates {
sourceKeys[i] = fmt.Sprintf("devices:%s", date)
}
ds.rdb.PFMerge(ctx, destKey, sourceKeys...)
count, _ := ds.rdb.PFCount(ctx, destKey).Result()
// 清理
ds.rdb.Del(ctx, destKey)
return count
}
func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
stats := NewDeviceStats(rdb)
// 记录设备
stats.RecordDevice("2024-01-01", "device:1")
stats.RecordDevice("2024-01-01", "device:2")
stats.RecordDevice("2024-01-01", "device:1") // 重复
stats.RecordDevice("2024-01-02", "device:2")
stats.RecordDevice("2024-01-02", "device:3")
// 统计
count1 := stats.CountDevices("2024-01-01")
count2 := stats.CountDevices("2024-01-02")
fmt.Printf("2024-01-01 devices: %d\n", count1) // 2
fmt.Printf("2024-01-02 devices: %d\n", count2) // 2
// 合并统计
total := stats.MergeDays([]string{"2024-01-01", "2024-01-02"})
fmt.Printf("Total unique devices: %d\n", total) // 3
}
三、GEO 地理位置
3.1 什么是 GEO
GEO 是地理位置数据类型:
- 存储经纬度
- 支持距离计算
- 支持附近搜索
# 添加位置
GEOADD cities:china 116.40 39.90 beijing
GEOADD cities:china 121.47 31.23 shanghai
# 获取位置
GEOPOS cities:china beijing
# 计算距离
GEODIST cities:china beijing shanghai km
# 附近搜索
GEORADIUS cities:china 116.40 39.90 500 km
3.2 Java 实现附近的人
import redis.clients.jedis.GeoCoordinate;
import redis.clients.jedis.GeoRadiusResponse;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.params.GeoRadiusParam;
import java.util.List;
public class NearbyPeople {
private Jedis jedis;
public NearbyPeople(Jedis jedis) {
this.jedis = jedis;
}
/**
* 添加用户位置
*/
public void addUserLocation(String userId, double longitude, double latitude) {
String key = "users:location";
jedis.geoadd(key, longitude, latitude, userId);
}
/**
* 查找附近的人
*/
public List<GeoRadiusResponse> findNearby(double longitude, double latitude, double radius, int count) {
String key = "users:location";
GeoRadiusParam param = new GeoRadiusParam()
.count(count)
.sortAscending()
.withDist()
.withCoord();
return jedis.georadius(key, longitude, latitude, radius, param);
}
/**
* 计算两人距离
*/
public double getDistance(String userId1, String userId2) {
String key = "users:location";
return jedis.geodist(key, userId1, userId2).doubleValue();
}
public static void main(String[] args) {
Jedis jedis = new Jedis("localhost", 6379);
NearbyPeople nearby = new NearbyPeople(jedis);
// 添加用户位置
nearby.addUserLocation("user:1", 116.40, 39.90); // 北京
nearby.addUserLocation("user:2", 116.39, 39.91); // 附近
nearby.addUserLocation("user:3", 121.47, 31.23); // 上海
// 查找附近的人(5 公里内)
List<GeoRadiusResponse> results = nearby.findNearby(116.40, 39.90, 5, 10);
for (GeoRadiusResponse response : results) {
System.out.println("User: " + response.getMemberByString());
System.out.println("Distance: " + response.getDistance() + " km");
}
// 计算距离
double distance = nearby.getDistance("user:1", "user:3");
System.out.println("Beijing to Shanghai: " + distance + " m");
}
}
3.3 Golang 实现附近店铺
package main
import (
"fmt"
"github.com/go-redis/redis/v8"
"context"
)
type NearbyShops struct {
rdb *redis.Client
}
func NewNearbyShops(rdb *redis.Client) *NearbyShops {
return &NearbyShops{rdb: rdb}
}
// 添加店铺位置
func (ns *NearbyShops) AddShop(shopID string, longitude, latitude float64) {
ctx := context.Background()
ns.rdb.GeoAdd(ctx, "shops:location", &redis.GeoLocation{
Name: shopID,
Longitude: longitude,
Latitude: latitude,
})
}
// 查找附近店铺
func (ns *NearbyShops) FindNearby(longitude, latitude float64, radius float64, count int64) []redis.GeoLocation {
ctx := context.Background()
locations, _ := ns.rdb.GeoRadius(ctx, "shops:location", &redis.GeoRadiusQuery{
Longitude: longitude,
Latitude: latitude,
Radius: radius,
Unit: "km",
Count: count,
WithDist: true,
WithCoord: true,
}).Result()
return locations
}
// 计算店铺间距离
func (ns *NearbyShops) GetDistance(shopID1, shopID2 string) float64 {
ctx := context.Background()
distance, _ := ns.rdb.GeoDist(ctx, "shops:location", shopID1, shopID2, "km").Result()
return distance
}
func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
nearby := NewNearbyShops(rdb)
// 添加店铺
nearby.AddShop("shop:1", 116.40, 39.90) // 北京
nearby.AddShop("shop:2", 116.39, 39.91) // 附近
nearby.AddShop("shop:3", 121.47, 31.23) // 上海
// 查找附近店铺
shops := nearby.FindNearby(116.40, 39.90, 10, 10)
for _, shop := range shops {
fmt.Printf("Shop: %s, Distance: %.2f km\n", shop.Name, shop.Dist)
}
// 计算距离
distance := nearby.GetDistance("shop:1", "shop:3")
fmt.Printf("Distance: %.2f km\n", distance)
}
四、Stream 流
4.1 什么是 Stream
Stream 是 Redis 5.0+ 的日志结构:
- 有序的消息列表
- 支持消费者组
- 支持消息确认
# 添加消息
XADD mystream * field1 value1 field2 value2
# 读取消息
XREAD COUNT 2 STREAMS mystream 0
# 消费者组
XGROUP CREATE mystream group1 0
# 消费消息
XREADGROUP GROUP group1 consumer1 COUNT 1 STREAMS mystream >
# 确认消息
XACK mystream group1 message_id
4.2 Java 实现消息队列
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.params.XReadParams;
import java.util.*;
public class StreamQueue {
private Jedis jedis;
public StreamQueue(Jedis jedis) {
this.jedis = jedis;
}
/**
* 发布消息
*/
public StreamEntryID publish(String stream, Map<String, String> data) {
return jedis.xadd(stream, null, data);
}
/**
* 创建消费者组
*/
public void createGroup(String stream, String group) {
try {
jedis.xgroupCreate(stream, group, new StreamEntryID("0-0"), true);
} catch (Exception e) {
// 组已存在
}
}
/**
* 消费消息
*/
public List<Map.Entry<String, Map<String, String>>> consume(
String stream, String group, String consumer, int count) {
Map<String, StreamEntryID> streams = new HashMap<>();
streams.put(stream, new StreamEntryID(">"));
return jedis.xreadGroup(group, consumer,
XReadParams.create().count(count).block(5000),
streams);
}
/**
* 确认消息
*/
public void ack(String stream, String group, StreamEntryID... ids) {
jedis.xack(stream, group, ids);
}
public static void main(String[] args) throws InterruptedException {
Jedis jedis = new Jedis("localhost", 6379);
StreamQueue queue = new StreamQueue(jedis);
String stream = "tasks";
String group = "workers";
// 创建消费者组
queue.createGroup(stream, group);
// 生产者
Map<String, String> task = new HashMap<>();
task.put("task_id", "1");
task.put("action", "send_email");
queue.publish(stream, task);
// 消费者
List<Map.Entry<String, Map<String, String>>> messages =
queue.consume(stream, group, "worker:1", 1);
for (Map.Entry<String, Map<String, String>> entry : messages) {
System.out.println("Received: " + entry.getValue());
// 确认消息
StreamEntryID id = new StreamEntryID(entry.getKey());
queue.ack(stream, group, id);
}
}
}
五、数据类型对比
5.1 去重方案对比
| 方案 | 内存 | 精度 | 适用场景 |
|---|---|---|---|
| Set | 高 | 100% | 小数据量 |
| Bitmap | 最低 | 100% | 连续整数 |
| HyperLogLog | 固定 12KB | 99.19% | 海量数据 |
5.2 选择指南
去重统计:
- 数据量小 → Set
- 连续整数 → Bitmap
- 海量数据 → HyperLogLog
地理位置:
- 附近搜索 → GEO
消息队列:
- 简单队列 → List
- 可靠队列 → Stream
总结
Redis 高级数据类型核心要点:
| 类型 | 特点 | 适用场景 |
|---|---|---|
| Bitmap | 位操作,省空间 | 签到、状态标记 |
| HyperLogLog | 基数统计,固定内存 | UV 统计 |
| GEO | 地理位置,距离计算 | 附近的人/店铺 |
| Stream | 消息流,消费者组 | 消息队列 |
最佳实践:
- 根据场景选择合适的数据类型
- 注意内存占用和精度权衡
- 定期清理过期数据
掌握高级数据类型,扩展 Redis 应用场景!