AWSlambdaでkerasを動かす

AWSのlambdaはサーバレスアーキテクチャで非常に扱いがいいのですが、一つ難点があって、アップロードできるファイルサイズが限られていることです。
最大50MのZIPファイルまでなのですが、裏技を使って展開時に500M以上のファイルを使うことができます。

そのカラクリは、Lambdaの/tmp以下は500Mまで使うことができます。この領域に、Lambdaが起動するときにファイルをS3から読み込めば、非常に大きなライブラリが必要なプログラムも動かすことができます。
以下は、pythonにてkeras+tensorflow+opencvを動かすサンプルです。

ライブラリ準備

python3を使用します。
ライブラリは、あらかじめ、EC2でAmazonLinuxを使って作成しておきます。

requirements.txt

keras
opencv-python
tensorflow
numpy
mkdir lib
pip3 install -U -r requirements.txt -t lib

このlib以下にライブラリがインストールされます。

find lib -name "*.pyc" -exec rm -rf {} ¥;

pycは削除しておきます。しなくてもいいですが。。。

これを全てLambdaの起動時に追加してもいいのですが、ロードが面倒になるので2つに分けます。

mv lib lib_other
mkdir lib
cd lib_other
mv cv2* ../lib
mv h5py* ../lib
mv keras* ../lib
mv tensor* ../lib
cd ..

libを固めてS3にアップ

zip -r lib.zip lib
aws s3 cp s3://bucketname/

lambdaディレクトリ

lambda用のディレクトリに、先ほど分けたlib_otherのライブラリを入れておきます。
これ全部を入れてようやく50Mぐらいまでに収まります。

mkdir lambda_dir
cd lambda_dir
mkdir vendor
cd vendor
cp -pr ../../lib_other/* .

lambdaプログラム

lambdaのプログラムには少し工夫が必要です。
起動時に、S3から先ほどアップしたlib.zipをダウンロードし展開、ロードします。

lambda_functioy.py


import sys
import os
sys.path.append(os.path.join(os.path.abspath(os.path.dirname(__file__)), './vendor'))

import boto3
import zipfile
import importlib

def lambda_handler(event, context):
    s3 = boto3.resource("s3")

    def load_zip(file):
        print("load_zip:"+file)
        s3.Bucket(bucket_name).download_file(libfiledir+"/"+file,"/tmp/"+file)
        with zipfile.ZipFile("/tmp/"+file) as zip:
            zip.extractall("/tmp")
        os.remove("/tmp/"+file)

    if os.path.exists("/tmp/lib") is False:  # 2回目以降はスキップ
        load_zip("lib.zip")

    sys.path.append('/tmp/lib')
    importlib.import_module("numpy")
    importlib.import_module("scipy")
    importlib.import_module("six")
    importlib.import_module("yaml")
    importlib.import_module("enum")
    importlib.import_module("h5py")
    importlib.import_module("absl")
    importlib.import_module("astor")
    importlib.import_module("bleach")
    importlib.import_module("external")
    importlib.import_module("gast")
    importlib.import_module("google.protobuf")
    importlib.import_module("grpc")
    importlib.import_module("html5lib")
    importlib.import_module("markdown")
    importlib.import_module("werkzeug")
    importlib.import_module("wheel")
    importlib.import_module("cv2")
    importlib.import_module("tensorflow")
    importlib.import_module("keras")

    import cv2
    import keras
    import numpy as np

こんな感じでコードを書きます。
Lambdaは一度起動すると、しばらくはインスタンスが残っているので、2回目以降に再度S3からダウンロードするのを防ぎます。

こんな感じで、500M以上のモジュールをつかったLambdaファンクションが作ることができます。

AmazonLinux(CentOS)にMPIを入れる

yumでインストール

sudo yum install openmpi openmpi-devel
sudo yum install mpich2 mpich2-devel # これは不要かも?

パス追加

~/.bashrcに追加

export PATH=$PATH:/usr/lib64/openmpi/bin
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/lib64/openmpi/lib
export C_INCLUDE_PATH=$C_INCLUDE_PATH:/usr/include/openmpi-x86_64
export CPLUS_INCLUDE_PATH=$C_INCLUDE_PATH

python

ライブラリを追加

sudo yum install gcc gcc-c++ make git openssl-devel bzip2-devel zlib-devel readline-devel sqlite-devel bzip2 sqlite
sudo yum -y install  zlib-devel bzip2 bzip2-devel readline-devel sqlite sqlite-devel openssl-devel

pyenv

git clone https://github.com/yyuu/pyenv.git ~/.pyenv

.bashrcに追加 

export PYENV_ROOT=$HOME/.pyenv
export PATH=$PYENV_ROOT/bin:$PATH
eval "$(pyenv init -)"

python install

. ~/.bashrc
pyenv install 3.6.2
pyenv global 3.6.2

mpi4pyを導入

pip install mpi4py

chainermn

wget https://github.com/NVIDIA/nccl/archive/v1.2.3-1+cuda7.5.tar.gz
tar zxvfp v1.2.3-1+cuda7.5.tar.gz 
cd nccl-1.2.3-1-cuda7.5/
make 
sudo make install
pip install chainer python chainermn

lambdaのAPIにキーを付与する

AWSの仕様がすぐ変わるのでメモ。

こちらの記事を参考にAPIGatewayにAPIキーをつけようとしたのですが、2017.5.16現在、仕様が変わってしまっているようでそのままでは設定できませんでした。

記事の中頃「API Keyの追加」の部分ですが、API Stage AssociationはAPI Keyのページには存在しません。

以下の手順となります

  • 左メニューのUsagePlansというメニューをクリック
  • createでNameに何か適当な名前を入れる。
  • リクエスト数にリミットをつけないのならば、Enable throttlingとEnable Quotaはチェックを外す
  • Add API Stageでデプロイ済みのAPIとStageを選択しチェックマークをクリック。Nextをクリック
  • Add API Key to Usage Planをクリックし、作成済みのAPI Keyの名前を入力。チェックマークをクリックしDone

これでAPIキーが付加されます。

AWSのlambdaでJavaを使う

ちょっとハマったのでメモ。JavaはMavenでコンパイルします。

Test.java


package com.example;

import java.util.Iterator;
import java.util.Map;

import com.amazonaws.services.lambda.runtime.Context;

public class Test {
	
	//http://stackoverflow.com/questions/35545642/error-executing-hello-world-for-aws-lambda-in-java
    public String handler(Map input, Context context){
    	String ret="";
    	Iterator ite=input.keySet().iterator();
    	while(ite.hasNext()){
    		String key=ite.next();
    		String val=input.get(key).toString();
    		ret+="("+key+","+val+"),";
    	}
    	return ret;
    }
}

pom.xml



  4.0.0

  com.example
  test
  0.0.1-SNAPSHOT
  jar

  Test
  http://example.com

  
    UTF-8
  

  
    
      junit
      junit
      3.8.1
      test
    
    
    
	    com.amazonaws
	    aws-lambda-java-core
	    1.0.0
	
  
  
  
    
    
      
        org.apache.maven.plugins
        maven-shade-plugin
        2.3
        
          false
        
        
          
            package
            
              shade
            
          
        
      
    
  


コンパイル

mvn package

Lambdaへアップロード

  • zipファイルをアップロード
  • Configulationを設定
  • RuntimeJava 8
    Handlercom.example.Test::handler

     これで動きます。テストの際に与えるパラメタの値が表示されます

UCD-DISKIO-MIB::diskIODevice = No more variables left in this MIB View (It is past the end of the MIB tree)エラー

Hinemos5.0でサーバ監視をしているのですが、EC2のAmazonLinuxで作成したサーバのリソース監視がデフォルトではうまくいかなかったのでメモ。

* Amazon Linux AMI release 2015.09

AmazonLinuxは作成すると自動で、SNMPが起動されています。そこに対してHinemosから監視をかけるわけですが、タイムアウトというエラーが出たりします。

まず確認用に監視サーバからコマンドを打ちます

$ snmpwalk -c public -v 2c IPADDRESS 1.3.6.1.4.1.2021.13.15.1.1.2
UCD-DISKIO-MIB::diskIODevice = No more variables left in this MIB View (It is past the end of the MIB tree)

こんな感じのエラー。
こちらに対処法が書いてありましたので実行してみます。

被監視サーバの設定を修正します

$ sudo echo "view systemview included .1.3.6.1." >> emacs /etc/snmp/snmpd.conf
$ sudo service snmpd restart

先のブログには再起動と書いていますがリスタートのみでOKです

再度、監視サーバからコマンドを打ちます

$ snmpwalk -c public -v 2c IPADDRESS 1.3.6.1.4.1.2021.13.15.1.1.2
UCD-DISKIO-MIB::diskIODevice.1 = STRING: xvda
UCD-DISKIO-MIB::diskIODevice.2 = STRING: xvda1
UCD-DISKIO-MIB::diskIODevice.3 = STRING: loop0
UCD-DISKIO-MIB::diskIODevice.4 = STRING: loop1
UCD-DISKIO-MIB::diskIODevice.5 = STRING: loop2
UCD-DISKIO-MIB::diskIODevice.6 = STRING: loop3
UCD-DISKIO-MIB::diskIODevice.7 = STRING: loop4
UCD-DISKIO-MIB::diskIODevice.8 = STRING: loop5
UCD-DISKIO-MIB::diskIODevice.9 = STRING: loop6
UCD-DISKIO-MIB::diskIODevice.10 = STRING: loop7
UCD-DISKIO-MIB::diskIODevice.11 = STRING: dm-0
UCD-DISKIO-MIB::diskIODevice.12 = STRING: dm-1

無事コマンドが通りました。
しばらくするとエラーになっていたHinemosの監視項目にも値が入るようになります

ElasticSearchでClusterBlockException[blocked by: [FORBIDDEN/8/index write (api)]エラーの対応

ElasticSearchをAWSで運用しているとたまにエラーが出ます。

ClusterBlockException[blocked by: [FORBIDDEN/8/index write (api)]

このメッセージが出る場合の対処方法ですが、こちらの記事にある通りにディスクフルの場合には容量アップで対応できるみたいですが、そうでない場合もあります。

http://qiita.com/dorachan1029/items/f3b47f4d9859450d9b90

その場合には再度テーブルを作成しなおすとなおったりします。こんな感じで。

$ curl -XDELETE "http://hostname/table"
$ -XPUT http://hostname/table -d '{
  "mappings": {
    "data":{
      "properties": {
        "id": { "type": "string" },
...
      }
    }
  }
}'

http://qiita.com/shouta-dev/items/c2d2eb6cf61bb1fa8e1b

自分の場合には、データを毎日入れ替えていたのですが、全入れ替えするとその間検索ができなくなってしまうので、要、不要を判断し、1レコードづつ入れ替えていたのが悪かったのかもしれません。半年ほど運用していたらこのエラーになりました。

EC2インスタンスでTensorflow

GoogleのTensorflow、GPUマシンでないとなかなか性能がでないので

EC2で作成してみます。

TensorflowはCUDA3.5以降対応だとかで、AWSのEC2インスタンスで使用可能なg2.2xlargeではCUDA3.0。ということでそのままでは使えないそうです

というわけで、いろいろ調べたところ、偉い方々が手順を示してくれています。

https://www.tecnos-dsm.co.jp/archives/info/technical_info_04

2016/3/17現在、これらの手順ですとTensorflowをコンパイルする際にエラーになります

$ bazel build -c opt --config=cuda //tensorflow/tools/pip_package:build_pip_package
.......
ERROR: /home/ubuntu/tensorflow/WORKSPACE:16:6: First argument of load() is a path, not a label. It should start with a single slash if it is an absolute path..
ERROR: WORKSPACE file could not be parsed.
ERROR: no such package 'external': Package 'external' contains errors.
INFO: Elapsed time: 0.444s

これの回避策がこちらに

http://stackoverflow.com/questions/34941620/unable-to-build-tensorflow-from-source-with-bazel-22nd-january-2016

単純な話でbazelのバージョンのせいだとか。

bazelをコンパイルしなおします

git clone https://github.com/bazelbuild/bazel.git
cd bazel
git checkout tags/0.1.4
./compile.sh
sudo cp output/bazel /usr/bin

その後、Tensorflowのコンパイル

$ bazel build -c opt --config=cuda //tensorflow/tools/pip_package:build_pip_package
Extracting Bazel installation...
Sending SIGTERM to previous Bazel server (pid=11695)... done.
.......
INFO: Found 1 target...
INFO: From Executing genrule @png_archive//:configure [for host]:
/home/ubuntu/.cache/bazel/_bazel_ubuntu/ad1e09741bb4109fbc70ef8216b59ee2/tensorflow/external/png_archive/libpng-1.2.53 /home/ubuntu/.cache/bazel/_bazel_ubuntu/ad1e09741bb4109fbc70ef8216b59ee2/tensorflow
...

うまくいきました

EMRのSparkでWordCount

BODY:

EMRではSparkでファイルを開く際には*が使えるみたいだ 

package sample
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
object WordCount {
			def main(args: Array[String]) {
				println("wordcount,args="+args(0)+","+args(1))
			val conf = new SparkConf().setAppName("wordcount").setMaster("yarn-cluster")
			val sc = new SparkContext(conf)
			
				val textFile = sc.textFile(args(0))	// s3n://bucket/*gz
			val counts = textFile.flatMap(line => line.split(" "))
										 .map(word => (word, 1))
										 .reduceByKey(_ + _)
				println("counts="+counts.id+","+counts.name)
				println(counts.toDebugString)
			counts.saveAsTextFile(args(1))
			}
}

こんな感じのBOWを数えるスクリプトを作成

s3にはgzで固められたファイルがたくさんある場合には

spark-submit --deploy-mode cluster --class sample.WordCount s3://bucket/dir/wordcount.jar s3n://bucket/log/*.gz s3n://bucket/output

このような指定で起動すると全ファイルを解凍しながら計算し、outputへ結果を保存します

EMRでSparkSQLサンプル

SparkからHiveが使いづらいというか使えない?のでSparkSQLを使ってみました。

そこそこ試行錯誤する必要があったのでメモです。

データファイル

銘柄コード,日付,始値,高値,安値,終値,出来高

のフォーマットのファイルを用意しておきます。こんな感じ。

1301,2004-04-01,198,198,195,196,651000
1301,2004-04-02,194,196,194,196,490000
1301,2004-04-05,196,200,195,197,1478000
1301,2004-04-06,202,208,200,207,4324000

これをS3へアップしておきます

build.sbt

こんな感じで記述します。build assemblyでエラーが出るのでこんな記述にしています。

name := "spark_sample"

version := "1.0-SNAPSHOT"

scalaVersion := "2.11.7"

// additional libraries
libraryDependencies ++= Seq(
	"org.apache.spark" %% "spark-core" % "1.5.2" % "provided",
	"org.apache.spark" %% "spark-sql" % "1.5.2",
	"org.apache.spark" %% "spark-hive" % "1.5.2",
	"org.apache.spark" %% "spark-streaming" % "1.5.2",
	"org.apache.spark" %% "spark-streaming-kafka" % "1.5.2",
	"org.apache.spark" %% "spark-streaming-flume" % "1.5.2",
	"org.apache.spark" %% "spark-mllib" % "1.5.2",
	"org.apache.commons" % "commons-lang3" % "3.0",
	"org.eclipse.jetty"	% "jetty-client" % "8.1.14.v20131031",
	"com.typesafe.play" %% "play-json" % "2.3.10",
	"com.fasterxml.jackson.core" % "jackson-databind" % "2.6.4",
	"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.6.3",
	"org.elasticsearch" % "elasticsearch-hadoop-mr" % "2.0.0.RC1",
	"net.sf.opencsv" % "opencsv" % "2.0",
	"com.twitter.elephantbird" % "elephant-bird" % "4.5",
	"com.twitter.elephantbird" % "elephant-bird-core" % "4.5",
	"com.hadoop.gplcompression" % "hadoop-lzo" % "0.4.17",
	"mysql" % "mysql-connector-java" % "5.1.31",
	"com.datastax.spark" %% "spark-cassandra-connector" % "1.5.0-M3",
	"com.datastax.spark" %% "spark-cassandra-connector-java" % "1.5.0-M3",
	"com.github.scopt" %% "scopt" % "3.2.0",
	"org.scalatest" %% "scalatest" % "2.2.1" % "test",
	"com.holdenkarau" %% "spark-testing-base" %	"1.5.1_0.2.1",
	"org.apache.hive" % "hive-jdbc" % "1.2.1"
)

resolvers ++= Seq(
	"JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/",
	"Spray Repository" at "http://repo.spray.cc/",
	"Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/",
	"Akka Repository" at "http://repo.akka.io/releases/",
	"Twitter4J Repository" at "http://twitter4j.org/maven2/",
	"Apache HBase" at "https://repository.apache.org/content/repositories/releases",
	"Twitter Maven Repo" at "http://maven.twttr.com/",
	"scala-tools" at "https://oss.sonatype.org/content/groups/scala-tools",
	"Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/",
	"Second Typesafe repo" at "http://repo.typesafe.com/typesafe/maven-releases/",
	"Mesosphere Public Repository" at "http://downloads.mesosphere.io/maven",
	Resolver.sonatypeRepo("public")
)

mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
	{
		case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
		case m if m.startsWith("META-INF") => MergeStrategy.discard
		case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
		case PathList("org", "apache", xs @ _*) => MergeStrategy.first
		case PathList("org", "jboss", xs @ _*) => MergeStrategy.first
		case "about.html"	=> MergeStrategy.rename
		case "reference.conf" => MergeStrategy.concat
		case _ => MergeStrategy.first
	}
}

ちなみにproject/assembly.sbtはこれ

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1")

addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "4.0.0")

SqlSample.scala

http://spark.apache.org/docs/latest/sql-programming-guide.html#upgrading-from-spark-sql-15-to-16

この辺りを参考に

package sample
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark._
import org.apache.spark.api.java._
import org.apache.spark.sql._
import org.apache.spark.sql.types._

object SqlSample {
	def main(args: Array[String]) {
		val conf = new SparkConf().setAppName("SparkSQL").setMaster("yarn-cluster")
		val sc = new SparkContext(conf)	

		val sqlContext = new org.apache.spark.sql.SQLContext(sc)
		// Import Row.
		import org.apache.spark.sql.Row;

		// Import Spark SQL data types
		import org.apache.spark.sql.types.{StructType,StructField,StringType};

		val histRDD = sc.textFile(args(0)).map(_.split(",")).
			map(p => Row(p(0), p(1),p(2),p(3),p(4),p(5),p(6)))
		val schemaString = "code date open high low close volume"
		val schema =
				StructType(
				schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))	
				
		// Apply the schema to the RDD.
		val histDataFrame = sqlContext.createDataFrame(histRDD, schema)
		// Register the DataFrames as a table.
		histDataFrame.registerTempTable("priceHist")
		
		// SQL statements can be run by using the sql methods provided by sqlContext.
		val results = sqlContext.sql("SELECT code,date,open FROM priceHist where code='6758'")

		val ary=results.map(_.getValuesMap[Any](List("code", "date","open"))).collect()

		val outputLocation = args(1) // s3n://bucket/
		val data=sc.makeRDD(ary)
		data.saveAsTextFile(outputLocation)

		sc.stop()
	}
}

build

$ sbt package

これで作成したJarを同じくS3へアップします

EMR

今までと同様にEMRを作成し、AddStepでSparkApplicationを追加します。Jarは先ほどアップしたものを指定します

Spark-submit options
--class sample.SqlSample
Arguments
s3n://bucket/output

ここには出力ファイルが入ります

じっこすればOutputにMapで表現されたデータが保存されます

EMRでSparkサンプル

emr-4.2.0をベースにAdvancedOptionでSpark1.5.2を追加しクラスターを作成しておきます

今回はPiをモンテカルロシミュレーションで計算するSpark付属のサンプルプログラムをちょっと改造して使用します

build.sbt

build.sbtはこんな感じ

name := "spark_sample"

version := "1.0-SNAPSHOT"

scalaVersion := "2.11.7"

// additional libraries
libraryDependencies ++= Seq(
	"org.apache.spark" %% "spark-core" % "1.5.2",
	"org.apache.spark" %% "spark-sql" % "1.5.2",
	"org.apache.spark" %% "spark-mllib" % "1.5.2"
)

SparkPi

SparkConfを作成する際のここがポイントです

		val conf = new SparkConf().setAppName("SparkPi").setMaster("yarn-cluster") // ここがポイント
package sample

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.	See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.	You may obtain a copy of the License at
 *
 *		http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import scala.math.random

import org.apache.spark.mllib.util.MLUtils
import org.apache.spark._

/** Computes an approximation to pi */
object SparkPi {
	def main(args: Array[String]) {
		val conf = new SparkConf().setAppName("SparkPi").setMaster("yarn-cluster") // ここがポイント
		val spark = new SparkContext(conf)
		val slices = 2
		val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
		val count = spark.parallelize(1 until n, slices).map { i =>
			val x = random * 2 - 1
			val y = random * 2 - 1
			if (x * x + y * y < 1) 1 else 0
		}.reduce(_ + _)
		println("Pi is roughly " + 4.0 * count / n)

		val outputLocation = args(0) // s3n://bucket/

		val pi = 4.0 * count / n
		val data = spark.makeRDD(Seq(pi))

		println(pi)
		data.saveAsTextFile(outputLocation)
		spark.stop()
	}
}

ビルド

$ sbt packge

Jarファイルが作成されたらS3にアップしておきます

EMRでの実行

AWSコンソールからEMRで作成したクラスターを選択し、AddStepで先ほどアップしたJarファイルを指定し追加します

step typeにはSpark applicationを選択、

Spark-submit optionsに

--class sample.SparkPi --verbose

Argumentsに出力を保存するS3のロケーションを入れておきます。すでにフォルダがあるとエラーになるので注意

s3n://bucketname/output

実行後、出力先にファイルが作成されます