Data analysis sample with Apache Spark

In this post I will go through a sample of how to analyze data in Apache Spark. The data set is a sample form Instagram feeds that I talked about it in my previous post. This post will be 2 parts. The first is preparing the data, and the second is answering questions.

Preparing the data:

I will use Spark SQL to read the JSON file.


scala> val df = sqlContext.read.json("C:\\json_data.json");
scala> df.printSchema()
root
|-- Caption: struct (nullable = true)
| |-- Created_Time: long (nullable = true)
| |-- From: struct (nullable = true)
| | |-- Bio: string (nullable = true)
| | |-- Counts: string (nullable = true)
| | |-- Full_Name: string (nullable = true)
| | |-- Id: long (nullable = true)
| | |-- Profile_Picture: string (nullable = true)
| | |-- Username: string (nullable = true)
| | |-- Website: string (nullable = true)
| |-- Text: string (nullable = true)
| |-- id: string (nullable = true)
|-- Comments: struct (nullable = true)
| |-- Count: long (nullable = true)
| |-- Data: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- Created_time: long (nullable = true)
| | | |-- From: struct (nullable = true)
| | | | |-- Bio: string (nullable = true)
| | | | |-- Counts: string (nullable = true)
| | | | |-- Full_Name: string (nullable = true)
| | | | |-- Id: long (nullable = true)
| | | | |-- Profile_Picture: string (nullable = true)
| | | | |-- Username: string (nullable = true)
| | | | |-- Website: string (nullable = true)
| | | |-- Id: string (nullable = true)
| | | |-- Text: string (nullable = true)
|-- Created_Time: long (nullable = true)
|-- Filter: string (nullable = true)
|-- Id: string (nullable = true)
|-- Images: struct (nullable = true)
| |-- Low_Resolution: struct (nullable = true)
| | |-- Height: long (nullable = true)
| | |-- Url: string (nullable = true)
| | |-- Width: long (nullable = true)
| |-- Standard_Resolution: struct (nullable = true)
| | |-- Height: long (nullable = true)
| | |-- Url: string (nullable = true)
| | |-- Width: long (nullable = true)
| |-- Thumbnail: struct (nullable = true)
| | |-- Height: long (nullable = true)
| | |-- Url: string (nullable = true)
| | |-- Width: long (nullable = true)
|-- Likes: struct (nullable = true)
| |-- Count: long (nullable = true)
| |-- Data: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- Bio: string (nullable = true)
| | | |-- Counts: string (nullable = true)
| | | |-- Full_Name: string (nullable = true)
| | | |-- Id: long (nullable = true)
| | | |-- Profile_Picture: string (nullable = true)
| | | |-- Username: string (nullable = true)
| | | |-- Website: string (nullable = true)
|-- Link: string (nullable = true)
|-- Location: struct (nullable = true)
| |-- Id: long (nullable = true)
| |-- Latitude: double (nullable = true)
| |-- Longitude: double (nullable = true)
| |-- Name: string (nullable = true)
|-- Tags: array (nullable = true)
| |-- element: string (containsNull = true)
|-- Type: string (nullable = true)
|-- User: struct (nullable = true)
| |-- Bio: string (nullable = true)
| |-- Counts: string (nullable = true)
| |-- Full_Name: string (nullable = true)
| |-- Id: long (nullable = true)
| |-- Profile_Picture: string (nullable = true)
| |-- Username: string (nullable = true)
| |-- Website: string (nullable = true)
|-- User_Has_Liked: boolean (nullable = true)
|-- Users_In_Photo: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- Position: struct (nullable = true)
| | | |-- X: double (nullable = true)
| | | |-- Y: double (nullable = true)
| | |-- User: struct (nullable = true)
| | | |-- Bio: string (nullable = true)
| | | |-- Counts: string (nullable = true)
| | | |-- Full_Name: string (nullable = true)
| | | |-- Id: long (nullable = true)
| | | |-- Profile_Picture: string (nullable = true)
| | | |-- Username: string (nullable = true)
| | | |-- Website: string (nullable = true)
|-- Videos: struct (nullable = true)
| |-- Low_Resolution: struct (nullable = true)
| | |-- Height: long (nullable = true)
| | |-- Url: string (nullable = true)
| | |-- Width: long (nullable = true)
| |-- Standard_Resolution: struct (nullable = true)
| | |-- Height: long (nullable = true)
| | |-- Url: string (nullable = true)
| | |-- Width: long (nullable = true)
|-- city: string (nullable = true)

 

I will not use all of the fields in the JSON data, so i will read the file with defined schema.


scala> val schemaString = "city Type Caption Filter Tags";
scala> val schema =StructType( schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)));
scala> val df = sqlContext.read.schema(schema).json("C:\\json_data.json");

So we have the data frame ready , let’s start asking it questions

Asking questions:

  • How many media per city?
    We have to ways to get the results, either using reduceByKey or groupByKey :


    scala> df.map(x => (x(0),1)).reduceByKey( _ + _ ).collect()
    scala> df.map(x => (x(0),1)).groupByKey().map(t => (t._1, t._2.sum)).collect()
    scala> res154: Array[(Any, Int)] = Array((london,4281), (seoul,3437),(berlin,1048), (tehran,752), (stockholm,718), (tokyo,3122), (moscow,3343), (rome,1587),(paris,2174), (newyork,1102))

    reduceByKey vs groupByKey:

    While both of these functions will produce the correct answer, the reduceByKey example works much better on a large dataset. That’s because Spark knows it can combine output with a common key on each partition before shuffling the data.

 

  • How many media without a caption?


    scala> df.count()
    scala> res158: Long = 21564
    scala> df.filter("Caption is null").count()
    scala> res159: Long = 2648

 

  • Number of media per type and city ?


    scala> df.map(x => ((x(0).toString,x(1).toString),1)).reduceByKey( _ + _ ).collect().sortBy(c=>c._2).sortBy(c=>c._1._2)
    scala> res162: Array[((String, String), Int)] = Array(((stockholm,image),682)... ((london,image),4114), ((tehran,video),28), ... ((london,video),167))

 

  • Most filter used with images and videos?


    scala> df.map(x => ((x(1).toString,x(3).toString),1)).reduceByKey( _ + _ ).collect().sortBy(c=>c._2).sortBy(c=>c._1._1)

    The result is long, but the interesting thing is that the most filter used is ‘Normal’, it means most of the times users don’t apply filters (or they apply filters in different application).

 

  • Average number of tags?


    scala> df.map(_(4)).map(_.toString).map(_.length).mean()
    res169: Double = 42.39111482099796
    scala> df.map(_(4)).map(_.toString).map(_.length).variance()
    res170: Double = 4825.149477722114
    scala> df.map(_(4)).map(_.toString).map(_.length).stdev()
    res171: Double = 69.46329590310349

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s