网站的网络公司,wordpress换行不显示,it外包数据,深圳网站 商城制作这是一份关于 “短链接访问统计系统”#xff08;基于 RocketMQ#xff09;的笔记#xff0c;整合了我们之前讨论的所有核心知识点、代码逻辑、设计思想和技术细节#xff0c;方便你系统复习和查阅。短链接访问统计系统#xff08;基于 RocketMQ#xff09;笔记一、系统核…这是一份关于 “短链接访问统计系统”基于 RocketMQ的笔记整合了我们之前讨论的所有核心知识点、代码逻辑、设计思想和技术细节方便你系统复习和查阅。短链接访问统计系统基于 RocketMQ笔记一、系统核心目标核心功能记录短链接的每一次访问并进行多维度的统计分析PV/UV/UIP、地域、设备、浏览器、操作系统等。核心挑战高性能短链接跳转是核心入口必须保证用户访问速度快。高并发可能面临瞬间大量的访问请求如热点链接。数据可靠每一次访问的统计数据都不能丢失也不能重复计算。系统解耦统计功能不能影响核心的跳转功能。二、技术架构与核心组件整个系统采用生产者 - 消费者Producer-Consumer模式核心组件如下生产者ProducerShortLinkStatsSaveProducer角色在短链接被访问时负责收集访问数据并发送到消息队列。核心任务将同步的统计入库操作转化为异步的消息发送。消息队列Message QueueRocketMQ角色作为生产者和消费者之间的 “桥梁”存储和转发消息。核心任务解耦、削峰填谷、保证消息可靠传输。消费者ConsumerShortLinkStatsSaveConsumer角色监听消息队列消费统计消息并将数据持久化到数据库。核心任务执行耗时的统计入库操作保证数据最终一致性。幂等处理器Idempotent HandlerMessageQueueIdempotentHandler角色基于 Redis 实现防止同一条消息被重复消费导致统计数据重复。核心任务保证消费的幂等性。分布式锁Distributed LockRedisson RReadWriteLock角色在消费者入库时保证并发场景下数据的一致性。核心任务防止在统计过程中短链接的 GID 被修改导致数据归属错误。三、核心流程详解1. 生产者流程 (ShortLinkStatsSaveProducer)java运行public void send(MapString, String producerMap) { // 1. 生成唯一的消息KeyUUID用于幂等性保证 String keys UUID.randomUUID().toString(); producerMap.put(keys, keys); // 2. 构建RocketMQ消息设置消息体和消息头 MessageMapString, String build MessageBuilder .withPayload(producerMap) // 消息体包含统计数据 .setHeader(MessageConst.PROPERTY_KEYS, keys) // 消息头设置消息Key .build(); try { // 3. 同步发送消息到指定的Topic SendResult sendResult rocketMQTemplate.syncSend(statsSaveTopic, build, 2000L); log.info(消息发送成功ID: {}, Keys: {}, sendResult.getMsgId(), keys); } catch (Throwable ex) { log.error(消息发送失败, ex); // 可扩展发送失败后的重试或告警逻辑 } }关键操作生成消息 Key使用UUID.randomUUID()确保每条消息的唯一性是实现幂等的基础。同步发送 (syncSend)最可靠的发送方式。生产者会等待 Broker 返回发送结果确保消息至少被 Broker 接收一次。设置超时时间2000L2 秒防止生产者无限期阻塞。2. 消费者流程 (ShortLinkStatsSaveConsumer)java运行Override public void onMessage(MapString, String producerMap) { String keys producerMap.get(keys); // 核心步骤1幂等校验 if (!messageQueueIdempotentHandler.isMessageProcessed(keys)) { if (messageQueueIdempotentHandler.isAccomplish(keys)) { return; // 消息已处理完成直接返回 } throw new ServiceException(消息处理中需要重试); // 触发MQ重试 } try { String fullShortUrl producerMap.get(fullShortUrl); if (StrUtil.isNotBlank(fullShortUrl)) { String gid producerMap.get(gid); ShortLinkStatsRecordDTO statsRecord JSON.parseObject(producerMap.get(statsRecord), ShortLinkStatsRecordDTO.class); // 核心步骤2执行业务逻辑 actualSaveShortLinkStats(fullShortUrl, gid, statsRecord); } } catch (Throwable ex) { log.error(消费异常, ex); try { // 核心步骤3异常处理 messageQueueIdempotentHandler.delMessageProcessed(keys); // 删除幂等标识允许重试 } catch (Throwable remoteEx) { log.error(删除幂等标识失败, remoteEx); } throw ex; // 抛出异常触发RocketMQ重试 } // 核心步骤4标记完成 messageQueueIdempotentHandler.setAccomplish(keys); // 标记消息处理完成 }关键操作幂等校验通过MessageQueueIdempotentHandler确保消息只被处理一次。执行业务逻辑调用actualSaveShortLinkStats方法将统计数据入库。异常处理消费失败时必须删除幂等标识否则消息将无法被重试。抛出异常触发 RocketMQ 的重试机制。标记完成消费成功后标记消息为 “已完成”防止后续重复消费。3. 实际入库逻辑 (actualSaveShortLinkStats)java运行public void actualSaveShortLinkStats(String fullShortUrl, String gid, ShortLinkStatsRecordDTO statsRecord) { // 核心步骤1加分布式读锁 RReadWriteLock readWriteLock redissonClient.getReadWriteLock(String.format(LOCK_GID_UPDATE_KEY, fullShortUrl)); RLock rLock readWriteLock.readLock(); rLock.lock(); // 加锁 try { // 1. 补全GID如果生产者未传入 if (StrUtil.isBlank(gid)) { ShortLinkGotoDO shortLinkGotoDO shortLinkGotoMapper.selectOne(Wrappers.lambdaQuery(ShortLinkGotoDO.class) .eq(ShortLinkGotoDO::getFullShortUrl, fullShortUrl)); gid shortLinkGotoDO.getGid(); } // 2. 解析时间维度小时、星期 int hour DateUtil.hour(new Date(), true); int weekValue DateUtil.dayOfWeekEnum(new Date()).getIso8601Value(); // 核心步骤2多维度统计入库 // a. PV/UV/UIP统计 LinkAccessStatsDO linkAccessStatsDO LinkAccessStatsDO.builder()...build(); linkAccessStatsMapper.shortLinkStats(linkAccessStatsDO); // 自定义的增量更新方法 // b. 地域统计调用高德API MapString, Object localeParamMap new HashMap(); localeParamMap.put(key, statsLocaleAmapKey); localeParamMap.put(ip, statsRecord.getRemoteAddr()); String localeResultStr HttpUtil.get(AMAP_REMOTE_URL, localeParamMap); // ... 解析结果并入库 ... // c. 操作系统、浏览器、设备、网络等统计类似 LinkOsStatsDO linkOsStatsDO LinkOsStatsDO.builder()...build(); linkOsStatsMapper.shortLinkOsState(linkOsStatsDO); // ... // d. 原始访问日志 LinkAccessLogsDO linkAccessLogsDO LinkAccessLogsDO.builder()...build(); linkAccessLogsMapper.insert(linkAccessLogsDO); // e. 更新短链接核心表的总统计 shortLinkMapper.incrementStats(gid, fullShortUrl, 1, ...); // f. 今日统计 LinkStatsTodayDO linkStatsTodayDO LinkStatsTodayDO.builder()...build(); linkStatsTodayMapper.shortLinkTodayState(linkStatsTodayDO); } catch (Throwable ex) { log.error(统计入库异常, ex); } finally { // 核心步骤3释放锁 rLock.unlock(); // 最终释放锁避免死锁 } }关键操作加分布式读锁锁的 KeyLOCK_GID_UPDATE_KEY fullShortUrl保证锁的粒度是单个短链接避免全局锁。读锁 (Read Lock)允许多个读操作并发执行多个统计线程可以同时处理同一个短链接。阻塞写操作修改 GID 的操作保证在统计过程中 GID 不会被修改。多维度统计入库增量更新大部分统计表如link_access_stats使用自定义的shortLinkStats方法实现 “不存在则插入存在则更新累加” 的逻辑避免全量更新的性能问题。原始日志link_access_logs表直接插入原始访问记录用于后续的明细查询和数据分析。释放锁在finally块中释放锁确保无论代码是否异常锁都能被释放防止死锁。4. 幂等处理器 (MessageQueueIdempotentHandler)基于 Redis 的SETNXsetIfAbsent命令实现保证分布式环境下的原子性。方法名作用Redis KeyValue核心逻辑isMessageProcessed判断消息是否可被处理short-link:idempotent:{keys}0使用setIfAbsent尝试设置 Key。-trueKey 不存在消息可处理设置 Value 为0处理中。-falseKey 已存在消息不可处理。isAccomplish判断消息是否处理完成short-link:idempotent:{keys}1检查 Key 对应的 Value 是否为1。setAccomplish标记消息处理完成short-link:idempotent:{keys}1将 Value 设置为1并设置过期时间。delMessageProcessed删除幂等标识short-link:idempotent:{keys}-删除 Key允许消息被重试。核心思想先占坑后处理。处理前用SETNX占坑Value0。处理中其他线程看到坑被占要么等待要么拒绝。处理成功将坑标记为完成Value1。处理失败把坑让出来删除 Key。四、为什么选择 RocketMQ异步解耦将耗时的统计入库操作从同步的跳转流程中剥离出来极大提升了核心接口的响应速度。削峰填谷面对突发的高并发访问RocketMQ 可以缓冲大量消息避免直接冲击数据库保证系统稳定。消息可靠性生产者同步发送确保消息至少被 Broker 接收一次。Broker 持久化消息存储在磁盘即使 Broker 宕机消息也不会丢失。消费者重试机制消费失败时RocketMQ 会自动重试保证消息最终被处理。负载均衡RocketMQ 的消费者组Consumer Group机制可以轻松实现多个消费者实例共同消费一个 Topic 的消息提高处理能力。可扩展性水平扩展可以通过增加 Broker 节点和消费者实例来提升系统的吞吐量。功能扩展RocketMQ 支持定时消息、事务消息等高级特性便于未来功能扩展。五、核心技术亮点与设计模式读写锁分离使用 Redisson 的RReadWriteLock统计入库时加读锁修改 GID 时加写锁。好处允许多个统计操作并发执行同时保证 GID 不被并发修改兼顾了性能和数据一致性。幂等性设计基于 Redis 的SETNX命令是分布式系统中实现幂等的经典方案。好处有效防止了因网络抖动或 MQ 重试导致的重复消费问题保证了统计数据的准确性。增量更新统计表的INSERT OR UPDATE操作如linkAccessStatsMapper.shortLinkStats。好处相比先查询后更新减少了一次数据库交互提升了入库性能。最小锁粒度锁的 Key 是fullShortUrl而不是全局锁。好处只对同一个短链接的操作进行同步不同短链接的操作互不影响最大化了并发性能。异常处理与重试消费失败时删除幂等标识并抛出异常触发 MQ 重试。好处保证了消息的最终一致性即使中间环节出错数据也不会丢失。六、总结这套短链接统计系统是一个高性能、高可用、高并发的分布式系统设计典范。它巧妙地运用了 RocketMQ 实现异步解耦和削峰填谷通过 Redis 实现了分布式锁和幂等性保证最终达到了 “用户访问快、统计数据准、系统运行稳” 的目标。核心设计思想可以概括为将同步操作异步化将串行操作并行化在性能和数据一致性之间找到最佳平衡点。