Deploying to Spark (by Ali Zaidi)

Azure HDInsight Spark Clusters

Azure HDInsight is a managed Hadoop offering for the cloud. It provides enterprise ready Hadoop solutions without the hassle of installation, management, and configuration. Developers and data scientists can immediately create and deploy big data solutions on the cloud with the latest and greatest of the Hadoop ecosystem.

Dealing with data in distributed storage and programming with concurrent systems often requires learning complicated new paradigms and techniques. Statisticians and data scientists familiar with R are unlikely to have much experience with such systems. Fortunately, the RevoScaleR package abstracts away the difficult portions of distributed computation and allows the user to focus on building R code that can be automatically deployed in distributed environments.

As we will see in this module, we can reuse the majority of the code we developed in the previous sections that was meant to run locally, and have it deploy automagically in our new Spark environment. RevoScaleR will automatically transfer the computation from a single local machine to a network of concurrent systems.

Loading Libraries

When working with MRS in distributed environments, we will need to install and load packages across the entire cluster, not just the current client node (edge node).

In order to install a package across your HDInsight cluster, you will need to use a script action that you can deploy from the cluster's dashboard. See the instructions here.

Before installing the packages, please run the following script action on all worker nodes: install libgeos-dev.

In order to run this chapter, you will to add the following parameters to packages installation installation script:

useCRAN dplyr, stringr, lubridate, rgeos, sp, maptools, ggmap, ggplot2, gridExtra, ggrepel, tidyr, seriation

options(max.print = 1000, scipen = 999, width = 90, 
        continue = " ")
rxOptions(reportProgress = 3) # reduces the amount of output RevoScaleR produces

Set Up HDFS Paths and Spark Context

In order to connect to HDFS, we have to set up pointers to our hadoop file system. In the of HDInsight clsuters, the file system is actually the fully HDFS-compatible Azure Blob Storage, and fortunately MRS makes it very easy to connect to with simple R statements.

First, we will make pointers to our files as though they were on local storage.

data_path <- file.path("/user/RevoShare/alizaidi")
taxi_path <- file.path(data_path, "nyctaxi/data")
hdfs_ls <- paste0("hadoop fs -ls ", taxi_path)
system(hdfs_ls)

taxi_xdf <- file.path(data_path, "TaxiXdf")

nyc_sample_df <- read.csv("data/yellow_tripdata_2016-05.csv", nrows = 1000)

Next, we will define a HDFS variable, which will tell RevoScaleR to look for the files under the Hadoop file system, not the local file system. Secondly, we will create our Spark context, which will tell MRS to execute the computations on the Spark cluster. We will use all the default arguments, except for insisting that RevoScaleR reuse the existing Spark application whenever possible (the persistentRun parameter), and that Spark attempt to restart tasks that appear to be lagging (the extraSparkConfig value).

myNameNode <- "default"
myPort <- 0
hdfsFS <- RxHdfsFileSystem(hostName = myNameNode, 
                           port = myPort)

taxi_text <- RxTextData(taxi_path, fileSystem = hdfsFS)
taxi_xdf <- RxXdfData(taxi_xdf, fileSystem = hdfsFS)

spark_cc <- RxSpark(
                    nameNode = myNameNode,
                    port = myPort,
                    persistentRun = TRUE, 
                    extraSparkConfig = "--conf spark.speculation=true"
                    )


rxSetComputeContext(spark_cc)


system.time(rxImport(inData = taxi_text,
                     outFile = taxi_xdf))
##    user  system elapsed 
##   1.804   3.176 386.016

Now that we our pointers and environment variables for Spark set, we can immediately query our data just as before!

rxGetInfo(taxi_xdf, getVarInfo = TRUE, numRows = 5)
## File name: /user/RevoShare/alizaidi/TaxiXdf 
## Number of composite data files: 48 
## Number of observations: 138413407 
## Number of variables: 19 
## Number of blocks: 289 
## Compression type: zlib 
## Variable information: 
## Var 1: VendorID, Type: integer, Low/High: (1, 2)
## Var 2: tpep_pickup_datetime, Type: character
## Var 3: tpep_dropoff_datetime, Type: character
## Var 4: passenger_count, Type: integer, Low/High: (0, 9)
## Var 5: trip_distance, Type: numeric, Storage: float32, Low/High: (-3390583.7500, 198623008.0000)
## Var 6: pickup_longitude, Type: numeric, Storage: float32, Low/High: (-736.6166, 172.6000)
## Var 7: pickup_latitude, Type: numeric, Storage: float32, Low/High: (-78.1947, 80.6025)
## Var 8: RatecodeID, Type: integer, Low/High: (1, 99)
## Var 9: store_and_fwd_flag, Type: character
## Var 10: dropoff_longitude, Type: numeric, Storage: float32, Low/High: (-781.8333, 172.6000)
## Var 11: dropoff_latitude, Type: numeric, Storage: float32, Low/High: (-78.1947, 480.7333)
## Var 12: payment_type, Type: integer, Low/High: (1, 5)
## Var 13: fare_amount, Type: numeric, Storage: float32, Low/High: (-957.6000, 825998.6250)
## Var 14: extra, Type: numeric, Storage: float32, Low/High: (-58.5000, 648.8700)
## Var 15: mta_tax, Type: numeric, Storage: float32, Low/High: (-3.0000, 91.0000)
## Var 16: tip_amount, Type: numeric, Storage: float32, Low/High: (-440.0000, 1200.8000)
## Var 17: tolls_amount, Type: numeric, Storage: float32, Low/High: (-99.9900, 1901.4000)
## Var 18: improvement_surcharge, Type: numeric, Storage: float32, Low/High: (-0.3000, 137.6300)
## Var 19: total_amount, Type: numeric, Storage: float32, Low/High: (-958.4000, 826040.0000)
## Data (5 rows starting with row 1):
##   VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count
## 1        1  2015-07-01 00:00:00   2015-07-01 00:15:26               1
## 2        1  2015-07-01 00:00:00   2015-07-01 00:22:22               1
## 3        1  2015-07-01 00:00:00   2015-07-01 00:07:42               1
## 4        1  2015-07-01 00:00:00   2015-07-01 00:39:37               1
## 5        1  2015-07-01 00:00:00   2015-07-01 00:05:34               1
##   trip_distance pickup_longitude pickup_latitude RatecodeID
## 1           3.5        -73.99416        40.75113          1
## 2           3.9        -73.98466        40.76849          1
## 3           2.3        -73.97889        40.76229          1
## 4           9.2        -73.99279        40.74276          1
## 5           1.1        -73.91243        40.76981          1
##   store_and_fwd_flag dropoff_longitude dropoff_latitude payment_type
## 1                  N         -73.97682         40.78857            2
## 2                  N         -74.00013         40.73490            2
## 3                  N         -74.00422         40.75253            2
## 4                  N         -73.97151         40.63715            1
## 5                  N         -73.92033         40.75744            1
##   fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge
## 1          14   0.5     0.5       0.00            0                   0.3
## 2          17   0.5     0.5       0.00            0                   0.3
## 3           9   0.5     0.5       0.00            0                   0.3
## 4          33   0.5     0.5       8.55            0                   0.3
## 5           6   0.5     0.5       2.00            0                   0.3
##   total_amount
## 1        15.30
## 2        18.30
## 3        10.30
## 4        42.85
## 5         9.30

Observe the new field in the metadata section: Number of composite data files: 48. While previously we worked with XDF files, which were single file objects comprosing of multiple blocks, we are now working with blocked datasets that reside in distributed storage. Therefore, our data now has two layers of sharding: the first for the multiple of blocks per HDFS chunk, and multiple HDFS chunks per data. We call such data XDFD, to emphasize it's distributed nature. This distinction won't be too important for you to understand as a developer, but is useful to keep in mind

Even though the data is now saved across multiple nodes in a distributed environment, the data is compressed in order to improve read performance, and metadata is saved, improving querying time for simple statsitics.

system.time(
  rxsum_xdf <- rxSummary( ~ fare_amount, taxi_xdf)
)
##    user  system elapsed 
##   0.444   0.432  42.425
rxsum_xdf
## Call:
## rxSummary(formula = ~fare_amount, data = taxi_xdf)
## 
## Summary Statistics Results for: ~fare_amount
## Data: taxi_xdf (RxXdfData Data Source)
## File name: /user/RevoShare/alizaidi/TaxiXdf
## Number of valid observations: 138413407 
##  
##  Name        Mean     StdDev  Min    Max      ValidObs  MissingObs
##  fare_amount 13.04811 142.761 -957.6 825998.6 138413407 0

Transformations with XDFDs

An important distinction with working with data stored in distributed file systems like HDFS in comparison to data residing on a single disk is it's mutability. In general, it's much more difficult to overwrite data in distributed storage, as it requires rewriting multiple non-contiguous blocks.

Therefore, it is often better practice to write to a new location when working with XDFDs than to overwrite existing directories.

taxi_tip <- RxXdfData("/user/RevoShare/alizaidi/taxitipXdf",
                      fileSystem = hdfsFS)

rxDataStep(taxi_xdf, taxi_tip,
           transforms = list(tip_percent = ifelse(fare_amount > 0, 
                                                  tip_amount/fare_amount,
                                                  NA)))


system.time(rxSummary( ~ tip_percent, taxi_tip))
##    user  system elapsed 
##   0.436   0.480  48.519

Similarly, we can do cross-tabulations and slightly more invovled trnasforms across the Spark cluster the same way we did in a local compute context:

rxCrossTabs( ~ month:year, taxi_tip,
             transforms = list(
               year = as.integer(substr(tpep_pickup_datetime, 1, 4)),
               month = as.integer(substr(tpep_pickup_datetime, 6, 7)),
               year = factor(year, levels = 2014:2016),
               month = factor(month, levels = 1:12)))
## Call:
## rxCrossTabs(formula = ~month:year, data = taxi_tip, transforms = list(year = as.integer(substr(tpep_pickup_datetime, 
##     1, 4)), month = as.integer(substr(tpep_pickup_datetime, 6, 
##     7)), year = factor(year, levels = 2014:2016), month = factor(month, 
##     levels = 1:12)))
## 
## Cross Tabulation Results for: ~month:year
## Data: taxi_tip (RxXdfData Data Source)
## File name: /user/RevoShare/alizaidi/taxitipXdf
## Number of valid observations: 138413407
## Number of missing observations: 0 
## Statistic: counts 
##  
## month:year (counts):
##      year
## month 2014     2015     2016
##    1     0        0 10906858
##    2     0        0 11382049
##    3     0        0 12210952
##    4     0        0 11934338
##    5     0        0 11836853
##    6     0        0 11135470
##    7     0 11562783        0
##    8     0 11130304        0
##    9     0 11225063        0
##    10    0 12315488        0
##    11    0 11312676        0
##    12    0 11460573        0

If we want to utilize a function that depends on a R package that is not already installed in our cluster, including all worker nodes, we have to make sure we first install those packages. You can use the instructions here to install packages throughout your cluster using a script action.

rxCrossTabs( ~ month:year, taxi_tip,
             transforms = list(
               date = ymd_hms(tpep_pickup_datetime), 
               year = factor(year(date), levels = 2014:2016), 
               month = factor(month(date), levels = 1:12)), 
             transformPackages = "lubridate")
## Call:
## rxCrossTabs(formula = ~month:year, data = taxi_tip, transforms = list(date = ymd_hms(tpep_pickup_datetime), 
##     year = factor(year(date), levels = 2014:2016), month = factor(month(date), 
##         levels = 1:12)), transformPackages = "lubridate")
## 
## Cross Tabulation Results for: ~month:year
## Data: taxi_tip (RxXdfData Data Source)
## File name: /user/RevoShare/alizaidi/taxitipXdf
## Number of valid observations: 138413407
## Number of missing observations: 0 
## Statistic: counts 
##  
## month:year (counts):
##      year
## month 2014     2015     2016
##    1     0        0 10906858
##    2     0        0 11382049
##    3     0        0 12210952
##    4     0        0 11934338
##    5     0        0 11836853
##    6     0        0 11135470
##    7     0 11562783        0
##    8     0 11130304        0
##    9     0 11225063        0
##    10    0 12315488        0
##    11    0 11312676        0
##    12    0 11460573        0

More Complicated Transforms

We saw in the previous sections how we could write our user-defined functions (UDFs) that we could apply to each chunk of our data to create new columns. This greatly expands on the capabilities available for MRS on a Spark cluster.

xforms <- function(data) { # transformation function for extracting some date and time features
  # require(lubridate)
  weekday_labels <- c('Sun', 'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat')
  cut_levels <- c(1, 5, 9, 12, 16, 18, 22)
  hour_labels <- c('1AM-5AM', '5AM-9AM', '9AM-12PM', '12PM-4PM', '4PM-6PM', '6PM-10PM', '10PM-1AM')

  pickup_datetime <- lubridate::ymd_hms(data$tpep_pickup_datetime, tz = "UTC")
  pickup_hour <- addNA(cut(hour(pickup_datetime), cut_levels))
  pickup_dow <- factor(wday(pickup_datetime), levels = 1:7, labels = weekday_labels)
  levels(pickup_hour) <- hour_labels
  # 
  dropoff_datetime <- lubridate::ymd_hms(data$tpep_dropoff_datetime, tz = "UTC")
  dropoff_hour <- addNA(cut(hour(dropoff_datetime), cut_levels))
  dropoff_dow <- factor(wday(dropoff_datetime), levels = 1:7, labels = weekday_labels)
  levels(dropoff_hour) <- hour_labels
  # 
  data$pickup_hour <- pickup_hour
  data$pickup_dow <- pickup_dow
  data$dropoff_hour <- dropoff_hour
  data$dropoff_dow <- dropoff_dow
  data$trip_duration <- as.integer(lubridate::interval(pickup_datetime, dropoff_datetime))

  return(data)
}

We defined our function above, and now we apply it to our dataset. In order to ensure our function works on our data, we could try it out on a sample dataset locally first:

x <- head(taxi_tip)
rxSetComputeContext("local")

rxDataStep(inData = x, 
           outFile = NULL, 
           transformFunc = xforms, 
           transformPackages = "lubridate")
## Rows Read: 6, Total Rows Processed: 6, Total Chunk Time: 0.018 seconds
##   VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count
## 1        1  2015-07-01 00:00:00   2015-07-01 00:15:26               1
## 2        1  2015-07-01 00:00:00   2015-07-01 00:22:22               1
## 3        1  2015-07-01 00:00:00   2015-07-01 00:07:42               1
## 4        1  2015-07-01 00:00:00   2015-07-01 00:39:37               1
## 5        1  2015-07-01 00:00:00   2015-07-01 00:05:34               1
## 6        1  2015-07-01 00:00:00   2015-07-01 00:06:46               2
##   trip_distance pickup_longitude pickup_latitude RatecodeID
## 1           3.5        -73.99416        40.75113          1
## 2           3.9        -73.98466        40.76849          1
## 3           2.3        -73.97889        40.76229          1
## 4           9.2        -73.99279        40.74276          1
## 5           1.1        -73.91243        40.76981          1
## 6           1.0        -73.95916        40.77343          1
##   store_and_fwd_flag dropoff_longitude dropoff_latitude payment_type
## 1                  N         -73.97682         40.78857            2
## 2                  N         -74.00013         40.73490            2
## 3                  N         -74.00422         40.75253            2
## 4                  N         -73.97151         40.63715            1
## 5                  N         -73.92033         40.75744            1
## 6                  N         -73.96935         40.76925            2
##   fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge
## 1        14.0   0.5     0.5       0.00            0                   0.3
## 2        17.0   0.5     0.5       0.00            0                   0.3
## 3         9.0   0.5     0.5       0.00            0                   0.3
## 4        33.0   0.5     0.5       8.55            0                   0.3
## 5         6.0   0.5     0.5       2.00            0                   0.3
## 6         6.5   0.5     0.5       0.00            0                   0.3
##   total_amount tip_percent pickup_hour pickup_dow dropoff_hour dropoff_dow
## 1        15.30   0.0000000    10PM-1AM        Wed     10PM-1AM         Wed
## 2        18.30   0.0000000    10PM-1AM        Wed     10PM-1AM         Wed
## 3        10.30   0.0000000    10PM-1AM        Wed     10PM-1AM         Wed
## 4        42.85   0.2590909    10PM-1AM        Wed     10PM-1AM         Wed
## 5         9.30   0.3333333    10PM-1AM        Wed     10PM-1AM         Wed
## 6         7.80   0.0000000    10PM-1AM        Wed     10PM-1AM         Wed
##   trip_duration
## 1           926
## 2          1342
## 3           462
## 4          2377
## 5           334
## 6           406
rxSetComputeContext(spark_cc)

We used a local compute context to try our function on the sample locally, then reverted to the spark context for doing our analysis on the cluster.

taxi_date <- RxXdfData("/user/RevoShare/alizaidi/TaxiDatesTranf",
                       fileSystem = hdfsFS)
rxDataStep(inData = taxi_tip, 
           outFile = taxi_date, 
           transformFunc = xforms, 
           transformPackages = "lubridate")
rxGetInfo(taxi_date, numRows = 5, getVarInfo = TRUE)
## File name: /user/RevoShare/alizaidi/TaxiDatesTranf 
## Number of composite data files: 48 
## Number of observations: 138413407 
## Number of variables: 25 
## Number of blocks: 289 
## Compression type: zlib 
## Variable information: 
## Var 1: VendorID, Type: integer, Low/High: (1, 2)
## Var 2: tpep_pickup_datetime, Type: character
## Var 3: tpep_dropoff_datetime, Type: character
## Var 4: passenger_count, Type: integer, Low/High: (0, 9)
## Var 5: trip_distance, Type: numeric, Storage: float32, Low/High: (-3390583.7500, 198623008.0000)
## Var 6: pickup_longitude, Type: numeric, Storage: float32, Low/High: (-736.6166, 172.6000)
## Var 7: pickup_latitude, Type: numeric, Storage: float32, Low/High: (-78.1947, 80.6025)
## Var 8: RatecodeID, Type: integer, Low/High: (1, 99)
## Var 9: store_and_fwd_flag, Type: character
## Var 10: dropoff_longitude, Type: numeric, Storage: float32, Low/High: (-781.8333, 172.6000)
## Var 11: dropoff_latitude, Type: numeric, Storage: float32, Low/High: (-78.1947, 480.7333)
## Var 12: payment_type, Type: integer, Low/High: (1, 5)
## Var 13: fare_amount, Type: numeric, Storage: float32, Low/High: (-957.6000, 825998.6250)
## Var 14: extra, Type: numeric, Storage: float32, Low/High: (-58.5000, 648.8700)
## Var 15: mta_tax, Type: numeric, Storage: float32, Low/High: (-3.0000, 91.0000)
## Var 16: tip_amount, Type: numeric, Storage: float32, Low/High: (-440.0000, 1200.8000)
## Var 17: tolls_amount, Type: numeric, Storage: float32, Low/High: (-99.9900, 1901.4000)
## Var 18: improvement_surcharge, Type: numeric, Storage: float32, Low/High: (-0.3000, 137.6300)
## Var 19: total_amount, Type: numeric, Storage: float32, Low/High: (-958.4000, 826040.0000)
## Var 20: tip_percent, Type: numeric, Low/High: (-1.0000, 54900.0012)
## Var 21: pickup_hour
##        7 factor levels: 1AM-5AM 5AM-9AM 9AM-12PM 12PM-4PM 4PM-6PM 6PM-10PM 10PM-1AM
## Var 22: pickup_dow
##        7 factor levels: Sun Mon Tue Wed Thu Fri Sat
## Var 23: dropoff_hour
##        7 factor levels: 1AM-5AM 5AM-9AM 9AM-12PM 12PM-4PM 4PM-6PM 6PM-10PM 10PM-1AM
## Var 24: dropoff_dow
##        7 factor levels: Sun Mon Tue Wed Thu Fri Sat
## Var 25: trip_duration, Type: integer, Low/High: (-631148790, 29227264)
## Data (5 rows starting with row 1):
##   VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count
## 1        1  2015-07-01 00:00:00   2015-07-01 00:15:26               1
## 2        1  2015-07-01 00:00:00   2015-07-01 00:22:22               1
## 3        1  2015-07-01 00:00:00   2015-07-01 00:07:42               1
## 4        1  2015-07-01 00:00:00   2015-07-01 00:39:37               1
## 5        1  2015-07-01 00:00:00   2015-07-01 00:05:34               1
##   trip_distance pickup_longitude pickup_latitude RatecodeID
## 1           3.5        -73.99416        40.75113          1
## 2           3.9        -73.98466        40.76849          1
## 3           2.3        -73.97889        40.76229          1
## 4           9.2        -73.99279        40.74276          1
## 5           1.1        -73.91243        40.76981          1
##   store_and_fwd_flag dropoff_longitude dropoff_latitude payment_type
## 1                  N         -73.97682         40.78857            2
## 2                  N         -74.00013         40.73490            2
## 3                  N         -74.00422         40.75253            2
## 4                  N         -73.97151         40.63715            1
## 5                  N         -73.92033         40.75744            1
##   fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge
## 1          14   0.5     0.5       0.00            0                   0.3
## 2          17   0.5     0.5       0.00            0                   0.3
## 3           9   0.5     0.5       0.00            0                   0.3
## 4          33   0.5     0.5       8.55            0                   0.3
## 5           6   0.5     0.5       2.00            0                   0.3
##   total_amount tip_percent pickup_hour pickup_dow dropoff_hour dropoff_dow
## 1        15.30   0.0000000    10PM-1AM        Wed     10PM-1AM         Wed
## 2        18.30   0.0000000    10PM-1AM        Wed     10PM-1AM         Wed
## 3        10.30   0.0000000    10PM-1AM        Wed     10PM-1AM         Wed
## 4        42.85   0.2590909    10PM-1AM        Wed     10PM-1AM         Wed
## 5         9.30   0.3333333    10PM-1AM        Wed     10PM-1AM         Wed
##   trip_duration
## 1           926
## 2          1342
## 3           462
## 4          2377
## 5           334

Tabulate Counts by Day of Week and Hour

In order to get a sense of the distribution of counts, let's use rxSummary to tabulate the counts of trips by day of week and by pickup hour.

rxs1 <- rxSummary( ~ pickup_hour + pickup_dow + trip_duration, taxi_date)
# we can add a column for proportions next to the counts
rxs1$categorical <- lapply(rxs1$categorical, 
                           function(x) cbind(x, prop =round(prop.table(x$Counts), 2)))
rxs1
## Call:
## rxSummary(formula = ~pickup_hour + pickup_dow + trip_duration, 
##     data = taxi_date)
## 
## Summary Statistics Results for: ~pickup_hour + pickup_dow +
##     trip_duration
## Data: taxi_date (RxXdfData Data Source)
## File name: /user/RevoShare/alizaidi/TaxiDatesTranf
## Number of valid observations: 138413407 
##  
##  Name          Mean     StdDev   Min        Max      ValidObs  MissingObs
##  trip_duration 943.0364 88419.66 -631148790 29227264 138413405 2         
## 
## Category Counts for pickup_hour
## Number of categories: 7
## Number of valid observations: 138413407
## Number of missing observations: 0
## 
##  pickup_hour Counts   prop
##  1AM-5AM      7806258 0.06
##  5AM-9AM     20901140 0.15
##  9AM-12PM    19532254 0.14
##  12PM-4PM    26733342 0.19
##  4PM-6PM     15569260 0.11
##  6PM-10PM    32235206 0.23
##  10PM-1AM    15635947 0.11
## 
## Category Counts for pickup_dow
## Number of categories: 7
## Number of valid observations: 138413407
## Number of missing observations: 0
## 
##  pickup_dow Counts   prop
##  Sun        18596548 0.13
##  Mon        17837556 0.13
##  Tue        19304158 0.14
##  Wed        20287441 0.15
##  Thu        20754418 0.15
##  Fri        20762408 0.15
##  Sat        20870878 0.15
rxs2 <- rxSummary( ~ pickup_dow:pickup_hour, taxi_date)
rxs2 <- tidyr::spread(rxs2$categorical[[1]], key = 'pickup_hour', value = 'Counts')
row.names(rxs2) <- rxs2[ , 1]
rxs2 <- as.matrix(rxs2[ , -1])
rxs2
##     1AM-5AM 5AM-9AM 9AM-12PM 12PM-4PM 4PM-6PM 6PM-10PM 10PM-1AM
## Sun 2130664 1492981  2760776  3928348 2030550  3443306  2809923
## Mon  609012 3223524  2545550  3647412 2223534  4240647  1347877
## Tue  578949 3608877  2785066  3737400 2262601  4791170  1540095
## Wed  676308 3685648  2906996  3805880 2280082  5080753  1851774
## Thu  770857 3658090  2887979  3827390 2268847  5200092  2141163
## Fri 1084527 3382232  2757791  3720367 2254199  4925703  2637589
## Sat 1955941 1849788  2888096  4066545 2249447  4553535  3307526
levelplot(prop.table(rxs2, 2), cuts = 4, xlab = "", ylab = "", 
          main = "Distribution of taxis by day of week")

Join in Neighborhoods from Shapefile

Just as we did in the prior chapters, we will add in categorical features of the neighborhoods related to our dataset, by merging in the data from a shapefile containing New York city data.

library(rgeos)
library(sp)
library(maptools)
library(stringr)
library(ggplot2)

nyc_shapefile <- readShapePoly('ZillowNeighborhoods-NY/ZillowNeighborhoods-NY.shp')
mht_shapefile <- subset(nyc_shapefile, str_detect(CITY, 'New York City-Manhattan'))

mht_shapefile@data$id <- as.character(mht_shapefile@data$NAME)
mht.points <- fortify(gBuffer(mht_shapefile, byid = TRUE, width = 0), region = "NAME")

library(dplyr)
mht.df <- inner_join(mht.points, mht_shapefile@data, by = "id")
mht.cent <- mht.df %>%
  group_by(id) %>%
  summarize(long = median(long), lat = median(lat))

library(ggrepel)
ggplot(mht.df, aes(long, lat, fill = id)) + 
  geom_polygon() +
  geom_path(color = "white") +
  coord_equal() +
  theme(legend.position = "none") +
  geom_text_repel(aes(label = id), data = mht.cent, size = 2)

data_coords <- transmute(nyc_sample_df,
                         long = ifelse(is.na(pickup_longitude), 0, pickup_longitude),
                         lat = ifelse(is.na(pickup_latitude), 0, pickup_latitude)
)
# we specify the columns that correspond to the coordinates
coordinates(data_coords) <- c('long', 'lat')
# returns the neighborhoods based on coordinates
nhoods <- over(data_coords, mht_shapefile)
# rename the column names in nhoods
names(nhoods) <- paste('pickup', tolower(names(nhoods)), sep = '_')
# combine the neighborhood information with the original data
nyc_sample_df <- cbind(nyc_sample_df, nhoods[, grep('name|city', names(nhoods))])
head(nyc_sample_df)
##   VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count
## 1        1  2016-05-01 00:00:00   2016-05-01 00:17:31               1
## 2        2  2016-05-01 00:00:00   2016-05-01 00:07:31               1
## 3        2  2016-05-01 00:00:00   2016-05-01 00:07:01               6
## 4        2  2016-05-01 00:00:00   2016-05-01 00:19:47               1
## 5        2  2016-05-01 00:00:00   2016-05-01 00:06:39               1
## 6        2  2016-05-01 00:00:00   2016-05-01 00:05:19               2
##   trip_distance pickup_longitude pickup_latitude RatecodeID
## 1          3.60        -73.98590        40.76804          1
## 2          1.68        -73.99158        40.74475          1
## 3          1.09        -73.99307        40.74157          1
## 4          4.21        -73.99194        40.68460          1
## 5          0.56        -74.00528        40.74019          1
## 6          0.63        -73.97929        40.75576          1
##   store_and_fwd_flag dropoff_longitude dropoff_latitude payment_type
## 1                  N         -73.98399         40.73010            1
## 2                  N         -73.97570         40.76547            1
## 3                  N         -73.98100         40.74463            1
## 4                  N         -74.00226         40.73300            1
## 5                  N         -73.99750         40.73756            1
## 6                  N         -73.98801         40.75847            1
##   fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge
## 1        15.0   0.5     0.5       1.50            0                   0.3
## 2         7.5   0.5     0.5       0.88            0                   0.3
## 3         6.5   0.5     0.5       1.56            0                   0.3
## 4        17.0   0.5     0.5       3.66            0                   0.3
## 5         6.0   0.5     0.5       1.46            0                   0.3
## 6         5.0   0.5     0.5       0.00            0                   0.3
##   total_amount             pickup_city  pickup_name
## 1        17.80 New York City-Manhattan      Midtown
## 2         9.68 New York City-Manhattan      Chelsea
## 3         9.36 New York City-Manhattan      Chelsea
## 4        21.96                    <NA>         <NA>
## 5         8.76 New York City-Manhattan West Village
## 6         6.30 New York City-Manhattan      Midtown

Let's create our merge function and ensure that it works with our sample data.frame in a local compute context.

find_nhoods <- function(data) {

  # extract pick-up lat and long and find their neighborhoods
  pickup_longitude <- ifelse(is.na(data$pickup_longitude), 0, data$pickup_longitude)
  pickup_latitude <- ifelse(is.na(data$pickup_latitude), 0, data$pickup_latitude)
  data_coords <- data.frame(long = pickup_longitude, lat = pickup_latitude)
  coordinates(data_coords) <- c('long', 'lat')
  nhoods <- over(data_coords, shapefile)

  ## add only the pick-up neighborhood and city columns to the data
  data$pickup_nhood <- nhoods$NAME
  data$pickup_borough <- nhoods$CITY

  # extract drop-off lat and long and find their neighborhoods
  dropoff_longitude <- ifelse(is.na(data$dropoff_longitude), 0, data$dropoff_longitude)
  dropoff_latitude <- ifelse(is.na(data$dropoff_latitude), 0, data$dropoff_latitude)
  data_coords <- data.frame(long = dropoff_longitude, lat = dropoff_latitude)
  coordinates(data_coords) <- c('long', 'lat')
  nhoods <- over(data_coords, shapefile)

  ## add only the drop-off neighborhood and city columns to the data  
  data$dropoff_nhood <- nhoods$NAME
  data$dropoff_borough <- nhoods$CITY

  ## return the data with the new columns added in
  data
}

# test the function on a data.frame using rxDataStep

rxSetComputeContext("local")
head(rxDataStep(nyc_sample_df, transformFunc = find_nhoods, transformPackages = c("sp", "maptools"), 
                transformObjects = list(shapefile = mht_shapefile)))
## Rows Read: 1000, Total Rows Processed: 1000, Total Chunk Time: 0.049 seconds
##   VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count
## 1        1  2016-05-01 00:00:00   2016-05-01 00:17:31               1
## 2        2  2016-05-01 00:00:00   2016-05-01 00:07:31               1
## 3        2  2016-05-01 00:00:00   2016-05-01 00:07:01               6
## 4        2  2016-05-01 00:00:00   2016-05-01 00:19:47               1
## 5        2  2016-05-01 00:00:00   2016-05-01 00:06:39               1
## 6        2  2016-05-01 00:00:00   2016-05-01 00:05:19               2
##   trip_distance pickup_longitude pickup_latitude RatecodeID
## 1          3.60        -73.98590        40.76804          1
## 2          1.68        -73.99158        40.74475          1
## 3          1.09        -73.99307        40.74157          1
## 4          4.21        -73.99194        40.68460          1
## 5          0.56        -74.00528        40.74019          1
## 6          0.63        -73.97929        40.75576          1
##   store_and_fwd_flag dropoff_longitude dropoff_latitude payment_type
## 1                  N         -73.98399         40.73010            1
## 2                  N         -73.97570         40.76547            1
## 3                  N         -73.98100         40.74463            1
## 4                  N         -74.00226         40.73300            1
## 5                  N         -73.99750         40.73756            1
## 6                  N         -73.98801         40.75847            1
##   fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge
## 1        15.0   0.5     0.5       1.50            0                   0.3
## 2         7.5   0.5     0.5       0.88            0                   0.3
## 3         6.5   0.5     0.5       1.56            0                   0.3
## 4        17.0   0.5     0.5       3.66            0                   0.3
## 5         6.0   0.5     0.5       1.46            0                   0.3
## 6         5.0   0.5     0.5       0.00            0                   0.3
##   total_amount             pickup_city  pickup_name pickup_nhood
## 1        17.80 New York City-Manhattan      Midtown      Midtown
## 2         9.68 New York City-Manhattan      Chelsea      Chelsea
## 3         9.36 New York City-Manhattan      Chelsea      Chelsea
## 4        21.96                    <NA>         <NA>         <NA>
## 5         8.76 New York City-Manhattan West Village West Village
## 6         6.30 New York City-Manhattan      Midtown      Midtown
##            pickup_borough     dropoff_nhood         dropoff_borough
## 1 New York City-Manhattan      East Village New York City-Manhattan
## 2 New York City-Manhattan      Central Park New York City-Manhattan
## 3 New York City-Manhattan          Gramercy New York City-Manhattan
## 4                    <NA> Greenwich Village New York City-Manhattan
## 5 New York City-Manhattan Greenwich Village New York City-Manhattan
## 6 New York City-Manhattan           Midtown New York City-Manhattan
rxSetComputeContext(spark_cc)

Then we will go ahead and deploy it across our cluster in a Spark compute context:

taxi_hoods <- RxXdfData("/user/RevoShare/alizaidi/TaxiHoodsXdf",
                       fileSystem = hdfsFS)

st <- Sys.time()
rxDataStep(taxi_date, taxi_hoods, 
           transformFunc = find_nhoods, 
           transformPackages = c("sp", "maptools", "rgeos"), 
           transformObjects = list(shapefile = mht_shapefile))
Sys.time() - st
## Time difference of 7.93051 mins
rxGetInfo(taxi_hoods, numRows = 5)
## File name: /user/RevoShare/alizaidi/TaxiHoodsXdf 
## Number of composite data files: 48 
## Number of observations: 138413407 
## Number of variables: 29 
## Number of blocks: 289 
## Compression type: zlib 
## Data (5 rows starting with row 1):
##   VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count
## 1        1  2015-07-01 00:00:00   2015-07-01 00:15:26               1
## 2        1  2015-07-01 00:00:00   2015-07-01 00:22:22               1
## 3        1  2015-07-01 00:00:00   2015-07-01 00:07:42               1
## 4        1  2015-07-01 00:00:00   2015-07-01 00:39:37               1
## 5        1  2015-07-01 00:00:00   2015-07-01 00:05:34               1
##   trip_distance pickup_longitude pickup_latitude RatecodeID
## 1           3.5        -73.99416        40.75113          1
## 2           3.9        -73.98466        40.76849          1
## 3           2.3        -73.97889        40.76229          1
## 4           9.2        -73.99279        40.74276          1
## 5           1.1        -73.91243        40.76981          1
##   store_and_fwd_flag dropoff_longitude dropoff_latitude payment_type
## 1                  N         -73.97682         40.78857            2
## 2                  N         -74.00013         40.73490            2
## 3                  N         -74.00422         40.75253            2
## 4                  N         -73.97151         40.63715            1
## 5                  N         -73.92033         40.75744            1
##   fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge
## 1          14   0.5     0.5       0.00            0                   0.3
## 2          17   0.5     0.5       0.00            0                   0.3
## 3           9   0.5     0.5       0.00            0                   0.3
## 4          33   0.5     0.5       8.55            0                   0.3
## 5           6   0.5     0.5       2.00            0                   0.3
##   total_amount tip_percent pickup_hour pickup_dow dropoff_hour dropoff_dow
## 1        15.30   0.0000000    10PM-1AM        Wed     10PM-1AM         Wed
## 2        18.30   0.0000000    10PM-1AM        Wed     10PM-1AM         Wed
## 3        10.30   0.0000000    10PM-1AM        Wed     10PM-1AM         Wed
## 4        42.85   0.2590909    10PM-1AM        Wed     10PM-1AM         Wed
## 5         9.30   0.3333333    10PM-1AM        Wed     10PM-1AM         Wed
##   trip_duration     pickup_nhood          pickup_borough     dropoff_nhood
## 1           926 Garment District New York City-Manhattan   Upper West Side
## 2          1342          Midtown New York City-Manhattan Greenwich Village
## 3           462          Midtown New York City-Manhattan           Chelsea
## 4          2377          Chelsea New York City-Manhattan              <NA>
## 5           334             <NA>                    <NA>              <NA>
##           dropoff_borough
## 1 New York City-Manhattan
## 2 New York City-Manhattan
## 3 New York City-Manhattan
## 4                    <NA>
## 5                    <NA>
system.time(
  rxs_all <- rxSummary( ~ ., taxi_hoods)
)
##    user  system elapsed 
##   0.464   0.876  44.699
head(rxs_all$sDataFrame)
##                    Name       Mean       StdDev           Min         Max
## 1              VendorID   1.529642 4.991206e-01        1.0000         2.0
## 2  tpep_pickup_datetime         NA           NA            NA          NA
## 3 tpep_dropoff_datetime         NA           NA            NA          NA
## 4       passenger_count   1.672253 1.320966e+00        0.0000         9.0
## 5         trip_distance   7.300553 1.756544e+04 -3390583.7500 198623008.0
## 6      pickup_longitude -72.925354 8.744200e+00     -736.6166       172.6
##    ValidObs MissingObs
## 1 138413407          0
## 2         0          0
## 3         0          0
## 4 138413407          0
## 5 138413407          0
## 6 138413407          0
nhoods_by_borough <- rxCrossTabs( ~ pickup_nhood:pickup_borough, taxi_hoods)
nhoods_by_borough <- nhoods_by_borough$counts[[1]]
nhoods_by_borough <- as.data.frame(nhoods_by_borough)

# get the neighborhoods by borough
lnbs <- lapply(names(nhoods_by_borough), 
               function(vv) subset(nhoods_by_borough, 
                                   nhoods_by_borough[ , vv] > 0, 
                                   select = vv, drop = FALSE))
lapply(lnbs, head)
## [[1]]
## [1] Albany
## <0 rows> (or 0-length row.names)
## 
## [[2]]
## [1] Buffalo
## <0 rows> (or 0-length row.names)
## 
## [[3]]
## [1] New York City-Bronx
## <0 rows> (or 0-length row.names)
## 
## [[4]]
## [1] New York City-Brooklyn
## <0 rows> (or 0-length row.names)
## 
## [[5]]
##               New York City-Manhattan
## Battery Park                  1282503
## Carnegie Hill                 1532962
## Central Park                  1882914
## Chelsea                       9237282
## Chinatown                      438817
## Clinton                       4213741
## 
## [[6]]
## [1] New York City-Queens
## <0 rows> (or 0-length row.names)
## 
## [[7]]
## [1] New York City-Staten Island
## <0 rows> (or 0-length row.names)
## 
## [[8]]
## [1] Rochester
## <0 rows> (or 0-length row.names)
## 
## [[9]]
## [1] Syracuse
## <0 rows> (or 0-length row.names)

Filter to Manhattan Neighborhoods

The majority of the data lies in the borough of Manhattan. Let's go ahead and refactor our neighborhoood columns to exclude any non-Manhattan pickup and dropoff rides.

manhattan_nhoods <- rownames(nhoods_by_borough)[nhoods_by_borough$`New York City-Manhattan` > 0]

refactor_columns <- function(dataList) {
  dataList$pickup_nb = factor(dataList$pickup_nhood, levels = nhoods_levels)
  dataList$dropoff_nb = factor(dataList$dropoff_nhood, levels = nhoods_levels)
  dataList
}

mht_hoods <- RxXdfData("/user/RevoShare/alizaidi/MhtHoodsXdf",
                       fileSystem = hdfsFS)

system.time(rxDataStep(taxi_hoods, mht_hoods, 
           transformFunc = refactor_columns,
           transformObjects = list(nhoods_levels = manhattan_nhoods)))
##    user  system elapsed 
##   2.100   6.428 396.368
system.time(rxs_pickdrop <- rxSummary( ~ pickup_nb:dropoff_nb, mht_hoods))
##    user  system elapsed 
##   0.452   0.636  30.343
head(rxs_pickdrop$categorical[[1]])
##       pickup_nb   dropoff_nb Counts
## 1  Battery Park Battery Park  39435
## 2 Carnegie Hill Battery Park   5148
## 3  Central Park Battery Park   7585
## 4       Chelsea Battery Park 122936
## 5     Chinatown Battery Park   7851
## 6       Clinton Battery Park  52650
system.time(rxHistogram( ~ trip_distance, 
             mht_hoods, startVal = 0, endVal = 25, histType = "Percent", numBreaks = 20))

##    user  system elapsed 
##   1.400   1.232  57.149
system.time(rxs <- rxSummary( ~ pickup_nhood:dropoff_nhood, 
                  mht_hoods, 
                  rowSelection = (trip_distance > 15 & trip_distance < 22)))
##    user  system elapsed 
##   0.972   0.836  37.138
head(arrange(rxs$categorical[[1]], desc(Counts)), 10)
##          pickup_nhood      dropoff_nhood Counts
## 1             Midtown            Midtown   2048
## 2     Upper East Side    Upper East Side    920
## 3     Upper West Side    Upper West Side    607
## 4            Gramercy           Gramercy    585
## 5             Chelsea            Chelsea    460
## 6    Garment District   Garment District    388
## 7         Murray Hill        Murray Hill    373
## 8             Clinton            Clinton    341
## 9  Financial District Financial District    291
## 10    Lower East Side    Lower East Side    259

Filter Data to Manhattan Only

Now that our dataset has encoded NA for the non-Manhattan rides, we can filter out those rides as well as other outlier rides.

mht_xdf <- RxXdfData("/user/RevoShare/alizaidi/ManhattanXdf",
                     fileSystem = hdfsFS)

st <- Sys.time()
rxDataStep(mht_hoods, mht_xdf, 
           rowSelection = (
             passenger_count > 0 &
               trip_distance >= 0 & trip_distance < 30 &
               trip_duration > 0 & trip_duration < 60*60*24 &
               str_detect(pickup_borough, 'Manhattan') &
               str_detect(dropoff_borough, 'Manhattan') &
               !is.na(pickup_nb) &
               !is.na(dropoff_nb) &
               fare_amount > 0), 
           transformPackages = "stringr",
           varsToDrop = c('extra', 'mta_tax', 'improvement_surcharge', 'total_amount', 
                          'pickup_borough', 'dropoff_borough', 'pickup_nhood', 'dropoff_nhood'))

Sys.time() - st
## Time difference of 1.782435 mins

Visualize Trip Routes

Let's make some visualizations of taxi rides.

system.time(rxct <- rxCrossTabs(trip_distance ~ pickup_nb:dropoff_nb, mht_xdf))
##    user  system elapsed 
##   0.456   0.640  26.394
res <- rxct$sums$trip_distance / rxct$counts$trip_distance

library(seriation)
res[which(is.nan(res))] <- mean(res, na.rm = TRUE)
nb_order <- seriate(res)

system.time(rxc1 <- rxCube(trip_distance ~ pickup_nb:dropoff_nb, mht_xdf))
##    user  system elapsed 
##   0.388   0.708  26.401
system.time(rxc2 <- rxCube(minutes_per_mile ~ pickup_nb:dropoff_nb, mht_xdf,
               transforms = list(minutes_per_mile = (trip_duration / 60) / trip_distance)))
##    user  system elapsed 
##   0.948   0.780  33.072
system.time(rxc3 <- rxCube(tip_percent ~ pickup_nb:dropoff_nb, mht_xdf))
##    user  system elapsed 
##   0.396   0.664  24.390
res <- bind_cols(list(rxc1, rxc2, rxc3))
res <- res[, c('pickup_nb', 'dropoff_nb', 
               'trip_distance', 'minutes_per_mile', 'tip_percent')]
head(res)
## # A tibble: 6 × 5
##       pickup_nb   dropoff_nb trip_distance minutes_per_mile tip_percent
##          <fctr>       <fctr>         <dbl>            <dbl>       <dbl>
## 1  Battery Park Battery Park     0.9881965        12.394359   0.3446432
## 2 Carnegie Hill Battery Park     8.5299689         3.825631   0.1497566
## 3  Central Park Battery Park     6.2392669         5.190193   0.1150186
## 4       Chelsea Battery Park     2.9915740         5.213426   0.1457605
## 5     Chinatown Battery Park     1.7717210         8.369832   0.1245773
## 6       Clinton Battery Park     3.9961512         4.892761   0.1151952
library(ggplot2)
ggplot(res, aes(pickup_nb, dropoff_nb)) +
  geom_tile(aes(fill = trip_distance), colour = "white") +
  theme(axis.text.x = element_text(angle = 60, hjust = 1)) +
  scale_fill_gradient(low = "white", high = "steelblue") +
  coord_fixed(ratio = .9)

newlevs <- levels(res$pickup_nb)[unlist(nb_order)]
res$pickup_nb <- factor(res$pickup_nb, levels = unique(newlevs))
res$dropoff_nb <- factor(res$dropoff_nb, levels = unique(newlevs))

library(ggplot2)
ggplot(res, aes(pickup_nb, dropoff_nb)) +
  geom_tile(aes(fill = trip_distance), colour = "white") +
  theme(axis.text.x = element_text(angle = 60, hjust = 1)) +
  scale_fill_gradient(low = "white", high = "steelblue") +
  coord_fixed(ratio = .9)

ggplot(res, aes(pickup_nb, dropoff_nb)) +
  geom_tile(aes(fill = minutes_per_mile), colour = "white") +
  theme(axis.text.x = element_text(angle = 60, hjust = 1)) +
  scale_fill_gradient(low = "white", high = "steelblue") +
  coord_fixed(ratio = .9)

res %>%
  mutate(tip_color = cut(tip_percent, c(0, 5, 8, 12, 15, 100)/100)) %>%
  ggplot(aes(pickup_nb, dropoff_nb)) +
  geom_tile(aes(fill = tip_color)) +
  theme(axis.text.x = element_text(angle = 60, hjust = 1)) +
  coord_fixed(ratio = .9)

Refactor Neighborhoods by Distance

mht_factor_xdf <- RxXdfData("/user/RevoShare/alizaidi/MhtFactorXdf",
                            fileSystem = hdfsFS)


system.time(rxDataStep(inData = mht_xdf, outFile = mht_factor_xdf,
           transforms = list(pickup_nb = factor(pickup_nb, levels = newlevels),
                             dropoff_nb = factor(dropoff_nb, levels = newlevels)),
           transformObjects = list(newlevels = unique(newlevs))))
##    user  system elapsed 
##   0.732   2.364  95.143

Visualizing Patterns

Spatial Patterns

system.time(rxc <- rxCube( ~ pickup_nb:dropoff_nb, mht_factor_xdf))
##    user  system elapsed 
##   0.380   0.764  24.450
rxc <- as.data.frame(rxc)

library(dplyr)
rxc %>%
  filter(Counts > 0) %>%
  mutate(pct_all = Counts / sum(Counts) * 100) %>%
  group_by(pickup_nb) %>%
  mutate(pct_by_pickup_nb = Counts / sum(Counts) * 100) %>%
  group_by(dropoff_nb) %>%
  mutate(pct_by_dropoff_nb = Counts / sum(Counts) * 100) %>%
  group_by() %>%
  arrange(desc(Counts)) -> rxcs

head(rxcs)
## # A tibble: 6 × 6
##          pickup_nb      dropoff_nb  Counts  pct_all pct_by_pickup_nb
##             <fctr>          <fctr>   <dbl>    <dbl>            <dbl>
## 1  Upper East Side Upper East Side 6309081 5.513185         36.08561
## 2          Midtown         Midtown 4370662 3.819300         21.70553
## 3  Upper West Side Upper West Side 3687383 3.222217         34.27800
## 4  Upper East Side         Midtown 3201768 2.797862         18.31293
## 5          Midtown Upper East Side 3198060 2.794622         15.88217
## 6 Garment District         Midtown 2131961 1.863012         28.72311
## # ... with 1 more variables: pct_by_dropoff_nb <dbl>
ggplot(rxcs, aes(pickup_nb, dropoff_nb)) +
  geom_tile(aes(fill = pct_all), colour = "white") +
  theme(axis.text.x = element_text(angle = 60, hjust = 1)) +
  scale_fill_gradient(low = "white", high = "black") +
  coord_fixed(ratio = .9)

ggplot(rxcs, aes(pickup_nb, dropoff_nb)) +
  geom_tile(aes(fill = pct_by_pickup_nb), colour = "white") +
  theme(axis.text.x = element_text(angle = 60, hjust = 1)) +
  scale_fill_gradient(low = "white", high = "steelblue") +
  coord_fixed(ratio = .9)

ggplot(rxcs, aes(pickup_nb, dropoff_nb)) +
  geom_tile(aes(fill = pct_by_dropoff_nb), colour = "white") +
  theme(axis.text.x = element_text(angle = 60, hjust = 1)) +
  scale_fill_gradient(low = "white", high = "red") +
  coord_fixed(ratio = .9)

Temporal Patterns

system.time(res1 <- rxCube(tip_percent ~ pickup_dow:pickup_hour, mht_factor_xdf))
##     user   system  elapsed 
##    8.828   17.456 2161.295
system.time(res2 <- rxCube(fare_amount / (trip_duration / 60) ~ pickup_dow:pickup_hour, 
                           mht_factor_xdf))
##    user  system elapsed 
##   0.888   0.320  32.652
names(res2)[3] <- 'fare_per_minute'
res <- bind_cols(list(res1, res2))
res <- res[, c('pickup_dow', 'pickup_hour', 'fare_per_minute', 'tip_percent', 'Counts')]
library(ggplot2)
ggplot(res, aes(pickup_dow, pickup_hour)) +
  geom_tile(aes(fill = fare_per_minute), colour = "white") +
  theme(axis.text.x = element_text(angle = 60, hjust = 1)) +
  scale_fill_gradient(low = "white", high = "steelblue") +
  geom_text(aes(label = sprintf('%dK riders\n (%d%% tip)', signif(Counts / 1000, 2), round(tip_percent*100, 0))), size = 2.5) +
  coord_fixed(ratio = .9)

Training Statistical Models in a Spark Compute Context

Let's try to train a few statistical algorithms for predicting the probability of a tip greater than 10% and see how they perform relative to one another.

model_xdf <- RxXdfData("/user/RevoShare/alizaidi/ModelXdf",
                       fileSystem = hdfsFS)

system.time(rxDataStep(inData = mht_factor_xdf,
             outFile = model_xdf,
             transforms = list(
               split = factor(ifelse(rbinom(.rxNumRows, size = 1, prob = 0.75), 
                                     "train", "test")),
               good_tip = ifelse(tip_percent > 0.1, 1, 0)))
            )
##    user  system elapsed 
##   1.056   1.376 183.439
rxSummary(~split + good_tip, model_xdf)
## Call:
## rxSummary(formula = ~split + good_tip, data = model_xdf)
## 
## Summary Statistics Results for: ~split + good_tip
## Data: model_xdf (RxXdfData Data Source)
## File name: /user/RevoShare/alizaidi/ModelXdf
## Number of valid observations: 114436219 
##  
##  Name     Mean      StdDev    Min Max ValidObs  MissingObs
##  good_tip 0.5865863 0.4924457 0   1   114436219 0         
## 
## Category Counts for split
## Number of categories: 2
## Number of valid observations: 114436219
## Number of missing observations: 0
## 
##  split Counts  
##  test  28611296
##  train 85824923

Now that we have created our train and test column, we can train a number of models simultaneously using the rxExec function. The rxExec function takes a single function that it distributes across all worker nodes. In this case, we will distribute the computation of three models we want to train: single decision tree, random forests, and gradient boosted trees.

list_models <- list(rxDTree, rxDForest, rxBTrees)

train_model <- function(model = rxDTree,
                        xdf_data = model_xdf) {

  form <- formula(good_tip ~ pickup_nb + dropoff_nb + pickup_hour + pickup_dow)

  rx_model <- model(form, data = xdf_data, 
                    rowSelection = (split == "train"),
                    method = "class")


  return(rx_model)  
}

system.time(trained_models <- rxExec(train_model, model = rxElemArg(list_models), xdf_data = model_xdf))

results matching ""

    No results matching ""