合肥 网站运营,湘潭做网站推荐磐石网络,企业官网门户网站管理系统,泰安网站建设最好文章目录1. 环境安装2. 使用SQLAlchemy与SQL数据库通信2.1 创建表2.2 连接数据库2.3 insert、select2.4 update、delete2.5 relationships2.6 用Alembic进行数据库迁移learn from 《Building Data Science Applications with FastAPI》1. 环境安装
docker 安装 MongoDB 服务 d…
文章目录1. 环境安装2. 使用SQLAlchemy与SQL数据库通信2.1 创建表2.2 连接数据库2.3 insert、select2.4 update、delete2.5 relationships2.6 用Alembic进行数据库迁移learn from 《Building Data Science Applications with FastAPI》1. 环境安装
docker 安装 MongoDB 服务 docker run -d --name fastapi-mongo -p 27017:27017 mongo:4.42. 使用SQLAlchemy与SQL数据库通信
安装 pip install databases[sqlite]
2.1 创建表
# models.pyimport sqlalchemy
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, Fieldmetadata sqlalchemy.MetaData() # 创建元数据对象posts sqlalchemy.Table( # 创建表对象posts, # 表名metadata, # 元数据对象# 列对象列名类型其他选项sqlalchemy.Column(id, sqlalchemy.Integer, primary_keyTrue, autoincrementTrue),sqlalchemy.Column(publication_date, sqlalchemy.DateTime(), nullableFalse),sqlalchemy.Column(title, sqlalchemy.String(255), nullableFalse),sqlalchemy.Column(text, sqlalchemy.Text(), nullableFalse),
)class PostBase(BaseModel):title: strtext: strpublication_date: datetime Field(dafault_factorydatetime.now)class PostPartialUpdate(BaseModel):text: Optional[str] Nonecontent: Optional[str] Noneclass PostCreate(PostBase):passclass PostDB(PostBase):id: int2.2 连接数据库
# _*_ coding: utf-8 _*_
# Time : 2022/3/8 9:28
# Author : Michael
# File : database.py
# desc :
import sqlalchemy
from databases import Database
DB_URL sqlite:///cp6_sqlalchemy.db
database Database(DB_URL)
sqlalchemy_engine sqlalchemy.create_engine(DB_URL)def get_database() - Database:return database2.3 insert、select
# _*_ coding: utf-8 _*_
# Time : 2022/3/8 9:40
# Author : Michael
# File : app.py
# desc :from typing import List, Tuple
import uvicorn
from databases import Database
from fastapi import Depends, FastAPI, HTTPException, Query, statusfrom database import get_database, sqlalchemy_engine
from models import metadata, posts, PostDB, PostCreate, PostPartialUpdateapp FastAPI()app.on_event(startup) # 启动的时候执行数据库连接
async def startup():await get_database().connect()metadata.create_all(sqlalchemy_engine)app.on_event(shutdown) # 关闭的时候执行数据库断开连接
async def shutdown():await get_database().disconnect()async def pagination(skip: int Query(0, ge0),limit: int Query(10, ge0),) - Tuple[int, int]:capped_limit min(100, limit)return (skip, capped_limit)async def get_post_or_404(id: int, database: Database Depends(get_database)) - PostDB:select_query posts.select().where(posts.c.id id)raw_post await database.fetch_one(select_query)if raw_post is None:raise HTTPException(status_codestatus.HTTP_404_NOT_FOUND)return PostDB(**raw_post)# 开始插入数据
app.post(/posts/, response_modelPostDB, status_codestatus.HTTP_201_CREATED)
async def create_post(post: PostCreate, db: Database Depends(get_database)) - PostDB:# 创建插入语句不必手写sqlinsert_query posts.insert().values(post.dict())# 执行插入语句命令post_id await db.execute(insert_query)post_db await get_post_or_404(post_id, db)return post_dbapp.get(/posts/{id}, response_modelPostDB)
async def get_post(post: PostDB Depends(get_post_or_404)) - PostDB:return postapp.get(/posts)
async def list_posts(pagination: Tuple[int, int] Depends(pagination),database: Database Depends(get_database),) - List[PostDB]:skip, limit paginationselect_query posts.select().offset(skip).limit(limit)rows await database.fetch_all(select_query)results [PostDB(**row) for row in rows]return resultsif __name__ __main__:uvicorn.run(appapp:app, host127.0.0.1, port8001, reloadTrue, debugTrue)2.4 update、delete
# update
app.patch(/posts/{id}, response_modelPostDB)
async def update_post(post_update: PostPartialUpdate,post: PostDB Depends(get_post_or_404),database: Database Depends(get_database)) - PostDB:update_query (posts.update().where(posts.c.id post.id).values(post_update.dict(exclude_unsetTrue)))await database.execute(update_query)post_db await get_post_or_404(post.id, database)return post_db# delete
app.delete(/posts/{id},status_codestatus.HTTP_204_NO_CONTENT)
async def delete_post(post: PostDB Depends(get_post_or_404),database: Database Depends(get_database)) - None:delete_query posts.delete().where(posts.c.id post.id)await database.execute(delete_query)2.5 relationships
models.py 编写新的表
comments sqlalchemy.Table(comments,metadata,sqlalchemy.Column(id, sqlalchemy.Integer, primary_keyTrue, autoincrementTrue),# 定义连接的外键sqlalchemy.Column(post_id, sqlalchemy.ForeignKey(posts.id, ondeleteCASCADE), nullableFalse),sqlalchemy.Column(publication_date, sqlalchemy.DateTime(), nullableFalse),sqlalchemy.Column(content, sqlalchemy.Text(), nullableFalse),
)class CommentBase(BaseModel):post_id: intpublication_date: datetime Field(default_factorydatetime.now)content: strclass CommentCreate(CommentBase):passclass CommentDB(CommentBase):id: intapp.py 添加内容
from typing import List, Mapping, Tuple, cast
from models import metadata, posts, PostDB, PostCreate, PostPartialUpdate, comments, CommentCreate, CommentDBapp.post(/comments, response_modelCommentDB, status_codestatus.HTTP_201_CREATED)
async def create_comment(comment: CommentCreate, database: Database Depends(get_database)
) - CommentDB:# 选取post表单数据select_post_query posts.select().where(posts.c.id comment.post_id)post await database.fetch_one(select_post_query)if post is None:raise HTTPException(status_codestatus.HTTP_400_BAD_REQUEST, detailfPost {id} does not exist)# 插入comment 语句insert_query comments.insert().values(comment.dict())comment_id await database.execute(insert_query)# 查询 commentselect_query comments.select().where(comments.c.id comment_id)raw_comment cast(Mapping, await database.fetch_one(select_query))return CommentDB(**raw_comment)获取一个post的全部comments
models.py
class PostPublic(PostDB):comments: Optional[List[CommentDB]] Noneapp.py
# _*_ coding: utf-8 _*_
# Time : 2022/3/8 9:40
# Author : Michael
# File : app.py
# desc :from typing import List, Mapping, Tuple, cast
import uvicorn
from databases import Database
from fastapi import Depends, FastAPI, HTTPException, Query, statusfrom database import get_database, sqlalchemy_engine
from models import metadata, posts, PostDB, PostCreate, PostPartialUpdate, comments, CommentCreate, CommentDB, \PostPublicapp FastAPI()app.on_event(startup) # 启动的时候执行数据库连接
async def startup():await get_database().connect()metadata.create_all(sqlalchemy_engine)app.on_event(shutdown) # 关闭的时候执行数据库断开连接
async def shutdown():await get_database().disconnect()async def pagination(skip: int Query(0, ge0),limit: int Query(10, ge0), ) - Tuple[int, int]:capped_limit min(100, limit)return (skip, capped_limit)async def get_post_or_404(id: int, database: Database Depends(get_database)) - PostPublic:select_query posts.select().where(posts.c.id id)raw_post await database.fetch_one(select_query)if raw_post is None:raise HTTPException(status_codestatus.HTTP_404_NOT_FOUND)# 编号为id的post的所有commentsselect_post_comment_query comments.select().where(comments.c.post_id id)raw_comments await database.fetch_all(select_post_comment_query)comments_list [CommentDB(**row) for row in raw_comments]return PostPublic(**raw_post, commentscomments_list)# 开始插入数据
app.post(/posts, response_modelPostDB, status_codestatus.HTTP_201_CREATED)
async def create_post(post: PostCreate, db: Database Depends(get_database)) - PostPublic:# 创建插入语句不必手写sqlinsert_query posts.insert().values(post.dict())# 执行插入语句命令post_id await db.execute(insert_query)post_db await get_post_or_404(post_id, db)return post_dbapp.get(/posts)
async def list_posts(pagination: Tuple[int, int] Depends(pagination),database: Database Depends(get_database), ) - List[PostDB]:skip, limit paginationselect_query posts.select().offset(skip).limit(limit)rows await database.fetch_all(select_query)results [PostDB(**row) for row in rows]return resultsapp.get(/posts/{id}, response_modelPostPublic)
async def get_post(post: PostPublic Depends(get_post_or_404)) - PostPublic:return postapp.get(/posts)
async def list_posts(pagination: Tuple[int, int] Depends(pagination),database: Database Depends(get_database), ) - List[PostDB]:skip, limit paginationselect_query posts.select().offset(skip).limit(limit)rows await database.fetch_all(select_query)results [PostDB(**row) for row in rows]return results# update
app.patch(/posts/{id}, response_modelPostPublic)
async def update_post(post_update: PostPartialUpdate,post: PostPublic Depends(get_post_or_404),database: Database Depends(get_database)) - PostPublic:update_query (posts.update().where(posts.c.id post.id).values(post_update.dict(exclude_unsetTrue)))await database.execute(update_query)post_db await get_post_or_404(post.id, database)return post_db# delete
app.delete(/posts/{id}, status_codestatus.HTTP_204_NO_CONTENT)
async def delete_post(post: PostPublic Depends(get_post_or_404),database: Database Depends(get_database)) - None:delete_query posts.delete().where(posts.c.id post.id)await database.execute(delete_query)app.post(/comments, response_modelCommentDB, status_codestatus.HTTP_201_CREATED)
async def create_comment(comment: CommentCreate, database: Database Depends(get_database)
) - CommentDB:select_post_query posts.select().where(posts.c.id comment.post_id)post await database.fetch_one(select_post_query)if post is None:raise HTTPException(status_codestatus.HTTP_400_BAD_REQUEST, detailfPost {comment.post_id} does not exist)insert_query comments.insert().values(comment.dict())comment_id await database.execute(insert_query)select_query comments.select().where(comments.c.id comment_id)raw_comment cast(Mapping, await database.fetch_one(select_query))return CommentDB(**raw_comment)if __name__ __main__:uvicorn.run(appapp:app, host127.0.0.1, port8001, reloadTrue, debugTrue)2.6 用Alembic进行数据库迁移
pip install alembic终端输入
alembic init alembic初始化迁移环境其中包括一组文件和目录Alembic将在其中存储其配置和迁移文件需要一起提交 git
在 env.py 中导入元数据
from web_python_dev.sqlalchemy1.models import metadatatarget_metadata metadata编辑ini配置
开始迁移
alembic revision --autogenerate -m Initial migration之后会生成一个py文件 该代码内有两个函数upgradedowngrade用于数据迁移和回滚
# 升级
alembic upgrade head数据的迁移和升级之前请做好备份和测试防止丢失损坏 https://alembic.sqlalchemy.org/en/latest/index.html