112. Databricks | Pyspark| Spark Reader: Skip First N Records While Reading CSV File

  Рет қаралды 5,075

Raja's Data Engineering

Raja's Data Engineering

Күн бұрын

Пікірлер
@percyjackson1662
@percyjackson1662 Жыл бұрын
for the quiz question - we first read the whole csv and then using row_number function we will filter out the records which we need
@rajasdataengineering7585
@rajasdataengineering7585 Жыл бұрын
Super quick! Thanks for sharing approach. Yes this is one of the approach. As databricks is distributed platform, while creating row number among partitions, it can't give a guarantee that first few records get the first few row numbers. Though this approach can be used, still we can think of other approaches which will give guarantee on removing only certain range of rows
@percyjackson1662
@percyjackson1662 Жыл бұрын
@@rajasdataengineering7585 I assumed there would be some timestamp column which we can use in order by clause of window spec. my bad! 😁
@rajasdataengineering7585
@rajasdataengineering7585 Жыл бұрын
That's right approach if we have timestamp👍🏻
@abhishekrajoriya2507
@abhishekrajoriya2507 Жыл бұрын
Sir when we use this skipRows option the header is also changed to wrong header, what to do in this case??
@rajasdataengineering7585
@rajasdataengineering7585 Жыл бұрын
Good question. So, for this scenario, we need to skip records starting from 2 to 5 instead of first 4 rows. I will post a video to handle that scenario in my next video.
@abhishekrajoriya2507
@abhishekrajoriya2507 Жыл бұрын
@@rajasdataengineering7585 you are the best
@AdityaKakaraparthi
@AdityaKakaraparthi 11 ай бұрын
I am trying this for reading a csv file in my databricks notebook, but the skip is not working.
@AnuragsMusicChannel
@AnuragsMusicChannel 4 ай бұрын
Use window function row_number, then use filter to filter out row nums from 11 to 20: w=Window.orderby(lit("A")) df.withColumn("ROWNUM", row_number().over(w)).filter( (col("ROWNUM")20) ).drop("ROWNUM").show()
@AshokKumar-ji3cs
@AshokKumar-ji3cs Жыл бұрын
Thanks for the best video with very useful content
@rajasdataengineering7585
@rajasdataengineering7585 Жыл бұрын
Thanks and welcome!
@NaveenKumar-kb2fm
@NaveenKumar-kb2fm Жыл бұрын
Hi, can you implement a real time scenario of applying SCD type 2 on multiple source tables dynamically like not writing the script for each table, can you take an example and do this
@rajasdataengineering7585
@rajasdataengineering7585 Жыл бұрын
Sure, I will create a video for this requirement
@NaveenKumar-kb2fm
@NaveenKumar-kb2fm Жыл бұрын
@@rajasdataengineering7585 thank you very much, we are taking the data from on premise client database using Adf and loading it into ADLS (source to landing pipeline), then other pipeline to load from ADLS to Staging tables in synapse DWH, now here comes the SCD layer ( one to one copy from Staging table), in Source we have more than 50 tables and Staging is every day refresh and we have to load new updates and inserts of new data to SCD type 2 tables with a flag column, so we can't create a script for each table so we make it dynamically to get all the tables from Staging and do the SCD TYPE 2 on all the tables
@rajunaik8803
@rajunaik8803 Жыл бұрын
@@NaveenKumar-kb2fm you can create a metadata ingestion framework. Basically, you can capture all the metadata information of all tables(like source table name, target table name, key columns etc.) in one table called metadata table. And, build a generic stored procedure or spark SQL (merge statement to achieve SCD TYPE2 using above metadata table). Finally, you can loop through this procedure for all the 50 odd records you have it in metadata table.
@kumarvummadi3772
@kumarvummadi3772 Жыл бұрын
What is OS area in spark?What is OS controlled area in the data node? Is the off-heap memory is controlled by the executor or OS control area? If the offheap memory is controlled by os area the how the executor will utilize the off heap memory? What are the kinds of optimisations that can we do using offheap memory ? Please make a video on this concepts sir requesting you sir please sir.
@rajasdataengineering7585
@rajasdataengineering7585 Жыл бұрын
On heap memory is controlled by executor JVM, while off heap memory is controlled by os of worker node. I have already posted a video about on- heap va off-heap memory kzbin.info/www/bejne/mYXNeaKhpN1of9U
@kumarvummadi3772
@kumarvummadi3772 Жыл бұрын
@@rajasdataengineering7585 sir could you please tell me how the jvm access the objects from off-heap memory and stores the objects gets stored in on heap memory. If possible please make a video sir.
@mankaransingh981
@mankaransingh981 Жыл бұрын
Sir in this example, the header is also getting skipped. How can we keep the header and then skip the rows?
@rajasdataengineering7585
@rajasdataengineering7585 Жыл бұрын
Hi Manikaran, I have given workaround for this requirement in my next video. Pls watch video 113
@mankaransingh981
@mankaransingh981 Жыл бұрын
Sure, Thanks for the quick revert. Appreciate your work@@rajasdataengineering7585
@bhaskarjha4673
@bhaskarjha4673 Жыл бұрын
Take 10 lines of rows using head and keep it one df Take all rows after skipping 20 rows using skipRows options and keep it in other data frame Union both the dataframe to get desired result Is this approach correct raja sir?
@rajasdataengineering7585
@rajasdataengineering7585 Жыл бұрын
Hi Bhaskar, this approach is very close to solution. Good thinking! One minor issue might arise while using head command. Head command ensures taking first 10 records of dafaframe. But it is not guaranteed that first 10 records of the CSV file will be first 10 records of dataframe, as it is distributed system and csv file would have been splitted into partitions after bringing into spark environment. Thanks for sharing your approach 👍🏻
@bhaskarjha4673
@bhaskarjha4673 Жыл бұрын
Read first 10 rows using head into panda df and then convert it into spark df
@kumarvummadi3772
@kumarvummadi3772 Жыл бұрын
Thank you very much sir for doing this video. Sir I request you to make a video on one more concept I posted some questions related to that concept sir in this comment only sis. Please see it sir.
@rajasdataengineering7585
@rajasdataengineering7585 Жыл бұрын
Noted
@rmrz2225
@rmrz2225 Жыл бұрын
Hi, could you share the quiz solution pls?. Thanks
@rajasdataengineering7585
@rajasdataengineering7585 Жыл бұрын
Hi, you can find the solution in next video 113
@sureshraina321
@sureshraina321 Жыл бұрын
How to skip last few lines ?
@prabhatgupta6415
@prabhatgupta6415 Жыл бұрын
Sir today u forgot to give datasett :D
@rajasdataengineering7585
@rajasdataengineering7585 Жыл бұрын
I will add dataset in the description now
@aravind5310
@aravind5310 Жыл бұрын
from pyspark.sql.functions import monotonically_increasing_id df1=df.coalesce(1).select("*",monotonically_increasing_id().alias("pk")) df1.display() from pyspark.sql.functions import col df2=df1.filter(~col('pk').between(4,7)) df2.display()
@sabesanj5509
@sabesanj5509 Жыл бұрын
Awesome explanation raja bro... from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() df = spark.read.csv('baby_names.csv', header=True, inferSchema=True, ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True, skiprows=lambda i: i >= 11 and i
@rajasdataengineering7585
@rajasdataengineering7585 Жыл бұрын
Happy to see the approaches. Good try. But skiprows expects just an integer, it does not accept lambda function. Thanks for sharing your approach.
@sabesanj5509
@sabesanj5509 Жыл бұрын
@@rajasdataengineering7585 Thanks Raja for the reply. will the below logic be correct for the quiz?? filtered_df = df.filter((df['Id'] < 11) | (df['Id'] > 20))
@rajasdataengineering7585
@rajasdataengineering7585 Жыл бұрын
Yes this will produce required output. But in this example, we have ID column. But that is not the case with all real time scenarios. So without having any id also, we should be able to produce the expected output
@9039522806
@9039522806 Жыл бұрын
Hello..is this option skipRows only applicable to Databricks spark applications only Or it works with non Databricks based spark applications like cloudera etc?
@rajasdataengineering7585
@rajasdataengineering7585 Жыл бұрын
Hi, it works for all spark related solutions like databricks, open source spark, azure synapse spark pook etc
@SubashSubash-v2l
@SubashSubash-v2l Ай бұрын
@@rajasdataengineering7585 Hi , this is not working in open source spark.. could plz help ?
66. Databricks | Pyspark | Delta: Z-Order Command
14:16
Raja's Data Engineering
Рет қаралды 24 М.
Lamborghini vs Smoke 😱
00:38
Topper Guild
Рет қаралды 68 МЛН
Sigma Kid Mistake #funny #sigma
00:17
CRAZY GREAPA
Рет қаралды 27 МЛН
The Best Band 😅 #toshleh #viralshort
00:11
Toshleh
Рет қаралды 20 МЛН
路飞做的坏事被拆穿了 #路飞#海贼王
00:41
路飞与唐舞桐
Рет қаралды 26 МЛН
96. Databricks | Pyspark | Real Time Scenario | Schema Comparison
12:34
Raja's Data Engineering
Рет қаралды 8 М.
111. Databricks | Pyspark| SQL Coding Interview: Exchange Seats of Students
22:50
Raja's Data Engineering
Рет қаралды 7 М.
3. Read CSV file in to Dataframe using PySpark
28:33
WafaStudies
Рет қаралды 69 М.
Lamborghini vs Smoke 😱
00:38
Topper Guild
Рет қаралды 68 МЛН