fast apiの設定
1 目的
すぐに環境やらコード(の場所)やらを忘れるので、自分の為にメモを残す
2 簡単なfastapiの環境とコード
私の環境では、c:\users\ebata\fastapi1,2,3,4,5あたりにある。
2.1. 最新のPIPをアップグレードする
> pip install --upgrade pip
> python -m pip install --upgrade pip
> python3 -m pip install --upgrade pip
のいずれかでできる。
2.2. uvicorn, fastapi, pydantic, pandas, requests, jsonのモジュールをインストールする
> pip3 install fastapi
> pip3 install uvicorn[standard]
は必須だが、後はプログラムを動かせば、プログラムの方から、インストールを指示されると思うので大丈夫(だろう)。
2.3. uvicornを起動する
> uvicorn index:app --reload
これで、自動的に"index.py"が動き出す。
2.4. クライアントを起動する
> python request.py
で、起動を確認する。
以上
3. ソースコード
3.1. index.py
# https://miseruit.com/2022/07/18/post-2939/
from fastapi import FastAPI
from pydantic import BaseModel
import pandas
class Item(BaseModel):
ID: str
Name: str
Class: str
app = FastAPI() #インスタンスを作成
User_list =[
{"ID":"M001","Name":"Takashi","Class":"A"},
{"ID":"M002","Name":"Hanako","Class":"A"},
{"ID":"M003","Name":"Hiroshi","Class":"B"},
{"ID":"M004","Name":"Kyoko","Class":"B"},
]
# joson_normalize関数を用いてJsonデータをDataFrame型に変換します。
df = pandas.json_normalize(User_list)
# Get /Users/ : 全件取得
# Get /Users/{ID} :特定のIDのみ取得
# POST /Users/ :ユーザの登録
# DELETE /Users/{ID} :ユーザの削除
# curl http://localhost:8000/Users/
@app.get("/Users/")
async def users():
return User_list
# curl http://localhost:8000/Users/M003
@app.get("/Users/{u_id}")
async def users(u_id:str):
return list(filter(lambda item : item['ID']==u_id, User_list))
# Windowsの場合
# {"ID":"M005","Name":"Aya","Class":"C"}]を追加する
# curl -X POST -H "accept: application/json" -H "Content-Type: application/json" -d "{\"ID\":\"M005\", \"Name\":\"Aya\", \"Class\":\"C\"}" http://localhost:8000/Users/
@app.post("/Users/")
async def users(user: Item):
User_list.append({"ID": user.ID,"Name":user.Name,"Class":user.Class})
return User_list
# curl -X DELETE -H "accept: application/json" http://localhost:8000/Users/M003
@app.delete("/Users/{u_id}")
async def users(u_id:str):
global df
df = df.drop(df.index[df["ID"]==u_id])
return df.to_json(orient='records')
3.2. request.py
import requests
import json
# テスト
#url = 'https://wp.kobore.net/'
#response = requests.get(url)
#print(response.text)
#print()
print("-----start of get(1)")
# curl http://localhost:8000/Users/ と同じ機能を実施
r = requests.get('http://localhost:8000/Users/')
print(r.url)
print(r.status_code)
print(r.text)
print(r.json())
print("-----end of get(1)")
print()
# curl http://localhost:8000/Users/M003 と同じ機能を実施
print("-----start of get(2)")
r = requests.get('http://localhost:8000/Users/M003')
print(r.url)
print(r.status_code)
print(r.text)
print(r.json())
print("-----end of get(2)")
print()
print("-----start of delete")
# curl -X DELETE -H "accept: application/json" http://localhost:8000/Users/M003と同じ機能を実施
r = requests.delete('http://localhost:8000/Users/M003')
print(r.url)
print(r.status_code)
print(r.text)
print(r.json())
print("-----end of delete")
print()
# curl -X POST -H "accept: application/json" -H "Content-Type: application/json" -d "{\"ID\":\"M005\", \"Name\":\"Aya\", \"Class\":\"C\"}" http://localhost:8000/Users/ と同じ機能を実現
# {"ID":"M005","Name":"Aya","Class":"C"}]を追加する
headers = {'Accept': 'application/json', 'Content-Type': 'application/json'}
json_payload = '{"ID":"M005","Name":"Aya","Class":"C"}' # ここで100回くらいのパターンを試したぞ
print("headers:",headers)
print("payload:",json_payload)
#r = requests.post('http://localhost:8000/Users/',headers=headers, params=payload)
#r = requests.post('http://localhost:8000/Users/', headers=headers,params=json.loads(payload))
r = requests.post('http://localhost:8000/Users/', headers=headers, data=json_payload)
print(r.url)
print(r.status_code)
print(r.text)
print(r.json())