概要

実行異常でクラスタが存在し続けることがあります。 未発見の場合、ずっと実行がなされるので異常な料金が発生します。

そこで、定期的に実行の確認をするために、Cloud Functionsを活用して ログを抽出して存在すればslackチャンネルを通知する というシステムを構築しました。

対応方法

前提

Webhook URL?

今回このslackへの通知に使用する「Webhook URL」についての解説

・アプリケーションの更新情報を他のアプリケーションへ リアルタイム提供する仕組みや概念

・イベント(リポジトリにプッシュなど)発生時 指定した**URLにPOSTリクエストする仕組み

Webhookの説明を見ると**「通知する、Webhookを送る」などの言葉が用いられているが、 これはPOSTリクエストのことを指している**。

Webhookを利用すると何ができるのか? GitHubやSlackなどのサービスのWebhookを利用すると、ユーザーはサービス側がPOSTリクエストするURLを指定可能

手順

1: Cloud Scheduler に定期実行スケジュールなどを登録 2: Cloud Scheduler が Cloud Pub/Subトピックにメッセージを送信(←トリガーの役割) 3: Cloud Functions が起動 4: Cloud Functions内でSlackに投稿

アーキテクチャ

Cloud Scheduler > Cloud Pub/Sub > Cloud Functions > slack

Cloud Scheduler & Pub/Subを設定

◼︎[Cloud Scheduler] スケジューラーからもCloud Pub/Subの設定が可能。

設定方法は省略

実装内容

一定時間ごとに下記コードが実行

実装概要

異常起動実行を確認して一定時間以上起動していると 異常と認識してslaackへ通知

Cloud Functions

◼︎必要モジュールファイル

ファイル名:requirements.txt   必要なモジュールなどバージョンのインポートするバージョンなどを記載してファイル保存

Function dependencies, for example:

package>=version

pytz==2022.1 google-cloud-dataproc; python-dateutil==2.8.2 requests==2.23.0◼︎実行コード ファイル名:main.py

from google.cloud import proc import datetime import dateutil.parser import requests import base64 import os import json

def extract(x): return (x.cluster_name, x.status.state, x.status.state_start_time)

def me2(y, t_limit): return y[2] < t_limit

def check_cluster(event, context):

if 'data' in event:
    s = base64.b64decode(event['data']).decode('utf-8')
else:
    s = os.environ.get('max(指標)', '(数値)')
m = int(s)

client = proc.ClusterControllerClient(
client_options={"api_endpoint": "地域-dataproc.googleapis.com:443"}
)

# Initialize request argument(s)
request = proc.ListClustersRequest(
    project_id="プロジェクト名", 
    region="",
)

# Make the request
page = client.list_clusters(request=request)

t_delta = datetime.timedelta(minutes = m)
t_limit = datetime.datetime.now(datetime.timezone.utc) - t_delta

me1 = lambda x: me2(x, time_limit)

list = list(filter(me1, map(extract, page)))

if list:
    webhook_url = "https://hooks.slack.com/~"
    message = f'`{checked_list[0][2]}から{m}時間以上`'
    requests.post(webhook_url, data=json.dumps({'text': message}))
else:
    print("その他")

実行補足 functionsを作成する際に ランタイム環境変数を設定して引数を渡すことが可能 スケジューラーのメッセージ本文からも引数設定g可能

参照資料

Google cloud scheduler で Functionsを定期実行する