system design 俄罗斯大叔系列
top k 问题
很常见的需求,like google一百个搜索次数最多的key word,或者y2b上观看最多的视频,转发最多的推等
mapreduce可以解决但不够
需要尽可能快,返回实时列表,比如最近几分钟的最热。
是一个流问题stream processing
Founctional:
- topK(k,startTime,endTime)
Non-Functional:
-
scalable(数据增长时候可以scale)
-
highly available(容错)
-
highly performance(few tens of millisecondes to return top 100 list)
高性能暗示:数据应该已经提前算好了。
-
accurate
从单host开始:
假设所有data可以导入它的内存。like video观看量
每当有用户打开视频,就在log里记:把它的video id添加到list
hash保存:{id:count} —>
① sort 这个hash O(nlogn)
② 把它加入head(更快)O(nlogk)
不能scale。如何scale:
引入LB,将每个事件分发到集群中的主机上, called processor hosts,算好再集中存。
吞吐增加了,但是内存不够。
这些都是在内存中算的,hash表会变很大。
和分布式缓存的解决方法一样,这里选择将数据分成更小的块。
引入新组件,叫 Data Partitioner。用来把每个video id路由到它自己的processor host,这样每个processor host只存部分data
(每个host)仍然建一个hash,建一个heap,把所有数据放进去,
这样每个host都内部排好序
然后——需要合并n个排序列表。
如果hash所有的数据量可能很大,内存放不下。
综上:增加了scalability和throughput,但是还有缺点。
缺点:目前为止的数据集都是有限的,但是数据流本质是无限的。
主机只能在一段时间内累积数据。假设1分钟内的前k个,那么k之后的其它信息都会丢失。如果所有数据都存,就会又存不下。所以才会有mapreduce,用来将所有数据存在磁盘上,并用批处理框架计算一个top k列表。
这种架构的另一个问题,不太简单:
- 每次引入数据分区,都需要处理数据复制,使得每个分区的副本都存储在
多个节点。 - 需要考虑重新平衡,将新节点添加到集群或从中删除。
- 需要处理热分区。
但是不重要,先跳过。
研究一下:前k问题有更简便解法吗?有。如果准确性不太重要的话。
called count-min sketch 一个二维数组
width有几千,height很小代表hash functions
(原理待补)
如何选择宽高:一些公式
用这个sketch代替可能会变大的hash
目前为止的high-level architecture
用户点击视频→请求进入网关→网关将请求路由到后端
网关:关注一下日志:
log generatioin:通常用来监控、审计、计费
这里用来计算 视频点击数
实现一个后台进程从log里面读数据,进行一些初始聚合,然后发送给后面进一步处理
在api gateway host的memory里分配一个缓冲区,读取日志,构建上面那种hash来计数
缓冲区大小有限制,满了会刷新,如果没满就根据时间刷新flush
也可以 用其它方案,like
即时聚合,就不写入log。
或者完全跳过网关,直接把video信息往后传
** 最好把数据序列化成二进制,节省资源
这些都基于api上的资源: memory, CPU, network or disk IO.
→ 初步聚合的数据传到分布消息系统里,like kafka。在kafka里消息被拆到多个分区,每个分区可能在不同的机器上。拆分方法没有特别的,随机比较均匀。
→ 数据管道分成两部分:快道和慢道
快道:计算近似值,几秒钟完事
有个服务called fast processor,可以在很短的时间间隔内创建 count-min 草图并聚合数据。
由于内存不再是问题,所有不需要再分区。
只要在内存里存数据,即使时间再短也要考虑replication,否则就不高可用。
因为count-min 已经算出了近似答案,所以偶尔丢数据可接受,只要slow path不丢,几小时后就会有精确数据。
→ 每隔几秒,结果就会刷新到storage里。可能会延迟几分钟。
另外一个pro/con:storage组件是一种db,存最终的前k的结果list存like1分钟or5分钟
** pipeline如何降低请求率?【对流处理和数据聚合很重要】
点击几百万→落在gateway主机上(集群可能很大including数千台)→每台主机预聚合(几秒钟但减小很多)→ fast proceesor(比api gateway集群小的多)进一步聚合(又几秒钟)→ 到达db,只有一小部分
慢道:计算精确值,几分钟或几小时
仍要聚合,但要精确:
①用mapreduce。所有数据存到distributed file system like HDFS 或 对象存储like S3
运行两个mapreduce,一个计数,一个计算实际的top K list
有点慢
② 用data partitioner批量读数据并解析成单个事件
partitioner获取每个视频并将消息发给另一个kafka集群中的对应分区
** 每次分区时都要考虑hot shard。交给partitioner service处理。
好,kafka/kinesis里的每个分区都存一个数据子集,kafka之流会复制数据
需要一个组件来从每个分区读数据并进一步聚合,called partition processor,在几分钟之内聚合数据,处理成预定义大小的文件,并发到分布式文件系统,给mapreduce下一步处理。
partition processor还可以将汇总信息发到storage service
——> storage 现在有三个path给它发数据
可以根据实际情况调整,like 如果不要求精确,可以不要slow path;如果要准确且要快,就要对数据分区并在内存里聚合;如果不要求时间、要准且数据量大,就用mapreduce
数据流:
假设user1点开ABC,user2点开AC
每个请求都发到一个api gateway host上,信息记在log里。然后在这里聚合观看数据,并传到分布式消息系统上。
聚合就是说记在一个hash表里,计算在最后几秒钟内看了几次。
当哈希表到达size上限或指定时限时,就发送结果、清空,并建立新表。
所以host1发A和B,host2发A和C,host3发A
fast:
fast processor1接收并处理消息1,fast processor2 do 消息2,几个fast processor合一起就是结果
slow:
用data partitioner。like,message1去partitioner1和2,message2去p2和p3,m3去p2
计数用mapreduce读这些文件并按某个时间间隔(例如 1 小时)创建一个最终列表。
计算前k的mapreduce计算前k
mapreduce job:
计数 MR:
input: 一组文件
split:文件被并行任务分成独立的块
map:record reader解析数据,以键值对形式传给mapper,在mapper转成中间对intermediate pairs
然后partitioner从mapper里面拿到键值对并分区写入每个映射器的本地磁盘,分区时,确保同一个键的值都加在一起了,并且去同一个reducer
shuffle and sort:mapredcue获取partitioners生成的输出文件,并通过http将它们下载到reducer machine里,sort把keys合在一起。
reducer:轻松迭代值
output:reduce task的结果写到file system
topk MR:
input:上面的output,拆分为块发到本地mapper,
map:分布计算本地前k list
reduce:聚合上个结果
output:输出最终结果
data retrieval 数据检索:
API Gateway 收到检索请求,把数据检索的调用路由到存储service
storage 从底层db检索数据。
假设fast path每隔一分钟创建一个近似的topKlist,slow path每隔一小时创建精确的topKlist。
user请求一个最后五分钟的topK,就just merge5个1min的list,结果是近似的。
user请求一个一小时topK,返回精确表
但是user要请i去2小时topK,没办法返回精确表。如果merger两个小时表,结果就不精确了。
其它:
API Gateway work a lot: Authentication身份验证, SSL terminationSSL 终止, rate limiting 速率限制, request routing请求路由, response caching 响应缓存 are among the few
如果它资源不够work了怎么办?
一般都会有一个进程专门记录日志,发送到专门存的地方进一步处理
所以可以run日志,并在专门的cluster处理聚合事宜。其余设置不变
count-min有其它替代方法吗?
有。like Lossy Counting, Space Saving and Sticky Sampling. 有损计数、节省空间和粘性采样
k有多大?
k到处可见:fastpath:检索时合并前k;slowpath:mapper把topK发到reducer
所以k不能任意arbitrary大。thousand OK,但是上万就会性能下降。
如果有更大需求,需要带宽和存储
这个设计的缺点是什么?
本系统也叫lambda 架构,which是一种在 MapReduce 和流处理引擎之上构建流处理应用程序的方法。
事件并行被批处理和流处理,在查询时将两个系统的结果拼接在一起。
缺点就是复杂。
如果需求不那么高,Kafka + Spark就可以搞定。
spark内部也是这样的:数据分区+堆计算topKlist→内存聚合
知识点:count-min sketch 数据结构、MapReduce 范式、数据聚合原理
topk问题可以解决:
“流行什么”服务 like Google 趋势、Twitter 趋势
热门产品列表
如何识别交易最活跃的股票
防止 DDoS 攻击的系统:重击者是一些 IP 地址