Skip to content
清晨的一缕阳光
返回

Redis 高级数据类型详解

Redis 高级数据类型详解

除了基础数据类型,Redis 还提供了 Bitmap、HyperLogLog、GEO、Stream 等高级数据结构。本文将深入这些高级类型的原理和实战应用。

一、Bitmap 位图

1.1 什么是 Bitmap

Bitmap 是位数组,通过 bit 位操作实现高效存储:

# 设置位
SETBIT users:online 0 1  # 用户 0 在线
SETBIT users:online 1 0  # 用户 1 离线
SETBIT users:online 2 1  # 用户 2 在线

# 获取位
GETBIT users:online 0    # 1

# 统计在线人数
BITCOUNT users:online    # 2

1.2 Java 实现签到系统

import redis.clients.jedis.Jedis;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;

public class CheckInSystem {
    private Jedis jedis;
    
    public CheckInSystem(Jedis jedis) {
        this.jedis = jedis;
    }
    
    /**
     * 用户签到
     */
    public void checkIn(String userId, LocalDate date) {
        String key = buildKey(userId, date);
        int dayOfMonth = date.getDayOfMonth();
        jedis.setbit(key, dayOfMonth - 1, true);
    }
    
    /**
     * 检查是否签到
     */
    public boolean isCheckedIn(String userId, LocalDate date) {
        String key = buildKey(userId, date);
        int dayOfMonth = date.getDayOfMonth();
        return jedis.getbit(key, dayOfMonth - 1);
    }
    
    /**
     * 统计签到天数
     */
    public long getCheckInDays(String userId, LocalDate date) {
        String key = buildKey(userId, date);
        return jedis.bitcount(key);
    }
    
    /**
     * 获取连续签到天数
     */
    public int getContinuousCheckInDays(String userId, LocalDate endDate) {
        String key = buildKey(userId, endDate);
        int continuous = 0;
        
        for (int i = endDate.getDayOfMonth() - 1; i >= 0; i--) {
            if (jedis.getbit(key, i)) {
                continuous++;
            } else {
                break;
            }
        }
        
        return continuous;
    }
    
    private String buildKey(String userId, LocalDate date) {
        String dateStr = date.format(DateTimeFormatter.ofPattern("yyyyMM"));
        return "checkin:" + userId + ":" + dateStr;
    }
    
    public static void main(String[] args) {
        Jedis jedis = new Jedis("localhost", 6379);
        CheckInSystem checkIn = new CheckInSystem(jedis);
        
        LocalDate today = LocalDate.now();
        
        // 签到
        checkIn.checkIn("user:1001", today);
        
        // 检查是否签到
        boolean checked = checkIn.isCheckedIn("user:1001", today);
        System.out.println("Checked in: " + checked);
        
        // 统计签到天数
        long days = checkIn.getCheckInDays("user:1001", today);
        System.out.println("Total days: " + days);
        
        // 连续签到
        int continuous = checkIn.getContinuousCheckInDays("user:1001", today);
        System.out.println("Continuous days: " + continuous);
    }
}

1.3 Golang 实现用户活跃统计

package main

import (
    "fmt"
    "github.com/go-redis/redis/v8"
    "context"
)

type ActiveUserStats struct {
    rdb *redis.Client
}

func NewActiveUserStats(rdb *redis.Client) *ActiveUserStats {
    return &ActiveUserStats{rdb: rdb}
}

// 记录用户活跃
func (aus *ActiveUserStats) RecordActive(date string, userID int64) {
    ctx := context.Background()
    key := fmt.Sprintf("active:%s", date)
    aus.rdb.SetBit(ctx, key, userID, 1)
}

// 统计活跃用户数
func (aus *ActiveUserStats) CountActive(date string) int64 {
    ctx := context.Background()
    key := fmt.Sprintf("active:%s", date)
    count, _ := aus.rdb.BitCount(ctx, key, nil).Result()
    return count.Val()
}

// 检查用户是否活跃
func (aus *ActiveUserStats) IsActive(date string, userID int64) bool {
    ctx := context.Background()
    key := fmt.Sprintf("active:%s", date)
    active, _ := aus.rdb.GetBit(ctx, key, userID).Result()
    return active == 1
}

// 统计两天都活跃的用户(位与运算)
func (aus *ActiveUserStats) CountBothActive(date1, date2 string) int64 {
    ctx := context.Background()
    key1 := fmt.Sprintf("active:%s", date1)
    key2 := fmt.Sprintf("active:%s", date2)
    
    // 位与运算
    result, _ := aus.rdb.BitOpAnd(ctx, "active:temp", key1, key2).Result()
    
    // 统计结果
    count, _ := aus.rdb.BitCount(ctx, "active:temp", nil).Result()
    
    // 清理临时 key
    aus.rdb.Del(ctx, "active:temp")
    
    return count.Val()
}

func main() {
    rdb := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })
    
    stats := NewActiveUserStats(rdb)
    
    // 记录活跃用户
    stats.RecordActive("2024-01-01", 1)
    stats.RecordActive("2024-01-01", 2)
    stats.RecordActive("2024-01-01", 3)
    
    stats.RecordActive("2024-01-02", 2)
    stats.RecordActive("2024-01-02", 3)
    
    // 统计活跃用户数
    count1 := stats.CountActive("2024-01-01")
    count2 := stats.CountActive("2024-01-02")
    
    fmt.Printf("2024-01-01 active: %d\n", count1)  // 3
    fmt.Printf("2024-01-02 active: %d\n", count2)  // 2
    
    // 两天都活跃的用户
    both := stats.CountBothActive("2024-01-01", "2024-01-02")
    fmt.Printf("Both days active: %d\n", both)  // 2
}

二、HyperLogLog 基数统计

2.1 什么是 HyperLogLog

HyperLogLog 是概率数据结构,用于基数统计:

# 添加元素
PFADD hll user1 user2 user3

# 统计基数
PFCOUNT hll  # 3

# 合并多个 HLL
PFMERGE dest source1 source2

2.2 Java 实现 UV 统计

import redis.clients.jedis.Jedis;

public class UVStats {
    private Jedis jedis;
    
    public UVStats(Jedis jedis) {
        this.jedis = jedis;
    }
    
    /**
     * 记录页面访问
     */
    public void recordVisit(String pageId, String userId) {
        String key = "page:" + pageId + ":uv";
        jedis.pfadd(key, userId);
    }
    
    /**
     * 获取 UV 数量
     */
    public long getUV(String pageId) {
        String key = "page:" + pageId + ":uv";
        return jedis.pfcount(key);
    }
    
    /**
     * 合并多页面 UV
     */
    public long getMergedUV(String... pageIds) {
        String destKey = "merged:uv";
        String[] sourceKeys = new String[pageIds.length];
        
        for (int i = 0; i < pageIds.length; i++) {
            sourceKeys[i] = "page:" + pageIds[i] + ":uv";
        }
        
        jedis.pfmerge(destKey, sourceKeys);
        long uv = jedis.pfcount(destKey);
        
        // 清理临时 key
        jedis.del(destKey);
        
        return uv;
    }
    
    public static void main(String[] args) {
        Jedis jedis = new Jedis("localhost", 6379);
        UVStats uvStats = new UVStats(jedis);
        
        // 记录访问
        uvStats.recordVisit("article:1001", "user:1");
        uvStats.recordVisit("article:1001", "user:2");
        uvStats.recordVisit("article:1001", "user:1");  // 重复访问
        
        // 获取 UV
        long uv = uvStats.getUV("article:1001");
        System.out.println("UV: " + uv);  // 2
        
        // 合并统计
        uvStats.recordVisit("article:1002", "user:2");
        uvStats.recordVisit("article:1002", "user:3");
        
        long totalUV = uvStats.getMergedUV("1001", "1002");
        System.out.println("Total UV: " + totalUV);  // 3
    }
}

2.3 Golang 实现设备统计

package main

import (
    "fmt"
    "github.com/go-redis/redis/v8"
    "context"
)

type DeviceStats struct {
    rdb *redis.Client
}

func NewDeviceStats(rdb *redis.Client) *DeviceStats {
    return &DeviceStats{rdb: rdb}
}

// 记录设备访问
func (ds *DeviceStats) RecordDevice(date string, deviceID string) {
    ctx := context.Background()
    key := fmt.Sprintf("devices:%s", date)
    ds.rdb.PFAdd(ctx, key, deviceID)
}

// 统计设备数量
func (ds *DeviceStats) CountDevices(date string) int64 {
    ctx := context.Background()
    key := fmt.Sprintf("devices:%s", date)
    count, _ := ds.rdb.PFCount(ctx, key).Result()
    return count
}

// 合并多天统计
func (ds *DeviceStats) MergeDays(dates []string) int64 {
    ctx := context.Background()
    destKey := "devices:merged"
    
    sourceKeys := make([]string, len(dates))
    for i, date := range dates {
        sourceKeys[i] = fmt.Sprintf("devices:%s", date)
    }
    
    ds.rdb.PFMerge(ctx, destKey, sourceKeys...)
    count, _ := ds.rdb.PFCount(ctx, destKey).Result()
    
    // 清理
    ds.rdb.Del(ctx, destKey)
    
    return count
}

func main() {
    rdb := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })
    
    stats := NewDeviceStats(rdb)
    
    // 记录设备
    stats.RecordDevice("2024-01-01", "device:1")
    stats.RecordDevice("2024-01-01", "device:2")
    stats.RecordDevice("2024-01-01", "device:1")  // 重复
    
    stats.RecordDevice("2024-01-02", "device:2")
    stats.RecordDevice("2024-01-02", "device:3")
    
    // 统计
    count1 := stats.CountDevices("2024-01-01")
    count2 := stats.CountDevices("2024-01-02")
    
    fmt.Printf("2024-01-01 devices: %d\n", count1)  // 2
    fmt.Printf("2024-01-02 devices: %d\n", count2)  // 2
    
    // 合并统计
    total := stats.MergeDays([]string{"2024-01-01", "2024-01-02"})
    fmt.Printf("Total unique devices: %d\n", total)  // 3
}

三、GEO 地理位置

3.1 什么是 GEO

GEO 是地理位置数据类型:

# 添加位置
GEOADD cities:china 116.40 39.90 beijing
GEOADD cities:china 121.47 31.23 shanghai

# 获取位置
GEOPOS cities:china beijing

# 计算距离
GEODIST cities:china beijing shanghai km

# 附近搜索
GEORADIUS cities:china 116.40 39.90 500 km

3.2 Java 实现附近的人

import redis.clients.jedis.GeoCoordinate;
import redis.clients.jedis.GeoRadiusResponse;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.params.GeoRadiusParam;
import java.util.List;

public class NearbyPeople {
    private Jedis jedis;
    
    public NearbyPeople(Jedis jedis) {
        this.jedis = jedis;
    }
    
    /**
     * 添加用户位置
     */
    public void addUserLocation(String userId, double longitude, double latitude) {
        String key = "users:location";
        jedis.geoadd(key, longitude, latitude, userId);
    }
    
    /**
     * 查找附近的人
     */
    public List<GeoRadiusResponse> findNearby(double longitude, double latitude, double radius, int count) {
        String key = "users:location";
        
        GeoRadiusParam param = new GeoRadiusParam()
            .count(count)
            .sortAscending()
            .withDist()
            .withCoord();
        
        return jedis.georadius(key, longitude, latitude, radius, param);
    }
    
    /**
     * 计算两人距离
     */
    public double getDistance(String userId1, String userId2) {
        String key = "users:location";
        return jedis.geodist(key, userId1, userId2).doubleValue();
    }
    
    public static void main(String[] args) {
        Jedis jedis = new Jedis("localhost", 6379);
        NearbyPeople nearby = new NearbyPeople(jedis);
        
        // 添加用户位置
        nearby.addUserLocation("user:1", 116.40, 39.90);  // 北京
        nearby.addUserLocation("user:2", 116.39, 39.91);  // 附近
        nearby.addUserLocation("user:3", 121.47, 31.23);  // 上海
        
        // 查找附近的人(5 公里内)
        List<GeoRadiusResponse> results = nearby.findNearby(116.40, 39.90, 5, 10);
        
        for (GeoRadiusResponse response : results) {
            System.out.println("User: " + response.getMemberByString());
            System.out.println("Distance: " + response.getDistance() + " km");
        }
        
        // 计算距离
        double distance = nearby.getDistance("user:1", "user:3");
        System.out.println("Beijing to Shanghai: " + distance + " m");
    }
}

3.3 Golang 实现附近店铺

package main

import (
    "fmt"
    "github.com/go-redis/redis/v8"
    "context"
)

type NearbyShops struct {
    rdb *redis.Client
}

func NewNearbyShops(rdb *redis.Client) *NearbyShops {
    return &NearbyShops{rdb: rdb}
}

// 添加店铺位置
func (ns *NearbyShops) AddShop(shopID string, longitude, latitude float64) {
    ctx := context.Background()
    ns.rdb.GeoAdd(ctx, "shops:location", &redis.GeoLocation{
        Name:      shopID,
        Longitude: longitude,
        Latitude:  latitude,
    })
}

// 查找附近店铺
func (ns *NearbyShops) FindNearby(longitude, latitude float64, radius float64, count int64) []redis.GeoLocation {
    ctx := context.Background()
    
    locations, _ := ns.rdb.GeoRadius(ctx, "shops:location", &redis.GeoRadiusQuery{
        Longitude: longitude,
        Latitude:  latitude,
        Radius:    radius,
        Unit:      "km",
        Count:     count,
        WithDist:  true,
        WithCoord: true,
    }).Result()
    
    return locations
}

// 计算店铺间距离
func (ns *NearbyShops) GetDistance(shopID1, shopID2 string) float64 {
    ctx := context.Background()
    distance, _ := ns.rdb.GeoDist(ctx, "shops:location", shopID1, shopID2, "km").Result()
    return distance
}

func main() {
    rdb := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })
    
    nearby := NewNearbyShops(rdb)
    
    // 添加店铺
    nearby.AddShop("shop:1", 116.40, 39.90)  // 北京
    nearby.AddShop("shop:2", 116.39, 39.91)  // 附近
    nearby.AddShop("shop:3", 121.47, 31.23)  // 上海
    
    // 查找附近店铺
    shops := nearby.FindNearby(116.40, 39.90, 10, 10)
    
    for _, shop := range shops {
        fmt.Printf("Shop: %s, Distance: %.2f km\n", shop.Name, shop.Dist)
    }
    
    // 计算距离
    distance := nearby.GetDistance("shop:1", "shop:3")
    fmt.Printf("Distance: %.2f km\n", distance)
}

四、Stream 流

4.1 什么是 Stream

Stream 是 Redis 5.0+ 的日志结构

# 添加消息
XADD mystream * field1 value1 field2 value2

# 读取消息
XREAD COUNT 2 STREAMS mystream 0

# 消费者组
XGROUP CREATE mystream group1 0

# 消费消息
XREADGROUP GROUP group1 consumer1 COUNT 1 STREAMS mystream >

# 确认消息
XACK mystream group1 message_id

4.2 Java 实现消息队列

import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.params.XReadParams;
import java.util.*;

public class StreamQueue {
    private Jedis jedis;
    
    public StreamQueue(Jedis jedis) {
        this.jedis = jedis;
    }
    
    /**
     * 发布消息
     */
    public StreamEntryID publish(String stream, Map<String, String> data) {
        return jedis.xadd(stream, null, data);
    }
    
    /**
     * 创建消费者组
     */
    public void createGroup(String stream, String group) {
        try {
            jedis.xgroupCreate(stream, group, new StreamEntryID("0-0"), true);
        } catch (Exception e) {
            // 组已存在
        }
    }
    
    /**
     * 消费消息
     */
    public List<Map.Entry<String, Map<String, String>>> consume(
        String stream, String group, String consumer, int count) {
        
        Map<String, StreamEntryID> streams = new HashMap<>();
        streams.put(stream, new StreamEntryID(">"));
        
        return jedis.xreadGroup(group, consumer, 
            XReadParams.create().count(count).block(5000), 
            streams);
    }
    
    /**
     * 确认消息
     */
    public void ack(String stream, String group, StreamEntryID... ids) {
        jedis.xack(stream, group, ids);
    }
    
    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis("localhost", 6379);
        StreamQueue queue = new StreamQueue(jedis);
        
        String stream = "tasks";
        String group = "workers";
        
        // 创建消费者组
        queue.createGroup(stream, group);
        
        // 生产者
        Map<String, String> task = new HashMap<>();
        task.put("task_id", "1");
        task.put("action", "send_email");
        queue.publish(stream, task);
        
        // 消费者
        List<Map.Entry<String, Map<String, String>>> messages = 
            queue.consume(stream, group, "worker:1", 1);
        
        for (Map.Entry<String, Map<String, String>> entry : messages) {
            System.out.println("Received: " + entry.getValue());
            
            // 确认消息
            StreamEntryID id = new StreamEntryID(entry.getKey());
            queue.ack(stream, group, id);
        }
    }
}

五、数据类型对比

5.1 去重方案对比

方案内存精度适用场景
Set100%小数据量
Bitmap最低100%连续整数
HyperLogLog固定 12KB99.19%海量数据

5.2 选择指南

去重统计:
- 数据量小 → Set
- 连续整数 → Bitmap
- 海量数据 → HyperLogLog

地理位置:
- 附近搜索 → GEO

消息队列:
- 简单队列 → List
- 可靠队列 → Stream

总结

Redis 高级数据类型核心要点:

类型特点适用场景
Bitmap位操作,省空间签到、状态标记
HyperLogLog基数统计,固定内存UV 统计
GEO地理位置,距离计算附近的人/店铺
Stream消息流,消费者组消息队列

最佳实践

  1. 根据场景选择合适的数据类型
  2. 注意内存占用和精度权衡
  3. 定期清理过期数据

掌握高级数据类型,扩展 Redis 应用场景!

参考资料


分享这篇文章到:

上一篇文章
飞越疯人院剧情分析
下一篇文章
RAG 分块策略与优化实战