main.py
import uvicorn
from fastapi import FastAPI, File, UploadFile
import shutil
import os
app = FastAPI()
@app.post("/files/")
async def file(file: bytes = File(...)):
content = file.decode('utf-8')
formatfile = content.split('\n')
return {'filedetail': formatfile}
@app.post("/uploadfile/")
async def upload_file(file: UploadFile = File(...)):
if file:
filename = file.filename
fileobj = file.file
#UPLOAD_DIR = os.getcwd()
#print(UPLOAD_DIR)
#upload_dir = open(os.path.join(UPLOAD_DIR, filename),'wb+')
upload_dir = open(os.path.join("C:\\Users\\ebata\\fastapi6\\", filename),'wb+')
shutil.copyfileobj(fileobj, upload_dir)
upload_dir.close()
return {"アップロードファイル名": filename}
return {"Error": "アップロードファイルが見つかりません。"}
'''
@app.post("/uploadfile/")
async def upload_file(file: UploadFile = File(...)):
return {'filename': file.filename}
'''
で、
> uvicorn main:app --reload
でAPIサーバを起動する。
request.py
import requests
import json
# curl -X POST http://127.0.0.1:8000/uploadfile/ -H 'accept: application/json' -H 'Content-Type: multipart/form-data' -F file=@dummy.txt;type=text/plain と同じ機能を実現
# C:\Users\ebata>curl -X POST http://127.0.0.1:8000/uploadfile/ -F file=@dummy.txt;type=text/plain とすれば、エラーメッセージは出てこない
url = 'http://localhost:8000/uploadfile/'
# headers = {'Accept': 'application/json', 'Content-Type': 'application/json'} //失敗
# headers = {'Accept': 'application/json', 'Content-Type': 'text/html'} # 失敗
headers = {'Accept': 'application/json'} # 成功
# file_payload = {'file=@dummy.txt;type=text/plain'} # 失敗
file_payload = {'file':open('data/dummy1.csv','rb')} # 成功 ファイルはdataというフォルダを掘って、dummy1.csv という名前のファイルを放り込んでおく
print("headers:",headers)
print("payload:",file_payload)
# r = requests.post('http://localhost:8000/uploadfile/', headers=headers,files=file_payload) # 成功
r = requests.post(url, headers=headers,files=file_payload) # 成功
print("--->1:", r.url)
print("--->2:",r.status_code)
print("--->3:",r.text)
print("--->4:",r.json())
>python requests.py
でクライアント起動。
以上