close

How to make the first row as header when reading a file in PySpark and converting it to Pandas Dataframe

Hello Guys, How are you all? Hope You all Are Fine. Today We Are Going To learn about How to make the first row as header when reading a file in PySpark and converting it to Pandas Dataframe in Python. So Here I am Explain to you all the possible Methods here.

Without wasting your time, Let’s start This Article.

How to make the first row as header when reading a file in PySpark and converting it to Pandas Dataframe?

  1. How to make the first row as header when reading a file in PySpark and converting it to Pandas Dataframe?

    There are a couple of ways to do that, depending on the exact structure of your data. Since you do not give any details, I'll try to show it using a datafile nyctaxicab.csv that you can download.

  2. make the first row as header when reading a file in PySpark and converting it to Pandas Dataframe

    There are a couple of ways to do that, depending on the exact structure of your data. Since you do not give any details, I'll try to show it using a datafile nyctaxicab.csv that you can download.

Method 1

There are a couple of ways to do that, depending on the exact structure of your data. Since you do not give any details, I’ll try to show it using a datafile nyctaxicab.csv that you can download.

If your file is in csv format, you should use the relevant spark-csv package, provided by Databricks. No need to download it explicitly, just run pyspark as follows:

$ pyspark --packages com.databricks:spark-csv_2.10:1.3.0

and then

>>> from pyspark.sql import SQLContext
>>> from pyspark.sql.types import *
>>> sqlContext = SQLContext(sc)

>>> df = sqlContext.read.load('file:///home/vagrant/data/nyctaxisub.csv', 
                      format='com.databricks.spark.csv', 
                      header='true', 
                      inferSchema='true')

>>> df.count()
249999

The file has 250,000 rows including the header, so 249,999 is the correct number of actual records. Here is the schema, as inferred automatically by the package:

>>> df.dtypes
[('_id', 'string'),
 ('_rev', 'string'),
 ('dropoff_datetime', 'string'),
 ('dropoff_latitude', 'double'),
 ('dropoff_longitude', 'double'),
 ('hack_license', 'string'),
 ('medallion', 'string'),
 ('passenger_count', 'int'),
 ('pickup_datetime', 'string'),
 ('pickup_latitude', 'double'),
 ('pickup_longitude', 'double'),
 ('rate_code', 'int'),
 ('store_and_fwd_flag', 'string'),
 ('trip_distance', 'double'),
 ('trip_time_in_secs', 'int'),
 ('vendor_id', 'string')]

You can see more details in my relevant blog post.

If, for whatever reason, you cannot use the spark-csv package, you’ll have to subtract the first row from the data and then use it to construct your schema. Here is the general idea, and you can again find a full example with code details in another blog post of mine:

>>> taxiFile = sc.textFile("file:///home/ctsats/datasets/BDU_Spark/nyctaxisub.csv")
>>> taxiFile.count()
250000
>>> taxiFile.take(5)
[u'"_id","_rev","dropoff_datetime","dropoff_latitude","dropoff_longitude","hack_license","medallion","passenger_count","pickup_datetime","pickup_latitude","pickup_longitude","rate_code","store_and_fwd_flag","trip_distance","trip_time_in_secs","vendor_id"',
 u'"29b3f4a30dea6688d4c289c9672cb996","1-ddfdec8050c7ef4dc694eeeda6c4625e","2013-01-11 22:03:00",+4.07033460000000E+001,-7.40144200000000E+001,"A93D1F7F8998FFB75EEF477EB6077516","68BC16A99E915E44ADA7E639B4DD5F59",2,"2013-01-11 21:48:00",+4.06760670000000E+001,-7.39810790000000E+001,1,,+4.08000000000000E+000,900,"VTS"',
 u'"2a80cfaa425dcec0861e02ae44354500","1-b72234b58a7b0018a1ec5d2ea0797e32","2013-01-11 04:28:00",+4.08190960000000E+001,-7.39467470000000E+001,"64CE1B03FDE343BB8DFB512123A525A4","60150AA39B2F654ED6F0C3AF8174A48A",1,"2013-01-11 04:07:00",+4.07280540000000E+001,-7.40020370000000E+001,1,,+8.53000000000000E+000,1260,"VTS"',
 u'"29b3f4a30dea6688d4c289c96758d87e","1-387ec30eac5abda89d2abefdf947b2c1","2013-01-11 22:02:00",+4.07277180000000E+001,-7.39942860000000E+001,"2D73B0C44F1699C67AB8AE322433BDB7","6F907BC9A85B7034C8418A24A0A75489",5,"2013-01-11 21:46:00",+4.07577480000000E+001,-7.39649810000000E+001,1,,+3.01000000000000E+000,960,"VTS"',
 u'"2a80cfaa425dcec0861e02ae446226e4","1-aa8b16d6ae44ad906a46cc6581ffea50","2013-01-11 10:03:00",+4.07643050000000E+001,-7.39544600000000E+001,"E90018250F0A009433F03BD1E4A4CE53","1AFFD48CC07161DA651625B562FE4D06",5,"2013-01-11 09:44:00",+4.07308080000000E+001,-7.39928280000000E+001,1,,+3.64000000000000E+000,1140,"VTS"']

# Construct the schema from the header 
>>> header = taxiFile.first()
>>> header
u'"_id","_rev","dropoff_datetime","dropoff_latitude","dropoff_longitude","hack_license","medallion","passenger_count","pickup_datetime","pickup_latitude","pickup_longitude","rate_code","store_and_fwd_flag","trip_distance","trip_time_in_secs","vendor_id"'
>>> schemaString = header.replace('"','')  # get rid of the double-quotes
>>> schemaString
u'_id,_rev,dropoff_datetime,dropoff_latitude,dropoff_longitude,hack_license,medallion,passenger_count,pickup_datetime,pickup_latitude,pickup_longitude,rate_code,store_and_fwd_flag,trip_distance,trip_time_in_secs,vendor_id'
>>> fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split(',')]
>>> schema = StructType(fields)

# Subtract header and use the above-constructed schema:
>>> taxiHeader = taxiFile.filter(lambda l: "_id" in l) # taxiHeader needs to be an RDD - the string we constructed above will not do the job
>>> taxiHeader.collect() # for inspection purposes only
[u'"_id","_rev","dropoff_datetime","dropoff_latitude","dropoff_longitude","hack_license","medallion","passenger_count","pickup_datetime","pickup_latitude","pickup_longitude","rate_code","store_and_fwd_flag","trip_distance","trip_time_in_secs","vendor_id"']
>>> taxiNoHeader = taxiFile.subtract(taxiHeader)
>>> taxi_df = taxiNoHeader.toDF(schema)  # Spark dataframe
>>> import pandas as pd
>>> taxi_DF = taxi_df.toPandas()  # pandas dataframe 

For brevity, here all columns end up being of type string, but in the blog post I show in detail and explain how you can further refine the desired data types (and names) for specific fields.

Method 2

The simple answer would be set header='true'

Eg:

df = spark.read.csv('housing.csv', header='true')

or

df = spark.read.option("header","true").format("csv").schema(myManualSchema).load("maestraDestacados.csv")

Summery

It’s all About this issue. Hope all Methods helped you a lot. Comment below Your thoughts and your queries. Also, Comment below which Method worked for you? Thank You.

Also, Read