orangain flavor

じっくりコトコト煮込んだみかん2。知らないことを知りたい。

AWS FargateとLambdaでサーバーレスなクローラー運用

これはWebスクレイピング Advent Calendar 2017の7日目の記事です。こんな感じでAWS FargateとAWS Lambdaを使ってサーバーレス(EC2レス)なクローラーを作ります。

f:id:mi_kattun:20171206222317p:plain

この記事はFargateでのクローリング処理にフォーカスしており、クロールしたHTMLをS3に保存するところまでを主に解説します。Lambdaの方はおまけ程度の扱いで、スクレイピングしたデータの扱い(データベースへの格納など)はスコープ外です。

長くなったので目次です。

背景

クローラーを稼働させるのにサーバー管理をしたくないという思いがあります。かと言ってScrapinghubのようなSaaSは、ロケーション・実装の自由度やサービス継続性の面から採用しづらいこともあります。

サーバーレスと言えばLambdaが代表格です。Lambdaでクローラーを作ろうとする取り組みは過去にもあります。

ただ、クローラー*1の処理のうち、スクレイピング処理はいいとしても、クローリング処理はあまりLambda向きのワークロードではないと考えています。

クロール対象のサーバー負荷軽減のためにクロール間隔を空けることを考えると、Lambdaのスケーラビリティはあまりメリットになりません。Lambdaをn秒間隔で呼び出してくれるような仕組みはない*2ので、クロール間隔を空けるためにはLambdaの実行中にSleepする必要があります。

LambdaはCPUを使っていなくても関数の実行時間で料金が決まるので、Sleepしている間やWebページのダウンロードを待っている間も課金対象です。このクローラーの性質は、まさに以下の記事に書かれているLambdaに不向きなアプリケーションの性質に合致します。

  • 1: Lambda functionの実行時間のうち、ネットワークI/O時間が支配的である
  • 2: Lambda functionの実行終了を同期的に待たなければならない
  • 3: 複数のレコードをLambda functionの引数に渡すことができない*3

コスト効率の悪いLambdaアプリケーションの性質に関する考察 - ゆううきブログ

さらに、Scrapyのような1プロセス内での非同期実行を前提とした開発効率の良いフレームワークがある中、細切れのLambda関数を書いていくのはダルいという思いもあります。

AWS Fargateの登場

そんなわけで2017年11月にこのAdvent Calendarに登録した時はクローリング処理をサーバーレス(EC2レス)で実行するためにAWS Batchを使うことを考えていました。

しかし先日のAWS re:Invent 2017でAWS Fargateが颯爽と登場したので、早速試してみます。AWS FargateはDockerクラスターを構築するサービスであるAmazon ECSにおいて、EC2を管理することなくクラスターを使えるサービス(クラスターの一形態)です。

aws.amazon.com

クローラーの場合、FargateにしたところでLambdaのコストモデルの問題が解消するわけではありませんが、プロセスの実行が長時間に渡ることを前提としたScrapyのようなフレームワークを使いやすいというメリットがあります。

LambdaでもFargateでも、クロール対象サイトを複数にして同時にクロールすればネットワークI/Oを多重化できます。ですが、サイトごとのクロール間隔の調整はそれなりに面倒な処理なので、Fargateを使うことで既存のフレームワークで提供されている仕組みに乗っかれるのはメリットです。

クローラーの構成

冒頭の構成図を再掲します。

f:id:mi_kattun:20171206222317p:plain

Scrapyはクローリングしながらスクレイピングするアーキテクチャですが、以前も書いたように、これはあまり良いアーキテクチャではないように思います。そこで、クローリングとスクレイピングを分離し、Scrapyではクローリングのみを行い、HTMLを丸ごとAmazon S3に保存します。

S3を間に挟むとスクレイピングに失敗したときや追加の情報を取得したくなったときでも、相手のサーバーに負荷を与えずにやり直しできて安心です*4。実運用ではS3のライフサイクルポリシーを使って1ヶ月後にオブジェクトを消すなどすると、再実行可能性とデータ量の抑制を両立できます。

やってみる

クローリング処理はPython 3.6とScrapyで作成します。途中でPython 2系では動作しないライブラリを使います。

これから作成するScrapyのSpiderのソースコードは以下の場所に置いてあります。

github.com

以下は前提とします。

  • AWSのアカウントを持っていること
  • 適当なS3バケットを持っていること
  • ローカル開発環境において、そのS3バケットへの書き込み権限とAmazon ECRへの書き込み権限を持つユーザーでAWS CLIを設定済みであること
  • ローカル開発環境ではPython 3の仮想環境内で作業していること

1. ScrapyのプロジェクトでSpiderを作る

まずはローカル開発環境でScrapyのSpiderを作ります。この記事では、Pythonクローリング&スクレイピングサンプルファイルの中にある、6.7節の食べログのSpiderを改変して使います。

DOWNLOAD_DELAYなどの設定は適切にしておきましょう。

2. Scrapy S3 Pipelineをインストールする

Scrapyでクローリングのみを行い、HTMLを丸ごとS3に保存するのはよくあるパターンなので、Scrapy S3 Pipelineというライブラリにしてみました。これをインストールします。

(venv) $ pip install scrapy-s3pipeline

Scrapy S3 Pipelineはスクレイピングして得られたItemをJSONLines形式でS3に保存するItem Pipelineです。Scrapyの通常のFeed ExportsもS3への保存に対応していますが、S3ではファイルへの追記ができない都合上、Spiderの実行が終わってから大きな1つのファイルをS3に保存します。一方Scrapy S3 Pipelineは、アイテムをチャンクごと(デフォルトで100アイテムごと)に区切ってS3に保存します。これによって、S3のイベント通知を受けてLambdaでスクレイピングする構成を取りやすくなります。

また、Gzip圧縮をサポートしているのも特徴です。同じようなHTMLが複数含まれるファイルは圧縮率が高くなるので、スクレイピング再実行のためにHTMLを残しておきたい場合には効果的です。

3. Scrapy S3 Pipelineをプロジェクトに追加する

プロジェクトのsettings.pyに以下の設定を追加します。S3PIPELINE_URLのバケット名 (scraping-bookの部分) はご自身の所有するS3バケットに変更してください。

ITEM_PIPELINES = {
    's3pipeline.S3Pipeline': 100,
}

S3PIPELINE_URL = 's3://scraping-book/{name}/{time}/items.{chunk:07d}.jl.gz'

なおS3PIPELINE_URL内の変数は以下のように置き換えられます。

  • {name}: Spider名
  • {time}: Spiderの開始時刻
  • {chunk}: チャンクに含まれるアイテムの開始番号(0, 100, 200, ...)。上記ではFormat Stringを使ってゼロパディングしている。

Spiderのコールバックメソッドではスクレイピング処理を行わず、単にs3pipeline.Pageオブジェクトをyieldするのみとします。Pageクラスはurl, body, crawled_atの3つのフィールドを持ち、Page.from_response(response)とするだけでこれらの値が埋められます。

# ...

from s3pipeline import Page

# ...

class TabelogSpider(CrawlSpider):

    # ...

    def parse(self, response):
        yield Page.from_response(response)

ここまでできたら以下のコマンドを実行し、問題なくクローラーが動作していればOKです。

(venv) $ scrapy crawl tabelog

正常に動作していれば以下のようにS3にオブジェクトが作成されるはずです。

f:id:mi_kattun:20171206223934p:plain

4. ScrapyのプロジェクトをDockerizeする

作成したSpiderをDockerコンテナ内で実行できるようにします。

以下の内容でDockerfileを作成します。

FROM python:3.6.3

WORKDIR /app

COPY requirements.txt .

RUN pip install -r requirements.txt

COPY . .

CMD ["scrapy", "crawl", "tabelog"]

.dockerignoreファイルも作成しておきます。

venv/
.scrapy/
.git/

以下のコマンドで、Dockerイメージをビルドして実行できればOKです。

$ docker build -t serverless-crawler .
$ docker run --rm -it serverless-crawler

通常Dockerコンテナ内ではAWSのクレデンシャルを取得できないので、botocore.exceptions.NoCredentialsError: Unable to locate credentials などのエラーが発生するはずです。Fargateでの実行時にはIAM Roleで取得できるようになるので特に問題ありません。Ctrl-Cを2回押してSpiderの実行を終了しておきます。

5. Amazon ECRにリポジトリを作成する

さて、ここからはAWSのマネジメントコンソールで作業します。2017年12月時点ではAWS Fargateは北部バージニアリージョン(us-east-1)のみに対応しているので、以降の手順ではすべて北部バージニアリージョンを使います。

Amazon ECSの管理画面の「リポジトリ」から serverless-crawler という名前のリポジトリを作ります。

6. DockerイメージをAmazon ECRにpushする

リポジトリ作成時に手順が表示されるはずですが、以下のようにログインして先ほどビルドしたDockerイメージをAmazon ECRにpushします。

ログイン:

$ $(aws ecr get-login --no-include-email --region us-east-1)
Login Succeeded

タグ付けとプッシュ(<AWS ID>はご自身のIDに置き換えてください):

$ docker tag serverless-crawler:latest <AWS ID>.dkr.ecr.us-east-1.amazonaws.com/serverless-crawler:latest
$ docker push <AWS ID>.dkr.ecr.us-east-1.amazonaws.com/serverless-crawler:latest

f:id:mi_kattun:20171206224109p:plain

7. IAM Roleを作成する

ECSのタスクからS3にPUTできるよう、EC2 Container Service Task (ecs-tasks.amazonaws.com) をTrusted EntityとするIAM Roleを作成しておきます。 ecsTaskServerlessCrawler という名前のロールを作りました。

本来はポリシーを書いて必要最低限の権限のみを割り当てるべきですが、手抜きしてAWS管理ポリシーAmazonS3FullAccessを割り当てました。

8. ECSクラスターを作成する

ここからいよいよAmazon ECSの管理画面での作業です。「クラスター」→「クラスターの作成」から、Networking Only (Powered by AWS Fargate) のクラスターを作成します。defaultという名前にしました。

「Getting Started with Amazon Elastic Container Service (Amazon ECS) using Fargate」というウィザードもありますが、今回は使いません。このウィザードを使うとECSのサービスも一緒に作成されますが、バッチ処理のように実行完了したら終了したいタスクはクラスターとタスク定義だけあれば良く、サービスは不要なためです。

f:id:mi_kattun:20171206224227p:plain

9. ECSタスク定義を作成する

「タスク定義」→「新しいタスク定義の作成」から、実行するタスクの雛形であるタスク定義を作成します。

  • ステップ 1: Select launch type compatibility
    • Launch Type Conmpatibility: Fargate
  • ステップ 2: Configure task and container definitions
    • タスク定義名: serverless-crawler
    • タスクロール: ecsTaskServerlessCrawler (7で作成したもの)
    • Task execution role: ecsTaskExecutionRole (初回に自動作成されるはず)
    • Task memory (GB): 0.5GB
    • Task CPU (vCPU): 0.25 vCPU
    • コンテナ定義
      • コンテナ名: serverless-crawler
      • イメージ: <AWS ID>.dkr.ecr.us-east-1.amazonaws.com/serverless-crawler:latest (6でpushしたもの)
      • メモリ制限: ソフト制限 128MB
      • 環境変数など必要なものがあればここで追加する

10. タスクを実行する

クラスタdefaultの「タスク」→「新しいタスクの実行」か、タスク定義serverless-crawlerの「アクション」→「タスクの実行」からタスクを実行します。

Launch typeでFARGATEを選択し、適切なVPCとサブネットを選択して実行します。この時点でタスクのロールやコンテナの環境変数などを上書きすることも可能です。

f:id:mi_kattun:20171206224516p:plain

実行開始して1分ほど待つとRUNNING状態になり、ログを確認できるようになります。 ただ、ECSの管理画面に表示されるログは見づらいので、CloudWatch Logsで見たほうがわかりやすいです。

5分ほどでSpiderの実行は終了し、S3にアイテムが生成されているのが確認できます。

おまけ: Lambda関数を作成する

スクレイピング処理は、こんな感じのLambda関数(Python 3.6)を作成し、S3へのオブジェクトの作成をトリガーとして呼び出されるようにします。Lambdaに関しては世の中に多くの情報があるのでここでは詳しく解説しませんが、以下の点には気をつけてください。

  • サードパーティライブラリ(この例ではlxmlとcssselect)は一緒にパッケージングする。
  • S3からファイルを取得できるようIAM Roleを設定する。
  • 同じファイルへのイベントが複数来ることがあるので、処理がべき等になるようにする。
  • LambdaをVPC内で実行する場合はLambdaからS3にアクセスできるようVPCエンドポイントを作るなどする。
import io
import json
import gzip

import boto3
import lxml.html


def lambda_handler(event, context):
    """
    Lambdaから呼び出されるエントリーポイント
    """
    for record in event['Records']:
        bucket_name = record['s3']['bucket']['name']
        object_key = record['s3']['object']['key']
        process_object(bucket_name, object_key)


def process_object(bucket_name, object_key):
    """
    1つのS3オブジェクトを処理する。
    """
    s3 = boto3.client('s3')

    for page in read_pages(s3, bucket_name, object_key):
        scrape_from_page(page)


def read_pages(s3, bucket_name, object_key):
    """
    S3オブジェクトからページをyieldするジェネレーター
    """
    use_gzip = object_key.endswith('.gz')

    bio = io.BytesIO()
    s3.download_fileobj(bucket_name, object_key, bio)
    bio.seek(0)
    f = gzip.GzipFile(mode='rb', fileobj=bio) if use_gzip else bio

    for line in f:
        page = json.loads(line)
        yield page


def scrape_from_page(page):
    """
    1つのページからlxmlなどを使ってスクレイピングする。
    """
    root = lxml.html.fromstring(page['body'])
    restaurant = {
        'url': page['url'],
        'name': root.cssselect('.display-name').text_content().strip(),
    }
    # スクレイピングしたデータのDBへの保存などを行う

まとめ

AWS Fargateを使って、EC2の管理をすることなくDocker化されたバッチ処理クローラー)を実行できました。バッチ処理に合わせてクラスタを起動・終了しなくてもクラスタ実行時間のみが課金対象となるので楽です。Lambdaのような制限も少なく、既存の処理も移行しやすそうです。

タスクの起動に時間がかかるのはやや気になりますが、ある程度長時間実行するクローラーであれば影響は少ないでしょう。今回はDebianベースのDockerイメージを使ったので、Alpineベースのイメージにするなど、イメージサイズを減らすことで改善するかもしれません。

他にもやり残した点として、クローラーのグレースフルな停止があります。ScrapyのクローラーをDocker化するとコンテナの停止時(マネジメントコンソールからECSのタスクを停止した時)にSIGTERMが送られて一瞬で終了してしまうので、SIGINTによるグレースフルなクローラーの停止に対応させたいです。

Pythonクローリング&スクレイピングはおかげさまで1万部を超えるヒットとなりました。出版当時の2016年12月はLambdaのPython 3対応もなかった時代で、7章の運用ではオーソドックスにEC2内のCronでプロセスを実行していましたが、Fargateの登場によってEC2なしで運用できるようになりました。Fargateが東京リージョンに来るのを楽しみにしています。

scraping-book.com

*1:ここでのクローラーは数時間に渡って1つのサイトを数万ページほど巡回するものを想定しています。また、クローリング処理とスクレイピング処理を分けて実行するクローラーを前提としています。

*2:筆者の知る限り。

*3:引用者注:クロール間隔を空けることを考えるとI/O多重化して複数のリクエストを送ることはできないため。

*4:HTTPキャッシュでも似たようなことは実現できますが、スクレイピングの再実行の効率やクロール結果の世代管理などを考えると、明示的に保存しておくほうがやりやすいと考えています。