banner

问题

假如有10亿的用户,每个用户有自己的分数(score),请你设计一个排行榜,可以根据用户的分数进行排名,每个用户可以知道自己在排行榜中的排名情况。

分析

一说到排行榜,我们的第一反应肯定是使用Redis的ZSET结构实现,那么使用ZSET如何实现对10亿用户进行排序?

ZSET单key

第一种思路,使用ZSET单key,也就是将10亿用户都放入一个ZSET key中,简单明了,这样设计是最简单的,但查询的性能是否可以接受?我们来计算一下:

ZSET结构使用SkipList实现,查询的时间复杂度:ZREVRANK:O(log(N))

查询单个排名的理论时间复杂度:O(log(10^9)) ≈ 30次比较

内存占用分析:

public class PerformanceAnalysis {
    // 1. 内存占用(假设每个成员)
    private static final long MEMORY_PER_MEMBER = 
        + 8    // score (double)
        + 20   // member (假设userId字符串)
        + 8    // 指针
        + 32;  // 跳表节点开销
    
    // 10亿用户预估内存
    private static final long TOTAL_MEMORY = 
        MEMORY_PER_MEMBER * 1_000_000_000L; // 约68GB
        
    // 2. 跳表层数
    private static final int SKIP_LIST_LEVELS = 
        (int) Math.log(1_000_000_000L) / Math.log(2); // 约30层
}

假定每个成员对象约68字节,那么10亿用户约需要68GB内存,当然实际场景下,可以考虑对单个成员对象再进行压缩,内存占用可以再小一点。

如果仅从查询的效率考虑,如果仅查询单个成员的排名,Redis的查询耗时是不用担心的,但如果要进行范围查询,那么性能则非常不乐观了。

在实际的生产环境中,如此大的单Key我们一般称为GodKey或者BigKey,是需要极力避免的,我们都知道Redis是纯内存的数据库,执行命令是单线程执行的,对于内存操作是非常的快的,但是如果单个Key值特别大时,容易阻塞线程,影响其他命令的执行,同时对于Redis服务器的网卡也是一个巨大的挑战,试想如果对于68G的 ZSET Key进行范围查询的话,如果1W QPS,Redis怕是直接会被打爆。

如此看来,这个使用单Key存储全部的用户数据,应该行不通的。

ZSET多Key

“一个篱笆三个桩,一个好汉三个帮”,那么既然一个key搞不定,那我们就分多个key就好了吧?那么我们该如何将10亿用户存储到多个key中呢?

正常第一反应,我们就搞10000个key,然后使用userId取模,路由到对应的子key就好了吧?但是我们的需求是,需要可以知道每个用户在排行榜中的排名,显然这种路由的方式,是无法满足需求的。

拆分多个子key,根据某种规则路由的思路是完全没问题的,不能根据userId路由,那我们可否换一种思路,以其他维度进行路由?

分片

换一个思路,如果以分数作为分桶逻辑,初始化N个分桶,每个分桶的划定范围以分数区间划定,当新用户获得分数后,根据其分数路由到所在的分桶中。

那么,用户的排名该如何计算?

get rank

计算用户的排名,我们肯定已经知道用户所在的分桶,可以直接获得在当前分桶中的排名,同时比当前分桶分数大的分桶名称我们肯定也可以知道,那么将所有的前面的分桶中的总数量累加,再加上当前分桶的排名,就是该用户的总排名。

问题

综上,多个子key也就是数据分桶的逻辑,基本上是可以满足我们的需求的,但是还有一个问题,就是数据分布不均匀的问题,我们设定的多个分桶,但是每个分桶中的数据量一定不是均匀的,可能会出现数据倾斜的问题,如何解决?

对此,较为简单的处理方式,当某个数据桶的数据量超过一定的阈值后,可以进行拆分,缩小分桶的数值范围,分割成两个子桶,如此循环往复,直到无法再切分为止。

实现demo

下面的Claude给出的代码实现Demo,逻辑可能不严谨,仅作参考:

import lombok.Data;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

public class ScoreBucketRankingSystem {

    @Data
    public class UserScore {
        private long userId;
        private double score;
        private long timestamp;
    }

    @Data
    public class RankInfo {
        private long userId;
        private double score;
        private long rank;
        private String bucketKey;
    }

    @Data
    public class ScoreBucket {
        private String bucketKey;
        private double minScore;
        private double maxScore;
        private long userCount;
    }

    private final RedisTemplate<String, String> redisTemplate;
    private final String BUCKET_PREFIX = "rank:bucket:";
    private final String BUCKET_META_KEY = "rank:bucket:meta";
    
    // 缓存分桶信息
    private final ConcurrentHashMap<String, ScoreBucket> bucketCache = new ConcurrentHashMap<>();
    
    // 初始化分桶配置
    private final List<ScoreBucket> initBuckets = new ArrayList<>() ;

    public ScoreBucketRankingSystem(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
        initBuckets();
    }

    // 初始化分桶
    private void initBuckets() {
        for (ScoreBucket bucket : initBuckets) {
            String bucketKey = BUCKET_PREFIX + bucket.getBucketKey();
            bucketCache.put(bucketKey, bucket);
            // 存储分桶元信息
            redisTemplate.opsForHash().put(
                BUCKET_META_KEY,
                bucketKey,
                JSON.toJSONString(bucket)
            );
        }
    }

    // 更新用户分数
    public void updateScore(long userId, double newScore) {
        // 1. 找到用户当前所在的分桶
        String oldBucketKey = findUserCurrentBucket(userId);
        
        // 2. 计算用户应该在的分桶
        String newBucketKey = calculateBucketKey(newScore);
        
        // 3. 如果分桶发生变化,需要迁移
        if (oldBucketKey != null && !oldBucketKey.equals(newBucketKey)) {
            moveUserToBucket(userId, oldBucketKey, newBucketKey, newScore);
        } else {
            // 4. 直接更新分数
            updateUserScore(userId, newBucketKey, newScore);
        }
    }

    // 查找用户当前所在的分桶
    private String findUserCurrentBucket(long userId) {
        for (String bucketKey : bucketCache.keySet()) {
            Double score = redisTemplate.opsForZSet().score(bucketKey, String.valueOf(userId));
            if (score != null) {
                return bucketKey;
            }
        }
        return null;
    }

    // 根据分数计算应该所在的分桶
    private String calculateBucketKey(double score) {
        for (ScoreBucket bucket : bucketCache.values()) {
            if (score >= bucket.getMinScore() && score <= bucket.getMaxScore()) {
                return BUCKET_PREFIX + bucket.getBucketKey();
            }
        }
        throw new IllegalArgumentException("Score out of range: " + score);
    }

    // 将用户从旧分桶移动到新分桶
    private void moveUserToBucket(long userId, String oldBucketKey, String newBucketKey, double newScore) {
        redisTemplate.execute(new SessionCallback<List<Object>>() {
            @Override
            public List<Object> execute(RedisOperations operations) {
                operations.multi();
                
                // 从旧分桶删除
                operations.opsForZSet().remove(oldBucketKey, String.valueOf(userId));
                
                // 更新分桶计数
                operations.opsForHash().increment(BUCKET_META_KEY, oldBucketKey + ":count", -1);
                operations.opsForHash().increment(BUCKET_META_KEY, newBucketKey + ":count", 1);
                
                // 添加到新分桶
                operations.opsForZSet().add(newBucketKey, String.valueOf(userId), newScore);
                
                return operations.exec();
            }
        });
    }

    // 更新用户分数
    private void updateUserScore(long userId, String bucketKey, double newScore) {
        redisTemplate.opsForZSet().add(bucketKey, String.valueOf(userId), newScore);
    }

    // 获取用户排名信息
    public RankInfo getUserRankInfo(long userId) {
        String bucketKey = findUserCurrentBucket(userId);
        if (bucketKey == null) {
            return null;
        }

        Double score = redisTemplate.opsForZSet().score(bucketKey, String.valueOf(userId));
        Long rank = redisTemplate.opsForZSet().reverseRank(bucketKey, String.valueOf(userId));
        
        if (score == null || rank == null) {
            return null;
        }

        // 计算全局排名
        long globalRank = calculateGlobalRank(bucketKey, rank);

        RankInfo rankInfo = new RankInfo();
        rankInfo.setUserId(userId);
        rankInfo.setScore(score);
        rankInfo.setRank(globalRank);
        rankInfo.setBucketKey(bucketKey);
        
        return rankInfo;
    }

    // 计算全局排名
    private long calculateGlobalRank(String currentBucketKey, long rankInBucket) {
        long globalRank = rankInBucket;
        
        // 加上更高分数段的用户数
        for (ScoreBucket bucket : bucketCache.values()) {
            String bucketKey = BUCKET_PREFIX + bucket.getBucketKey();
            if (bucket.getMaxScore() > bucketCache.get(currentBucketKey).getMaxScore()) {
                globalRank += bucket.getUserCount();
            }
        }
        
        return globalRank;
    }

    // 获取排行榜区间
    public List<RankInfo> getRankRange(int start, int end) {
        List<RankInfo> result = new ArrayList<>();
        int currentPosition = 0;
        
        // 从高分到低分遍历分桶
        for (ScoreBucket bucket : bucketCache.values()) {
            String bucketKey = BUCKET_PREFIX + bucket.getBucketKey();
            
            // 获取分桶内的排名区间
            Set<ZSetOperations.TypedTuple<String>> ranksInBucket = 
                redisTemplate.opsForZSet().reverseRangeWithScores(
                    bucketKey, 
                    Math.max(0, start - currentPosition),
                    end - currentPosition
                );
                
            if (ranksInBucket != null) {
                for (ZSetOperations.TypedTuple<String> rank : ranksInBucket) {
                    if (currentPosition >= start && currentPosition <= end) {
                        RankInfo rankInfo = new RankInfo();
                        rankInfo.setUserId(Long.parseLong(rank.getValue()));
                        rankInfo.setScore(rank.getScore());
                        rankInfo.setRank(currentPosition + 1);
                        rankInfo.setBucketKey(bucketKey);
                        result.add(rankInfo);
                    }
                    currentPosition++;
                }
            }
            
            if (currentPosition > end) {
                break;
            }
        }
        
        return result;
    }

    // 获取分桶统计信息
    public List<ScoreBucket> getBucketStats() {
        List<ScoreBucket> stats = new ArrayList<>();
        
        for (ScoreBucket bucket : bucketCache.values()) {
            String bucketKey = BUCKET_PREFIX + bucket.getBucketKey();
            Long count = redisTemplate.opsForZSet().zCard(bucketKey);
            
            ScoreBucket stat = new ScoreBucket();
            stat.setBucketKey(bucket.getBucketKey());
            stat.setMinScore(bucket.getMinScore());
            stat.setMaxScore(bucket.getMaxScore());
            stat.setUserCount(count != null ? count : 0);
            
            stats.add(stat);
        }
        
        return stats;
    }

    // 定期优化分桶
    public void optimizeBuckets() {
        List<ScoreBucket> stats = getBucketStats();
        
        // 检查是否需要分裂或合并分桶
        for (ScoreBucket bucket : stats) {
            if (bucket.getUserCount() > 100_000_000) { // 1亿用户
                // 分裂分桶
                splitBucket(bucket);
            } else if (bucket.getUserCount() < 1_000_000) { // 100万用户
                // 考虑合并分桶
                mergeBucket(bucket);
            }
        }
    }

    // 分裂分桶
    private void splitBucket(ScoreBucket bucket) {
        double mid = (bucket.getMinScore() + bucket.getMaxScore()) / 2;
        
        ScoreBucket lowerBucket = new ScoreBucket();
        lowerBucket.setBucketKey(bucket.getBucketKey() + "_lower");
        lowerBucket.setMinScore(bucket.getMinScore());
        lowerBucket.setMaxScore(mid);
        
        ScoreBucket upperBucket = new ScoreBucket();
        upperBucket.setBucketKey(bucket.getBucketKey() + "_upper");
        upperBucket.setMinScore(mid + 1);
        upperBucket.setMaxScore(bucket.getMaxScore());
        
        // 重新分配用户到新分桶
        redistributeUsers(bucket, lowerBucket, upperBucket);
    }

    // 合并分桶
    private void mergeBucket(ScoreBucket bucket) {
        // 实现合并逻辑
    }

    // 重新分配用户
    private void redistributeUsers(ScoreBucket oldBucket, ScoreBucket lowerBucket, ScoreBucket upperBucket) {
        String oldBucketKey = BUCKET_PREFIX + oldBucket.getBucketKey();
        String lowerBucketKey = BUCKET_PREFIX + lowerBucket.getBucketKey();
        String upperBucketKey = BUCKET_PREFIX + upperBucket.getBucketKey();
        
        long cursor = 0;
        ScanOptions options = ScanOptions.scanOptions().count(1000).build();
        
        do {
            ScanResult<ZSetOperations.TypedTuple<String>> scanResult = 
                redisTemplate.opsForZSet().scan(oldBucketKey, options);
                
            for (ZSetOperations.TypedTuple<String> tuple : scanResult.getContent()) {
                String userId = tuple.getValue();
                double score = tuple.getScore();
                
                if (score <= lowerBucket.getMaxScore()) {
                    redisTemplate.opsForZSet().add(lowerBucketKey, userId, score);
                } else {
                    redisTemplate.opsForZSet().add(upperBucketKey, userId, score);
                }
            }
            
            cursor = scanResult.getCursor();
        } while (cursor != 0);
        
        // 更新分桶缓存
        bucketCache.put(lowerBucketKey, lowerBucket);
        bucketCache.put(upperBucketKey, upperBucket);
        bucketCache.remove(oldBucketKey);
        
        // 删除旧分桶
        redisTemplate.delete(oldBucketKey);
    }
}