system design 俄罗斯大叔系列
分布消息队列

假设有个producer和consumer,需要互相通信。

两个选项:
① 直接 synchronous communication,等response
pro:简单 可快速实现
con:处理失败更困难
→ 要考虑失败重试、请求过多、加快速度

② 引入可以异步通信的新组件,called 队列
它是分布式的,因为数据存在n台机器上
但是和标题有点不同,标题的情况会发给n个subscriber,这里一个queue有且只有一个customer

Requirement

Functional:

  • sendMessage(messageBody)
  • receiveMessage
  • 可能:添加/删除api

Non-Functional:

  • scalable
  • highly available(survives failures)
  • highly performant
  • durable persistence(可靠不丢)
  • 可能:不重复提交/安全性/cost

High-level Architecture

需要一个虚拟ip,把hostname解析给LB system。
所以也需要一个LB
next, 一个前端服务,负责处理身份验证等
一个metadata database,负责存一些信息(名称、日期等)
db之前需要一个service,处理对db的调用
需要一个地方来存queue message,所以需要一个backend web service,负责消息的persistence and processing

Virtual IP & Load Balancer

(LB很大,除非问否则不要偏离主题)

但是需要解释LB为什么可以帮助高throughput和availability。

domain name被hit时,request 被transferred to 其中一个在DNS注册过的VirtualIP上,然后VirtuaIP被resolve to一个LB service上,LB知道这些hostname

LB似乎是single point of failure,怎么办?

如果LB到达负载上限limits are reached怎么办?

解决high availability:

  • 用primary and secondary nodes的概念
  • primary node 接受连接并服务请求,secondary node 监视primary node
  • 如果主节点挂了,辅助节点上。

解决scalability:

  • 使用多个visual IPs,called vip partitioning
  • 把A记录的副本们分配给多个LB服务的同个DNS name上,这样请求就被分区到多个LB

FrontEnd web service

  • 轻量级 lightweight web service
  • 由位于多个数据中心的无状态机器stateless service组成deploy

负责:

  • request validation 请求验证
    有助于确保请求中存在所有必需的参数,并且这些参数的值符合约束条件,并且消息大小不超过指定阈值

  • authentication / authorization 身份验证和授权
    验证消息发送者是否message queue服务的registered customer
    是否可以发到它想发的那个队列

  • TLS(SSL) termination SSL 终止
    TLS是一种提供隐私和数据完整性的协议
    TLS 终止是指解密请求并将解密的请求传递给后端服务的过程
    最好在frontend 上进行这步,因为LB的TLS很贵
    通常不是由 FrontEnd 服务本身处理,而是由一个单独运行在同一主机上的 HTTP 进程代理处理。

  • server-side data encryption 服务器端数据加密
    保证消息安全存在后端,所以frontend收到消息就加密 encrypt
    只在return 给customer时解密

  • caching 缓存
    缓存副本。
    缓存常用queue的metadata,和用来身份验证和授权的用户身份信息

  • rate limiting (also known as throttling) 速率限制(也称为限制)
    可以用漏桶算法

  • request dispatching 请求调度
    frontend服务至少远程调用两个web:metadata service & backend service
    并为它俩创建http client,且保证它们isolated,means 如果一个服务速度下降,另一个不受影响impacted
    一些常见的模式,like bulkhead he circuit breaker可以帮忙隔离,当远程调用挂的时候让系统更有弹性

  • request deduplication 请求重复数据删除
    如果消息没到达client可能有重复请求,就可能有重复数据。
    所以cache提前存用过的request ID,来过滤

  • usage data collection 使用数据收集

Metadata Service:

每次创建queue时,就把它的信息存在DB里

而这个service就是frontend和DB之间的cache层

它读多写少,只有创建新queue时写,而每次消息到达时读

强一致性不强求

有不同的organize缓存集群的方法

  1. 缓存小:将整个数据set存在每个cluster的node上
    frontend host可以随便调一个metadata service host,因为所有cache cluter上都一样有完整信息
  2. data大:分片
    frontend知道哪个分片存了数据并直接调用这个分片
  3. 仍然分片,但frontend不知道地址
    so frontend随机调metadata service host,host知道转发到哪里

对①,可以引入LB,因为传到所有host都一样
对②和③,metadata host代表一个一致性哈希环

** 刚刚这个组件集:

visual IP + LB + frontend web service + metadata web service,一个在DB之上的缓存层,非常流行,广泛适用。

Backend Service

如果卡住,先问问题:like:

  • where and how do we store messages?
    → DB可以吗?可以但不优。因为分布message queue有高throughput,都会被store到DB里。
    so 问题变成:构建能处理high throughput的DB的问题
    这种DB有现成的。初级回答:用第三方db。高级:需要解释如何构建分布式db
    → 那么就用RAM 和 local disk of a backend host
    长久存的用本地disk,短暂存的用memory

  • how do we replicate data?
    → copies of messages send给其它hots 备用
    replicate within a group of hosts 在集群内发送消息的拷贝,防止单点故障

  • frontend host 怎么知道从哪搜?怎么知道data send to哪
    → metadata service

    message到达frontend,frontend问metadata service发到哪个backend,message发过去并复制
    当接收消息来的时候,frontend talks to Metadata service确定存的是哪个backend host

后端host如何互相connect

两种

① Leader-Follower 关系

这个queue的所有请求(send & receive)都传到这个leader实例上

  • send message传到frontend,到达q1队列 ——> 前端服务call metadata service来识别这个q的后端leader实例,like B ——> message传给leader B,B负责复制
  • receive message到达frontend ——> 也发个request to metadata找这个q的leader ——>从leader instance里面检索消息,leader负责清理用过的data

需要一个组件,帮忙进行leader的选举和管理,called in-cluster manager
它负责维护一个在queues、leaders、followers之间的mapping
这个组件很复杂,得reliable scalable performant

② 一个小型集群,每个cluster都有分布在多个数据中心的3-4个机器

  • send message来时,as before 也要调用metadata service识别哪个cluster ——> 然后调这个cluster的随便一个instance,这个instance负责复制这个cluster所有node
  • receive message来时,仍然调用metadata service确定哪个cluster ——> 调用这个cluster随便一个host,搜消息,选定的host负责清理message

不需要选leader的组件,但是仍然需要组件帮忙管理queue到cluster的分配,called out-cluster manager

它负责维护一个queue和cluster之间的mapping

in-cluster manager VS out-cluster manager

  • in管理cluster内的队列分配,out管理跨cluster的队列分配

  • in需要了解cluster里的每个instance,out不了解instance但需要了解每个cluster

  • in监听来自instance的heartbeat,out监控每个cluster的运行状况

  • in 处理host failures,instance可能死亡或新添,

    out 追踪每个cluster的利用率,处理过热cluster,如果容量到上限了就不添加

big queue?

当一个queue来了太多messages,leader (in A)或cluster(in B)无法负载时:

in将queue拆成partitions,每个partition都有一个leader

out可以跨cluster拆分queue,so queue可以在多个cluster之间均分

其它

  • queue 创建和删除
    可以自动创建,when 第一条message到达frontend时,也可以创建create API
    delete 要谨慎,所以一般不公开接口,用命令行删

  • 消息 删除
    ① 消费完不立刻删除
    所以consumer得一直知道它们消费了什么
    所以必须维持有序,并跟踪offset
    几天之后mssage可以通过一些job删除掉
    kafka就这样
    ② 也不删除,而是立刻标记为不可见,让其他consumer无法检索到
    amazon SQS就这样
    收到消息的consumer,需要call deleteAPI,从后端来删除,如果没删,message就会可见,可能被重复消费

  • message replication
    可以同步or异步复制
    同步:复制完才return给producer
    持久性高,延迟也高
    异步:存完一份就返回,稍后复制
    性能高,但可能丢

  • messages delivery semantics 传递保证
    三种,只有一次,最多一次,最少一次
    为啥要这么多种?因为分布式系统可能有很多地方出bug,消息没办法处理
    所以要最少一次,保证了持久、可用、性能之间的平衡
    防丢:
    生产消费端:确认机制
    中间:批量,副本 cluster leader
    防重:=幂等
    生产及过程中:全局唯一id
    消费:要考虑入库时丢失:①事务 ② 乐观锁:version

  • push vs pull

    • push:新消息到达message时,通知consumer
      queue大量写,写能力 存储能力:

      • 多线程 写入性能好的db引擎
      • 定期清理
      • 读之前先判断
        适合订阅/粉丝量小的业务
    • pull:consumer不断发送检索消息的请求,当queue里有新消息时,发给consumer
      解决了推的延迟,存储成本降低,扩展性更好
      但 查询聚合成本高:缓存聚合.
      优化:
      ①只缓存最近的
      ②增加多个副本
      pull对queue来说简单,push对consumer来说简单

  • FIFO
    分布式里很难维持order
    所以要么不要求顺序,要么不要求速度

  • security
    通过https使用ssl加密
    还可以在后端存的时候加密

  • monitoring
    monitor每个组件:fronted ,metadata,backend
    要监控系统的health 并让consumer track它们的队列状态
    每个服务都必须emit metrics且写log
    还要能设置警报,所以要和监控系统集成

总结

可扩展吗?yes 每个组件都

高可用吗?yes 没有单点,每个组件都部在多个数据中心

高性能吗?depend on 软硬件和网络

耐用吗?yes,有复制数据,保证不丢