Quick exploration of data using spark shell
2017/02/21 Leave a comment
Data analytics has never been easier to use than in the last decade thanks to open sources projects like Hadoop, Spark and many others.
In this post we are going to use spark-shell to read a CSV file and analyze it by running sql queries on this dataset. The CSV contains the list of restaurant inspections in NYC. It is freely available on the NYC Open Data website. You can download it at: https://data.cityofnewyork.us/Health/DOHMH-New-York-City-Restaurant-Inspection-Results/xx67-kt59/data
Click on Export, then CSV.
Installing spark-shell
To explore the dataset, we use spark-shell
Download spark 2.0.2 at http://spark.apache.org/downloads.html
Uncompress it:
tar xvfz Downloads/spark-2.0.2-bin-hadoop2.7.tgz
Run spark-shell:
cd spark-2.0.2-bin-hadoop2.7/ bin/spark-shell
To load the inspection file in a rdd:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().getOrCreate()
val csvFile = "/Users/chimpler/Downloads/DOHMH_New_York_City_Restaurant_Inspection_Results.csv"
val inspections = spark.read.option("header",true).csv(csvFile)
Now let’s have a quick look at the dataframe content:
// columns scala> inspections.printSchema root |-- CAMIS: string (nullable = true) |-- DBA: string (nullable = true) |-- BORO: string (nullable = true) |-- BUILDING: string (nullable = true) |-- STREET: string (nullable = true) |-- ZIPCODE: string (nullable = true) |-- PHONE: string (nullable = true) |-- CUISINE DESCRIPTION: string (nullable = true) |-- INSPECTION DATE: string (nullable = true) |-- ACTION: string (nullable = true) |-- VIOLATION CODE: string (nullable = true) |-- VIOLATION DESCRIPTION: string (nullable = true) |-- CRITICAL FLAG: string (nullable = true) |-- SCORE: string (nullable = true) |-- GRADE: string (nullable = true) |-- GRADE DATE: string (nullable = true) |-- RECORD DATE: string (nullable = true) |-- INSPECTION TYPE: string (nullable = true) // size of the dataset scala> inspections.count res42: Long = 436999
Let’s run some SQL queries on the dataset:
// register the table so we can access it from SQL
scala> inspections.registerTempTable("inspections")
scala> val df = sql("""
SELECT
`CUISINE DESCRIPTION`,
COUNT(1) AS Num
FROM inspections
GROUP BY `CUISINE DESCRIPTION`
ORDER BY Num DESC
LIMIT 10
""");
scala> df.show
+---------------------+------+
| CUISINE DESCRIPTION | Num |
+---------------------+------+
| American |101265|
| Chinese | 47391|
| Latin (Cuban, Dom...| 20778|
| Pizza | 20763|
| Italian | 20084|
| Mexican | 16424|
| Café/Coffee/Tea | 16016|
| Japanese | 15624|
| Caribbean | 13615|
| Bakery | 13200|
+--------------------+------+
As expected, since the inspections are taking place in New York, the top cuisines served in restaurants are American, followed by Chinese, Pizza and Italian.
Now let’s look at the distribution of the grades that the inspectors are giving to the restaurants:
scala> val df2 = sql("""
SELECT
GRADE, COUNT(1) AS Num FROM inspections
GROUP BY GRADE
ORDER BY Num DESC
""");
scala> df2.show()
+--------------+------+
| GRADE | Num |
+--------------+------+
| null |228417|
| A |160200|
| B | 33794|
| C | 8551|
| Z | 2689|
|Not Yet Graded| 1943|
| P | 1405|
+--------------+------+
It shows that most of the restaurants are not graded yet and that among the graded restaurants, most of them are graded A.
Now let’s look at the number of A, B, C grades for each cuisine:
scala> val df3 = sql("""
SELECT
`CUISINE DESCRIPTION`,
SUM(IF(GRADE='A', 1, 0)) AS A,
SUM(IF(GRADE='B', 1, 0)) AS B,
SUM(IF(GRADE='C', 1, 0)) AS C
FROM inspections
GROUP BY `CUISINE DESCRIPTION`
""");
scala> df3.show()
+-------------------+-----+----+----+
|CUISINE DESCRIPTION| A | B | C |
+-------------------+-----+----+----+
| Pancakes/Waffles | 128| 10| 0|
| Chinese/Japanese | 342| 106| 24|
| Mexican | 5210|1471| 412|
| Jewish/Kosher | 2281| 445| 134|
| Bakery | 4937|1027| 250|
| Turkish | 413| 86| 18|
| Scandinavian | 49| 5| 0|
...
We can create a view out of the result of this query:
scala> df3.registerTempTable("grades")
Note that this view is not materialized so the query on this “grades” table will be ran on the inspections table.
From this, let’s find the cuisine that shows the worst ratio of Grade C. We filter out the Cuisines having less than 1,000 inspections to have meaningful results:
scala> val query = """
SELECT
`CUISINE DESCRIPTION`,
C / (A + B + C) AS Ratio,
A + B + C AS Num
FROM grades
WHERE A + B + C > 1000
ORDER BY Ratio DESC
LIMIT 5
"""
scala> sql(query).show()
+---------------------+-------------------+----+
| CUISINE DESCRIPTION | Ratio | Num|
+---------------------+-------------------+----+
| Asian |0.08726249120337791|2842|
| Korean |0.07679395719681074|2383|
| Spanish |0.06973732889382168|5406|
| Latin (Cuban, Dom...| 0.0690481625199726|8762|
| Indian |0.06473293243721259|2827|
+---------------------+-------------------+----+
Speeding up the queries
Running a SQL query takes some time, you can make the query faster by caching the intermediate result:
// Without caching:
scala> val startTime = System.currentTimeMillis ; sql(query).collect() ; println(s"Took ${System.currentTimeMillis - startTime}")
Took 1454
// With caching
scala> df3.cache()
// Because the caching is lazy, the first time we run the query is slow
scala> val startTime = System.currentTimeMillis ; sql(query).collect() ; println(s"Took ${System.currentTimeMillis - startTime}")
Took 1848
scala> val startTime = System.currentTimeMillis ; sql(query).collect() ; println(s"Took ${System.currentTimeMillis - startTime}")
Took 446
If the data cannot be cached because it’s too huge to fit into memory, you can use another storage than CSV. Parquet and ORC are storing data in columnar format which allows the engine to scan only the data of the columns required for the query as opposed to all the data.
// unfortunately, parquet does not play well with spaces in column names, so
// we have to define aliases for those columns
sql("""
CREATE TABLE inspections_parquet USING parquet AS
SELECT
CAMIS, DBA, BORO, BUILDING, STREET, ZIPCODE, PHONE,
`CUISINE DESCRIPTION` AS CUISINE_DESCRIPTION,
`INSPECTION DATE` AS INSPECTION_DATE, ACTION,
`VIOLATION CODE` AS VIOLATION_CODE,
`VIOLATION DESCRIPTION` AS VIOLATION_DESCRIPTION,
`CRITICAL FLAG` AS CRITICAL_FLAG,
SCORE, GRADE,
`GRADE DATE` AS GRADE_DATE,
`RECORD DATE` AS RECORD_DATE,
`INSPECTION TYPE` AS INSPECTION_TYPE
FROM inspections""")
Now let’s run the same query than previously but on this “inspections_parquet” table
val df4 = sql("""
SELECT
CUISINE_DESCRIPTION,
SUM(IF(GRADE='A', 1, 0)) AS A,
SUM(IF(GRADE='B', 1, 0)) AS B,
SUM(IF(GRADE='C', 1, 0)) AS C
FROM inspections_parquet
GROUP BY CUISINE_DESCRIPTION
""");
scala> df4.registerTempTable("grades_parquet")
scala> val query = """
SELECT
CUISINE_DESCRIPTION,
C / (A + B + C) AS Ratio,
A + B + C AS Num
FROM grades_parquet
WHERE A + B + C > 1000
ORDER BY Ratio DESC
LIMIT 5
"""
scala> sql(query).show()
scala> val startTime = System.currentTimeMillis ; sql(query).collect() ; println(s"Took ${System.currentTimeMillis - startTime}")
Took 549
# with caching
scala> df4.cache()
scala> val startTime = System.currentTimeMillis ; sql(query).collect() ; println(s"Took ${System.currentTimeMillis - startTime}")
Took 626
scala> val startTime = System.currentTimeMillis ; sql(query).collect() ; println(s"Took ${System.currentTimeMillis - startTime}")
Took 263
Without caching the query is now running in 549ms(parquet) vs 1,454ms(CSV).
With caching 263ms(parquet) vs 443ms(CSV). Not too shabby for just a storage format change.
Conclusion
In this post, we read a CSV file and analyze it using spark-shell. We saw some techniques to make the query faster by caching the data in memory or using a different a different storage format.
Another way to speed up the queries is to make them run on a spark cluster, it can be the subject of another post.
Alternatively to spark-shell, you can use spark-notebook and apache zeppelin that are web dashboards that you can create in the browser to display grid and charts using Scala and SQL code.






In recent years, Apache Spark has gained in popularity as a faster alternative to Hadoop and it reached a major milestone last month by releasing the production ready version 1.0.0. It claims to be up to a 100 times faster by leveraging the distributed memory of the cluster and by not being tied to the multi stage execution of Map/Reduce. Like Hadoop, it offers a similar ecosystem with a database (Shark SQL), a machine learning library (MLlib), a graph library (GraphX) and many other tools built on top of Spark. Finally Spark integrates well with Scala and one can manipulate distributed collections just like regular Scala collections and Spark will take care of distributing the processing to the different workers.


Frederic Dang Ngoc
@fredang
Recent Comments