雑記

Rocky Linux 9

CentOSがサポート終了になってしまったので,乗換先候補の筆頭ともいえるRocky LinuxをHyper-V上にインストールしてみました。いきなりRocky Linuxリポジトリのサーバ証明書が検証できないというエラーでひっかかりましたが,どうにかパッチもインストールして使える状態になりました。

https://rockylinux.org/

雑記

消費税確定申告

令和4年分の消費税の確定申告を行いました。中間申告での納付分もあるので,納税額はこんな金額ではありません。所得税は2月16日以降に申告しないと,過誤納として還付されてしまうと聞いたことがあるので,2月16日に申告作業を行います。

雑記

Youtubeのコメントをダウンロードするツール

Google FunctionsでYoutube APIを呼び出してのコメントをダウンロードするツールを作ってみました。Python 3.10で書いています。Googleによる審査を受けていないので,エンドポイントを一般公開することはできません。主要部分のソースコードは示します。Google APIの設定等は省略しています。

Google Cloudのコンソール画面
from flask import Flask, Response, make_response
import functions_framework
import json
import os
import requests
import tempfile

RECORD_FORMAT = '"{}","{}","{}"\n'

class APIException(Exception):
  """
  API呼出のエラー
  """
  def __init__(self, code, text):
    self.code = code
    self.text = text
    super().__init__("APIの呼出でエラーが発生しました。status:{code}, message{text}".format(code=code, text=text))

@functions_framework.http
def get_comments(request):
  """HTTP Cloud Function.
  Args:
    request (flask.Request): The request object.
    <https://flask.palletsprojects.com/en/1.1.x/api/#incoming-request-data>
  Returns:
    The response text, or any set of values that can be turned into a
    Response object using `make_response`
    <https://flask.palletsprojects.com/en/1.1.x/api/#flask.make_response>.
   """
  if request.method == 'GET':
    response = do_get(request)
  else:
    response = do_post(request)
  return response

def do_get(request):
  """
  GETリクエストを処理する

  Args:
    request : Request
  Returns:
    Response 
  """
  if len(request.query_string.decode('utf-8')) == 0:
    return handle_init_access()
  else:
    return handle_login(request)

def do_post(request):
  """
  POSTリクエストを処理する

  Args:
    request : Request
  Returns:
    Response 
  """
  if 'channel_id' in request.form:
    response = make_response()
    try:
      response.data = execute_job(request)
      response.headers['Content-Type'] = 'application/octet-stream'
      response.headers['Content-Disposition'] = "attachment; filename=comment_{}.csv".format(request.form['channel_id'])
    except Exception as e:
      with open("./form.html", 'r') as f:
        template = f.read()
        response.data = template.format(access_token=request.form['access_token'], message=str(e))
      response.headers['Content-Type'] = 'text/html'
    finally:
      return response
  else:
    return handle_token(request)

def handle_init_access():
  """
  初期画面用のhtmlを返す

  Returns:
    String
      初期画面用のhtml
  """
  template = ''
  with open("./init.html", 'r') as f:
    template = f.read()
  return template.format(client_id=os.environ.get('CLIENT_ID'), limit=os.environ.get('LIMIT'))

def handle_login(request):
  """
  認証処理を行う

  Args:
    request : Request
  Returns:
    認証用html
  """
  template = ''
  with open("./login.html", 'r') as f:
    template = f.read()
  client_id = os.environ.get('CLIENT_ID')
  client_secret = os.environ.get('CLIENT_SECRET')
  return template.format(client_id=client_id, client_secret=client_secret, code=request.args['code'])

def get_tokens(request):
  """
  アクセストークンを取得する

  Args:
    request : Request
  Returns:
    Dictionary
      succes: 成功か否か,status_code : API応答のHTTPステータスコード,access_token : アクセストークン,refresh_token : リフレッシュトークン,message : メッセージ
  """
  params = { "code" : request.form['code'], "grant_type" : "authorization_code", "client_secret" : os.environ.get('CLIENT_SECRET'), \
             "client_id" : os.environ.get('CLIENT_ID'), os.environ.get('REDIRECT_URI') }
  header = { "Content-Type" : "application/json" }
  response = requests.post("https://oauth2.googleapis.com/token", params=params, headers=header)
  if response.status_code == 200:
    result = json.loads(response.text)
    tokens = { "success" : True, "status_code" : 200, "access_token" : result['access_token'], "refresh_token" : result['refresh_token'], "message" : "" }
  else:
    tokens = { "success" : False, "status_code" : response.status_code, "access_token" : None, "refresh_token" : None, "message" : response.text }
  return tokens

def handle_token(request):
  """
  トークンエンドポイントからのレスポンスからアクセストークンを読込み,処理実行画面のhtmlを返す

  Args:
    request : Request
  Returns:
    String
      処理実行画面のhtml
  """
  tokens = get_tokens(request)
  with open("./form.html", 'r') as f:
    template = f.read()
    return template.format(access_token=tokens['access_token'], message=tokens['message'])

def execute_job(request):
  """
  Youtubeのコメント取得処理を実行する

  Args:
    request : Request
  Returns:
    String
      取得したコメントをCSVにした文字列
  """
  access_token = request.form['access_token']
  channel_id = request.form['channel_id']
  limit = int(os.environ.get('LIMIT'))
  total_count = 0
  with tempfile.TemporaryFile(mode='r+', encoding='utf-8') as fp:
    token = None
    while True:
      next_token = list_comments(fp, token, channel_id, access_token)
      total_count += next_token['count']
      token = next_token['token'] if total_count <= limit else None
      if token == None:
        fp.seek(0)
        return fp.read()

def list_comments(f, token, channel_id, access_token):
  """
  Youtubeコメントを取得して一時ファイルに保存し,次のトークンと件数を含むDictionaryを返す

  Args:
    f : Tempfile
      取得結果を書き込む一時ファイル
    token : String
      次のページを示すトークン
    channel_id : String
      YoutubeのチャンネルID
    access_token : String
      Youtube APIのアクセストークン
  Returns:
    Dictionary
      token : 次のページを示すトークン,count : 件数
  """
  url = "https://www.googleapis.com/youtube/v3/commentThreads"
  header = { "authorization" : "Bearer {}".format(access_token) }
  param = { "part" : "snippet", "allThreadsRelatedToChannelId" :  channel_id, "textFormat" : "plainText" }
  if not token == None:
    param['pageToken'] = token
  response = requests.get(url, params = param, headers = header)
  count = 0
  if response.status_code == 200:
    result = json.loads(response.text)
    next_page_token = result['nextPageToken'] if 'nextPageToken' in result else None
    for item in result['items']:
      snippet = item['snippet']['topLevelComment']['snippet']
      text = snippet["textOriginal"].replace("\r", "").replace("\n", "\\n")
      f.write(RECORD_FORMAT.format(snippet['publishedAt'], snippet["authorDisplayName"], text))
      count += 1
    f.flush()
    return { 'token' : next_page_token, 'count' : count }
  else:
    raise APIException(response.status_code, response.text)

雑記

中古PCを児童養護施設へ寄贈

サブのマシンとして使用していたノートPC(MacBook Air 2019)1台を都内の児童養護施設へ寄贈しました。開発で使うことは難しいのですが,事務用PCあるいは学習用PCとしては十分に使用可能ですし,購入後3年未満でApple Care+も有効な状態ですから,役に立てて頂ければ幸いです。

雑記

低レベルWebサーバ

WebAPI試験用のモックを作るついでに,Socketを使った低レベルのWebサーバをPythonで作りました。http://localhost:8080/*.htmlでアクセスするとindex.htmlファイルを返して,それ以外はNot Found(404)を返します。またhttp://localhost:8080/quitをリクエストするとサーバが停止します。

※セキュリティに関しては一切考慮していないので,インターネット公開するサーバでは実行しないでください。ディレクトリトラバーサルは簡単に実行できるでしょう。脆弱サーバの実験用です。

import re
import time
import socket

HOST = '127.0.0.1'
PORT = 8080
BUFFER_SIZE = 4096

def main():
  print("Server Listening")
  while True:
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
      sock.bind((HOST, PORT))
      sock.listen(2)
      sock.settimeout(10)
      connection = None
      try:
        connection, address = sock.accept()
        print("connection from {} has been established!".format(address))
        recv = connection.recv(BUFFER_SIZE)
        print(recv)
        data = parse(recv.decode('UTF-8'))
        if data['url'] == '/quit':
          break
        response = create_response(data)
        print(response)
        connection.send(response.encode('UTF-8'))
      except TimeoutError as e:
        continue
      except Exception as e:
        print(str(e))
        response = format_response(500, "Internal Server Error", "text/plain",  str(e))
        if connection:
          connection.send(response.encode('UTF-8'))
      finally:
        if connection:
          connection.shutdown(socket.SHUT_RDWR)
          connection.close()
        time.sleep(1)

def parse(recv):
  array = recv.splitlines()
  data = {}
  if len(array) > 0:
    m = re.match(r'(GET|POST|PATCH|PUT|DELETE|HEAD)\s+(.*)\s+(.*)', array[0])
    if m:
      data['method'] = m.group(1)
      data['url'] = m.group(2)
      data['payload'] = array[len(array) - 1]
  return data


def create_response(data):
  if re.match(r'.*\.html', data['url']):
    with open('index.html', 'r', encoding='utf-8') as f:
      body = f.read()
      return format_response(200, "OK", "text/html; charset=utf-8", body)
  return format_response(404, "Not Found", "text/plain", "{} Not Found".format(data['url']))

def format_response(code, status, type, body):
  response = "HTTP/1.0 {} {}\nContent-Type: {}\n\n{}\n"
  return response.format(code, status, type, body);

if __name__ == '__main__':
  main()

簡単なhtmlファイルを置いて実行してみます。

<!DOCTYPE html>
<html lang="ja">
<head>
	<title>Hello</title>
</head>
<body>
	<h1>Hello World</h1>
</body>
</html>

Server Listening
connection from (‘127.0.0.1’, 55843) has been established!
b’GET /aaa.html HTTP/1.1\r\nHost: localhost:8080\r\nConnection: keep-alive\r\nsec-ch-ua: ” Not A;Brand”;v=”99″, “Chromium”;v=”101″, “Opera”;v=”87″\r\nsec-ch-ua-mobile: ?0\r\nsec-ch-ua-platform: “Windows”\r\nUpgrade-Insecure-Requests: 1\r\nUser-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.67 Safari/537.36 OPR/87.0.4390.45\r\nAccept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9\r\nSec-Fetch-Site: none\r\nSec-Fetch-Mode: navigate\r\nSec-Fetch-User: ?1\r\nSec-Fetch-Dest: document\r\nAccept-Encoding: gzip, deflate, br\r\nAccept-Language: ja,en-US;q=0.9,en;q=0.8\r\n\r\n’
HTTP/1.0 200 OK
Content-Type: text/html; charset=utf-8

<!DOCTYPE html>
<html lang=”ja”>
<head>
<title>Hello</title>
</head>
<body>
<h1>Hello World</h1>
</body>
</html>


connection from (‘127.0.0.1’, 55848) has been established!
b’GET /favicon.ico HTTP/1.1\r\nHost: localhost:8080\r\nConnection: keep-alive\r\nsec-ch-ua: ” Not A;Brand”;v=”99″, “Chromium”;v=”101″, “Opera”;v=”87″\r\nsec-ch-ua-mobile: ?0\r\nUser-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.67 Safari/537.36 OPR/87.0.4390.45\r\nsec-ch-ua-platform: “Windows”\r\nAccept: image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8\r\nSec-Fetch-Site: same-origin\r\nSec-Fetch-Mode: no-cors\r\nSec-Fetch-Dest: image\r\nReferer: http://localhost:8080/aaa.html\r\nAccept-Encoding: gzip, deflate, br\r\nAccept-Language: ja,en-US;q=0.9,en;q=0.8\r\n\r\n’
HTTP/1.0 404 Not Found
Content-Type: text/plain

/favicon.ico Not Found

情報セキュリティ

CSPヘッダとFacebook SDK

近年はXSS対策などのためサーバ上でCSPヘッダを付加することが増えている。詳しいことはリンク先に書かれているが,簡単に言うとHTTPヘッダでjavascriptやStylesheetを読み込んで実行する対象を制限することができる。これによって,不正なスクリプトが実行されることを防止しようというものだ。

ところが,Facebookと連携させようとすると,CSPが邪魔をしてしまう。Facebookのsdkはjavascriptの中で更に別のjavascriptを読込み,動的にscriptタグをDOM内に追加するのである。CSPヘッダのscript-srcに’unsafe-inline’を指定することで実行できるのであるが,これではXSSに対して無防備となる。インラインのjavascriptには毎回nonce属性にランダムな文字列を生成してセットし,CSPヘッダにも’nonce-{設定したnonce値}’という文字列を設定した方がよい。ただし,Facebookのようにjavascript内で動的に生成されるscriptタグにnonce属性は設定されていない。そのためにブラウザで読み込まなくなってしまうのである。

CSPによりJavascriptがブロックされている

動かすためには,動的に追加されるscriptタグにnonceを設定する必要がある。facebookのjavascriptを書き換えられればよいのであるが,それはできない。どうするか迷った末に,scriptタグを追加する関数を書き換えてみることにした。

htmlのDOMからheadノードを取得して,そのappendChild()メソッドを書き換えて,子要素を追加する際に追加される子要素にnonce属性を追加してしまおうという作戦である。次のようにhead要素を取得(1行目)して,そのappendChildメソッドの参照を取得ておく(2行目)。そして,head要素のappendChildメソッドに新しくfunctionを定義する(3-6行目)。このfunctionの中では子要素にsetAttributeメソッドを使って”nonce”属性を設定する。その後,2行目で取得しておいた元々のappendChildメソッドの参照を使って,元の関数を呼び出す。

    head = document.getElementsByTagName('head')[0];
    _appendChild = head.appendChild;
    head.appendChild = function(child) {
      child.setAttribute('nonce', "<?php echo $nonce; ?>");
      _appendChild.apply(this, arguments);
    } 

このJavascriptを組み込んで再度Webブラウザで読み込むと,ブロックされずにJavascriptが実行された。


実験に用いたソースファイル(index.php)

<?php
function getNonce() {
  return base64_encode(openssl_random_pseudo_bytes(20)); 
}
function createCSPHeader($nonce) {
   $nonces = array( 
       'default-src' => "'self'",
       'img-src' => "'self' data:",
       'script-src' => "'unsafe-eval' 'strict-dynamic' 'nonce-".$nonce."'" ,
       'style-src' => "'nonce-".$nonce."'"
   );
   $csp = 'Content-Security-Policy: ';
   $delimiter = '';
   foreach($nonces as $key => $value) {
     $csp = $csp.$delimiter.$key.' '.$value;
     $delimiter = ';';
   }     
   return $csp;      
}
$nonce = getNonce();
$csp_header = createCSPHeader($nonce);
header($csp_header);
?>
<!DOCTYPE html>
<html lang="ja">
<head>
    <title>PHP TEST</title>
    <style type="text/css" nonce="<?php echo $nonce ?>">
    .container {
        background-color: #F3F3F3;
        width: 100%;
        height; 100%;
    }
    </style>
    <script nonce="<?php echo $nonce; ?>">
    window.fbAsyncInit = function() {
      FB.init({
        appId   :     'YOUR_APPLICATION_ID',
        cookie  :     true,
        oauth   :     true,
        version :     'v2.9'
      });
    }
    </script>    
    <script nonce="<?php echo $nonce; ?>">
    head = document.getElementsByTagName('head')[0];
    _appendChild = head.appendChild;
    head.appendChild = function(child) {
      child.setAttribute('nonce', "<?php echo $nonce; ?>");
      _appendChild.apply(this, arguments);
    } 
    </script>
    
</head>
<body>
    <div class="container">
        <h1>Hello World.</h1>
        <p>Nonce: <?php echo $nonce; ?></p>
        <p><?php echo $csp_header; ?></p>
    </div>
</body>
</html>