S3のデータをRStudioとsparklyrで分析する

RStudio社が提供しているsparklyrを使うと、Sparkクラスターに格納されている大規模なデータに対して、普段お使いのR言語から簡単に処理をすることが出来ます。

sparklyrとは、大規模なデータに対してもRを使い容易に操作できるパッケージです。Rユーザーに人気のdplyrと呼ばれるパッケージのバックエンドとして動き、Sparkを直接意識することなく大規模なデータを扱うことが出来ます。Clouderaでは、Pythonのデータ分析用のライブラリpandasからImpalaを使ってデータ分析をしやすくしたIbisというパッケージを開発していますが、これのR+Spark版と言っても過言ではないでしょう。

sparklyrに興味をもったなら、公式ドキュメントから始めるといいでしょう。 もしくは、Cloudera DirectorでSparkクラスターを簡単につくり、それとsparklyrをつなげても良いでしょう。sparklyrを使う上で、チートシートが非常に役に立ちます。

Cloudera Directorを使ってSparkとsparklyrクラスタを起動する

Cloudera Directorを使ったsparklyrクラスタの起動方法は、こちらのCloudera Blogの方法を試しても良いですし、Cloudera Director clientがあれば以下のレポジトリのcluster.confを使うこともできます。

ただ、私の方で試したときには、先のブログの方法でGUIベースではうまく行かなかったので、今回は前述のレポジトリのcluster.confを使います。このコンフィグファイルはCloudera Director 2.3以降で動きます。

AWSの東京リージョンを前提にしていますが、必要なサブネットやセキュリティグループを用意していれば、必要な設定を書き換えた上で以下のコマンドだけでクラスタを起動することが可能です。

$ cloudera-director bootstrap cluster.conf

Cloudera Directorのサーバがある場合は、以下のようなコマンドでデプロイが可能です。

$ cloudera-director bootstrap-remote cluster.conf –lp.remote.username=<director-username> –lp.remote.password=<director-password> –lp.remote.hostAndPort=<director-server-hostname>:7189

Cloudera Director clientがインストールされていない場合でも、Dockerベースのツールを使うことで、sparklyrのクラスタが起動できます。

sparklyrとRStudioによるAmazon S3上のデータの分析

今回は、アメリカのフライト情報についてsparklyrを使って、可視化、予測モデルの構築を行います。

地図の可視化はこちらのブログを参考にしました。 http://flowingdata.com/2011/05/11/how-to-map-connections-with-great-circles/

このドキュメントでは以下のテーブルを使います:

  • Airlines dataairlines_bi_pq という名前のテーブルで格納しています。S3においてあるデータを使いますが、HDFSにあっても構いません. 詳細は Ibis project をご覧ください.
  • Airports dataairports_new_pqという名前のテーブルにparquet形式で保存しています。 詳細は 2009 ASA Data Expo をご覧ください.

これらのテーブルはHueなどを使って、別途作成しておいてください。テーブル作成のためのSQLはこちらを参考にしてください。

まず、先程立ち上げたクラスタのゲートウェイサーバにRStudio Serverが入っています。ポートが開放されている場合は、<gateway-server-host>:8787でブラウザからアクセスできます。お手持ちのブラウザで<gateway-server-host>:8787を開いてください。そうでなければ8787番のポートをsshなどでフォワードし、localhost:8787にアクセスします。ブラウザからRStudio Serverにアクセスできます。スクリプトの設定だと、ユーザ名rsuserパスワードclouderaでRStudioにログインできます。

Sparkに接続する

sparklyr を使いApache Sparkクラスタに接続します。今回のコードは別途入れておいたSpark 2.0を使っています。

# Load libraries
library(ggplot2)
library(maps)
library(geosphere)
## Loading required package: sp
library(sparklyr)
library(dplyr)
## 
## Attaching package: 'dplyr'
## The following objects are masked from 'package:stats':
##
## filter, lag
## The following objects are masked from 'package:base':
##
## intersect, setdiff, setequal, union
# Configure cluster
config <- spark_config()
config$spark.driver.cores <- 4
config$spark.executor.cores <- 4
config$spark.executor.memory <- "4G"
#spark_home <- "/opt/cloudera/parcels/CDH/lib/spark"
#spark_version <- "1.6.2"
spark_home <- "/opt/cloudera/parcels/SPARK2/lib/spark2"
spark_version <- "2.0.0"
sc <- spark_connect(master="yarn-client", version=spark_version, config=config, spark_home=spark_home)

S3にあるテーブルのデータを読み込みプロットする

まずは、airlines_bi_pqというHiveのテーブルにあるフライト数を年毎に集計します。 元データは、こちらからダウンロードできます。

airlines <- tbl(sc, "airlines_bi_pq")
airlines
## Source:   query [1.235e+08 x 30]
## Database: spark connection master=yarn-client app=sparklyr local=FALSE
##
## year month day dayofweek dep_time crs_dep_time arr_time crs_arr_time
## <int> <int> <int> <int> <int> <int> <int> <int>
## 1 2006 6 3 6 840 830 1006 1007
## 2 2006 6 4 7 830 830 958 1007
## 3 2006 6 5 1 827 830 1004 1007
## 4 2006 6 6 2 830 830 1006 1007
## 5 2006 6 7 3 831 830 1005 1007
## 6 2006 6 8 4 826 830 958 1007
## 7 2006 6 9 5 826 830 958 1007
## 8 2006 6 10 6 845 830 1011 1007
## 9 2006 6 11 7 828 830 950 1007
## 10 2006 6 12 1 829 830 956 1007
## # ... with 1.235e+08 more rows, and 22 more variables: carrier <chr>,
## # flight_num <int>, tail_num <int>, actual_elapsed_time <int>,
## # crs_elapsed_time <int>, airtime <int>, arrdelay <int>, depdelay <int>,
## # origin <chr>, dest <chr>, distance <int>, taxi_in <int>,
## # taxi_out <int>, cancelled <int>, cancellation_code <chr>,
## # diverted <int>, carrier_delay <int>, weather_delay <int>,
## # nas_delay <int>, security_delay <int>, late_aircraft_delay <int>,
## # date_yyyymm <chr>
airline_counts_by_year <- airlines %>% group_by(year) %>% summarise(count=n()) %>% collect
airline_counts_by_year %>% tbl_df %>% print(n=nrow(.))
## # A tibble: 22 × 2
## year count
## <int> <dbl>
## 1 1990 5270893
## 2 2003 6488540
## 3 2007 7453215
## 4 2006 7141922
## 5 1997 5411843
## 6 1988 5202096
## 7 1994 5180048
## 8 2004 7129270
## 9 1991 5076925
## 10 1996 5351983
## 11 1989 5041200
## 12 1998 5384721
## 13 1987 1311826
## 14 1995 5327435
## 15 2001 5967780
## 16 1992 5092157
## 17 2005 7140596
## 18 2000 5683047
## 19 2008 7009728
## 20 1999 5527884
## 21 2002 5271359
## 22 1993 5070501

これをグラフにプロットしてみましょう。

g <- ggplot(airline_counts_by_year, aes(x=year, y=count))
g <- g + geom_line(
colour = "magenta",
linetype = 1,
size = 0.8
)
g <- g + xlab("Year")
g <- g + ylab("Flight number")
g <- g + ggtitle("US flights")
plot(g)
Flight number of US flights

2001–2003年のフライト数を見てみる

2002年のフライト数が激減しているのがわかります。何が起こったのでしょうか?2001年〜2003年のフライト数を見てみましょう。

airline_counts_by_month <- airlines %>% filter(year>= 2001 & year<=2003) %>% group_by(year, month) %>% summarise(count=n()) %>% collect
g <- ggplot(
airline_counts_by_month,
aes(x=as.Date(sprintf("%d-%02d-01", airline_counts_by_month$year, airline_counts_by_month$month)), y=count)
)
g <- g + geom_line(
colour = "magenta",
linetype = 1,
size = 0.8
)
g <- g + xlab("Year/Month")
g <- g + ylab("Flight number")
g <- g + ggtitle("US flights")
plot(g)
2001年から2003年のフライト数

2001年の9月以降、フライト数が激減しているのがグラフから読み取れます。これは、9.11の影響を受けて以降2002年の間、フライト数が減少していると考えられます。この記事ではこれ以上原因については深く追求はしませんが、このように探索的な分析を行うことで、普段と異なるデータの傾向から、何が起こったのかを紐解くことができます。

フライトデータを年、キャリア、出発地、到着地で集計する

次に、キャリア・年ごとのフライト数を集計してみます。

flights <- airlines %>% group_by(year, carrier, origin, dest) %>% summarise(count=n()) %>% collect
flights
## Source: local data frame [123,737 x 5]
## Groups: year, carrier, origin [24,636]
##
## year carrier origin dest count
## <int> <chr> <chr> <chr> <dbl>
## 1 2006 WN LAX ELP 1070
## 2 2006 WN MDW RDU 1370
## 3 2006 US CLT BNA 1095
## 4 2007 DL JFK FLL 1394
## 5 2007 DL RSW ATL 2520
## 6 1992 CO DEN MSY 700
## 7 1992 CO SEA DEN 1732
## 8 2006 US TPA CLT 2754
## 9 2006 WN BNA OAK 365
## 10 2006 WN BNA PVD 400
## # ... with 123,727 more rows
airports <- tbl(sc, "airports_new_pq") %>% collect

その中から、2007年のアメリカン航空のフライト情報を抽出します。

flights_aa <- flights %>% filter(year==2007) %>% filter(carrier=="AA") %>% arrange(count)
flights_aa
## Source: local data frame [460 x 5]
## Groups: year, carrier, origin [82]
##
## year carrier origin dest count
## <int> <chr> <chr> <chr> <dbl>
## 1 2007 AA BOS JFK 1
## 2 2007 AA MIA FLL 1
## 3 2007 AA BOS SDF 1
## 4 2007 AA MIA XNA 1
## 5 2007 AA BNA IAD 1
## 6 2007 AA XNA MIA 2
## 7 2007 AA OGG ORD 8
## 8 2007 AA ORD OGG 8
## 9 2007 AA EGE LGA 17
## 10 2007 AA MTJ ORD 17
## # ... with 450 more rows

アメリカン航空のフライトのマップを出力する

では、2007年のアメリカン航空のフライトを地図上に可視化してみましょう。フィルターするデータを変えれば他の航空会社でもプロット可能です。

# draw map with line of AA
xlim <- c(-171.738281, -56.601563)
ylim <- c(12.039321, 71.856229)
# Color settings
#pal <- colorRampPalette(c("#f2f2f2", "red"))
pal <- colorRampPalette(c("#333333", "white", "#1292db"))
colors <- pal(100)
#map("world", col="#f2f2f2", fill=TRUE, bg="white", lwd=0.05, xlim=xlim, ylim=ylim)
map("world", col="#6B6363", fill=TRUE, bg="#000000", lwd=0.05, xlim=xlim, ylim=ylim)
#carriers <- unique(flights$carrier)
maxcnt <- max(flights_aa$count)
for (j in 1:length(flights_aa$carrier)) {
air1 <- airports[airports$iata == flights_aa[j,]$origin,]
air2 <- airports[airports$iata == flights_aa[j,]$dest,]

inter <- gcIntermediate(c(air1[1,]$longitude, air1[1,]$latitude), c(air2[1,]$longitude, air2[1,]$latitude), n=100, addStartEnd=TRUE)
colindex <- round( (flights_aa[j,]$count / maxcnt) * length(colors) )

lines(inter, col=colors[colindex], lwd=0.8)
}
Flight map of AA 2007

遅延時間を予測する線形回帰モデルを学習する

最後に、MLlibの線形回帰モデルを使い、遅延を予測するモデルを構築してみましょう。

なお、sparklyrでカテゴリカルモデルを扱う際には、ft_string_indexerを使い変換しておきます。

# build predictive model with linear regression
partitions <- airlines %>%
filter(arrdelay >= 5) %>%
sdf_mutate(
carrier_cat = ft_string_indexer(carrier),
origin_cat = ft_string_indexer(origin),
dest_cat = ft_string_indexer(dest)
) %>%
mutate(hour = floor(dep_time/100)) %>%
sdf_partition(training = 0.5, test = 0.5, seed = 1099)
fit <- partitions$training %>%
ml_linear_regression(
response = "arrdelay",
features = c(
"month", "hour", "dayofweek", "carrier_cat", "depdelay", "origin_cat", "dest_cat", "distance"
)
)
## * Dropped 44822 rows with 'na.omit' (22519745 => 22474923)
fit
## Call: ml_linear_regression(., response = "arrdelay", features = c("month", "hour", "dayofweek", "carrier_cat", "depdelay", "origin_cat", "dest_cat", "distance"))
##
## Coefficients:
## (Intercept) month hour dayofweek carrier_cat
## 11.778682025 -0.046258441 -0.150788013 -0.235037195 0.147276224
## depdelay origin_cat dest_cat distance
## 0.903219105 -0.005712402 -0.023306845 0.001430704
summary(fit)
## Call: ml_linear_regression(., response = "arrdelay", features = c("month", "hour", "dayofweek", "carrier_cat", "depdelay", "origin_cat", "dest_cat", "distance"))
##
## Deviance Residuals: (approximate):
## Min 1Q Median 3Q Max
## -1297.944 -7.889 -2.038 4.579 1316.740
##
## Coefficients:
## Estimate Std. Error t value Pr(>|t|)
## (Intercept) 1.1779e+01 1.6486e-02 714.468 < 2.2e-16 ***
## month -4.6258e-02 9.8378e-04 -47.021 < 2.2e-16 ***
## hour -1.5079e-01 7.4880e-04 -201.373 < 2.2e-16 ***
## dayofweek -2.3504e-01 1.7646e-03 -133.195 < 2.2e-16 ***
## carrier_cat 1.4728e-01 7.2567e-04 202.951 < 2.2e-16 ***
## depdelay 9.0322e-01 8.7020e-05 10379.398 < 2.2e-16 ***
## origin_cat -5.7124e-03 9.8910e-05 -57.753 < 2.2e-16 ***
## dest_cat -2.3307e-02 9.4674e-05 -246.179 < 2.2e-16 ***
## distance 1.4307e-03 6.4905e-06 220.431 < 2.2e-16 ***
## ---
## Signif. codes: 0 '***' 0.001 '**' 0.01 '*' 0.05 '.' 0.1 ' ' 1
##
## R-Squared: 0.8333
## Root Mean Squared Error: 16.33

このように、線形回帰モデルが学習され、どの特徴量がどういった係数か、ということがわかります。

また、今回の分析はRPubsにもRmarkdownを公開しています。よろしければそちらもご覧になってください。

まとめ

この記事では、sparklyrを使ってAmazon S3にあるアメリカの航空データの可視化、および予測モデルの構築を行いました。sparklyrを使うことでRに馴染み深い方にはいつもと同じ感覚でS3のデータに対して分析を行うことができます。また、Cloudera Directorを使うことで簡単にSparkクラスタを構築することができるので、是非試してみてください。