Google FunctionsでYoutube APIを呼び出してのコメントをダウンロードするツールを作ってみました。Python 3.10で書いています。Googleによる審査を受けていないので,エンドポイントを一般公開することはできません。主要部分のソースコードは示します。Google APIの設定等は省略しています。





from flask import Flask, Response, make_response
import functions_framework
import json
import os
import requests
import tempfile
RECORD_FORMAT = '"{}","{}","{}"\n'
class APIException(Exception):
"""
API呼出のエラー
"""
def __init__(self, code, text):
self.code = code
self.text = text
super().__init__("APIの呼出でエラーが発生しました。status:{code}, message{text}".format(code=code, text=text))
@functions_framework.http
def get_comments(request):
"""HTTP Cloud Function.
Args:
request (flask.Request): The request object.
<https://flask.palletsprojects.com/en/1.1.x/api/#incoming-request-data>
Returns:
The response text, or any set of values that can be turned into a
Response object using `make_response`
<https://flask.palletsprojects.com/en/1.1.x/api/#flask.make_response>.
"""
if request.method == 'GET':
response = do_get(request)
else:
response = do_post(request)
return response
def do_get(request):
"""
GETリクエストを処理する
Args:
request : Request
Returns:
Response
"""
if len(request.query_string.decode('utf-8')) == 0:
return handle_init_access()
else:
return handle_login(request)
def do_post(request):
"""
POSTリクエストを処理する
Args:
request : Request
Returns:
Response
"""
if 'channel_id' in request.form:
response = make_response()
try:
response.data = execute_job(request)
response.headers['Content-Type'] = 'application/octet-stream'
response.headers['Content-Disposition'] = "attachment; filename=comment_{}.csv".format(request.form['channel_id'])
except Exception as e:
with open("./form.html", 'r') as f:
template = f.read()
response.data = template.format(access_token=request.form['access_token'], message=str(e))
response.headers['Content-Type'] = 'text/html'
finally:
return response
else:
return handle_token(request)
def handle_init_access():
"""
初期画面用のhtmlを返す
Returns:
String
初期画面用のhtml
"""
template = ''
with open("./init.html", 'r') as f:
template = f.read()
return template.format(client_id=os.environ.get('CLIENT_ID'), limit=os.environ.get('LIMIT'))
def handle_login(request):
"""
認証処理を行う
Args:
request : Request
Returns:
認証用html
"""
template = ''
with open("./login.html", 'r') as f:
template = f.read()
client_id = os.environ.get('CLIENT_ID')
client_secret = os.environ.get('CLIENT_SECRET')
return template.format(client_id=client_id, client_secret=client_secret, code=request.args['code'])
def get_tokens(request):
"""
アクセストークンを取得する
Args:
request : Request
Returns:
Dictionary
succes: 成功か否か,status_code : API応答のHTTPステータスコード,access_token : アクセストークン,refresh_token : リフレッシュトークン,message : メッセージ
"""
params = { "code" : request.form['code'], "grant_type" : "authorization_code", "client_secret" : os.environ.get('CLIENT_SECRET'), \
"client_id" : os.environ.get('CLIENT_ID'), os.environ.get('REDIRECT_URI') }
header = { "Content-Type" : "application/json" }
response = requests.post("https://oauth2.googleapis.com/token", params=params, headers=header)
if response.status_code == 200:
result = json.loads(response.text)
tokens = { "success" : True, "status_code" : 200, "access_token" : result['access_token'], "refresh_token" : result['refresh_token'], "message" : "" }
else:
tokens = { "success" : False, "status_code" : response.status_code, "access_token" : None, "refresh_token" : None, "message" : response.text }
return tokens
def handle_token(request):
"""
トークンエンドポイントからのレスポンスからアクセストークンを読込み,処理実行画面のhtmlを返す
Args:
request : Request
Returns:
String
処理実行画面のhtml
"""
tokens = get_tokens(request)
with open("./form.html", 'r') as f:
template = f.read()
return template.format(access_token=tokens['access_token'], message=tokens['message'])
def execute_job(request):
"""
Youtubeのコメント取得処理を実行する
Args:
request : Request
Returns:
String
取得したコメントをCSVにした文字列
"""
access_token = request.form['access_token']
channel_id = request.form['channel_id']
limit = int(os.environ.get('LIMIT'))
total_count = 0
with tempfile.TemporaryFile(mode='r+', encoding='utf-8') as fp:
token = None
while True:
next_token = list_comments(fp, token, channel_id, access_token)
total_count += next_token['count']
token = next_token['token'] if total_count <= limit else None
if token == None:
fp.seek(0)
return fp.read()
def list_comments(f, token, channel_id, access_token):
"""
Youtubeコメントを取得して一時ファイルに保存し,次のトークンと件数を含むDictionaryを返す
Args:
f : Tempfile
取得結果を書き込む一時ファイル
token : String
次のページを示すトークン
channel_id : String
YoutubeのチャンネルID
access_token : String
Youtube APIのアクセストークン
Returns:
Dictionary
token : 次のページを示すトークン,count : 件数
"""
url = "https://www.googleapis.com/youtube/v3/commentThreads"
header = { "authorization" : "Bearer {}".format(access_token) }
param = { "part" : "snippet", "allThreadsRelatedToChannelId" : channel_id, "textFormat" : "plainText" }
if not token == None:
param['pageToken'] = token
response = requests.get(url, params = param, headers = header)
count = 0
if response.status_code == 200:
result = json.loads(response.text)
next_page_token = result['nextPageToken'] if 'nextPageToken' in result else None
for item in result['items']:
snippet = item['snippet']['topLevelComment']['snippet']
text = snippet["textOriginal"].replace("\r", "").replace("\n", "\\n")
f.write(RECORD_FORMAT.format(snippet['publishedAt'], snippet["authorDisplayName"], text))
count += 1
f.flush()
return { 'token' : next_page_token, 'count' : count }
else:
raise APIException(response.status_code, response.text)