Advancing Spark - Delta Merging with Structured Streaming Data

  Рет қаралды 19,458

Advancing Analytics

Advancing Analytics

Күн бұрын

Пікірлер: 59
@taikoktsui_sithlord
@taikoktsui_sithlord Жыл бұрын
newbie data engineer here, just the thing I needed for this task I'm assigned!
@jonathanbergenblom9888
@jonathanbergenblom9888 4 жыл бұрын
You’re targeting everything I could ever ask for. Best azure/dataengineer topic channel out there.
@AdvancingAnalytics
@AdvancingAnalytics 4 жыл бұрын
Wahey! Glad to hear it's hitting the mark - hit us up if there are any topics we should add to the backlog! Simon
@NeumsFor9
@NeumsFor9 3 жыл бұрын
Mohit Batra's Pluralsight course on this subject is a great complement to this video. Good work. Whatley + Batra = All Bases Covered :)
@alifrad
@alifrad 2 жыл бұрын
very happy that I found your great channel... very educational and... thank you Advancing Analytics
@mnseshan
@mnseshan 3 жыл бұрын
Brilliant video my friend. Necessary & sufficient ..👍🙏
@AdvancingAnalytics
@AdvancingAnalytics 3 жыл бұрын
Thanks
@AvrtblrBM
@AvrtblrBM Жыл бұрын
Awesome work. Keep going 👍
@richardgriffiths3277
@richardgriffiths3277 3 жыл бұрын
Awesome. Something was nagging me all this week after a hack in work to get data streaming into our lake, for all append data great... but we had one regular batch process from a SQL db. I thought "well everything listening to this now further downstream in the lake can't take advantage of the self-producing watermarks of delta streams...there must be a way" This is the way! Thank you Basically one off historical loads use batch to get it in to delta, ongoing batches to keep that fluid stream to the next process, use this :)
@zhakunigor
@zhakunigor 2 жыл бұрын
Finally I got the solution for deduplication in streaming!! Omg! Thanks!
@AdvancingAnalytics
@AdvancingAnalytics 2 жыл бұрын
Haha, no problem! Glad it helped!
@anandladda
@anandladda 2 жыл бұрын
Hi Simon - Great videos & helpful content in understanding core spark and Db concepts. Do you maintain a repo someplace of the various notebooks you use in your videos?
@nikolaychalkanov896
@nikolaychalkanov896 Жыл бұрын
Learning a lot from your channel Simon. Thank you! Is there a way we can further improve the merge into the foreachBatch func - like partition pruning towards destination write?
@fb-gu2er
@fb-gu2er 3 жыл бұрын
I feel the deduplication might cause issues. You should only de duplicate on the primary key alone, not both, primary and change key, because you may end up writing to the same target row and delta will throw an exception. I know it’s a simple example. It’s best in production to append a timestamp to every row and deduplicate only by the primary key by selecting the most recent row with the largest timestamp
@AdvancingAnalytics
@AdvancingAnalytics 3 жыл бұрын
Aha, yep, that's very true. Our standard pattern, if we de-dupe at all, just uses the Primary Key. At the time I made this video, a client specifically had issues with conflicting updates and the received file was the only timestamp - therefore we couldn't determine a "winner" and they wanted the update to fail. Lazy example, just to demonstrate what can be done inside the microbatching function :) Simon
@koleaby4
@koleaby4 3 жыл бұрын
a very useful approach - thanks for sharing 👍 One issue I am facing in our systems is merge conflicts - when there are several different updates in the source, which are targeting the same record in the target table. Have you been in a similar situation? If so - how did you approach it? An idea for future topics (something I found very limited documentation for) - joining streaming DFs. * How to join 2 streaming DFs before writing them into sink? * Is it possible to aggregate one of the streams before joining the two? * etc. Thanks again
@AdvancingAnalytics
@AdvancingAnalytics 3 жыл бұрын
Yep, that scenario comes up fairly often with engineered merges. You need to remove the duplicate matches somehow - either by changing the match key, or removing them from the incoming stream - as long as you put the dedupe logic (dropduplicates or a target filter, depending on requirements) inside the forEachBatch function you'll be fine!
@koleaby4
@koleaby4 3 жыл бұрын
@@AdvancingAnalytics yes, that's our current approach - thanks for validating my thought process 👍🏼
@danielguillermopericosanch9103
@danielguillermopericosanch9103 3 жыл бұрын
I'm curious about the print statement that you added in the foreachBatch function. I don't see it anywhere... 🤔. Thank you Simon!
@AdvancingAnalytics
@AdvancingAnalytics 3 жыл бұрын
Yeah, you don't see the output of that nested function, that was an oversight when I put the demo together. But you can output the statement to the spark logs, or call a logging function to pass the data back to log analytics etc! Would be nice if any cell output fed directly into the streaming microbatch outputs...
@saeedrahman8362
@saeedrahman8362 3 жыл бұрын
This is very helpful, thanks. But one issue I found is that I can't save the delta with parttiion if we are using foreachBatch, so how can we make sure that the data is still partitioned when saved.
@mannammanojkumar2822
@mannammanojkumar2822 3 жыл бұрын
Hi, excellent presentation. if we create multiple dataframes beween and read and write streams, is replication will continuously reflect in data frames as well?.
@AdvancingAnalytics
@AdvancingAnalytics 3 жыл бұрын
You can join together different streaming/non-streaming dataframes but there are complexities around state. Each time you call an action on a streaming dataframe (ie: if you wanted multiple writes), each will create it's own streaming query and trigger as specified
@stvv5546
@stvv5546 3 жыл бұрын
Hey there, been following for months now, this is indeed the best channel on all things spark/databricks/delta lake for sure! Many thanks for providing the knowledge! Appreciate it! I got a situation in which we were extracting data from a delta table in a batch mode, saving as delta in a target table and then writing to an azure sql server database table. Right now the situation is about to change and we should start streaming from the same source table (since it will get refreshed every 3-4min or so), then save the data in the same target delta table and from there load into the same sql table we used in above batch scenario. So is it actually possible to write the results of the streaming query directly to a database like for example SQL server on Azure and will it bottleneck at the time of writing (especially when dealing with huge number of rows streamed)? Also is it possible to tell spark streaming from which point in time I want my former batch table (now a streaming source) to be streamed from since I do not want to start from scratch and stream it fully, I just want to start the stream where the old batch process has ended? (we are talking of a billion record source table here!) Many thanks!
@AdvancingAnalytics
@AdvancingAnalytics 3 жыл бұрын
Hey hey - glad the videos are hitting the spot! So - writing to the SQL DB will indeed bottleneck, we tend to find that you quickly hit DTU limits quicker than you hit databricks cluster limits, but both can be tweaked to overcome. Streaming doesn't support SQLDB as a native sink, but you can use the foreachBatch() approach to do this, it's described in this page: docs.databricks.com/spark/latest/structured-streaming/foreach.html In terms of starting points, you can provide a timestamp and/or delta transaction log version that the stream should start from, then all progress is logged in the checkpoint anyway, so it should be a pretty efficient cut over. Simon
@stvv5546
@stvv5546 3 жыл бұрын
@@AdvancingAnalytics Thank you!
@PakIslam2012
@PakIslam2012 4 жыл бұрын
Hi, Great work...Love the channels, keep doing the videos you make it so entertaining and easy to understand Can we do a video on how to determine the best and most cosft effective cluster configuration on Azure Databricks when running our Batch or Streaming job over the platform?
@Nehmaiz
@Nehmaiz 3 жыл бұрын
Thank you for this video, super informative! one question though, where do I see the output of the print statements in the mergetoDF() function?
@AdvancingAnalytics
@AdvancingAnalytics 3 жыл бұрын
They're not exposed in the notebook output directly, but you can either 1) view the outputs in the logs, or 2) call a logging function to pass the message to log analytics etc, or even just an audit delta table
@Nehmaiz
@Nehmaiz 3 жыл бұрын
@@AdvancingAnalytics Thanks, they don't show up the logs hence my question. I actually created an audit delta table as you mentioned.
@avnish.dixit_
@avnish.dixit_ 3 жыл бұрын
Very Helpful...
@marcocaviezel2672
@marcocaviezel2672 3 жыл бұрын
Hi Simon! Great video! A more general question. Do you know if there is a “when not matched by source” in Databricks? Because I want to achieve that data which isn’t in the “new data” is deleted in the original table. Unfortunately I don’t have a delete flag in the new data.
@AdvancingAnalytics
@AdvancingAnalytics 3 жыл бұрын
Hey Marco, sorry, missed this one! There isn't a "not matched by source" in the default merge criteria, which is a little annoying. If you're not maintaining history, then you could just overwrite the table rather than merging, but I'm assuming you want something more along the SCD Type 2 route. The main scenario I've seen is a little messy - You do an initial query to bring back any records in the destination that aren't in the update, append those logical deletions into the update dataframe, then handle those as one of the matched elements in your merge statement. Not the cleanest and feels "hacky", but the examples I've seen from Databricks use that pattern! Their SCD 2 example notebook is a good starting point - docs.databricks.com/_static/notebooks/merge-in-scd-type-2.html Simon
@marcocaviezel2672
@marcocaviezel2672 3 жыл бұрын
Hey Simon! Thanks a lot for this source! So far I worked with a logical delete after the merge statement, but I will try to implement your suggestion. -- Logical delete UPDATE testdb.test_data_delta SET DeleteFlag = 1 WHERE testdb.test_data_delta.Index in ( SELECT D.index FROM testdb.test_data_delta AS D LEFT JOIN upsert_data AS S ON (S.Index = D.Index) WHERE S.Index is null ) Thanks a lot for you great channel! It really helps to learn databricks :-)
@avnish.dixit_
@avnish.dixit_ 3 жыл бұрын
What if we want to modify data present in DataFrame but we don't want large amount of latency. I mean is there any other approach as well which we can use instead of ForeachBatch
@fb-gu2er
@fb-gu2er 3 жыл бұрын
You shouldn’t be updating too much in this case. The best use case for delta merges is to deduplicate, maybe add some metadata columns like timestamp, etc, and merge the results. For what you want you should do it before you get to the foreachbatch on a merge
@marellasrikanth8497
@marellasrikanth8497 4 жыл бұрын
Thanks for sharing knowledge! Can this approach applies to other targets like 'Snowflake' or Azure SQL as well?
@AdvancingAnalytics
@AdvancingAnalytics 4 жыл бұрын
Hey! So from Databricks to other services, the foreachBatch can use any valid datawriter. So yep, you can microbatch to SQLDB, Snowflake, whatever you fancy. Whether the streaming capabilities of snowflake have a similar function to spark streaming, I have no idea! :) Simon
@NORCupcake
@NORCupcake 3 жыл бұрын
If I wish to fetch data through API's using GET Request (e.g. from an accounting system). Would you create a script in Databricks that handles the ETL process on a scheduled basis, or is there any other platforms that would be more efficient and suffice better? Thank you for sharing great content!
@shanhuahuang3063
@shanhuahuang3063 Жыл бұрын
why I got errors like AttributeError: 'DataFrame' object has no attribute 'merge'?
@AdvancingAnalytics
@AdvancingAnalytics Жыл бұрын
DataFrames don't have the merge function - you need to create a DeltaTable object using DeltaTable.forPath() then run the merge command on that new object!
@shanhuahuang3063
@shanhuahuang3063 Жыл бұрын
@@AdvancingAnalytics Thanks! I suggest if you can make some vedio on how to get key vault for databricks that will be great. In my working place, setting up key vaults using dutils.getkeys() then print out something I am sure a lot of people want to know that.
@shanhuahuang3063
@shanhuahuang3063 Жыл бұрын
@@AdvancingAnalytics It is like setting up storage account string, key valut and run some set up script. Please let me know if you can make a vedio of it. I will share around as I am very interesting in it. We are using databrisk at work.
@sid0000009
@sid0000009 4 жыл бұрын
In ForeachBatch , how do we control the batch size we want it to read at a given time...like can we set the threshold in someway.. thank you
@AdvancingAnalytics
@AdvancingAnalytics 4 жыл бұрын
Same way you work with normal streaming queries! You can use the .trigger() function on the writeStream action and the maxFilesPerTrigger/maxBytesPerTrigger readStream options to tweak how the batches are structured. Simon
@sid0000009
@sid0000009 4 жыл бұрын
@@AdvancingAnalytics thanks
@giovanicamargodosanjos6659
@giovanicamargodosanjos6659 2 жыл бұрын
What if I want to pass another param to mergetoDF function?
@AdvancingAnalytics
@AdvancingAnalytics 2 жыл бұрын
Hrm, haven't tried overloading it with additional parameters - it should have context for other variables defined in the session though, without you needing to pass them in explicitly? Not great from a coding standpoint but should work :)
@kasthurisomshekar3717
@kasthurisomshekar3717 2 жыл бұрын
hey Simon ..Iwatched all the videos of your's on autoloader. it's awesome work. I need one help. I want use drop duplicate dataframe in autoloader with watermark value. can you please provide example syntax for that. it would be really helpful
@kuldipjoshi1406
@kuldipjoshi1406 3 жыл бұрын
is there any way to directly upsert data to adls gen 2 with structured streaming.?
@AdvancingAnalytics
@AdvancingAnalytics 3 жыл бұрын
In this example, the delta table is stored within ADLS Gen 2, so it's upserting via the merge clause. If you want to put the data directly to the lake without a delta table, you can write to one of the native output sinks in adls gen 2, but you'll need to be careful about the output mode (append/update/complete) depending on the aggregations you're using. The databricks streaming examples are a pretty good starting place - docs.databricks.com/spark/latest/structured-streaming/demo-notebooks.html#structured-streaming-demo-python-notebook
@kuldipjoshi1406
@kuldipjoshi1406 3 жыл бұрын
@@AdvancingAnalytics Yeh, this was really helpful. If I want to send data realtime to other downstream kafka or event hub, should i hit the final merged table or should i consider using 1)one staging table having append mode and write rows in the downstream directly and parallelly merge it into merge table OR 2)hit merged table and and write to downstream sink (with ignorechange=true and i will need to handle bunch of data as doc. says into downstram)
@sid0000009
@sid0000009 4 жыл бұрын
If we add Ignorechanges = True during the read stream would be make it more optimize? File Level CDC read before update it on the target? ( Needless to say I am big fan of your videos :))
@AdvancingAnalytics
@AdvancingAnalytics 4 жыл бұрын
Without ignore changes, you'll get errors if you ever need to re-write files in the source delta. Basically kicks out the whole updated file with unchanged and updated rows combined - it's not particularly efficient as you might get 10,000 unchanged rows with a single updated row, but yes, you would only see the files that were changed in the source table. As with most things delta, there's a lot of data re-writing going on under the hood to make things nice and simple from a code perspective!
@gauravkumar796
@gauravkumar796 4 жыл бұрын
Hi buddy, I have been following your channel and it has been very helpful with my Databricks journey in organization. Currently we are using Databricks for data ingestion and creating delta tables as part of batch jobs. Now business users want to use this data for BI and analytics .. primarily in Tableau or any interface (Might be Redash in future). My question is that if Databricks is sufficient for BI , does it provide good performance.. or do we need any other tool like Qubole or Dremio which provides a semantic later on top of ADLS data without any data ingestion. Please let me know.
@AdvancingAnalytics
@AdvancingAnalytics 4 жыл бұрын
Depends what "good performance" is - it's still a distributed compute platform, so it'll always take a couple of seconds to return data - if your users are looking for millisecond latency, they'll need some kind of cached semantic layer (either inside Tableau server, Power BI etc). If they're happy that queries may take a few seconds to return, then you can likely tweak and tune a cluster to be sufficient directly. All depends on requirements - the real benefit of Databricks is the ability to run HUGE queries over massive datasets in reasonable times :)
@bhaveshpatelaus
@bhaveshpatelaus 4 жыл бұрын
@@AdvancingAnalytics We are using the delta lake with Power BI Premium for our BI and Analytics workload and the performance is pretty good so far.
@gauravkumar796
@gauravkumar796 2 жыл бұрын
@@bhaveshpatelaus do you connect powerbi via serveless cluster or SQL endpoint?
Advancing Spark - Databricks Delta Streaming
20:07
Advancing Analytics
Рет қаралды 29 М.
Advancing Spark - Bloom Filter Indexes in Databricks Delta
24:41
Advancing Analytics
Рет қаралды 9 М.
Smart Sigma Kid #funny #sigma
00:33
CRAZY GREAPA
Рет қаралды 30 МЛН
One day.. 🙌
00:33
Celine Dept
Рет қаралды 60 МЛН
Симбу закрыли дома?! 🔒 #симба #симбочка #арти
00:41
Симбочка Пимпочка
Рет қаралды 6 МЛН
Advancing Spark - Crazy Performance with Spark 3 Adaptive Query Execution
18:48
Advancing Spark - Building Delta Live Table Frameworks
24:14
Advancing Analytics
Рет қаралды 15 М.
Advancing Spark - Databricks Delta Change Feed
17:01
Advancing Analytics
Рет қаралды 15 М.
21. Databricks| Spark Streaming
18:12
Raja's Data Engineering
Рет қаралды 37 М.
Advancing Spark - Introduction to Databricks SQL Analytics
25:39
Advancing Analytics
Рет қаралды 11 М.
Advancing Spark - Give your Delta Lake a boost with Z-Ordering
20:31
Advancing Analytics
Рет қаралды 29 М.