亿级排行榜设计
亿级排行榜设计
问题
假如有10亿的用户,每个用户有自己的分数(score),请你设计一个排行榜,可以根据用户的分数进行排名,每个用户可以知道自己在排行榜中的排名情况。
分析
一说到排行榜,我们的第一反应肯定是使用Redis的ZSET结构实现,那么使用ZSET如何实现对10亿用户进行排序?
ZSET单key
第一种思路,使用ZSET单key,也就是将10亿用户都放入一个ZSET key中,简单明了,这样设计是最简单的,但查询的性能是否可以接受?我们来计算一下:
ZSET结构使用SkipList实现,查询的时间复杂度:ZREVRANK:O(log(N))
查询单个排名的理论时间复杂度:O(log(10^9)) ≈ 30次比较
内存占用分析:
public class PerformanceAnalysis {
// 1. 内存占用(假设每个成员)
private static final long MEMORY_PER_MEMBER =
+ 8 // score (double)
+ 20 // member (假设userId字符串)
+ 8 // 指针
+ 32; // 跳表节点开销
// 10亿用户预估内存
private static final long TOTAL_MEMORY =
MEMORY_PER_MEMBER * 1_000_000_000L; // 约68GB
// 2. 跳表层数
private static final int SKIP_LIST_LEVELS =
(int) Math.log(1_000_000_000L) / Math.log(2); // 约30层
}
假定每个成员对象约68字节,那么10亿用户约需要68GB内存,当然实际场景下,可以考虑对单个成员对象再进行压缩,内存占用可以再小一点。
如果仅从查询的效率考虑,如果仅查询单个成员的排名,Redis的查询耗时是不用担心的,但如果要进行范围查询,那么性能则非常不乐观了。
在实际的生产环境中,如此大的单Key我们一般称为GodKey或者BigKey,是需要极力避免的,我们都知道Redis是纯内存的数据库,执行命令是单线程执行的,对于内存操作是非常的快的,但是如果单个Key值特别大时,容易阻塞线程,影响其他命令的执行,同时对于Redis服务器的网卡也是一个巨大的挑战,试想如果对于68G的 ZSET Key进行范围查询的话,如果1W QPS,Redis怕是直接会被打爆。
如此看来,这个使用单Key存储全部的用户数据,应该行不通的。
ZSET多Key
“一个篱笆三个桩,一个好汉三个帮”,那么既然一个key搞不定,那我们就分多个key就好了吧?那么我们该如何将10亿用户存储到多个key中呢?
正常第一反应,我们就搞10000个key,然后使用userId取模,路由到对应的子key就好了吧?但是我们的需求是,需要可以知道每个用户在排行榜中的排名,显然这种路由的方式,是无法满足需求的。
拆分多个子key,根据某种规则路由的思路是完全没问题的,不能根据userId路由,那我们可否换一种思路,以其他维度进行路由?
换一个思路,如果以分数作为分桶逻辑,初始化N个分桶,每个分桶的划定范围以分数区间划定,当新用户获得分数后,根据其分数路由到所在的分桶中。
那么,用户的排名该如何计算?
计算用户的排名,我们肯定已经知道用户所在的分桶,可以直接获得在当前分桶中的排名,同时比当前分桶分数大的分桶名称我们肯定也可以知道,那么将所有的前面的分桶中的总数量累加,再加上当前分桶的排名,就是该用户的总排名。
问题
综上,多个子key也就是数据分桶的逻辑,基本上是可以满足我们的需求的,但是还有一个问题,就是数据分布不均匀的问题,我们设定的多个分桶,但是每个分桶中的数据量一定不是均匀的,可能会出现数据倾斜的问题,如何解决?
对此,较为简单的处理方式,当某个数据桶的数据量超过一定的阈值后,可以进行拆分,缩小分桶的数值范围,分割成两个子桶,如此循环往复,直到无法再切分为止。
实现demo
下面的Claude给出的代码实现Demo,逻辑可能不严谨,仅作参考:
import lombok.Data;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
public class ScoreBucketRankingSystem {
@Data
public class UserScore {
private long userId;
private double score;
private long timestamp;
}
@Data
public class RankInfo {
private long userId;
private double score;
private long rank;
private String bucketKey;
}
@Data
public class ScoreBucket {
private String bucketKey;
private double minScore;
private double maxScore;
private long userCount;
}
private final RedisTemplate<String, String> redisTemplate;
private final String BUCKET_PREFIX = "rank:bucket:";
private final String BUCKET_META_KEY = "rank:bucket:meta";
// 缓存分桶信息
private final ConcurrentHashMap<String, ScoreBucket> bucketCache = new ConcurrentHashMap<>();
// 初始化分桶配置
private final List<ScoreBucket> initBuckets = new ArrayList<>() ;
public ScoreBucketRankingSystem(RedisTemplate<String, String> redisTemplate) {
this.redisTemplate = redisTemplate;
initBuckets();
}
// 初始化分桶
private void initBuckets() {
for (ScoreBucket bucket : initBuckets) {
String bucketKey = BUCKET_PREFIX + bucket.getBucketKey();
bucketCache.put(bucketKey, bucket);
// 存储分桶元信息
redisTemplate.opsForHash().put(
BUCKET_META_KEY,
bucketKey,
JSON.toJSONString(bucket)
);
}
}
// 更新用户分数
public void updateScore(long userId, double newScore) {
// 1. 找到用户当前所在的分桶
String oldBucketKey = findUserCurrentBucket(userId);
// 2. 计算用户应该在的分桶
String newBucketKey = calculateBucketKey(newScore);
// 3. 如果分桶发生变化,需要迁移
if (oldBucketKey != null && !oldBucketKey.equals(newBucketKey)) {
moveUserToBucket(userId, oldBucketKey, newBucketKey, newScore);
} else {
// 4. 直接更新分数
updateUserScore(userId, newBucketKey, newScore);
}
}
// 查找用户当前所在的分桶
private String findUserCurrentBucket(long userId) {
for (String bucketKey : bucketCache.keySet()) {
Double score = redisTemplate.opsForZSet().score(bucketKey, String.valueOf(userId));
if (score != null) {
return bucketKey;
}
}
return null;
}
// 根据分数计算应该所在的分桶
private String calculateBucketKey(double score) {
for (ScoreBucket bucket : bucketCache.values()) {
if (score >= bucket.getMinScore() && score <= bucket.getMaxScore()) {
return BUCKET_PREFIX + bucket.getBucketKey();
}
}
throw new IllegalArgumentException("Score out of range: " + score);
}
// 将用户从旧分桶移动到新分桶
private void moveUserToBucket(long userId, String oldBucketKey, String newBucketKey, double newScore) {
redisTemplate.execute(new SessionCallback<List<Object>>() {
@Override
public List<Object> execute(RedisOperations operations) {
operations.multi();
// 从旧分桶删除
operations.opsForZSet().remove(oldBucketKey, String.valueOf(userId));
// 更新分桶计数
operations.opsForHash().increment(BUCKET_META_KEY, oldBucketKey + ":count", -1);
operations.opsForHash().increment(BUCKET_META_KEY, newBucketKey + ":count", 1);
// 添加到新分桶
operations.opsForZSet().add(newBucketKey, String.valueOf(userId), newScore);
return operations.exec();
}
});
}
// 更新用户分数
private void updateUserScore(long userId, String bucketKey, double newScore) {
redisTemplate.opsForZSet().add(bucketKey, String.valueOf(userId), newScore);
}
// 获取用户排名信息
public RankInfo getUserRankInfo(long userId) {
String bucketKey = findUserCurrentBucket(userId);
if (bucketKey == null) {
return null;
}
Double score = redisTemplate.opsForZSet().score(bucketKey, String.valueOf(userId));
Long rank = redisTemplate.opsForZSet().reverseRank(bucketKey, String.valueOf(userId));
if (score == null || rank == null) {
return null;
}
// 计算全局排名
long globalRank = calculateGlobalRank(bucketKey, rank);
RankInfo rankInfo = new RankInfo();
rankInfo.setUserId(userId);
rankInfo.setScore(score);
rankInfo.setRank(globalRank);
rankInfo.setBucketKey(bucketKey);
return rankInfo;
}
// 计算全局排名
private long calculateGlobalRank(String currentBucketKey, long rankInBucket) {
long globalRank = rankInBucket;
// 加上更高分数段的用户数
for (ScoreBucket bucket : bucketCache.values()) {
String bucketKey = BUCKET_PREFIX + bucket.getBucketKey();
if (bucket.getMaxScore() > bucketCache.get(currentBucketKey).getMaxScore()) {
globalRank += bucket.getUserCount();
}
}
return globalRank;
}
// 获取排行榜区间
public List<RankInfo> getRankRange(int start, int end) {
List<RankInfo> result = new ArrayList<>();
int currentPosition = 0;
// 从高分到低分遍历分桶
for (ScoreBucket bucket : bucketCache.values()) {
String bucketKey = BUCKET_PREFIX + bucket.getBucketKey();
// 获取分桶内的排名区间
Set<ZSetOperations.TypedTuple<String>> ranksInBucket =
redisTemplate.opsForZSet().reverseRangeWithScores(
bucketKey,
Math.max(0, start - currentPosition),
end - currentPosition
);
if (ranksInBucket != null) {
for (ZSetOperations.TypedTuple<String> rank : ranksInBucket) {
if (currentPosition >= start && currentPosition <= end) {
RankInfo rankInfo = new RankInfo();
rankInfo.setUserId(Long.parseLong(rank.getValue()));
rankInfo.setScore(rank.getScore());
rankInfo.setRank(currentPosition + 1);
rankInfo.setBucketKey(bucketKey);
result.add(rankInfo);
}
currentPosition++;
}
}
if (currentPosition > end) {
break;
}
}
return result;
}
// 获取分桶统计信息
public List<ScoreBucket> getBucketStats() {
List<ScoreBucket> stats = new ArrayList<>();
for (ScoreBucket bucket : bucketCache.values()) {
String bucketKey = BUCKET_PREFIX + bucket.getBucketKey();
Long count = redisTemplate.opsForZSet().zCard(bucketKey);
ScoreBucket stat = new ScoreBucket();
stat.setBucketKey(bucket.getBucketKey());
stat.setMinScore(bucket.getMinScore());
stat.setMaxScore(bucket.getMaxScore());
stat.setUserCount(count != null ? count : 0);
stats.add(stat);
}
return stats;
}
// 定期优化分桶
public void optimizeBuckets() {
List<ScoreBucket> stats = getBucketStats();
// 检查是否需要分裂或合并分桶
for (ScoreBucket bucket : stats) {
if (bucket.getUserCount() > 100_000_000) { // 1亿用户
// 分裂分桶
splitBucket(bucket);
} else if (bucket.getUserCount() < 1_000_000) { // 100万用户
// 考虑合并分桶
mergeBucket(bucket);
}
}
}
// 分裂分桶
private void splitBucket(ScoreBucket bucket) {
double mid = (bucket.getMinScore() + bucket.getMaxScore()) / 2;
ScoreBucket lowerBucket = new ScoreBucket();
lowerBucket.setBucketKey(bucket.getBucketKey() + "_lower");
lowerBucket.setMinScore(bucket.getMinScore());
lowerBucket.setMaxScore(mid);
ScoreBucket upperBucket = new ScoreBucket();
upperBucket.setBucketKey(bucket.getBucketKey() + "_upper");
upperBucket.setMinScore(mid + 1);
upperBucket.setMaxScore(bucket.getMaxScore());
// 重新分配用户到新分桶
redistributeUsers(bucket, lowerBucket, upperBucket);
}
// 合并分桶
private void mergeBucket(ScoreBucket bucket) {
// 实现合并逻辑
}
// 重新分配用户
private void redistributeUsers(ScoreBucket oldBucket, ScoreBucket lowerBucket, ScoreBucket upperBucket) {
String oldBucketKey = BUCKET_PREFIX + oldBucket.getBucketKey();
String lowerBucketKey = BUCKET_PREFIX + lowerBucket.getBucketKey();
String upperBucketKey = BUCKET_PREFIX + upperBucket.getBucketKey();
long cursor = 0;
ScanOptions options = ScanOptions.scanOptions().count(1000).build();
do {
ScanResult<ZSetOperations.TypedTuple<String>> scanResult =
redisTemplate.opsForZSet().scan(oldBucketKey, options);
for (ZSetOperations.TypedTuple<String> tuple : scanResult.getContent()) {
String userId = tuple.getValue();
double score = tuple.getScore();
if (score <= lowerBucket.getMaxScore()) {
redisTemplate.opsForZSet().add(lowerBucketKey, userId, score);
} else {
redisTemplate.opsForZSet().add(upperBucketKey, userId, score);
}
}
cursor = scanResult.getCursor();
} while (cursor != 0);
// 更新分桶缓存
bucketCache.put(lowerBucketKey, lowerBucket);
bucketCache.put(upperBucketKey, upperBucket);
bucketCache.remove(oldBucketKey);
// 删除旧分桶
redisTemplate.delete(oldBucketKey);
}
}