本日より令和4年分の所得税/復興特別所得税の確定申告の受付が始まりました。さっそく,Webからマイナンバーカードを使って申告・納税しました。マイナンバーカードのメリットは,確定申告のために税務署に行かずにすむことくらいのように思います。

本日より令和4年分の所得税/復興特別所得税の確定申告の受付が始まりました。さっそく,Webからマイナンバーカードを使って申告・納税しました。マイナンバーカードのメリットは,確定申告のために税務署に行かずにすむことくらいのように思います。
CentOSがサポート終了になってしまったので,乗換先候補の筆頭ともいえるRocky LinuxをHyper-V上にインストールしてみました。いきなりRocky Linuxリポジトリのサーバ証明書が検証できないというエラーでひっかかりましたが,どうにかパッチもインストールして使える状態になりました。
SQL Serverを2019から2022へアップグレードしました。アップグレードにあたってデータをエクスポートしてインポートするといった面倒な作業は一切不要というのがありがたいです。OSS製品の場合は互換性で苦労することもよくありますが,さすが商用製品です。
https://www.microsoft.com/ja-jp/sql-server/sql-server-2022
趣味でやっていたペン習字でようやく昇段が認められて,初等師範の免許状を頂きました。
令和4年分の消費税の確定申告を行いました。中間申告での納付分もあるので,納税額はこんな金額ではありません。所得税は2月16日以降に申告しないと,過誤納として還付されてしまうと聞いたことがあるので,2月16日に申告作業を行います。
Google FunctionsでYoutube APIを呼び出してのコメントをダウンロードするツールを作ってみました。Python 3.10で書いています。Googleによる審査を受けていないので,エンドポイントを一般公開することはできません。主要部分のソースコードは示します。Google APIの設定等は省略しています。
from flask import Flask, Response, make_response
import functions_framework
import json
import os
import requests
import tempfile
RECORD_FORMAT = '"{}","{}","{}"\n'
class APIException(Exception):
"""
API呼出のエラー
"""
def __init__(self, code, text):
self.code = code
self.text = text
super().__init__("APIの呼出でエラーが発生しました。status:{code}, message{text}".format(code=code, text=text))
@functions_framework.http
def get_comments(request):
"""HTTP Cloud Function.
Args:
request (flask.Request): The request object.
<https://flask.palletsprojects.com/en/1.1.x/api/#incoming-request-data>
Returns:
The response text, or any set of values that can be turned into a
Response object using `make_response`
<https://flask.palletsprojects.com/en/1.1.x/api/#flask.make_response>.
"""
if request.method == 'GET':
response = do_get(request)
else:
response = do_post(request)
return response
def do_get(request):
"""
GETリクエストを処理する
Args:
request : Request
Returns:
Response
"""
if len(request.query_string.decode('utf-8')) == 0:
return handle_init_access()
else:
return handle_login(request)
def do_post(request):
"""
POSTリクエストを処理する
Args:
request : Request
Returns:
Response
"""
if 'channel_id' in request.form:
response = make_response()
try:
response.data = execute_job(request)
response.headers['Content-Type'] = 'application/octet-stream'
response.headers['Content-Disposition'] = "attachment; filename=comment_{}.csv".format(request.form['channel_id'])
except Exception as e:
with open("./form.html", 'r') as f:
template = f.read()
response.data = template.format(access_token=request.form['access_token'], message=str(e))
response.headers['Content-Type'] = 'text/html'
finally:
return response
else:
return handle_token(request)
def handle_init_access():
"""
初期画面用のhtmlを返す
Returns:
String
初期画面用のhtml
"""
template = ''
with open("./init.html", 'r') as f:
template = f.read()
return template.format(client_id=os.environ.get('CLIENT_ID'), limit=os.environ.get('LIMIT'))
def handle_login(request):
"""
認証処理を行う
Args:
request : Request
Returns:
認証用html
"""
template = ''
with open("./login.html", 'r') as f:
template = f.read()
client_id = os.environ.get('CLIENT_ID')
client_secret = os.environ.get('CLIENT_SECRET')
return template.format(client_id=client_id, client_secret=client_secret, code=request.args['code'])
def get_tokens(request):
"""
アクセストークンを取得する
Args:
request : Request
Returns:
Dictionary
succes: 成功か否か,status_code : API応答のHTTPステータスコード,access_token : アクセストークン,refresh_token : リフレッシュトークン,message : メッセージ
"""
params = { "code" : request.form['code'], "grant_type" : "authorization_code", "client_secret" : os.environ.get('CLIENT_SECRET'), \
"client_id" : os.environ.get('CLIENT_ID'), os.environ.get('REDIRECT_URI') }
header = { "Content-Type" : "application/json" }
response = requests.post("https://oauth2.googleapis.com/token", params=params, headers=header)
if response.status_code == 200:
result = json.loads(response.text)
tokens = { "success" : True, "status_code" : 200, "access_token" : result['access_token'], "refresh_token" : result['refresh_token'], "message" : "" }
else:
tokens = { "success" : False, "status_code" : response.status_code, "access_token" : None, "refresh_token" : None, "message" : response.text }
return tokens
def handle_token(request):
"""
トークンエンドポイントからのレスポンスからアクセストークンを読込み,処理実行画面のhtmlを返す
Args:
request : Request
Returns:
String
処理実行画面のhtml
"""
tokens = get_tokens(request)
with open("./form.html", 'r') as f:
template = f.read()
return template.format(access_token=tokens['access_token'], message=tokens['message'])
def execute_job(request):
"""
Youtubeのコメント取得処理を実行する
Args:
request : Request
Returns:
String
取得したコメントをCSVにした文字列
"""
access_token = request.form['access_token']
channel_id = request.form['channel_id']
limit = int(os.environ.get('LIMIT'))
total_count = 0
with tempfile.TemporaryFile(mode='r+', encoding='utf-8') as fp:
token = None
while True:
next_token = list_comments(fp, token, channel_id, access_token)
total_count += next_token['count']
token = next_token['token'] if total_count <= limit else None
if token == None:
fp.seek(0)
return fp.read()
def list_comments(f, token, channel_id, access_token):
"""
Youtubeコメントを取得して一時ファイルに保存し,次のトークンと件数を含むDictionaryを返す
Args:
f : Tempfile
取得結果を書き込む一時ファイル
token : String
次のページを示すトークン
channel_id : String
YoutubeのチャンネルID
access_token : String
Youtube APIのアクセストークン
Returns:
Dictionary
token : 次のページを示すトークン,count : 件数
"""
url = "https://www.googleapis.com/youtube/v3/commentThreads"
header = { "authorization" : "Bearer {}".format(access_token) }
param = { "part" : "snippet", "allThreadsRelatedToChannelId" : channel_id, "textFormat" : "plainText" }
if not token == None:
param['pageToken'] = token
response = requests.get(url, params = param, headers = header)
count = 0
if response.status_code == 200:
result = json.loads(response.text)
next_page_token = result['nextPageToken'] if 'nextPageToken' in result else None
for item in result['items']:
snippet = item['snippet']['topLevelComment']['snippet']
text = snippet["textOriginal"].replace("\r", "").replace("\n", "\\n")
f.write(RECORD_FORMAT.format(snippet['publishedAt'], snippet["authorDisplayName"], text))
count += 1
f.flush()
return { 'token' : next_page_token, 'count' : count }
else:
raise APIException(response.status_code, response.text)
サブのマシンとして使用していたノートPC(MacBook Air 2019)1台を都内の児童養護施設へ寄贈しました。開発で使うことは難しいのですが,事務用PCあるいは学習用PCとしては十分に使用可能ですし,購入後3年未満でApple Care+も有効な状態ですから,役に立てて頂ければ幸いです。
WebAPI試験用のモックを作るついでに,Socketを使った低レベルのWebサーバをPythonで作りました。http://localhost:8080/*.htmlでアクセスするとindex.htmlファイルを返して,それ以外はNot Found(404)を返します。またhttp://localhost:8080/quitをリクエストするとサーバが停止します。
※セキュリティに関しては一切考慮していないので,インターネット公開するサーバでは実行しないでください。ディレクトリトラバーサルは簡単に実行できるでしょう。脆弱サーバの実験用です。
import re
import time
import socket
HOST = '127.0.0.1'
PORT = 8080
BUFFER_SIZE = 4096
def main():
print("Server Listening")
while True:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.bind((HOST, PORT))
sock.listen(2)
sock.settimeout(10)
connection = None
try:
connection, address = sock.accept()
print("connection from {} has been established!".format(address))
recv = connection.recv(BUFFER_SIZE)
print(recv)
data = parse(recv.decode('UTF-8'))
if data['url'] == '/quit':
break
response = create_response(data)
print(response)
connection.send(response.encode('UTF-8'))
except TimeoutError as e:
continue
except Exception as e:
print(str(e))
response = format_response(500, "Internal Server Error", "text/plain", str(e))
if connection:
connection.send(response.encode('UTF-8'))
finally:
if connection:
connection.shutdown(socket.SHUT_RDWR)
connection.close()
time.sleep(1)
def parse(recv):
array = recv.splitlines()
data = {}
if len(array) > 0:
m = re.match(r'(GET|POST|PATCH|PUT|DELETE|HEAD)\s+(.*)\s+(.*)', array[0])
if m:
data['method'] = m.group(1)
data['url'] = m.group(2)
data['payload'] = array[len(array) - 1]
return data
def create_response(data):
if re.match(r'.*\.html', data['url']):
with open('index.html', 'r', encoding='utf-8') as f:
body = f.read()
return format_response(200, "OK", "text/html; charset=utf-8", body)
return format_response(404, "Not Found", "text/plain", "{} Not Found".format(data['url']))
def format_response(code, status, type, body):
response = "HTTP/1.0 {} {}\nContent-Type: {}\n\n{}\n"
return response.format(code, status, type, body);
if __name__ == '__main__':
main()
簡単なhtmlファイルを置いて実行してみます。
<!DOCTYPE html>
<html lang="ja">
<head>
<title>Hello</title>
</head>
<body>
<h1>Hello World</h1>
</body>
</html>
Server Listening
connection from (‘127.0.0.1’, 55843) has been established!
b’GET /aaa.html HTTP/1.1\r\nHost: localhost:8080\r\nConnection: keep-alive\r\nsec-ch-ua: ” Not A;Brand”;v=”99″, “Chromium”;v=”101″, “Opera”;v=”87″\r\nsec-ch-ua-mobile: ?0\r\nsec-ch-ua-platform: “Windows”\r\nUpgrade-Insecure-Requests: 1\r\nUser-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.67 Safari/537.36 OPR/87.0.4390.45\r\nAccept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9\r\nSec-Fetch-Site: none\r\nSec-Fetch-Mode: navigate\r\nSec-Fetch-User: ?1\r\nSec-Fetch-Dest: document\r\nAccept-Encoding: gzip, deflate, br\r\nAccept-Language: ja,en-US;q=0.9,en;q=0.8\r\n\r\n’
HTTP/1.0 200 OK
Content-Type: text/html; charset=utf-8
<!DOCTYPE html>
<html lang=”ja”>
<head>
<title>Hello</title>
</head>
<body>
<h1>Hello World</h1>
</body>
</html>
connection from (‘127.0.0.1’, 55848) has been established!
b’GET /favicon.ico HTTP/1.1\r\nHost: localhost:8080\r\nConnection: keep-alive\r\nsec-ch-ua: ” Not A;Brand”;v=”99″, “Chromium”;v=”101″, “Opera”;v=”87″\r\nsec-ch-ua-mobile: ?0\r\nUser-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.67 Safari/537.36 OPR/87.0.4390.45\r\nsec-ch-ua-platform: “Windows”\r\nAccept: image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8\r\nSec-Fetch-Site: same-origin\r\nSec-Fetch-Mode: no-cors\r\nSec-Fetch-Dest: image\r\nReferer: http://localhost:8080/aaa.html\r\nAccept-Encoding: gzip, deflate, br\r\nAccept-Language: ja,en-US;q=0.9,en;q=0.8\r\n\r\n’
HTTP/1.0 404 Not Found
Content-Type: text/plain
/favicon.ico Not Found
Pythonプログラミングの資格試験を受けてきました。60分あったのですが,30分位で終わらせて「試験終了」ボタンを押しました。プログラミング経験者にはそれほど難しい試験ではないように思いました。