system design 俄罗斯大叔系列
top k 问题

很常见的需求,like google一百个搜索次数最多的key word,或者y2b上观看最多的视频,转发最多的推等

mapreduce可以解决但不够

需要尽可能快,返回实时列表,比如最近几分钟的最热。

是一个流问题stream processing

Founctional:

  • topK(k,startTime,endTime)

Non-Functional:

  • scalable(数据增长时候可以scale)

  • highly available(容错)

  • highly performance(few tens of millisecondes to return top 100 list)

    高性能暗示:数据应该已经提前算好了。

  • accurate

从单host开始:

假设所有data可以导入它的内存。like video观看量

每当有用户打开视频,就在log里记:把它的video id添加到list

hash保存:{id:count} —>

① sort 这个hash O(nlogn)

② 把它加入head(更快)O(nlogk)

不能scale。如何scale:

引入LB,将每个事件分发到集群中的主机上, called processor hosts,算好再集中存。

Untitled

  吞吐增加了,但是内存不够。

这些都是在内存中算的,hash表会变很大。

和分布式缓存的解决方法一样,这里选择将数据分成更小的块。

Untitled

引入新组件,叫 Data Partitioner。用来把每个video id路由到它自己的processor host,这样每个processor host只存部分data

(每个host)仍然建一个hash,建一个heap,把所有数据放进去,

这样每个host都内部排好序

然后——需要合并n个排序列表。

如果hash所有的数据量可能很大,内存放不下。

综上:增加了scalability和throughput,但是还有缺点。

缺点:目前为止的数据集都是有限的,但是数据流本质是无限的。

主机只能在一段时间内累积数据。假设1分钟内的前k个,那么k之后的其它信息都会丢失。如果所有数据都存,就会又存不下。所以才会有mapreduce,用来将所有数据存在磁盘上,并用批处理框架计算一个top k列表。

这种架构的另一个问题,不太简单:

  • 每次引入数据分区,都需要处理数据复制,使得每个分区的副本都存储在
    多个节点。
  • 需要考虑重新平衡,将新节点添加到集群或从中删除。
  • 需要处理热分区。

但是不重要,先跳过。

研究一下:前k问题有更简便解法吗?有。如果准确性不太重要的话。

called count-min sketch 一个二维数组

Untitled

width有几千,height很小代表hash functions

(原理待补)

如何选择宽高:一些公式

用这个sketch代替可能会变大的hash

目前为止的high-level architecture

Untitled

用户点击视频→请求进入网关→网关将请求路由到后端

网关:关注一下日志:

log generatioin:通常用来监控、审计、计费

这里用来计算 视频点击数

实现一个后台进程从log里面读数据,进行一些初始聚合,然后发送给后面进一步处理

在api gateway host的memory里分配一个缓冲区,读取日志,构建上面那种hash来计数

缓冲区大小有限制,满了会刷新,如果没满就根据时间刷新flush

也可以 用其它方案,like

即时聚合,就不写入log。

或者完全跳过网关,直接把video信息往后传

** 最好把数据序列化成二进制,节省资源

这些都基于api上的资源: memory, CPU, network or disk IO.

→ 初步聚合的数据传到分布消息系统里,like kafka。在kafka里消息被拆到多个分区,每个分区可能在不同的机器上。拆分方法没有特别的,随机比较均匀。

→ 数据管道分成两部分:快道和慢道

快道:计算近似值,几秒钟完事

有个服务called fast processor,可以在很短的时间间隔内创建 count-min 草图并聚合数据。

由于内存不再是问题,所有不需要再分区。

只要在内存里存数据,即使时间再短也要考虑replication,否则就不高可用。

因为count-min 已经算出了近似答案,所以偶尔丢数据可接受,只要slow path不丢,几小时后就会有精确数据。

→ 每隔几秒,结果就会刷新到storage里。可能会延迟几分钟。

另外一个pro/con:storage组件是一种db,存最终的前k的结果list存like1分钟or5分钟

** pipeline如何降低请求率?【对流处理和数据聚合很重要】

点击几百万→落在gateway主机上(集群可能很大including数千台)→每台主机预聚合(几秒钟但减小很多)→ fast proceesor(比api gateway集群小的多)进一步聚合(又几秒钟)→ 到达db,只有一小部分

慢道:计算精确值,几分钟或几小时

仍要聚合,但要精确:

①用mapreduce。所有数据存到distributed file system like HDFS 或 对象存储like S3

运行两个mapreduce,一个计数,一个计算实际的top K list

有点慢

② 用data partitioner批量读数据并解析成单个事件

partitioner获取每个视频并将消息发给另一个kafka集群中的对应分区

** 每次分区时都要考虑hot shard。交给partitioner service处理。

好,kafka/kinesis里的每个分区都存一个数据子集,kafka之流会复制数据

需要一个组件来从每个分区读数据并进一步聚合,called partition processor,在几分钟之内聚合数据,处理成预定义大小的文件,并发到分布式文件系统,给mapreduce下一步处理。

partition processor还可以将汇总信息发到storage service

——> storage 现在有三个path给它发数据

可以根据实际情况调整,like 如果不要求精确,可以不要slow path;如果要准确且要快,就要对数据分区并在内存里聚合;如果不要求时间、要准且数据量大,就用mapreduce

数据流:

假设user1点开ABC,user2点开AC

每个请求都发到一个api gateway host上,信息记在log里。然后在这里聚合观看数据,并传到分布式消息系统上。

聚合就是说记在一个hash表里,计算在最后几秒钟内看了几次。

当哈希表到达size上限或指定时限时,就发送结果、清空,并建立新表。

所以host1发A和B,host2发A和C,host3发A

fast:

fast processor1接收并处理消息1,fast processor2 do 消息2,几个fast processor合一起就是结果

Untitled

slow:

用data partitioner。like,message1去partitioner1和2,message2去p2和p3,m3去p2

计数用mapreduce读这些文件并按某个时间间隔(例如 1 小时)创建一个最终列表。

计算前k的mapreduce计算前k

Untitled

mapreduce job:

Untitled

计数 MR:

input: 一组文件

split:文件被并行任务分成独立的块

map:record reader解析数据,以键值对形式传给mapper,在mapper转成中间对intermediate pairs

然后partitioner从mapper里面拿到键值对并分区写入每个映射器的本地磁盘,分区时,确保同一个键的值都加在一起了,并且去同一个reducer

shuffle and sort:mapredcue获取partitioners生成的输出文件,并通过http将它们下载到reducer machine里,sort把keys合在一起。

reducer:轻松迭代值

output:reduce task的结果写到file system

topk MR:

input:上面的output,拆分为块发到本地mapper,

map:分布计算本地前k list

reduce:聚合上个结果

output:输出最终结果

data retrieval 数据检索:

Untitled

API Gateway 收到检索请求,把数据检索的调用路由到存储service

storage 从底层db检索数据。

假设fast path每隔一分钟创建一个近似的topKlist,slow path每隔一小时创建精确的topKlist。

user请求一个最后五分钟的topK,就just merge5个1min的list,结果是近似的。

user请求一个一小时topK,返回精确表

但是user要请i去2小时topK,没办法返回精确表。如果merger两个小时表,结果就不精确了。

其它:

API Gateway work a lot: Authentication身份验证, SSL terminationSSL 终止, rate limiting 速率限制, request routing请求路由, response caching 响应缓存 are among the few

如果它资源不够work了怎么办?

一般都会有一个进程专门记录日志,发送到专门存的地方进一步处理

所以可以run日志,并在专门的cluster处理聚合事宜。其余设置不变

count-min有其它替代方法吗?

有。like Lossy Counting, Space Saving and Sticky Sampling. 有损计数、节省空间和粘性采样

k有多大?

k到处可见:fastpath:检索时合并前k;slowpath:mapper把topK发到reducer

所以k不能任意arbitrary大。thousand OK,但是上万就会性能下降。

如果有更大需求,需要带宽和存储

这个设计的缺点是什么?

本系统也叫lambda 架构,which是一种在 MapReduce 和流处理引擎之上构建流处理应用程序的方法。

事件并行被批处理和流处理,在查询时将两个系统的结果拼接在一起。

缺点就是复杂。

如果需求不那么高,Kafka + Spark就可以搞定。

spark内部也是这样的:数据分区+堆计算topKlist→内存聚合

知识点:count-min sketch 数据结构、MapReduce 范式、数据聚合原理

topk问题可以解决:

“流行什么”服务 like Google 趋势、Twitter 趋势

热门产品列表

如何识别交易最活跃的股票

防止 DDoS 攻击的系统:重击者是一些 IP 地址