newbie data engineer here, just the thing I needed for this task I'm assigned!
@jonathanbergenblom98884 жыл бұрын
You’re targeting everything I could ever ask for. Best azure/dataengineer topic channel out there.
@AdvancingAnalytics4 жыл бұрын
Wahey! Glad to hear it's hitting the mark - hit us up if there are any topics we should add to the backlog! Simon
@NeumsFor93 жыл бұрын
Mohit Batra's Pluralsight course on this subject is a great complement to this video. Good work. Whatley + Batra = All Bases Covered :)
@alifrad2 жыл бұрын
very happy that I found your great channel... very educational and... thank you Advancing Analytics
@mnseshan3 жыл бұрын
Brilliant video my friend. Necessary & sufficient ..👍🙏
@AdvancingAnalytics3 жыл бұрын
Thanks
@AvrtblrBM Жыл бұрын
Awesome work. Keep going 👍
@richardgriffiths32773 жыл бұрын
Awesome. Something was nagging me all this week after a hack in work to get data streaming into our lake, for all append data great... but we had one regular batch process from a SQL db. I thought "well everything listening to this now further downstream in the lake can't take advantage of the self-producing watermarks of delta streams...there must be a way" This is the way! Thank you Basically one off historical loads use batch to get it in to delta, ongoing batches to keep that fluid stream to the next process, use this :)
@zhakunigor2 жыл бұрын
Finally I got the solution for deduplication in streaming!! Omg! Thanks!
@AdvancingAnalytics2 жыл бұрын
Haha, no problem! Glad it helped!
@anandladda2 жыл бұрын
Hi Simon - Great videos & helpful content in understanding core spark and Db concepts. Do you maintain a repo someplace of the various notebooks you use in your videos?
@nikolaychalkanov896 Жыл бұрын
Learning a lot from your channel Simon. Thank you! Is there a way we can further improve the merge into the foreachBatch func - like partition pruning towards destination write?
@fb-gu2er3 жыл бұрын
I feel the deduplication might cause issues. You should only de duplicate on the primary key alone, not both, primary and change key, because you may end up writing to the same target row and delta will throw an exception. I know it’s a simple example. It’s best in production to append a timestamp to every row and deduplicate only by the primary key by selecting the most recent row with the largest timestamp
@AdvancingAnalytics3 жыл бұрын
Aha, yep, that's very true. Our standard pattern, if we de-dupe at all, just uses the Primary Key. At the time I made this video, a client specifically had issues with conflicting updates and the received file was the only timestamp - therefore we couldn't determine a "winner" and they wanted the update to fail. Lazy example, just to demonstrate what can be done inside the microbatching function :) Simon
@koleaby43 жыл бұрын
a very useful approach - thanks for sharing 👍 One issue I am facing in our systems is merge conflicts - when there are several different updates in the source, which are targeting the same record in the target table. Have you been in a similar situation? If so - how did you approach it? An idea for future topics (something I found very limited documentation for) - joining streaming DFs. * How to join 2 streaming DFs before writing them into sink? * Is it possible to aggregate one of the streams before joining the two? * etc. Thanks again
@AdvancingAnalytics3 жыл бұрын
Yep, that scenario comes up fairly often with engineered merges. You need to remove the duplicate matches somehow - either by changing the match key, or removing them from the incoming stream - as long as you put the dedupe logic (dropduplicates or a target filter, depending on requirements) inside the forEachBatch function you'll be fine!
@koleaby43 жыл бұрын
@@AdvancingAnalytics yes, that's our current approach - thanks for validating my thought process 👍🏼
@danielguillermopericosanch91033 жыл бұрын
I'm curious about the print statement that you added in the foreachBatch function. I don't see it anywhere... 🤔. Thank you Simon!
@AdvancingAnalytics3 жыл бұрын
Yeah, you don't see the output of that nested function, that was an oversight when I put the demo together. But you can output the statement to the spark logs, or call a logging function to pass the data back to log analytics etc! Would be nice if any cell output fed directly into the streaming microbatch outputs...
@saeedrahman83623 жыл бұрын
This is very helpful, thanks. But one issue I found is that I can't save the delta with parttiion if we are using foreachBatch, so how can we make sure that the data is still partitioned when saved.
@mannammanojkumar28223 жыл бұрын
Hi, excellent presentation. if we create multiple dataframes beween and read and write streams, is replication will continuously reflect in data frames as well?.
@AdvancingAnalytics3 жыл бұрын
You can join together different streaming/non-streaming dataframes but there are complexities around state. Each time you call an action on a streaming dataframe (ie: if you wanted multiple writes), each will create it's own streaming query and trigger as specified
@stvv55463 жыл бұрын
Hey there, been following for months now, this is indeed the best channel on all things spark/databricks/delta lake for sure! Many thanks for providing the knowledge! Appreciate it! I got a situation in which we were extracting data from a delta table in a batch mode, saving as delta in a target table and then writing to an azure sql server database table. Right now the situation is about to change and we should start streaming from the same source table (since it will get refreshed every 3-4min or so), then save the data in the same target delta table and from there load into the same sql table we used in above batch scenario. So is it actually possible to write the results of the streaming query directly to a database like for example SQL server on Azure and will it bottleneck at the time of writing (especially when dealing with huge number of rows streamed)? Also is it possible to tell spark streaming from which point in time I want my former batch table (now a streaming source) to be streamed from since I do not want to start from scratch and stream it fully, I just want to start the stream where the old batch process has ended? (we are talking of a billion record source table here!) Many thanks!
@AdvancingAnalytics3 жыл бұрын
Hey hey - glad the videos are hitting the spot! So - writing to the SQL DB will indeed bottleneck, we tend to find that you quickly hit DTU limits quicker than you hit databricks cluster limits, but both can be tweaked to overcome. Streaming doesn't support SQLDB as a native sink, but you can use the foreachBatch() approach to do this, it's described in this page: docs.databricks.com/spark/latest/structured-streaming/foreach.html In terms of starting points, you can provide a timestamp and/or delta transaction log version that the stream should start from, then all progress is logged in the checkpoint anyway, so it should be a pretty efficient cut over. Simon
@stvv55463 жыл бұрын
@@AdvancingAnalytics Thank you!
@PakIslam20124 жыл бұрын
Hi, Great work...Love the channels, keep doing the videos you make it so entertaining and easy to understand Can we do a video on how to determine the best and most cosft effective cluster configuration on Azure Databricks when running our Batch or Streaming job over the platform?
@Nehmaiz3 жыл бұрын
Thank you for this video, super informative! one question though, where do I see the output of the print statements in the mergetoDF() function?
@AdvancingAnalytics3 жыл бұрын
They're not exposed in the notebook output directly, but you can either 1) view the outputs in the logs, or 2) call a logging function to pass the message to log analytics etc, or even just an audit delta table
@Nehmaiz3 жыл бұрын
@@AdvancingAnalytics Thanks, they don't show up the logs hence my question. I actually created an audit delta table as you mentioned.
@avnish.dixit_3 жыл бұрын
Very Helpful...
@marcocaviezel26723 жыл бұрын
Hi Simon! Great video! A more general question. Do you know if there is a “when not matched by source” in Databricks? Because I want to achieve that data which isn’t in the “new data” is deleted in the original table. Unfortunately I don’t have a delete flag in the new data.
@AdvancingAnalytics3 жыл бұрын
Hey Marco, sorry, missed this one! There isn't a "not matched by source" in the default merge criteria, which is a little annoying. If you're not maintaining history, then you could just overwrite the table rather than merging, but I'm assuming you want something more along the SCD Type 2 route. The main scenario I've seen is a little messy - You do an initial query to bring back any records in the destination that aren't in the update, append those logical deletions into the update dataframe, then handle those as one of the matched elements in your merge statement. Not the cleanest and feels "hacky", but the examples I've seen from Databricks use that pattern! Their SCD 2 example notebook is a good starting point - docs.databricks.com/_static/notebooks/merge-in-scd-type-2.html Simon
@marcocaviezel26723 жыл бұрын
Hey Simon! Thanks a lot for this source! So far I worked with a logical delete after the merge statement, but I will try to implement your suggestion. -- Logical delete UPDATE testdb.test_data_delta SET DeleteFlag = 1 WHERE testdb.test_data_delta.Index in ( SELECT D.index FROM testdb.test_data_delta AS D LEFT JOIN upsert_data AS S ON (S.Index = D.Index) WHERE S.Index is null ) Thanks a lot for you great channel! It really helps to learn databricks :-)
@avnish.dixit_3 жыл бұрын
What if we want to modify data present in DataFrame but we don't want large amount of latency. I mean is there any other approach as well which we can use instead of ForeachBatch
@fb-gu2er3 жыл бұрын
You shouldn’t be updating too much in this case. The best use case for delta merges is to deduplicate, maybe add some metadata columns like timestamp, etc, and merge the results. For what you want you should do it before you get to the foreachbatch on a merge
@marellasrikanth84974 жыл бұрын
Thanks for sharing knowledge! Can this approach applies to other targets like 'Snowflake' or Azure SQL as well?
@AdvancingAnalytics4 жыл бұрын
Hey! So from Databricks to other services, the foreachBatch can use any valid datawriter. So yep, you can microbatch to SQLDB, Snowflake, whatever you fancy. Whether the streaming capabilities of snowflake have a similar function to spark streaming, I have no idea! :) Simon
@NORCupcake3 жыл бұрын
If I wish to fetch data through API's using GET Request (e.g. from an accounting system). Would you create a script in Databricks that handles the ETL process on a scheduled basis, or is there any other platforms that would be more efficient and suffice better? Thank you for sharing great content!
@shanhuahuang3063 Жыл бұрын
why I got errors like AttributeError: 'DataFrame' object has no attribute 'merge'?
@AdvancingAnalytics Жыл бұрын
DataFrames don't have the merge function - you need to create a DeltaTable object using DeltaTable.forPath() then run the merge command on that new object!
@shanhuahuang3063 Жыл бұрын
@@AdvancingAnalytics Thanks! I suggest if you can make some vedio on how to get key vault for databricks that will be great. In my working place, setting up key vaults using dutils.getkeys() then print out something I am sure a lot of people want to know that.
@shanhuahuang3063 Жыл бұрын
@@AdvancingAnalytics It is like setting up storage account string, key valut and run some set up script. Please let me know if you can make a vedio of it. I will share around as I am very interesting in it. We are using databrisk at work.
@sid00000094 жыл бұрын
In ForeachBatch , how do we control the batch size we want it to read at a given time...like can we set the threshold in someway.. thank you
@AdvancingAnalytics4 жыл бұрын
Same way you work with normal streaming queries! You can use the .trigger() function on the writeStream action and the maxFilesPerTrigger/maxBytesPerTrigger readStream options to tweak how the batches are structured. Simon
@sid00000094 жыл бұрын
@@AdvancingAnalytics thanks
@giovanicamargodosanjos66592 жыл бұрын
What if I want to pass another param to mergetoDF function?
@AdvancingAnalytics2 жыл бұрын
Hrm, haven't tried overloading it with additional parameters - it should have context for other variables defined in the session though, without you needing to pass them in explicitly? Not great from a coding standpoint but should work :)
@kasthurisomshekar37172 жыл бұрын
hey Simon ..Iwatched all the videos of your's on autoloader. it's awesome work. I need one help. I want use drop duplicate dataframe in autoloader with watermark value. can you please provide example syntax for that. it would be really helpful
@kuldipjoshi14063 жыл бұрын
is there any way to directly upsert data to adls gen 2 with structured streaming.?
@AdvancingAnalytics3 жыл бұрын
In this example, the delta table is stored within ADLS Gen 2, so it's upserting via the merge clause. If you want to put the data directly to the lake without a delta table, you can write to one of the native output sinks in adls gen 2, but you'll need to be careful about the output mode (append/update/complete) depending on the aggregations you're using. The databricks streaming examples are a pretty good starting place - docs.databricks.com/spark/latest/structured-streaming/demo-notebooks.html#structured-streaming-demo-python-notebook
@kuldipjoshi14063 жыл бұрын
@@AdvancingAnalytics Yeh, this was really helpful. If I want to send data realtime to other downstream kafka or event hub, should i hit the final merged table or should i consider using 1)one staging table having append mode and write rows in the downstream directly and parallelly merge it into merge table OR 2)hit merged table and and write to downstream sink (with ignorechange=true and i will need to handle bunch of data as doc. says into downstram)
@sid00000094 жыл бұрын
If we add Ignorechanges = True during the read stream would be make it more optimize? File Level CDC read before update it on the target? ( Needless to say I am big fan of your videos :))
@AdvancingAnalytics4 жыл бұрын
Without ignore changes, you'll get errors if you ever need to re-write files in the source delta. Basically kicks out the whole updated file with unchanged and updated rows combined - it's not particularly efficient as you might get 10,000 unchanged rows with a single updated row, but yes, you would only see the files that were changed in the source table. As with most things delta, there's a lot of data re-writing going on under the hood to make things nice and simple from a code perspective!
@gauravkumar7964 жыл бұрын
Hi buddy, I have been following your channel and it has been very helpful with my Databricks journey in organization. Currently we are using Databricks for data ingestion and creating delta tables as part of batch jobs. Now business users want to use this data for BI and analytics .. primarily in Tableau or any interface (Might be Redash in future). My question is that if Databricks is sufficient for BI , does it provide good performance.. or do we need any other tool like Qubole or Dremio which provides a semantic later on top of ADLS data without any data ingestion. Please let me know.
@AdvancingAnalytics4 жыл бұрын
Depends what "good performance" is - it's still a distributed compute platform, so it'll always take a couple of seconds to return data - if your users are looking for millisecond latency, they'll need some kind of cached semantic layer (either inside Tableau server, Power BI etc). If they're happy that queries may take a few seconds to return, then you can likely tweak and tune a cluster to be sufficient directly. All depends on requirements - the real benefit of Databricks is the ability to run HUGE queries over massive datasets in reasonable times :)
@bhaveshpatelaus4 жыл бұрын
@@AdvancingAnalytics We are using the delta lake with Power BI Premium for our BI and Analytics workload and the performance is pretty good so far.
@gauravkumar7962 жыл бұрын
@@bhaveshpatelaus do you connect powerbi via serveless cluster or SQL endpoint?