联通的网站是谁做的,手游源码平台,艺腾青岛网站建设,赢了网站怎么做的一、理解Django缓存与原生Redis的区别Django缓存APIRedis原生数据类型用途键值对存储字符串(String)简单缓存不支持列表(List)消息队列、最新列表不支持集合(Set)去重、共同好友不支持有序集合(Sorted Set)排行榜、优先级队列不支持哈希(Hash)对象存储、多个字段二、获取原生Re…一、理解Django缓存与原生Redis的区别Django缓存APIRedis原生数据类型用途键值对存储字符串(String)简单缓存不支持列表(List)消息队列、最新列表不支持集合(Set)去重、共同好友不支持有序集合(Sorted Set)排行榜、优先级队列不支持哈希(Hash)对象存储、多个字段二、获取原生Redis连接要在Django中使用Redis的所有数据类型需要获取原生Redis连接# 方法1通过django-redis获取连接fromdjango_redisimportget_redis_connection# 获取默认缓存对应的Redis连接redis_connget_redis_connection(default)# 获取特定缓存的连接session_connget_redis_connection(session)# 方法2直接创建连接不推荐缺少连接池管理importredis redis_clientredis.Redis(hostlocalhost,port6379,db0,passwordNone)三、字符串类型String操作虽然Django缓存API支持字符串但Redis有更多功能1. 带过期时间的操作defstring_operations():Redis字符串操作redis_connget_redis_connection(default)# 1. 设置带过期时间的键redis_conn.setex(session:user:123,3600,session_data)# 1小时后过期# 2. 设置多个键redis_conn.mset({key1:value1,key2:value2})# 3. 获取字符串的一部分redis_conn.set(message,Hello World)partredis_conn.getrange(message,0,4)# Hello# 4. 追加字符串redis_conn.append(message, Redis!)# 5. 获取字符串长度lengthredis_conn.strlen(message)# 6. 设置键的值并返回旧值old_valueredis_conn.getset(counter,100)# 7. 位操作redis_conn.setbit(user:online:2023-10-01,123,1)# 用户123在线redis_conn.getbit(user:online:2023-10-01,123)# 检查是否在线return{part:part,length:length,old_value:old_value}四、列表类型List操作1. 基本列表操作deflist_operations():Redis列表操作redis_connget_redis_connection(default)# 1. 左侧推入元素redis_conn.lpush(recent_users,user_123,user_456,user_789)# 2. 右侧推入元素redis_conn.rpush(task_queue,task1,task2,task3)# 3. 左侧弹出元素first_userredis_conn.lpop(recent_users)# 4. 右侧弹出元素last_taskredis_conn.rpop(task_queue)# 5. 获取列表长度lengthredis_conn.llen(recent_users)# 6. 获取指定范围的元素usersredis_conn.lrange(recent_users,0,9)# 前10个元素# 7. 移除元素redis_conn.lrem(recent_users,1,user_123)# 删除1个user_123# 8. 通过索引获取元素userredis_conn.lindex(recent_users,0)# 获取第一个# 9. 设置指定索引的值redis_conn.lset(recent_users,0,new_user)return{first_user:first_user,last_task:last_task,length:length,users:users}2. 实战消息队列classMessageQueue:基于Redis列表的消息队列def__init__(self,queue_namedefault_queue):self.redis_connget_redis_connection(default)self.queue_namequeue_namedefenqueue(self,message):入队importjson message_strjson.dumps(message)returnself.redis_conn.rpush(self.queue_name,message_str)defdequeue(self,timeout0):出队阻塞importjson resultself.redis_conn.blpop(self.queue_name,timeouttimeout)ifresult:queue_name,message_strresultreturnjson.loads(message_str)returnNonedefsize(self):队列大小returnself.redis_conn.llen(self.queue_name)defpeek(self):查看但不移除importjson message_strself.redis_conn.lindex(self.queue_name,0)ifmessage_str:returnjson.loads(message_str)returnNone# 使用示例defprocess_messages():处理消息队列queueMessageQueue(email_queue)# 生产者queue.enqueue({to:userexample.com,subject:Welcome,body:Hello!})# 消费者whileTrue:messagequeue.dequeue(timeout5)# 阻塞5秒ifmessage:# 处理消息send_email(**message)else:# 没有消息休息一下importtime time.sleep(1)五、集合类型Set操作1. 基本集合操作defset_operations():Redis集合操作redis_connget_redis_connection(default)# 1. 添加元素redis_conn.sadd(user_tags:123,python,django,redis)redis_conn.sadd(user_tags:456,python,javascript,vue)# 2. 获取所有元素tags_123redis_conn.smembers(user_tags:123)# 3. 判断元素是否存在is_memberredis_conn.sismember(user_tags:123,python)# 4. 移除元素redis_conn.srem(user_tags:123,redis)# 5. 获取集合大小sizeredis_conn.scard(user_tags:123)# 6. 随机获取元素random_tagredis_conn.srandmember(user_tags:123,1)# 7. 集合运算# 交集都喜欢的标签common_tagsredis_conn.sinter(user_tags:123,user_tags:456)# 并集所有标签all_tagsredis_conn.sunion(user_tags:123,user_tags:456)# 差集用户123有但456没有的标签diff_tagsredis_conn.sdiff(user_tags:123,user_tags:456)return{tags_123:tags_123,is_member:is_member,size:size,common_tags:common_tags}2. 实战用户标签系统classUserTagSystem:基于Redis集合的用户标签系统def__init__(self):self.redis_connget_redis_connection(default)defadd_tag(self,user_id,tag):为用户添加标签returnself.redis_conn.sadd(fuser_tags:{user_id},tag)defremove_tag(self,user_id,tag):移除用户标签returnself.redis_conn.srem(fuser_tags:{user_id},tag)defget_user_tags(self,user_id):获取用户所有标签returnself.redis_conn.smembers(fuser_tags:{user_id})defhas_tag(self,user_id,tag):检查用户是否有某个标签returnself.redis_conn.sismember(fuser_tags:{user_id},tag)deffind_users_with_tag(self,tag):查找具有某个标签的所有用户# 需要遍历所有用户实际应用中可以使用反向索引patternuser_tags:*matching_users[]# 使用SCAN避免阻塞cursor0whileTrue:cursor,keysself.redis_conn.scan(cursor,pattern,count100)forkeyinkeys:ifself.redis_conn.sismember(key,tag):user_idkey.decode().split(:)[1]matching_users.append(user_id)ifcursor0:breakreturnmatching_usersdefget_common_tags(self,user_ids):获取多个用户的共同标签iflen(user_ids)2:return[]keys[fuser_tags:{user_id}foruser_idinuser_ids]returnself.redis_conn.sinter(*keys)defget_recommended_tags(self,user_id):根据相似用户推荐标签user_tagsself.get_user_tags(user_id)ifnotuser_tags:return[]# 查找有相同标签的用户similar_users[]fortaginuser_tags:users_with_tagself.find_users_with_tag(tag)similar_users.extend(users_with_tag)# 去重并排除自己similar_usersset(similar_users)-{user_id}# 收集相似用户的标签all_tagsset()forsimilar_userinlist(similar_users)[:10]:# 只看前10个相似用户tagsself.get_user_tags(similar_user)all_tags.update(tags)# 排除用户已有的标签recommendedall_tags-set(user_tags)returnlist(recommended)[:10]# 返回前10个推荐标签六、有序集合类型Sorted Set操作1. 基本有序集合操作defsorted_set_operations():Redis有序集合操作redis_connget_redis_connection(default)# 1. 添加元素带分数redis_conn.zadd(leaderboard,{player1:1000,player2:1500,player3:800,player4:2000})# 2. 增加分数redis_conn.zincrby(leaderboard,100,player1)# player1增加100分# 3. 按分数升序获取ascendingredis_conn.zrange(leaderboard,0,-1,withscoresTrue)# 4. 按分数降序获取descendingredis_conn.zrevrange(leaderboard,0,-1,withscoresTrue)# 5. 获取排名rankredis_conn.zrevrank(leaderboard,player1)# 降序排名0为第一# 6. 获取分数scoreredis_conn.zscore(leaderboard,player1)# 7. 获取分数范围内的元素top_playersredis_conn.zrangebyscore(leaderboard,1000,3000,withscoresTrue)# 8. 移除元素redis_conn.zrem(leaderboard,player4)# 9. 获取集合大小sizeredis_conn.zcard(leaderboard)# 10. 统计分数区间的元素数量countredis_conn.zcount(leaderboard,1000,3000)return{rank:rank,score:score,top_3:descending[:3],count:count}2. 实战排行榜系统classLeaderboard:基于Redis有序集合的排行榜系统def__init__(self,leaderboard_namedefault_leaderboard):self.redis_connget_redis_connection(default)self.nameleaderboard_namedefadd_score(self,player_id,score):添加或更新分数returnself.redis_conn.zincrby(self.name,score,player_id)defget_top_n(self,n10):获取前N名returnself.redis_conn.zrevrange(self.name,0,n-1,withscoresTrue)defget_player_rank(self,player_id):获取玩家排名从1开始rankself.redis_conn.zrevrank(self.name,player_id)ifrankisnotNone:returnrank1returnNonedefget_player_score(self,player_id):获取玩家分数returnself.redis_conn.zscore(self.name,player_id)defget_players_around(self,player_id,range_size5):获取玩家附近的排名rankself.get_player_rank(player_id)ifrankisNone:return[]startmax(0,rank-1-range_size)endrank-1range_size playersself.redis_conn.zrevrange(self.name,start,end,withscoresTrue)return[{player:player,score:score,rank:idxstart1}foridx,(player,score)inenumerate(players)]defget_season_leaderboard(self,season_id,top_n100):获取赛季排行榜season_keyf{self.name}:season:{season_id}# 检查是否有缓存的赛季排行榜cachedself.redis_conn.get(season_key)ifcached:importjsonreturnjson.loads(cached)# 计算赛季排行榜season_endint(season_id)# 假设season_id是时间戳season_startseason_end-86400*30# 30天# 获取赛季期间有分数的玩家all_playersself.redis_conn.zrevrangebyscore(f{self.name}:daily:{season_end},season_start,season_end,withscoresFalse)# 计算每个玩家的赛季总分pipeself.redis_conn.pipeline()forplayerinall_players:pipe.zscore(f{self.name}:daily,player)scorespipe.execute()# 创建有序集合season_data{player:scoreforplayer,scoreinzip(all_players,scores)ifscore}ifseason_data:season_leaderboard_keyf{self.name}:season_temp:{season_id}self.redis_conn.zadd(season_leaderboard_key,season_data)# 获取前N名top_playersself.redis_conn.zrevrange(season_leaderboard_key,0,top_n-1,withscoresTrue)# 删除临时键self.redis_conn.delete(season_leaderboard_key)# 缓存结果importjson self.redis_conn.setex(season_key,3600,# 缓存1小时json.dumps(top_players))returntop_playersreturn[]# 游戏积分系统示例classGameScoreSystem:游戏积分系统def__init__(self,game_id):self.redis_connget_redis_connection(default)self.game_idgame_id self.leaderboardLeaderboard(fgame:{game_id}:leaderboard)defrecord_score(self,user_id,score,level,time_spent):记录游戏得分importtime timestampint(time.time())# 1. 更新总排行榜self.leaderboard.add_score(user_id,score)# 2. 记录详细得分使用哈希score_keyfgame:{self.game_id}:score:{user_id}:{timestamp}self.redis_conn.hmset(score_key,{score:score,level:level,time_spent:time_spent,timestamp:timestamp})# 设置24小时过期self.redis_conn.expire(score_key,86400)# 3. 更新每日排行榜date_strtime.strftime(%Y%m%d)daily_keyfgame:{self.game_id}:daily:{date_str}self.redis_conn.zincrby(daily_key,score,user_id)# 4. 更新关卡排行榜level_keyfgame:{self.game_id}:level:{level}self.redis_conn.zincrby(level_key,1,user_id)# 通关次数return{rank:self.leaderboard.get_player_rank(user_id),score:self.leaderboard.get_player_score(user_id)}七、哈希类型Hash操作1. 基本哈希操作defhash_operations():Redis哈希操作redis_connget_redis_connection(default)# 1. 设置单个字段redis_conn.hset(user:123,name,张三)# 2. 设置多个字段redis_conn.hmset(user:123,{age:25,email:zhangsanexample.com,city:北京})# 3. 获取单个字段nameredis_conn.hget(user:123,name)# 4. 获取多个字段fieldsredis_conn.hmget(user:123,[name,age,email])# 5. 获取所有字段all_fieldsredis_conn.hgetall(user:123)# 6. 获取所有字段名field_namesredis_conn.hkeys(user:123)# 7. 获取所有字段值field_valuesredis_conn.hvals(user:123)# 8. 检查字段是否存在existsredis_conn.hexists(user:123,name)# 9. 删除字段redis_conn.hdel(user:123,city)# 10. 获取字段数量field_countredis_conn.hlen(user:123)# 11. 增加数值字段的值redis_conn.hincrby(user:123,age,1)# 增加1岁return{name:name,fields:fields,exists:exists,field_count:field_count}2. 实战购物车系统classShoppingCart:基于Redis哈希的购物车系统def__init__(self,cart_idNone):self.redis_connget_redis_connection(default)self.cart_idcart_idorfcart:{int(time.time())}defadd_item(self,product_id,quantity1,priceNone):添加商品到购物车cart_keyfcart:items:{self.cart_id}# 获取当前数量current_quantityself.redis_conn.hget(cart_key,product_id)ifcurrent_quantity:quantityint(current_quantity)quantity# 更新购物车self.redis_conn.hset(cart_key,product_id,quantity)# 如果提供了价格存储到商品信息ifpriceisnotNone:product_keyfcart:product_info:{self.cart_id}self.redis_conn.hset(product_key,product_id,price)returnquantitydefremove_item(self,product_id,quantityNone):从购物车移除商品cart_keyfcart:items:{self.cart_id}ifquantityisNone:# 移除整个商品self.redis_conn.hdel(cart_key,product_id)# 同时移除价格信息product_keyfcart:product_info:{self.cart_id}self.redis_conn.hdel(product_key,product_id)else:# 减少数量current_quantityself.redis_conn.hget(cart_key,product_id)ifcurrent_quantity:new_quantityint(current_quantity)-quantityifnew_quantity0:self.redis_conn.hset(cart_key,product_id,new_quantity)else:self.remove_item(product_id)defget_items(self):获取购物车所有商品cart_keyfcart:items:{self.cart_id}itemsself.redis_conn.hgetall(cart_key)product_keyfcart:product_info:{self.cart_id}pricesself.redis_conn.hgetall(product_key)result[]forproduct_id,quantityinitems.items():product_idproduct_id.decode()ifisinstance(product_id,bytes)elseproduct_id quantityint(quantity)priceprices.get(product_id.encode()ifisinstance(product_id,str)elseproduct_id)ifprice:pricefloat(price.decode()ifisinstance(price,bytes)elseprice)result.append({product_id:product_id,quantity:quantity,price:price,subtotal:price*quantityifpriceelseNone})returnresultdefget_total(self):计算购物车总价itemsself.get_items()returnsum(item.get(subtotal,0)foriteminitems)defclear(self):清空购物车cart_keyfcart:items:{self.cart_id}product_keyfcart:product_info:{self.cart_id}self.redis_conn.delete(cart_key,product_key)defget_item_count(self):获取购物车商品总数cart_keyfcart:items:{self.cart_id}itemsself.redis_conn.hvals(cart_key)returnsum(int(q)forqinitems)ifitemselse0defcheckout(self):结账itemsself.get_items()ifnotitems:return{success:False,message:购物车为空}# 生成订单importuuid order_idstr(uuid.uuid4())# 保存订单order_keyforder:{order_id}self.redis_conn.hmset(order_key,{cart_id:self.cart_id,total:self.get_total(),created_at:int(time.time()),status:pending})# 保存订单商品foriteminitems:self.redis_conn.hset(f{order_key}:items,item[product_id],item[quantity])# 清空购物车self.clear()return{success:True,order_id:order_id,total:self.get_total(),item_count:len(items)}八、高级数据结构组合应用1. 社交关系系统classSocialNetwork:基于Redis的社交关系系统def__init__(self):self.redis_connget_redis_connection(default)deffollow(self,follower_id,followee_id):关注用户# 添加到关注列表集合self.redis_conn.sadd(fuser:{follower_id}:following,followee_id)self.redis_conn.sadd(fuser:{followee_id}:followers,follower_id)# 记录时间线有序集合importtime timestamptime.time()# 获取关注用户的动态posts_keyfuser:{followee_id}:postspostsself.redis_conn.zrevrangebyscore(posts_key,timestamp-86400*7,timestamp,# 最近7天的帖子withscoresTrue)# 添加到关注者的时间线timeline_keyfuser:{follower_id}:timelineforpost_id,post_timeinposts:self.redis_conn.zadd(timeline_key,{post_id:post_time})# 限制时间线长度self.redis_conn.zremrangebyrank(timeline_key,0,-1000)# 只保留最新的1000条defunfollow(self,follower_id,followee_id):取消关注self.redis_conn.srem(fuser:{follower_id}:following,followee_id)self.redis_conn.srem(fuser:{followee_id}:followers,follower_id)defget_followers(self,user_id,page1,per_page20):获取粉丝列表keyfuser:{user_id}:followersstart(page-1)*per_page endstartper_page-1# 使用集合获取粉丝followersself.redis_conn.smembers(key)# 获取粉丝详细信息followers_list[]forfollower_idinlist(followers)[start:end1]:user_infoself.redis_conn.hgetall(fuser:{follower_id})followers_list.append(user_info)return{followers:followers_list,total:self.redis_conn.scard(key),page:page,per_page:per_page}defget_mutual_followers(self,user1_id,user2_id):获取共同关注的人key1fuser:{user1_id}:followingkey2fuser:{user2_id}:followingreturnself.redis_conn.sinter(key1,key2)defpost_update(self,user_id,content):发布动态importtimeimportuuid post_idstr(uuid.uuid4())timestamptime.time()# 存储帖子内容哈希post_keyfpost:{post_id}self.redis_conn.hmset(post_key,{id:post_id,user_id:user_id,content:content,timestamp:timestamp,likes:0,comments:0})# 添加到用户的帖子列表有序集合user_posts_keyfuser:{user_id}:postsself.redis_conn.zadd(user_posts_key,{post_id:timestamp})# 添加到粉丝的时间线followers_keyfuser:{user_id}:followersfollowersself.redis_conn.smembers(followers_key)forfollower_idinfollowers:timeline_keyfuser:{follower_id}:timelineself.redis_conn.zadd(timeline_key,{post_id:timestamp})self.redis_conn.zremrangebyrank(timeline_key,0,-1000)returnpost_iddefget_timeline(self,user_id,page1,per_page20):获取时间线timeline_keyfuser:{user_id}:timelinestart(page-1)*per_page endstartper_page-1# 获取帖子IDpost_idsself.redis_conn.zrevrange(timeline_key,start,end)# 获取帖子详情posts[]forpost_idinpost_ids:postself.redis_conn.hgetall(fpost:{post_id})ifpost:posts.append(post)return{posts:posts,total:self.redis_conn.zcard(timeline_key),page:page,per_page:per_page}2. 实时统计系统classRealTimeStatistics:实时统计系统def__init__(self):self.redis_connget_redis_connection(default)defrecord_event(self,event_type,user_idNone,dataNone):记录事件importtime timestampint(time.time())# 1. 记录到全局事件流列表event_idf{timestamp}:{user_id}:{event_type}self.redis_conn.lpush(events:stream,event_id)# 2. 按类型计数哈希todaytime.strftime(%Y%m%d)daily_keyfstats:events:{today}self.redis_conn.hincrby(daily_key,event_type,1)# 3. 记录用户事件有序集合ifuser_id:user_events_keyfuser:{user_id}:eventsself.redis_conn.zadd(user_events_key,{event_type:timestamp})# 用户最后活跃时间self.redis_conn.hset(fuser:{user_id}:activity,last_active,timestamp)# 4. 按小时统计哈希hourtime.strftime(%Y%m%d%H)hour_keyfstats:hourly:{hour}self.redis_conn.hincrby(hour_key,event_type,1)returnevent_iddefget_daily_stats(self,date_strNone):获取每日统计importtimeifnotdate_str:date_strtime.strftime(%Y%m%d)keyfstats:events:{date_str}returnself.redis_conn.hgetall(key)defget_hourly_trend(self,event_type,hours24):获取小时趋势importtimefromdatetimeimportdatetime,timedelta nowdatetime.now()trend[]foriinrange(hours):hournow-timedelta(hoursi)hour_strhour.strftime(%Y%m%d%H)keyfstats:hourly:{hour_str}countself.redis_conn.hget(key,event_type)trend.append({hour:hour_str,count:int(count)ifcountelse0})returnlist(reversed(trend))# 从早到晚defget_active_users(self,minutes5):获取活跃用户最近N分钟有活动的用户importtime# 使用集合存储活跃用户active_users_keyusers:active# 获取所有用户patternuser:*:activityactive_usersset()cursor0whileTrue:cursor,keysself.redis_conn.scan(cursor,pattern,count100)forkeyinkeys:# 解析用户IDuser_idkey.decode().split(:)[1]# 检查最后活跃时间last_activeself.redis_conn.hget(key,last_active)iflast_active:last_activeint(last_active)iftime.time()-last_activeminutes*60:active_users.add(user_id)ifcursor0:breakreturnlist(active_users)九、性能优化技巧1. 使用管道Pipelinedefuse_pipeline():使用管道批量操作redis_connget_redis_connection(default)# 创建管道piperedis_conn.pipeline()# 批量添加命令foriinrange(100):pipe.set(fkey:{i},fvalue:{i})pipe.expire(fkey:{i},3600)# 一次执行所有命令resultspipe.execute()returnlen(results)# 200个结果2. 使用Lua脚本defuse_lua_script():使用Lua脚本实现原子操作redis_connget_redis_connection(default)# 原子增加分数并更新排行榜lua_script local player KEYS[1] local score tonumber(ARGV[1]) local leaderboard KEYS[2] -- 增加分数 local new_score redis.call(ZINCRBY, leaderboard, score, player) -- 获取排名 local rank redis.call(ZREVRANK, leaderboard, player) -- 返回结果 return {new_score, rank} # 执行脚本resultredis_conn.eval(lua_script,2,# 2个KEYSplayer_123,# KEYS[1]leaderboard,# KEYS[2]100# ARGV[1])return{new_score:float(result[0]),rank:int(result[1])1ifresult[1]isnotNoneelseNone}十、总结通过原生Redis连接你可以在Django中使用Redis的所有数据类型数据类型主要用途Django对应字符串简单缓存、计数器Django缓存API列表消息队列、时间线无需手动实现集合标签、好友关系无需手动实现有序集合排行榜、优先级队列无需手动实现哈希对象存储、购物车无需手动实现最佳实践简单缓存使用Django缓存API复杂数据结构使用原生Redis连接批量操作使用管道原子操作使用Lua脚本合理设置过期时间避免内存泄漏这样你就可以充分利用Redis的所有功能来构建高性能的Django应用了。