Parallel table ingestion with a Spark Notebook (PySpark + Threading)

  Рет қаралды 13,055

Dustin Vannoy

Dustin Vannoy

Күн бұрын

Пікірлер: 61
@GrantDoyle-n4e
@GrantDoyle-n4e 9 ай бұрын
I was having so many issues using the other Threadpool library in a notebook, It cut my notebook runtime down by 70% but I couldn't get it to run in a databricks job. Your solution worked perfectly! Thank you so much!
@vygrys
@vygrys 8 ай бұрын
Great video tutorial. Clear explanation. Thank you.
@Druidofwind
@Druidofwind 11 ай бұрын
This is great. Have you compared pyspark.InheritableThread vs Python thread? I read that pyspark version will keep thread in sync between PVM and JVM
@saipremikak5049
@saipremikak5049 Ай бұрын
Wonderful tutorial, Thank you! This approach works effectively for running multiple tables in parallel when using spark.read and spark.write to a table. However, if the process involves reading with spark.read and then merging the data into a table based on a condition, one thread interferes with another, leading to thread failure. Is there any workaround for this?
@VishalMishra-d3f
@VishalMishra-d3f Ай бұрын
nice observation . i am also facing this issue. can u figure out soln ? how do u know "merging the data into a table based on a condition" is issue?
@DustinVannoy
@DustinVannoy 22 күн бұрын
I don't think I follow. Is there a code example you can send along? For Databricks I sometimes just set this up as separate parallel workflow tasks but you may be describing other challenges. If there is error message you encounter please share.
@shamalsamal5461
@shamalsamal5461 5 ай бұрын
thanks so much for your help
@maoraharon3201
@maoraharon3201 4 ай бұрын
Hey, Great video! Small question, Why not just using the FAIR scheduler that doing that automatically?
@DustinVannoy
@DustinVannoy Ай бұрын
@@maoraharon3201 on Databricks you can now submit multiple tasks in parallel from a workflow/job which is my preferred approach in many cases.
@chrishassan8766
@chrishassan8766 6 ай бұрын
Hi Dustin, Thank you for sharing this approach I am going to use it for training spark ml models. I had a question on using daemon option. My understanding is that these threads will never terminate until a script ends. When do they in this example? Do they terminate at the end of the cell? Or after .join()? So when all items in the queue have completed. I really appreciate any explanation you provide.
@haoyuwang5995
@haoyuwang5995 Жыл бұрын
hey quick question, if I have a toxic permission df gets rows of combination of permission. Is there a way to parallel run JOIN, using each of the permissions to filter the users who have all those toxic permissions and perform joins on that user dataframe in a pySpark native way. Right now, I can multi threading to run the function, but when I try something like map and foreach, it gives me a error. any idea how should I achieve this? open to any ideas.
@christoffermunch7983
@christoffermunch7983 2 жыл бұрын
Great video. It would be great if you could also explain how to do error handling for each of the threads.
@DustinVannoy
@DustinVannoy 2 жыл бұрын
Thanks for the feedback, will put that on my to do list.
@DustinVannoy
@DustinVannoy Жыл бұрын
I have addressed your question in this video: kzbin.info/www/bejne/pYCpo4ija692Y5I
@Jolu140
@Jolu140 6 ай бұрын
Hi thanks for the informative video! I have a question, instead of sending a list to the notebook, I send a single table to the notebook using a for each activity (synapse can do maximum 50 concurrent iterations). What would the difference be? Which would be more efficient? And what is best practice in this case? Thanks in advance!
@sujitunim
@sujitunim 2 жыл бұрын
Thanks for sharing 👍
@shubhamsannyasi794
@shubhamsannyasi794 2 жыл бұрын
We have to process 800 files who insert data into 800 seperate delta tables. We used threadpool executor. Using the threadpool we call a child notebook that processes one single file. In production the code became driver intensive and utilization of executors remained low. After investigation we realised python multithreading was driver intensive and we were not able to leverage resources of multiple available workers. Is there an alternative to multithreading we can use in pyspark that passes the load to workers?
@DustinVannoy
@DustinVannoy Жыл бұрын
In my tests it is using the workers as long as you use spark.read and spark.write, but it does keep threads going on the driver. I wouldn't make the worker count too high and maybe scale up the driver (if using Databricks). You could submit each as a separate job if using Databricks but will still put load on the driver and 800 is a lot. Either way you will need to throttle (not kick off 800 at once). However, if all files have the same schema you may be able to read all together by using a regular expression for the path and then partitionBy in Spark to save under separate folders. It would be registered as a single delta table though so may not fit your requirements.
@AgusParrotta
@AgusParrotta Жыл бұрын
Great video! Congrats! I have a couple of questions The multi threading is being carried out only by the driver, right? If I want to improve, for example, API calls, I assume that the best idea is to use Multi Threading on each core (using a udf) instead of this approach. What do you think?
@DustinVannoy
@DustinVannoy Жыл бұрын
The driver would manage threads, any Spark actions should run on executors. So you don’t want too many threads started for the cores available on the driver.
@SreenathaReddyKR
@SreenathaReddyKR Жыл бұрын
100th like given by me. Thank you for insightful tutorial.
@DustinVannoy
@DustinVannoy Жыл бұрын
Wow, 100 likes? Thanks for the heads up and for being number 100. Very humbled that this content is useful for people.
@JasonZhang-se2jo
@JasonZhang-se2jo Жыл бұрын
Thank you for your Video , which is very informative and helpful in your demo code , you are using the multithreading to do the job , would you help to advise whether we could use the multiprocessing at the same time to further invoke the concurrent jobs one more question is that , how to fine tuning what is the ideal concurrent numbers , does that depends on the cores of the driver node , or the utilization of the worker nodes
@DustinVannoy
@DustinVannoy Жыл бұрын
How many workers varies but you do want to be sure not to overload the driver. If using Databricks, you have some easier options of defining workflows that run steps in parallel either on the same or separate job clusters. This type of approach was really most helpful when everything had to be driven from a single notebook or when working in Synapse when it didn't let you share the driver/workers across different notebooks.
@JasonZhang-se2jo
@JasonZhang-se2jo Жыл бұрын
@@DustinVannoy Thank you for your help on this , you are true : 1. we could enable multiple notebook concurrent run with the same cluster , by enabling several instances of notebook runs at the same time, which will simulate the concurrent run with the same cluster; do you have any idea or experience of how to wrap up a for loop with several notebooks , I am not quite whether it will be a good idea to enable a workflow in databricks , such as : wrapping up the other notebook runs with the parameters from the instances of the for loop. My use case is that we want to enable the every instance within a for loop to be running at the same time, so that we could finish our model running as soon as possible. Thank you for addressing the overloading of the driver node , we have been experiencing this recently, do you have any idea of this how to mitigate this problem , except for increasing the size of the driver nodes Thank you for your help again , it would be appreciated if you could introduce and sharing some videos , which will demonstrate how to analyze ad optimize the complicated DAGs /execution plans in the sql tab of the spark UI. if you have any paid courses , pls let me know , I am keen to learn more from you , thank you
@monicawtavares
@monicawtavares 2 жыл бұрын
Hi Dustin, thanks for this incredible explanation. I'm using Synapse and I have twenty-five parquet files in an Azure container that i need to copy every day to another container. This process took around 9h and i need to parallelize it. Can i use this methodology to improve performance? thanks
@DustinVannoy
@DustinVannoy Жыл бұрын
It probably can help. Are you copying files using mssparkutils or using spark.read? Are all of the files part of the same spark table (same schema and directory)? If using spark.read and it is all the same table, then you would not scale this way but instead want to add more executors or look at the execution plan to see where the slowness is.
@monicawtavares
@monicawtavares Жыл бұрын
@@DustinVannoy I'm using mssparkutils and all of the files are part of the same spark tables.
@DustinVannoy
@DustinVannoy Жыл бұрын
If you are running a recursive copy I am not sure if that is happening in parallel already but I think it is. If calling the cp command once per file, you may try comparing a recursive copy to using concurrency code like I show in the video. Example of recursive copy... mssparkutils.fs.cp("wasbs://nyctlc@azureopendatastorage.blob.core.windows.net/yellow/puYear=2019", "/nyctaxi_raw/parquet/yellow/puYear=2019/", recurse=True)
@sayedmohammad2394
@sayedmohammad2394 2 жыл бұрын
could you please share q syntax if load_table function would take more than 1 parameter like load_table(x,y)
@DustinVannoy
@DustinVannoy Жыл бұрын
Watch his video for the answer: kzbin.info/www/bejne/pYCpo4ija692Y5I
@saeedp62
@saeedp62 Жыл бұрын
Thanks Dustin for the video, one question I have is how to speed up the read/write process when the table is very big!? Is there a difference when the number of writers (creating multiple files) are increased compared to writing to a single parquet/delta file?
@DustinVannoy
@DustinVannoy Жыл бұрын
It's a separate topic from the parallel run of Spark notebooks. What you expect to see for larger table writes is multiple tasks that are writing the data. Usually the more you can avoid shuffle/exchange and spill, the better off you will be. I have some videos on using the Spark UI but others go more in depth on optimizations. Maybe in the future I will cover more about that here.
@ManavMishra-by6ls
@ManavMishra-by6ls Ай бұрын
@@DustinVannoy Thanks @DustinVannoy, have you made any content on this yet?
@rohitSingh-ow5st
@rohitSingh-ow5st Жыл бұрын
why are you making the thread as daemon thread? is there any specific reason behind that?
@DustinVannoy
@DustinVannoy Жыл бұрын
I don't have a specific reason for setting that. I don't recall if I tried it multiple ways.
@rohitSingh-ow5st
@rohitSingh-ow5st Жыл бұрын
@DustinVannoy Thank you for your time and for making such a good and informative video, I was lucky that I found this one while looking for my answers. I was just curious because of the behavior of a daemon thread and in our case ingestion is a critical process and we would like to exit only when all the processing is completed. So a non-daemon thread would be more apt in this scenario I think.
@willweatherley4411
@willweatherley4411 9 ай бұрын
Will this work if you read in a file, do some minor transformations and then save to ADLS? Would it work if we add in transformations basically?
@DustinVannoy
@DustinVannoy 8 ай бұрын
Yes. If the transformations are different per source table you may want to provide the correct transformation function as an argument also. Or have something like a dictionary that maps source table to transformation logic.
@linuxzinho7620
@linuxzinho7620 Жыл бұрын
Awesome !!!!! One question, you are using threads, threads in python are concurrent and not parallel, when the thread start a spark processing and goes to another table to start another thread, the spark still running the tasks of threads that was blocked ? and do you know if there is any way to do this with process to run completelly in parallel ?
@DustinVannoy
@DustinVannoy Жыл бұрын
It is concurrent.
@DustinVannoy
@DustinVannoy Жыл бұрын
Spark tasks will keep processing but they are sharing a cluster and driver with this pattern. You could use Databricks Workflows or Azure Data Factory / Synapse Pipelines to kick off notebooks in parallel each on their own cluster. You can create single-node environment or fairly small clusters in Databricks. In Synapse the pool allocates quite a bit of resources to the driver which can be overkill for a small job.
@linuxzinho7620
@linuxzinho7620 Жыл бұрын
Thanks for answering this, i'm using synapse and really the driver is a bit heavy to start more drivers to run simple tasks, i have a ETL of hundreds of little tables daily and i'm testing your thread technique to load more tables in parallel and it works very well ! the etl is running 30-50% more fast with this approach. i only was thinking about use multi processing python lib to start process inside the driver to run the threads in parallel but this souds like weird to do in a notebook and can compromisse the driver (maybe i will try to curiosity). thanks for answer me, i really appreciate your job, you helped-me a lot, and sorry for my english, i'm from brazil
@linuxzinho7620
@linuxzinho7620 Жыл бұрын
threads fit well the situation because the driver only need to send the tasks to executors, anyway the thread need to wait the end of task and another thread can send your requests... So concurrent threads in this case is a good aproach, thanks for the video
@SuperLano98
@SuperLano98 2 жыл бұрын
And if in the for loop we add pools, something like this with one different pool for each list item: spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1") We will achieve the same result as using threads ?
@DustinVannoy
@DustinVannoy 2 жыл бұрын
I have not tried changing the pool that way so I am not sure. I expect the calls are still blocking / sequential if you don't use threads.
@gardnmi
@gardnmi Жыл бұрын
I had a project where I needed to scrape over 30,000 json files that took around 3 hours each which weren't spark friendly and required a json streaming library. I ended up using the concurrent.futures module and the ThreadPoolExecuter class which is available in python 3.2 and above which makes threading a bit easier. Still ran into resource and memory issues that caused me headaches but its a good starting point. Tried a bit to hack the spark cluster into running the code currently but never managed to get it working.
@DustinVannoy
@DustinVannoy Жыл бұрын
For that many files I would instead look for a way to save files with a common schema and read as one big table so Spark can work more naturally at parallelizing the work. Ideally they could be compacted into larger newline delimited files though. There is a function to get source file name so you could add a column that tracks source file and use that when storing the output. Not sure if that works for your use case though.
@Sundar25
@Sundar25 5 ай бұрын
Run driver program using multithreads using this as well. from threading import * # import threading from time import * # for demonstration we have added time module workerCount = 3 # number to control the program using threads def display(tablename): # function to read & load tables from X schema to Y Schema try: #spark.table(f'{tablename}').write.format('delta').mode('overwrite').saveAsTable(f'{tablename}'+'target') print(f'Data Copy from {tablename} -----To----- {tablename}_target is completed.') except : print("Data Copy Failed.") sleep(3) list = ['Table1','Table2','Table3','Table4','Table5', 'Table3', 'Table7', 'Table8'] # list of tables to process tablesPair = zip(list,list) # 1st list used for creating object & 2nd list used as table name & thread name counter = 0 for obj,value in tablesPair: obj = Thread(target=display, args=(value,), name=value) # creating Thread obj.start() # Starting Thread counter += 1 if counter % workerCount == 0: obj.join() # Hold untill 3rd Thread completes counter = 0
@wangwu9299
@wangwu9299 Жыл бұрын
Is it running on all executors. Or only on driver.
@DustinVannoy
@DustinVannoy Жыл бұрын
When I have done this and used spark.read and spark.write, it is using executors. Smaller tables tend to use 1 task each which means only one executor is used for a single table, but it should not be restricted to that. I have seen it use multiple tasks and executors for a single table in some Spark environments I have tried this on.
@arshadshaik3020
@arshadshaik3020 2 жыл бұрын
Are all these threads created on the driver node? Or will it simultaneously use both driver and executor nodes for processing. I remember once trying to use multithreading, but found all the threads were being created and processed on driver node which was creating a bottleneck on driver node. What would be a way to utilise both driver and executor nodes while creating threads?
@DustinVannoy
@DustinVannoy 2 жыл бұрын
My understanding is the driver is hosting each thread but it the calls to spark (like spark.read) are making use of the executors. So running with 4 threads is similar to submitting 4 separate spark jobs on Databricks. A YARN runtime like what Azure Synapse uses it is actually putting more work on the driver than if you ran 4 separate Spark jobs but that can be a good thing if your driver is more powerful than you need for a single table.
@arshadshaik3020
@arshadshaik3020 2 жыл бұрын
@@DustinVannoy Thank you for the reply. I am slowly trying to understand how multithreading is working in spark cluster. Looks like all my threads are getting created in driver node and as you say if I use spark dataframe methods inside it, work is being distrubuted across the executor nodes. However, I was wondering what would be a good way to distribute threads creation across different nodes. Would appreciate any help/suggestion on this. Right now I am trying to understand RDDs and experiment a little to see if using UDFs makes the process of thread creation distributed across different nodes
@DustinVannoy
@DustinVannoy 2 жыл бұрын
@@arshadshaik3020 why do you want thread creation distributed? And are you running on Databricks? You are correct that work done as a UDF or within a map task will be distributed across executors rather than running on the driver. If you use DataFrames and stick to built in functions then Spark typically optimizes execution better, but that does not cover all custom transformation use cases.
@arshadshaik3020
@arshadshaik3020 2 жыл бұрын
@@DustinVannoy I was actually making thousands of calls to an api server, which seems to be throttling me as the requests were coming from single driver node i.p. I want to distribute it because each node will have different i.p., this way I will be able to make more calls without getting throttled from server. I found few more things in python requests library where I can provide i.p. pools and also back off parameters to overcome this. I am just trying out all the different possibilities
@gardnmi
@gardnmi Жыл бұрын
@@arshadshaik3020 There is a project called Sparkler that is a web crawler built on spark.
@nikunjkakadiya2011
@nikunjkakadiya2011 2 жыл бұрын
Something similar to this which we can do in scala.
@DustinVannoy
@DustinVannoy 2 жыл бұрын
Databricks shares a way of doing it that is same concepts but they use the run command to kick off a separate notebook instead of just calling a function. If you download the notebook archive it contains Scala code that uses futures to achieve this. Would it be helpful for me to do a video covering a Scala version? docs.databricks.com/notebooks/notebook-workflows.html#run-multiple-notebooks-concurrently
@DustinVannoy
@DustinVannoy Жыл бұрын
I talked through the Databricks example in this video: kzbin.info/www/bejne/pYCpo4ija692Y5I
Parallel Load in Spark Notebook - Questions Answered
30:23
Dustin Vannoy
Рет қаралды 2,3 М.
Spark Monitoring: Basics
11:26
Dustin Vannoy
Рет қаралды 13 М.
When mom gets home, but you're in rollerblades.
00:40
Daniel LaBelle
Рет қаралды 78 МЛН
Wait for the last one 🤣🤣 #shorts #minecraft
00:28
Cosmo Guy
Рет қаралды 15 МЛН
Human vs Jet Engine
00:19
MrBeast
Рет қаралды 155 МЛН
PandasUDFs: One Weird Trick to Scaled Ensembles
37:32
Databricks
Рет қаралды 1 М.
Optimize read from Relational Databases using Spark
34:53
The Big Data Show
Рет қаралды 4,6 М.
Advancing Spark - Understanding the Spark UI
30:19
Advancing Analytics
Рет қаралды 53 М.
Introduction to PySpark using AWS & Databricks
53:42
Abdul Zedan
Рет қаралды 46 М.
Apache Spark / PySpark Tutorial: Basics In 15 Mins
17:16
Greg Hogg
Рет қаралды 150 М.
threading vs multiprocessing in python
22:31
Dave's Space
Рет қаралды 587 М.
When mom gets home, but you're in rollerblades.
00:40
Daniel LaBelle
Рет қаралды 78 МЛН