EMRでSparkサンプル

emr-4.2.0をベースにAdvancedOptionでSpark1.5.2を追加しクラスターを作成しておきます

今回はPiをモンテカルロシミュレーションで計算するSpark付属のサンプルプログラムをちょっと改造して使用します

build.sbt

build.sbtはこんな感じ

name := "spark_sample"

version := "1.0-SNAPSHOT"

scalaVersion := "2.11.7"

// additional libraries
libraryDependencies ++= Seq(
	"org.apache.spark" %% "spark-core" % "1.5.2",
	"org.apache.spark" %% "spark-sql" % "1.5.2",
	"org.apache.spark" %% "spark-mllib" % "1.5.2"
)

SparkPi

SparkConfを作成する際のここがポイントです

		val conf = new SparkConf().setAppName("SparkPi").setMaster("yarn-cluster") // ここがポイント
package sample

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.	See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.	You may obtain a copy of the License at
 *
 *		http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import scala.math.random

import org.apache.spark.mllib.util.MLUtils
import org.apache.spark._

/** Computes an approximation to pi */
object SparkPi {
	def main(args: Array[String]) {
		val conf = new SparkConf().setAppName("SparkPi").setMaster("yarn-cluster") // ここがポイント
		val spark = new SparkContext(conf)
		val slices = 2
		val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
		val count = spark.parallelize(1 until n, slices).map { i =>
			val x = random * 2 - 1
			val y = random * 2 - 1
			if (x * x + y * y < 1) 1 else 0
		}.reduce(_ + _)
		println("Pi is roughly " + 4.0 * count / n)

		val outputLocation = args(0) // s3n://bucket/

		val pi = 4.0 * count / n
		val data = spark.makeRDD(Seq(pi))

		println(pi)
		data.saveAsTextFile(outputLocation)
		spark.stop()
	}
}

ビルド

$ sbt packge

Jarファイルが作成されたらS3にアップしておきます

EMRでの実行

AWSコンソールからEMRで作成したクラスターを選択し、AddStepで先ほどアップしたJarファイルを指定し追加します

step typeにはSpark applicationを選択、

Spark-submit optionsに

--class sample.SparkPi --verbose

Argumentsに出力を保存するS3のロケーションを入れておきます。すでにフォルダがあるとエラーになるので注意

s3n://bucketname/output

実行後、出力先にファイルが作成されます