17: Top K Leaderboard | Systems Design Interview Questions With Ex-Google SWE

  Рет қаралды 23,189

Jordan has no life

Jordan has no life

Күн бұрын

Пікірлер: 162
@idiot7leon
@idiot7leon 9 ай бұрын
Brief Outline 00:00:23 Find the Top K... 00:01:16 Problem Requirement 00:02:16 Overall Approach 00:03:23 Precise Solution 00:06:11 Counting Phase 00:08:57 Local Top K Step 00:10:51 Global Top K 00:12:27 Speeding Things Up - Approximation 00:15:07 Windowed Top K, How To Calculate 00:18:10 Windowed Top K, How To Read 00:19:45 Publishing Faster Results 00:21:11 Count Min Sketch 00:23:19 Count Min Sketch - Continued 00:26:01 Count Min Sketch - Continued 00:27:44 Count Min Sketch - Fault Tolerance 00:28:48 Final Diagram - Top K Problem Thanks, Jordan~
@jordanhasnolife5163
@jordanhasnolife5163 8 ай бұрын
Thank you!!
@chaitanyatanwar8151
@chaitanyatanwar8151 Ай бұрын
Thank you! The videos and the discussions in Comments make this channel the best source for system design.
@skinnycucumber
@skinnycucumber 9 ай бұрын
i have my first system design interview tomorrow and i've been cramming all your videos. great content!
@jordanhasnolife5163
@jordanhasnolife5163 9 ай бұрын
Good luck, let us know how it goes!
@yashbansal1038
@yashbansal1038 4 ай бұрын
how did it go boy?
@bingqinghuang9318
@bingqinghuang9318 Ай бұрын
Are you in Amazon now? :)
@aliz166
@aliz166 9 ай бұрын
I love your videos man! Any chance you might cover an ad click aggregation system soon? It's a bit similar to this actually, but of course has its own nuances. Thanks again for your help!
@jordanhasnolife5163
@jordanhasnolife5163 9 ай бұрын
Hey Ali! I'd just watch this one and the tinyurl video, curious to hear what else you feel this incorporates beyond that
@totsubo2000
@totsubo2000 Ай бұрын
Great video! One small question: Why introduce Spark as an intermediate step for the top K aggregation instead of having Flink consume directly from the initial Kafka stream and handle the top K calculation? Wouldn't using Flink throughout simplify the pipeline and reduce the need for multiple aggregation stages?
@jordanhasnolife5163
@jordanhasnolife5163 Ай бұрын
Flink can only hold so much data, and it isn't really meant for ad hoc querying! If you give me an arbitrary window in the past where you want the top K, I'll have to run some distributed query over all of the data to find it.
@mimanshabhargav3266
@mimanshabhargav3266 8 ай бұрын
I got this problem for my on-site today. Thanks to you I had some idea on how to do it.
@Meck-ow2cj
@Meck-ow2cj 5 ай бұрын
Great video Jordan. To avoid contention on the count min sketch service, we can use event loop within the service. That way we only have one thread writing to the count min sketch. Of course we'll still have a thread pool doing the I/O thingy with kafka, but they will have to queue it up in an internal queue where the single thread reads it off and update the count min sketch. It has its own trade offs, but it is a potential solution.
@jordanhasnolife5163
@jordanhasnolife5163 5 ай бұрын
Yep! Agreed
@truptijoshi2535
@truptijoshi2535 6 ай бұрын
Hi Jordan! I have some questions. 1. For the precise solution, the spark node aggregates the event IDs. Why are we aggregating it again in Hadoop? 2. After aggregating, we are doing local top k and then global top k. Suppose the local top k of 3 nodes look like this : 1000->999->998->997->996 etc 100->99->98->97 200->300->400 If we are only taking top 3 in each node, we might be missing out some of the bigger values like 997->996 right?
@jordanhasnolife5163
@jordanhasnolife5163 6 ай бұрын
1) The stream consumer is just putting tuples of (eventId, timestamp) into hadoop via parquet files. 2) We do a local top k after aggregating as you mentioned, which means we did a shuffle to get all events with the same id on the same node. So we have the full counts of each event. Then, once we get the local top k of the eventIds that were hashed to a given node, we can do a global top k.
@jordiesteve8693
@jordiesteve8693 5 ай бұрын
great work! Hope I can add some value to the video: on the exact-offline solution, there can be very popular keys (e.g, songs) and they can make the shuffle phase very slow or even unfeasible 1) the shuffle phase will be as fast as the slowest sub-shuffle tasks 2) all information for a key might not even fit in a single partition and spark can start spilling to disk or just throw an OOM. One common technique to solve this is called salting (plenty of info out there), and good thing spark handles these skew distributions on its own in its most recent versions(afaik from version 3). Edit: Having a "fast" batch phase would allow us to combine batch and online computations (aka kappa architecture). Edit2: Just saw at the final architecture you propose to combine both offline, online and exact and approximate results. Anyhow, hope my small deep dive in the batch layer is helpful to someone!
@jordanhasnolife5163
@jordanhasnolife5163 5 ай бұрын
Nice, thanks for the contribution! Will look into salting sometime, I assume it's mainly just re-splitting into more partitions and then aggregating again.
@jordiesteve8693
@jordiesteve8693 5 ай бұрын
@@jordanhasnolife5163 indeed!
@jordiesteve8693
@jordiesteve8693 5 ай бұрын
@@jordanhasnolife5163 indeed, that's exactly what it is!
@harman654321
@harman654321 6 ай бұрын
i think you can use redis to store intermediate count min sketch on each flink job, and aggregrate
@jordanhasnolife5163
@jordanhasnolife5163 6 ай бұрын
Can you elaborate on this? I'm not sure in the flow of the problem where it belongs.
@nalamda3682
@nalamda3682 3 ай бұрын
did we considered the fact that local topk sum may not result in global top k sum. Eg: local A1 top 4 are x1:50, x2:40, x3:40, X4:39 and local A2 top 4 are: y1:50, y2:40,y3:40,x4:39 - you see ideally x4 with 78 count should win, but as we took top3 we did not add up right. I may be wrong, but not sure what exactly I missed, possible to help?
@jordanhasnolife5163
@jordanhasnolife5163 3 ай бұрын
Yes, we considered this
@vincentchee4108
@vincentchee4108 9 ай бұрын
Thanks for the video. I'm a bit curious the need of a tsdb. I assume a Postgres DB with time range partition would suffice the use case. With time range partition, you can still do: WITH cte AS (SELECT event_id, SUM(count) AS sum FROM table WHERE timestamp BETWEEN t1 and t2 GROUP BY event_id) SELECT * FROM cte ORDER BY cte.sum LIMIT k
@jordanhasnolife5163
@jordanhasnolife5163 9 ай бұрын
I imagine this would work, but would be curious to see how the benchmarks actually look between them
@ankur20010
@ankur20010 Ай бұрын
I think that you can't control which HDFS node to write the top K list. The distribution is handled by HDFS and all that is visible to you is a directory structure. Same directory may be distributed across different data nodes.
@jordanhasnolife5163
@jordanhasnolife5163 Ай бұрын
Can you elaborate on this a bit more? In HDFS I'm running a spark job to just aggregate counts, and then from there creating local top k counts on each hadoop node, and from there merging all of those together
@ankur20010
@ankur20010 Ай бұрын
​@@jordanhasnolife5163 What I mean is that, HDFS exposes a directory like structure and you can write in a particular directory. But where the file blocks in that directory will go, is decided by HDFS. It is possible that you write top k (let us say k = 100 for time being) records in one directory but they end up on being saved to 2 nodes (50 records in each). Also writing to different directories does not guarentee them being written to different ndoes. In this case, how can we distribute top k to each hadoop node? In a spark cluster also , this hadoop system will act as a big storage drive. We don't see individual nodes Is there a particular way of partition which will ensure that each local top k records goes to a single node in Hadoop cluster?
@jordanhasnolife5163
@jordanhasnolife5163 Ай бұрын
@@ankur20010 Ah yeah, I guess my explanation better suits how spark will run this type of thing internally. In reality, this is one command lol spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.takeOrdered.html
@varunigarg6492
@varunigarg6492 7 ай бұрын
Why can’t we maintain state in flink for some x hours and run the aggregation there itself - this would surpass the need to have spark streaming right?
@jordanhasnolife5163
@jordanhasnolife5163 7 ай бұрын
Well what about after those few hours? How do we combine hourly intervals to get the top k over the aggregate?
@varunigarg6492
@varunigarg6492 7 ай бұрын
In flink, we can create 2 process functions : 1- which computes top N at an hourly cadence (can be used to serve fast viz use cases via Druid for example) and 2- computes top N from last 24 hourly computed results. Eviction policy is time based. To store state we can make use of Rocks DB state backend that stores state as serialized bytes on disk with an in memory cache. So all functionalities encapsulated within Flink.
@jordanhasnolife5163
@jordanhasnolife5163 7 ай бұрын
@@varunigarg6492 Sure but these aren't accurate results. If I want to know the top K for the last week, how would I do that? You could combine the top k from each smaller interval within that week, but you then don't consider that an event that wasn't included from the top k of each mini interval could be in the total top k
@yunfu518
@yunfu518 5 ай бұрын
@@jordanhasnolife5163 But that is the same problem for the spark streaming pipeline right? Unless you are talking about kick off another spark job just to aggregate result from parquet in HDFS. I think what she means is why don't we replace flink for spark streaming pipeline. One more pros i can see from flink is, we don't have to have a zookeeper to keep track of number of spark we have. We can simply aggregate the result from several Flink job based on event time window with specific out-of-order time limit.
@guitarMartial
@guitarMartial 2 ай бұрын
Thanks Jordan! What would the data model for the TSDB be like in these cases? Also the heap implementation - isnt that germane to Apache Spark itself?
@jordanhasnolife5163
@jordanhasnolife5163 2 ай бұрын
Hey it would just be like window time, eventId, count, rank As for the heap point, I'm actually not 100% sure. I see there's an approximate top k algorithm, but I'm not a big enough user of spark to know this one off the top of my head. Would recommend throwing that question in chatGPT.
@jordiesteve8693
@jordiesteve8693 5 ай бұрын
my Kafka/streaming knowledge is limited, so bear with me. To avoid congestion on the CMS service, we could start aggregating counts on the keys in Kafka, so there are less updates on the count min sketch index. what do people think?
@jordanhasnolife5163
@jordanhasnolife5163 5 ай бұрын
Not sure how well aggregation within kafka itself works. It's mainly supposed to just be a log. I do think this is like technically a feature it may support but I'm skeptical there
@jordiesteve8693
@jordiesteve8693 5 ай бұрын
@@jordanhasnolife5163 there’s this thing called kafka streams, you can do some aggregations and processing, but never put my hands deep into it
@jordiesteve8693
@jordiesteve8693 5 ай бұрын
@@jordanhasnolife5163 I agree, I once worked on a project where we were aggregating users clicks in kafka. I wasn't involved much on that layer, but on-calls and bugs were quite funny, to say the least.
@capriworld
@capriworld 3 ай бұрын
Thanks again, In precise solution, if we take eventId as the partition for Kafka, it may create Skewed Partition, it may end up, spark consumer also hot nodes because of hot keys, some of them are hot and some of them rarely may get any events. not sure, i am thinking it correctly here or missing something, just thought of knowing your thoughts on this. Thanks.
@jordanhasnolife5163
@jordanhasnolife5163 3 ай бұрын
For the "precise" solution, we don't have to shard by event id, because our spark job will eventually do a shuffle step. For the streaming solution, we do have to. I don't have a good solution for this, other than starting out with small fixed sized partitions, and gradually adjust which kafka consumers are responsible for which partition over time. It may require a dynamic partitioning solution.
@capriworld
@capriworld 3 ай бұрын
@@jordanhasnolife5163 Thanks.
@lastjoker99pm
@lastjoker99pm 3 ай бұрын
@@jordanhasnolife5163 Kafka queue doesn't need to use the event ids as key, right? We can store . Publishing service can batch for 10 sec and set the count value before publishing to kafka. For non-popular videos count will be 1; popular videos will have a value >> 1. This will still keep the top-k solution precise but cut down the write volume by ~10x. Another solution could be sampling. For popular videos, we can randomly drop 90% of the events then adjust for the drop by multiplying 10 to the count. This makes the count approximate for popular videos but solves the hot partition/node problem. We will also need the video id as the key here. Even separate kafka topics for popular and non-popular items might work. Popular topic needs to have different kind of partitioning with salting/appending random numbers to key. However, I can't figure out how to cleanly resolve the hot spark node problem. We can use a consumer group with multiple nodes but then we will need another level of aggregation over the clean video ids.
@tengyuanye8897
@tengyuanye8897 4 ай бұрын
I have a question on how to get the TOP-K event_id list from the count-min sketch table? Let's say we save count-min sketch table into storage, it only has the 2D array with the frequency of each hash value. Based on those hash_values, we still don't know the event_id. I feel like we need to store those event_id somewhere, then using those event_id we can calculate its frequency, but if we store those event_ids, why not just use hashtable?
@jordanhasnolife5163
@jordanhasnolife5163 4 ай бұрын
Yeah fair point, you'd likely have to hit a DB for those, or potentially keep a local top K heap as you're building out the CMS (probably more viable)
@ShreeharshaV
@ShreeharshaV 5 ай бұрын
Great content as usual, Jordan. I have watched several videos of yours and noticed that you takeout time to answer each and every question/remark thats put by the viewer. Absolutely commendable job there. Appreciate the time and efforts being put there for the entire community. On that note, I have this below question: Given that each of 3 different solutions proposed - count min sketch node (superfast but possibly inaccurate), Flink + spark streaming node (stream processing, fast but nearly accurate) and SparkStreaming + HDFS results (batch processing, slow and very accurate) are gonna give us top k entities in x minutes and will be stored in TimeSeries DB. Question is which result will be shown to the user as I see all 3 streams are updating the DB with certain inputs. Can you pls share info on how data will be read that case?
@jordanhasnolife5163
@jordanhasnolife5163 5 ай бұрын
I don't have an exact answer for this - I think that this would depend on the specific user + application and their specific needs. In theory you could run all three, and let a user decide which route they want to read from.
@Anonymous-ym6st
@Anonymous-ym6st 3 ай бұрын
Rewatched again, and learnt new things... I always find top k a tricky question. Some question: 1. We can use stream processing but still with full data to sort? (the way we keep local top K is optimized for speed, but we can just try to count them in different flink, and just do heap top k in the last aggregate flink)? 2. the reason we try to introduce count min sketch is to reduce the network call, but why spark / flink need multiple node? (just thinking 4B youtube video, we just need to store a count for that, that should be good for one single node storage)? Thanks as always!!!
@jordanhasnolife5163
@jordanhasnolife5163 3 ай бұрын
I think both of your questions touch on the same idea, which is whether we can handle all events on one node. From a size perspective, maybe so, but what about from a load perspective? Think about how many events we have to process.
@Anonymous-ym6st
@Anonymous-ym6st 3 ай бұрын
@@jordanhasnolife5163 yay! the load makes sense, say if it is like millions of like update per second, a single flink even for counting should not be good. Still a bit doubting on the first one, we can have many flink node for counting, and just one flink node for later aggregation and top k heap maintainence?
@jordanhasnolife5163
@jordanhasnolife5163 3 ай бұрын
@@Anonymous-ym6st That's basically what I propose in the video I believe, we just do a top k on each one and then aggregate those on another
@stong4320
@stong4320 4 ай бұрын
Apologies if these are really newb questions but: 1. What are the scenarios when you'd use Spark over Flink? It looks like in this case, you used Spark because of micro-batching and so you can send less packets. Would Flink have been ok to use also? 2. After we have the counts in HDFS, exactly what's coordinating the count, shuffle, and get top k phases? Is this a map reduce job? Is it just a normal job you defined? What's orchestrating these?
@jordanhasnolife5163
@jordanhasnolife5163 4 ай бұрын
1) Yeah that was basically my logic. I think Flink would realistically be fine to use too. 2) A spark job. I guess "what's orchestrating this" is a non trivial question and I'd recommend looking into spark architecture.
@williamzhang5336
@williamzhang5336 Ай бұрын
Hi Jordan, thanks for the great video! One point i feel a little confused, plz educate me: In the Windowed Top K section, for the 2nd Kafka queue: 1) Why do we use Kafka there? Looks like it's just for holding local top K from each partition, can we just use any in-memory queue for faster purpose and convenience if number of partitions and K are not that large? Or any benefit of using Kafka here? 2) Technically Flink can hold as much data as we need as long as we have enough resource on the single node or scale it on many nodes. Is the purpose of using a queue before it to save the nodes and only process the merging logic on a single node, then merge the element from the queue one by one on the single node? Thanks again!
@jordanhasnolife5163
@jordanhasnolife5163 Ай бұрын
Hey Will! The reason we use Kafka into flink is that 1) we may literally have to, I can't remember 2) the main benefit that we get out of flink is the ability to checkpoint state and reply it in the event of a failure. We need Kafka with that to provide us the ability to replay log messages.
@jdxxmxnd
@jdxxmxnd 2 ай бұрын
Thanks for the vid. I may of missed it but how would we get the top k for different windows less than an hour? If we wanted to support minute granularity would we have to just change our stream consumers to aggregate on minutes (and can deduce hours from these minutes)? Or would the consumers aggregate on both minute and hour intervals and store both results in TSDB?
@jordanhasnolife5163
@jordanhasnolife5163 2 ай бұрын
I suppose you could do both, as you alluded to. But it's definitely the case that if you're only aggregating by hour there's no good way to do a historical minute by minute lookup. These are the tradeoffs here, you sacrifice lookup flexibility in exchange for decreased load/storage usage.
@alphabeta644
@alphabeta644 5 ай бұрын
At 9:31 minutes, when we are computing the local top-K, is it still about precise solution or is that part of approximate results? My thinking is that if we find local top-K, and merge them at global level, then the answer of top-k will not be precise/accurate. Is that correct?
@alphabeta644
@alphabeta644 5 ай бұрын
Answering my own question, in case it helps future reader. Merging local top-K would not be an approximation coz the "local" data still has the global picture for a set of event_ids for a given time duration. It would be an approximation if we were chunking the counts per hour, but the precise method computes the event counts in whole time duration, not per chunk/hour etc.
@jordanhasnolife5163
@jordanhasnolife5163 5 ай бұрын
Precise. Each key and its corresponding count is just present on one node, as a result finding the top k locally and merging globally should get us a global result.
@akshatshetty7103
@akshatshetty7103 Ай бұрын
Noob question: If we're partitioning Kafka by eventId, how would it scale for a million or 10 millions songs/videos? Wouldn't we need to partition by some consistent hash of the eventId? PS: Watching all your content for a while, very informative!!
@jordanhasnolife5163
@jordanhasnolife5163 Ай бұрын
That's basically what I mean when I say this. Keep all events with the same event ID on the same partition.
@hrjhonor
@hrjhonor 9 ай бұрын
Great video! Two questions: what's the downside of writing the first spark streaming which outputs local top k results directly to the time series db, given the TSDB has many aggregation capabilities built in? Other question is is it possible to use redis + cdc/kafka, where real time result is computed from redis?
@hrjhonor
@hrjhonor 9 ай бұрын
to extend the first question, why can't we use the TSDB in place of the HDFS? Is it mainly concerning the ingestion rate?
@jordanhasnolife5163
@jordanhasnolife5163 9 ай бұрын
1) You actually probably could do all of the writes to the time series database, and then run a big query on it! But keep in mind that we'd then have to store each of the events with their timestamps in the tsdb, which is a lot of data. Also, I do wonder (maybe it wouldn't) if the TSDB would try to perform some locking for a read only query (hadoop definitely wouldn't as the files are immutable). Great point though - I also do think that a spark job allows us to be specific for how we want to do the counting and joins, whereas I'm not sure how optimized a TSDB query would be for this. 2) Yeah you could probably just put all of the local top k counts in the tsdb, but I'd hope that you'd only actually store the global top K (meaning the local top k's should overwrite one another). Otherwise we have ourselves a pretty big storage cost. I was thinking about the overwriting of data though to merge sorted lists, and I felt that in a db on disk that's probably quite a bit slower than just doing this in memory and then sinking it to a db.
@hrjhonor
@hrjhonor 9 ай бұрын
Thanks for the thoughts!
@mcee311
@mcee311 9 ай бұрын
In the time series DB how are you differentiating the results from count min sketch versus the ones from spark streaming? Does the spark one just overrides the count min sketch?
@jordanhasnolife5163
@jordanhasnolife5163 9 ай бұрын
I think you could do either implementation but considering the count min sketch data is pretty small I figure you may as well keep it around
@kevinding0218
@kevinding0218 8 ай бұрын
Thanks for the great content! I have a question about maintaining a K-size min-heap on a Spark streaming node. Since this is a streaming process, it's difficult to know the exact count of a specific keyword in real time as I assume the count may increase over time. If we only maintain a K-size heap, could we end up losing earlier counts for some keywords? Please correct me if I'm wrong.
@jordanhasnolife5163
@jordanhasnolife5163 8 ай бұрын
Yup you can't do this if you have active incoming counts - you'd need to keep a sorted list (or treeset or something) of all incoming events to be precise
@dinar.mingaliev
@dinar.mingaliev 9 ай бұрын
hi Jordan, hope you still have more DM than Megan :) Could you please clarify how data from count min sketch goes to time-series. Basically it is a matrix of dimension # of hash function X modulo we use for hashing. The bigger the matrix - more numbers we can store there. Kinda dictionary. But it does not store keys - which is our event type for which we want to count how many times it appeared. How to reconstruct those IDs from min-count sketch?
@dinar.mingaliev
@dinar.mingaliev 9 ай бұрын
for example if we keep in count-min sketch counts of type of videos we need to know all possible types and query them. if we keek ids of video to approximately count counts of every video we have to again query our CM sketch as many times as videos. Right? we can also shard count min sketch :)
@jordanhasnolife5163
@jordanhasnolife5163 9 ай бұрын
Hi Dinar! In the export to TSDB phase, we need to basically loop over every possible ID, and get a count for it, and then export that count to the tsdb. We could always pull these Ids from a database or something.
@jordanhasnolife5163
@jordanhasnolife5163 9 ай бұрын
Good point though! It may be nice to have a second set of nodes listening to Kafka to ensure that we have a list of all the keys in some sort of database
@savy6682
@savy6682 4 ай бұрын
@@jordanhasnolife5163 wouldn't it better to simply have top K min heap along with CM Sketch instead of looping over all keys? Our original approach was to store hash map of keys and their counts, along with a top K min heap. But given large enough number of keys this could be huge in size. So we replace our hash map with CM Sketch which reduces memory required drastically (on expense on some accuracy ofcourse). But if we start storing all keys in some DB and start looping over it wouldn't that sort of defeat the purpose?
@jordanhasnolife5163
@jordanhasnolife5163 4 ай бұрын
@@savy6682 yeah good point, agree
@soumik76
@soumik76 5 ай бұрын
Hey Jordan, slightly confused here. We are writing both count min sketch, as well as windowed top k to TSDB. I probably missed why exactly. Is it to answer queries like fetch top k in last 10 mins (in which case CMS will be used) and fetch top k in last 3 hours (in which case windowed top k will be used)?
@jordanhasnolife5163
@jordanhasnolife5163 5 ай бұрын
I think CMS is mainly just an approximation approach for when you want to limit the amount of compute resources you're devoting to solving this problem. As for the latter solution, it may also take a bit longer to aggregate and populate in the TSDB due to having to merge the results, and as a result CMS may give you access to the data sooner.
@AP-eh6gr
@AP-eh6gr 8 ай бұрын
17:42 each listening to a partition on a kafka broker to be more precise I guess..? correct me if wrong
@jordanhasnolife5163
@jordanhasnolife5163 8 ай бұрын
that works
@nalamda3682
@nalamda3682 2 ай бұрын
Jordan another stupid question. Count min sketch is a table, to get count from it I have to iterate it for every unique key, if I have one billion youtube video do I need to loop every time for 1 billion to store the data from CMS to TSDBMS ?
@jordanhasnolife5163
@jordanhasnolife5163 2 ай бұрын
Keep a separate k sized heap in memory, and upon putting an element into the count min sketch, get its count, and add it to the heap if it's in the top k
@parthpanchmatia224
@parthpanchmatia224 9 ай бұрын
Hi Jordan, what iPad app do you use to draw the system design diagrams and present them? I have an iPad so this info will help me put my system design learnings to test
@jordanhasnolife5163
@jordanhasnolife5163 9 ай бұрын
I wanna say one note
@jiananstrackjourney370
@jiananstrackjourney370 6 ай бұрын
Thanks Jordan, I really appreciate the video. One question, is there any specific reason the kafka has to partition on event id? Even if it is not, the downstream spark streaming will still be able to count and aggregate the data, right?
@jordanhasnolife5163
@jordanhasnolife5163 6 ай бұрын
Yes but it's going to take a lot less data shuffling if we partition by event Id
@sahilkalamkar5332
@sahilkalamkar5332 2 ай бұрын
If my query pattern is of the type, last 5 minutes, last 1 hour, last 1 day? How can we modify the above approach? My thought process for the precise solution is we can have minute granularity parquet files, and we can have a hourly and daily job which does the roll up on the minute files. What about realtime approximate results? Also what advantage are we getting by partitioning based on eventId in Kafka, since we are reaggregating in the slow path after shuffling anyway?
@jordanhasnolife5163
@jordanhasnolife5163 2 ай бұрын
1) I agree with your approach using 1 minute small windows and doing aggregations 2) Can you elaborate on your question? 3) When you say slow path, I'm not sure if you mean the spark case or the flink case, but either way there's far less data to send over the network when you do as much aggregation of the same event as possible early on
@sahilkalamkar5332
@sahilkalamkar5332 2 ай бұрын
@@jordanhasnolife5163 For point2. I mean what if I need realtime results for the last 5 mins, 1 hour and 1 day. How can we store the data effectively to cater this usecase
@GuntherJones
@GuntherJones 8 ай бұрын
For the top k don't you want a max heap? Thank you so much for these.
@jordanhasnolife5163
@jordanhasnolife5163 8 ай бұрын
Nope min heap! If I want to get the top k I want to quickly compare with the smallest element of the current top k
@marksun6420
@marksun6420 3 ай бұрын
Hey Jordan! Great video! I actually got a simpler question in an interview, where the problem becomes top k games by their score. So there is no need to count or aggregate. In this case, do we still need a streaming system, or we just need to run a sort by query in the DB? To speed up, we can further use Redis to store the top k.
@jordanhasnolife5163
@jordanhasnolife5163 3 ай бұрын
Sort by query in the DB: Too slow I'd guess. If you have billions of game scores, these will be on different partitions, so you have to aggregate the results from all of them. Realistically, I think the solution here is still stream processing based. 1) Randomly send each score to a kafka broker so their loads are balanced 2) Use a stream consumer to maintain a top k heap for each kafka partition 3) If an event is in the top k for one of the stream consumers, send it to another stream consumer (via kafka for message durability) and create the global top k there 4) Make queries from the global top k cache Do you have to use stream processing for any of that? No, and perhaps I'm overcomplicating - maybe a DB trigger could be enough, and a redis cache to store the final results.
@Kazhiyur
@Kazhiyur 2 ай бұрын
Bit confused on Querying the use of the heap. we are doing our approximation by caching previous hour and aggregating them w/Flink correct? (last two hours find top k by merging last hour and current hour). If so what is need for heap? Is the time series DB not already storing top per hour? Is it related that we use min heap to query top k? Saw your comment but still confused how it is minheap and not max heap.
@jordanhasnolife5163
@jordanhasnolife5163 2 ай бұрын
1) The heap is useful for the actual computation of the top K within the last hour due to the min heap algorithm, after that sure it is sufficient to put the top K for each hour in a TSDB 2) leetcode.com/problems/top-k-frequent-elements/editorial/
@WallaceSui
@WallaceSui 2 ай бұрын
Thanks for your video! It's super great! Actually I have read multiple resources related to this problem. And I have some questions: 1: so should we always only aggregate count and topk per hour (only per hour) in each flink processing into TSDB? If we need any topK in last 5hour, last 2days, last 2months, should we always only use leaderboard service to do hashmap + heap calculation to aggregate these per hour data from TSDB into these time window? we should never use flink processing to do this right? so which means that we should never calculate/aggregate count or topK data in last 5hr/2days/2months in our flink processing node right? If we did this, so we need to hold and store 5hr/2days/2months data in flink, which is super large and impossible and this is the reason right? Thanks a lot. 2: in the leaderboard service, since it will do hashmap count + topK calculation for random time window, why not we just use noSQL or SQL language to directly do this query in the TSDB, which may be much faster? Thanks a lot. 3: in your approach, you split the time into each per hour chunk and then we just aggregate count/topk per hour into TSDB. If it is possible that we can use a sliding window with two pointer to dynamically record the fixed time range window like last 1hr, last 1day? In this way, every time when new data comes, we just keep updating hashmap for each fixed time range window , then in the DB, we do not need to store each per hour chunk but just these hashmap state. The storage will be super low but read/write will be higher.
@jordanhasnolife5163
@jordanhasnolife5163 2 ай бұрын
1) For your example, as long as the window time is a multiple of 1 hour we can use the TSDB to compute it for us. I agree we shouldn't use flink here. Agreed that increasing the window means caching more data and decreasing our flexibility for future queries. 2) What if the random window provided is only 5 minutes long, but our TSDB has data at hour intervals? 3) Can you elaborate on what you're saying? You just want to do multiple time windows in flink at once?
@WallaceSui
@WallaceSui 2 ай бұрын
@@jordanhasnolife5163 Thanks for your reply! For Q2, actually maybe I missed something in the video. From my understanding, if our granularity of data aggregation is per hour, then we cannot do anything for per minutes aggregation? Unless we change our granularity to per minute? Not sure why you asking is related to if use leaderboard service or directly TSDB's nosql or sql language for the query? Thanks a lot. For Q3, yes, I mean that if we must find a way to do multiple time windows in flink, whether we can consider sliding window with 2 pointers? I think if we use this way, we can only provide fixed multiple time window choices not random, like last 5hour, last 1day, last 5days. Since we need to do multiple time window in flink, to avoid a lot of data cache, we maybe can use this way? But I think in industry nobody use this idea. So whether that means in most of cases in industry, we will always separate our query to something like leaderboard service only to do aggregate multiple time window? Thanks a lot.
@jordanhasnolife5163
@jordanhasnolife5163 Ай бұрын
@@WallaceSui My point was we can run a spark query for exact time intervals if need be, and the windows don't work for us
@ankur20010
@ankur20010 Ай бұрын
how easy is to register a spark consumer to zookeeper? Because spark consumers are configured by and are managed by spark driver program. Does spark controller provide this info via any API? And even if it provides, since the executors number is dynamic, how will flink manage the merge sort of list?
@jordanhasnolife5163
@jordanhasnolife5163 Ай бұрын
For the most part when I say stuff like this it's generally inaccurate, I guess the point is that for example kafka consumer groups are using zookeeper, you can use a "spark streaming like technology that registers itself to zookeeper to support partitioning"
@ankur20010
@ankur20010 Ай бұрын
@jordanhasnolife5163 just out of curiosity, is it required in an interview to name such a technology? Or just mentioning the required properties of such service is sufficient?
@jordanhasnolife5163
@jordanhasnolife5163 Ай бұрын
@@ankur20010 I imagine that depends on your interviewer and the level of role that you're going for, but I think having an idea of both is fairly important
@andrewcisternino8271
@andrewcisternino8271 6 ай бұрын
Hi Jordan... So I'm under the impression the time series db is storing top k per hr on each chunk. Yet, your slide for consumption via leaderboard service has us using a hashmap to aggregate from 3 separate hrs. Wouldn't the consumption be same process as 'Global top k' utilizing the 'merge sorted lists' leetcode problem since we are pulling 3 separate top k lists (1 per hr)? I guess I'm also wondering the format TSDB is storing data. Thanks for videos.
@jordanhasnolife5163
@jordanhasnolife5163 6 ай бұрын
This is a bit different here because the top K from those hour intervals likely overlap so you'll need to use a hashmap to merge the total counts together first. Then from there, nothing is sorted, so we'll just have to use a heap of size k and loop over the events to figure out the new top k in the 3 hour interval.
@andrewcisternino8271
@andrewcisternino8271 6 ай бұрын
@@jordanhasnolife5163 This makes sense. Thanks.
@andreybraslavskiy522
@andreybraslavskiy522 7 ай бұрын
Hi Jordan, thank you for the great video. Can you please help me to understand how sketch min and spark jobs works together? Do we use sketch min only for the current not finished hour and as soon as spark job finish it overwrites sketch min result as more precise? Do we reset sketch min every hour ? If user query for the last 3 hour, does it mean that spark job results always has available for previous hours and we don’t need result from sketch min at all ? Actually I can’t get the scenario when data from sketch min is used, should it be combined with spark results somehow ?
@jordanhasnolife5163
@jordanhasnolife5163 7 ай бұрын
I'd think which implementation/how you combine them comes down to the application itself. It's your choice whether this is a worthy enough problem to even run a spark job for. If it is, it depends how often you run it. I think having a count min sketch available just makes the hour by hour computation a bit cheaper compared to something like flink where we actually would perform the counts on a per hour basis. I don't think you'd really ever combine the results from spark, if you have the spark results, you have the real answer.
@andreybraslavskiy522
@andreybraslavskiy522 7 ай бұрын
@@jordanhasnolife5163 Thank you!
@Anonymous-ym6st
@Anonymous-ym6st 8 ай бұрын
Thanks for the great contents as always! One quick question (I might miss from the video): why we don't store more aggregated data (more than the top k for every hour, but the full data) in the approximation + fast solution? It won't be more data than the accurate one right? And it won't sacrifice the accuracy? Or we mean the trade-off of fast read is not when we pre-calculate, but when we query for, say that 3 hour, sum them up more than the every hour's top k would be the expensive?
@jordanhasnolife5163
@jordanhasnolife5163 8 ай бұрын
Having a little bit of trouble understanding you but: 1) That's a ton of data to store 2) Now to calculate the top k over all of those windows I need to aggregate the counts for every event, which is slow!
@tunepa4418
@tunepa4418 7 ай бұрын
Thanks for the video. How do we get top k count from count min sketch, I assume count min sketch is just for counting not to get top k
@jordanhasnolife5163
@jordanhasnolife5163 7 ай бұрын
In theory you can maintain an in memory heap of size k with the counts of the top k elements as you compute the count of each while using the count min sketch
@rogermarin1712
@rogermarin1712 6 ай бұрын
Why do we need flink? Can flink be replaced by another spark streaming job?
@jordanhasnolife5163
@jordanhasnolife5163 6 ай бұрын
Sure
@maxvettel7337
@maxvettel7337 6 ай бұрын
I am not sure but does this question overlap with Twitter Search? You have such video in previous season and func requirements look very similar
@jordanhasnolife5163
@jordanhasnolife5163 6 ай бұрын
Probably, which is why I didn't make it in 2.0. Twitter search also does a lot of focusing on the search engine piece too.
@maxvettel7337
@maxvettel7337 6 ай бұрын
​@@jordanhasnolife5163 I think I finally got the difference. In Twitter Search problem we search entities by term and get the result in chronological order. In Top K problem we get the result by popularity. That's why here we can't use ElasticSearch. Is my logic correct?
@tunepa4418
@tunepa4418 6 ай бұрын
Thanks for the video! For a problem like top k Spotify songs in the last 24 hours which approach is the best? I am assuming the batch processing path is enough as we can get accurate counts without the granularity requirement of last 5 minutes or seconds (which may require us to use flink or Count Min Sketch)
@tunepa4418
@tunepa4418 6 ай бұрын
Just to clarify please, how would I approach this. Let's say the leaderboard will be updated every 24hrs and the current time is 12noon. To update the leaderboard at 12 noon tomorrow, the MR job would be run after 12 noon tomorrow (since we need the data in the 24hr window) which means there will be a bit of delay to get the leaderboard result but it will be accurate. Another approach is to pre calculate the top songs for every 1 hr window and aggregate that to get the 24hr window but that will not be exactly accurate too but faster than the first suggestion. What do you think?
@jordanhasnolife5163
@jordanhasnolife5163 6 ай бұрын
I think that for the purposes of spotify, running this on a batch job is probably fine. Not like getting this data is too latency sensitive for any musician!
@tunepa4418
@tunepa4418 6 ай бұрын
@@jordanhasnolife5163 that makes sense. I think precise path will be the best here as users don’t really expect real time update of the top playlists
@harshbhatt_
@harshbhatt_ 9 ай бұрын
Hi Jordan, love the content! Quick question - how would this solution change if we need to find the top K for each user? Ex: Rather than finding the top 10 songs globally in the past 24 hours; we want to find the top 10 songs for each user in past 1day/7days/etc? (No complicated ML model; just based on counts of each song played)
@jordanhasnolife5163
@jordanhasnolife5163 9 ай бұрын
Makes sense! This probably becomes a bit easier then actually in the sense that you could potentially store all of this data on a single node per user. Basically, we can sink all data to a time series database, partitioning it by user id and ordered by timestamp. Then we can just do the exact computation, since we're hitting just one node at a time.
@saber3112
@saber3112 9 ай бұрын
we can ue flink's windowing capabilities to create time windows for the specified time ranges (e.g., 1 day, 7 days). Aggregate the data within each window to compute the count of each song played by each user. Now for each window and each user, compute the top N songs based on their play count.
@gabbah79
@gabbah79 9 ай бұрын
Love the content. Just which you would use more legible font, it really slows down my reading. 😢
@jordanhasnolife5163
@jordanhasnolife5163 9 ай бұрын
Sorry will try to work on this and make it a bit more clear
@Henry-wq7ki
@Henry-wq7ki 9 ай бұрын
Why count min sketch server in addition to global top k in the time series DB? They are redundant right ?
@jordanhasnolife5163
@jordanhasnolife5163 9 ай бұрын
In theory the count min sketch server would be able to expose metrics within a shorter period of time, albeit with less accuracy than the global top k
@monihazarika5628
@monihazarika5628 9 ай бұрын
I really liked the content and the way technical concepts around building approx vs real time leaderboard considerations were made. I am looking for some insights on how the fantasy gaming platform like Dream11 works and it will be a great video for your india fans Jordan !!
@jordanhasnolife5163
@jordanhasnolife5163 9 ай бұрын
Thanks for the kind words! I've never heard of dream11 so I'd have to do some research there!
@MrBrown78
@MrBrown78 9 ай бұрын
Got an interview today, been spamming your videos and they've been really helpful! Hopefully I can perform tho XD
@jordanhasnolife5163
@jordanhasnolife5163 9 ай бұрын
Best of luck! Make sure to pop a Viagra before
@MrBrown78
@MrBrown78 9 ай бұрын
@@jordanhasnolife5163 Popped them shits like they were fruit gummies but ended up getting rejected. Man fuck, time to drink the pain away🥳
@jordanhasnolife5163
@jordanhasnolife5163 9 ай бұрын
@@MrBrown78 lol no worries, what was the problem and how do you feel it could have gone better
@MrBrown78
@MrBrown78 9 ай бұрын
​@@jordanhasnolife5163 Asked me to design a system to allow employees within an organization to connect with those they normally wouldn't contact. I aimed for a simple messaging platform because I didn't really understand what they were looking for and they said it was supposed to be built with a timeline of 2 weeks. I approached it same way you designed whatsapp, but I think I forgot some design choices during it.
@rakeshvarma8091
@rakeshvarma8091 7 ай бұрын
As usual, great video Jordan.
@wenqingliu4275
@wenqingliu4275 4 ай бұрын
can't we randomly throw away 90% of the events and count?
@jordanhasnolife5163
@jordanhasnolife5163 4 ай бұрын
You could, that's sub sampling. If you want to be exact, obviously that won't work.
@harshitnarang6354
@harshitnarang6354 9 ай бұрын
Chad content, thanks Jordan ! Will it be okay for you to share the notes ?
@jordanhasnolife5163
@jordanhasnolife5163 9 ай бұрын
Thanks Chad! I'll share them, but planning to do so in batch in a few weeks. I'm out of the country at the moment so I don't have much time to export them.
@SunilKumar-qh2ln
@SunilKumar-qh2ln 9 ай бұрын
finally this problem, Giga Chad of system design for a reason
@jordanhasnolife5163
@jordanhasnolife5163 9 ай бұрын
With great power (a 3 inch schlong) comes great responsibility
@oakvillian5
@oakvillian5 9 ай бұрын
Bro couldn't stay away from amsterdam
@jordanhasnolife5163
@jordanhasnolife5163 9 ай бұрын
Bro had to go back for his top two hobbies
@aa-kj5xi
@aa-kj5xi 9 ай бұрын
any plans to start a merch shop for feet pics, farts, or bath water?
@jordanhasnolife5163
@jordanhasnolife5163 9 ай бұрын
Yeah thinking bath water may scale nicely. Any thoughts on pricing? Currently going for $10, and $20 for my post lift bath
@Hangar1318
@Hangar1318 6 ай бұрын
If it doesn't scale, you could always shart or fartition.
@TheMdaliazhar
@TheMdaliazhar 5 ай бұрын
Please use better diagram tool. yours is unreadable. and you are already doing so much of work. if you can share final system design diagram link that will be a great help. Thanks.
@jordanhasnolife5163
@jordanhasnolife5163 5 ай бұрын
Check the channel description
@raxcoins
@raxcoins 3 ай бұрын
nice
@explorer9004
@explorer9004 9 ай бұрын
let's get to Google bros
@jordanhasnolife5163
@jordanhasnolife5163 9 ай бұрын
Best of luck my friend - just remember don't harp on one dream company and take the best offer you get :)
@mohitjn91
@mohitjn91 17 күн бұрын
Design has some inconsistencies, does not look rock solid. Following are the observations 1) In section Local top K, Global top K: using min heap in individual nodes can only provide top k elements at that given time. However you mentioned that any arbitrary time range can be given to fetch top K elements in that time. I think your design fails a that step 2) In section Windowed Top K, How To Read: you are saying that we can fetch top k elements across different hourly partitions and then merge them but this step is also wrong. For example consider this scenario for fetching top 2 elements in last 2 hrs [T-1 : (A: 10, B:7, C:6, D:2, E:1), T : (A:1, B: 2, C:7, D: 8, E:9)]. The answer should contain C : 13 but your solution will only merge (A:10, B:7) & (D:8, E:9)
@jordanhasnolife5163
@jordanhasnolife5163 13 күн бұрын
1) For any given time, we have to use batch processing, it is inevitable. The Flink stuff is all approximate. 2) Once again, this is approximate. The only way to get exact results is by doing a batch computation over all events.
@AP-eh6gr
@AP-eh6gr 8 ай бұрын
red light district 15 times 😄 someones an addict, jk
@loshmiification
@loshmiification 9 ай бұрын
kzbin.info/www/bejne/pILTnIWFg994h6s There is a 1 consumer per Kafka partition limit in existence. You either are not sharding Kafka using partitions, or you cannot have more then one consumer reading there. Care to elaborate more on it? (Either on the sharding mechanism, or the consumer limit). Thank you. Great content otherwise!
@jordanhasnolife5163
@jordanhasnolife5163 9 ай бұрын
To be fair, I am using one consumer per Kafka partition
@jordanhasnolife5163
@jordanhasnolife5163 9 ай бұрын
If you're referring to state machine replication the count min sketch replicas could always read from the replica partitions
@kbman99
@kbman99 6 ай бұрын
Assuming you have a kafka topic with 1 partition, you can use 2 consumers as long as they are both set up with different consumer groups. In this case that would be what you want because you want both CMS state machines to receive the same data. In this manner, with different consumer groups, we are effectively implementing a one-to-many pub-sub.
It’s all not real
00:15
V.A. show / Магика
Рет қаралды 20 МЛН
Mom Hack for Cooking Solo with a Little One! 🍳👶
00:15
5-Minute Crafts HOUSE
Рет қаралды 23 МЛН
Don’t Choose The Wrong Box 😱
00:41
Topper Guild
Рет қаралды 62 МЛН
Quando A Diferença De Altura É Muito Grande 😲😂
00:12
Mari Maria
Рет қаралды 45 МЛН
System Design: Top K Songs on Spotify (3+ Approaches)
1:14:06
System Design Fight Club
Рет қаралды 15 М.
System Design Interview - Top K Problem (Heavy Hitters)
36:18
System Design Interview
Рет қаралды 380 М.
Google system design interview: Design Spotify (with ex-Google EM)
42:13
IGotAnOffer: Engineering
Рет қаралды 1,2 МЛН
5: Netflix + YouTube | Systems Design Interview Questions With Ex-Google SWE
45:44
It’s all not real
00:15
V.A. show / Магика
Рет қаралды 20 МЛН