建网站石家庄,h5怎么设计网页,今天的重要新闻,如何做类似千图网的网站在Flask框架中#xff0c;实现Token认证机制并不是一件复杂的事情。除了使用官方提供的flask_httpauth模块或者第三方模块flask-jwt#xff0c;我们还可以考虑自己实现一个简易版的Token认证工具。自定义Token认证机制的本质是生成一个令牌#xff08;Token#xff09;实现Token认证机制并不是一件复杂的事情。除了使用官方提供的flask_httpauth模块或者第三方模块flask-jwt我们还可以考虑自己实现一个简易版的Token认证工具。自定义Token认证机制的本质是生成一个令牌Token并在用户每次请求时验证这个令牌的有效性。
整个过程可以分为以下几个步骤
用户登录时生成Token并将Token与用户关联存储在服务器端。用户在请求时携带Token。服务器在收到请求后验证Token的有效性。如果Token有效允许用户访问相应资源否则拒绝访问。
这种自定义的Token认证机制相对简单适用于一些小型应用或者对于Token认证机制有特殊需求的场景。搭建这样一个简易的认证系统有助于理解Token认证的基本原理并可以根据实际需求进行灵活的定制。
创建表结构
通过表结构的创建建立用户认证和会话管理表。UserAuthDB表存储了用户的账号密码信息而SessionAuthDB表则存储了用户登录后生成的Token信息包括用户名、Token本身以及Token的过期时间。这为后续实现用户注册、登录以及Token认证等功能提供了数据库支持。
UserAuthDB表
用途存储用户账号密码信息。字段 id: 主键自增唯一标识每个用户。username: 用户名非空唯一用于登录时识别用户。password: 密码非空用于验证用户身份。
SessionAuthDB表
用途存储登录成功后用户的Token信息。字段 id: 主键自增唯一标识每个登录会话。username: 用户名非空唯一关联到UserAuthDB表的用户名。token: 用户登录后生成的Token非空唯一用于身份验证。invalid_date: Token的过期时间用于判断Token是否过期。
代码通过Flask路由/create实现了数据库表结构的创建主要包括两张表分别是UserAuthDB和SessionAuthDB。
app.route(/create,methods[GET])
def create():conn sqlite3.connect(./database.db)cursor conn.cursor()create_auth create table UserAuthDB( \id INTEGER primary key AUTOINCREMENT not null unique, \username varchar(64) not null unique, \password varchar(64) not null \)cursor.execute(create_auth)create_session create table SessionAuthDB( \id INTEGER primary key AUTOINCREMENT not null unique, \username varchar(64) not null unique, \token varchar(128) not null unique, \invalid_date int not null \)cursor.execute(create_session)conn.commit()cursor.close()conn.close()return create success验证函数
该验证函数用于保证传入的用户名和密码满足一定的安全性和格式要求。通过对长度和字符内容的检查确保了传入的参数不会导致潜在的安全问题。这样的验证机制在用户注册、登录等场景中可以有效地防止一些常见的安全漏洞。
参数验证
接受不定数量的参数*kwargs可传入多个参数。对于每个传入的参数首先验证其长度是否在合法范围内小于128个字符且不为空。
字符串处理
将参数转换为小写形式然后去除两侧空格并移除所有空格。
字符内容验证
遍历处理后的字符串检查其中的字符是否仅包含大写字母、小写字母和数字。如果出现其他字符则认为非法。
返回结果
如果所有参数验证通过即长度合法且字符内容符合要求则返回True表示参数合法。如果有任何一个参数不合法则返回False表示参数存在非法字符或超出长度限制。
代码定义了一个名为CheckParameters的验证函数该函数用于验证传入的参数是否合法。主要验证的对象是用户名和密码具体概述如下
def CheckParameters(*kwargs):for item in range(len(kwargs)):# 先验证长度if len(kwargs[item]) 128 or len(kwargs[item]) 0:return False# 先小写,然后去掉两侧空格,去掉所有空格local_string kwargs[item].lower().strip().replace( ,)# 判断是否只包含 大写 小写 数字for kw in local_string:if kw.isupper() ! True and kw.islower() ! True and kw.isdigit() ! True:return Falsereturn True登录认证函数
该函数实现了用户登录认证的核心逻辑。首先对输入的用户名和密码进行验证然后检查用户是否存在以及是否已经有生成的Token。如果用户存在但Token不存在生成一个新的Token并存入数据库最终返回生成的Token。
路由定义
使用app.route(/login, methods[POST])定义了一个POST请求的路由用于处理用户登录请求。
参数获取
通过request.form.to_dict()获取POST请求中的参数包括用户名username和密码password。
参数验证
调用之前定义的CheckParameters函数对获取的用户名和密码进行合法性验证确保其符合安全性和格式要求。
用户存在性验证
调用RunSqlite函数查询UserAuthDB表验证用户名和密码是否匹配。如果存在匹配的用户则继续执行下一步。
生成Token
查询SessionAuthDB表检查是否存在该用户的Token记录。如果存在则直接返回该Token。如果不存在Token记录则生成一个32位的随机Token并设置过期时间为当前时间戳加上360秒6分钟。
Token写入数据库
将生成的Token和过期时间写入SessionAuthDB表。
返回结果
返回生成的Token作为登录成功的标识。
app.route(/login,methods[POST])
def login():if request.method POST:# 获取参数信息obtain_dict request.form.to_dict()if len(obtain_dict) ! 0 and len(obtain_dict) 2:username obtain_dict[username]password obtain_dict[password]# 验证是否合法is_true CheckParameters(username,password)if is_true True:# 查询是否存在该用户select RunSqlite(./database.db, UserAuthDB, select, username,password, fusername{username})if select[0][0] username and select[0][1] password:# 查询Session列表是否存在select_session RunSqlite(./database.db,SessionAuthDB,select,token,fusername{username})if select_session ! []:ref {message: }ref[message] select_session[0][0]return json.dumps(ref, ensure_asciiFalse)# Session不存在则需要重新生成else:# 生成并写入token和过期时间戳token .join(random.sample(string.ascii_letters string.digits, 32))# 设置360秒周期,过期时间time_stamp int(time.time()) 360insert RunSqlite(./database.db, SessionAuthDB, insert, username,token,invalid_date, f{username},{token},{time_stamp})if insert True:ref {message: }ref[message] tokenreturn json.dumps(ref, ensure_asciiFalse)else:return json.dumps({message: 用户名或密码错误}, ensure_asciiFalse)else:return json.dumps({message: 输入参数不可用}, ensure_asciiFalse)return json.dumps({message: 未知错误}, ensure_asciiFalse)登录认证装饰器
检查用户登录状态Token是否过期的装饰器装饰器用于装饰某一些函数当主调函数被调用时会优先执行装饰器内的代码执行后根据装饰器执行结果返回或退出装饰器分为两种模式一种是FBV模式另一种是CBV模式。
FBVFunction-Based Views和CBVClass-Based Views是两种不同的视图设计模式用于处理Web框架中的请求和生成响应。这两种模式在Django框架中被广泛使用。
FBVFunction-Based Views
定义 FBV是指使用普通的Python函数来处理请求和生成响应的视图设计模式。特点 每个视图对应一个函数函数接收请求作为参数返回响应。简单易于理解和使用。视图的逻辑和处理集中在一个函数中。
示例
def my_view(request):# 处理逻辑return HttpResponse(Hello, World!)CBVClass-Based Views
定义 CBV是指使用基于类的Python类来处理请求和生成响应的视图设计模式。特点 视图是类每个类中可以包含多个方法来处理不同HTTP方法GET、POST等的请求。提供了更多的代码组织和复用的可能性可以使用类的继承、Mixin等方式。更灵活适用于复杂的业务逻辑和共享逻辑。
示例
class MyView(View):def get(self, request):# 处理 GET 请求的逻辑return HttpResponse(Hello, World!)def post(self, request):# 处理 POST 请求的逻辑return HttpResponse(Received a POST request)FBV与CBV区别
结构差异 FBV使用函数逻辑较为集中CBV使用类允许通过类的继承和Mixin等方式更好地组织代码。代码复用 CBV更容易实现代码复用可以通过继承和Mixin在不同的类之间共享逻辑而FBV需要显式地将共享逻辑提取为函数。装饰器 在FBV中使用装饰器来添加额外的功能而在CBV中通过类的继承和Mixin来实现相似的功能。可读性 对于简单的视图逻辑FBV可能更直观易懂对于较为复杂的业务逻辑CBV提供了更好的组织和扩展性。
在Flask中两种设计模式都可以使用开发者可以根据项目的需求和个人喜好选择使用FBV或CBV。
基于FBV的装饰器设置使用时需要注意装饰器嵌入的位置装饰器需要在请求进入路由之前即在请求未走原逻辑代码的时候介入对原业务逻辑进行业务拓展。
from flask import Flask, request,render_template
from functools import wrapsapp Flask(__name__)def login(func):wraps(func)def wrapper(*args, **kwargs):print(登录请求: {}.format(request.url))value request.form.get(value)if value lyshark:# 调用原函数,并返回function_ptr func(*args, **kwargs)return function_ptrelse:return 登录失败return wrapperapp.route(/, methods[GET, POST])
login
def index():if request.method POST:value request.form.get(value)return indexif __name__ __main__:app.run()而基于CBV的装饰器设置使用就显得更加细分化可以定制管理专属功能在外部定义装饰器可以全局使用内部定义可以针对特定路由函数特殊处理。
from flask import Flask, request,render_template,views
from functools import wrapsapp Flask(__name__)# 装饰器
def login(func):wraps(func)def wrapper(*args, **kwargs):print(登录请求: {}.format(request.url))value request.form.get(value)if value lyshark:# 调用原函数,并返回function_ptr func(*args, **kwargs)return function_ptrelse:return 登录失败return wrapper# 类视图
class index(views.MethodView):logindef get(self):return request.argslogindef post(self):return success# 增加路由
app.add_url_rule(rule/, view_funcindex.as_view(index))if __name__ __main__:app.run()此处为了实现起来更简单一些此处直接使用FBV模式我们实现的login_check装饰器通过FVB模式构建代码中取得用户的Token以及用户名对用户身份进行验证。
def login_check(func):wraps(func)def wrapper(*args, **kwargs):print(处理登录逻辑部分: {}.format(request.url))# 得到token 验证是否登陆了,且token没有过期local_timestamp int(time.time())get_token request.headers.get(token)# 验证传入参数是否合法if CheckParameters(get_token) True:select RunSqlite(database.db,SessionAuthDB,select,token,invalid_date,ftoken{get_token})print(select)# 判断是否存在记录,如果存在,在判断时间戳是否合理if select ! []:# 如果当前时间与数据库比对,大于说明过期了需要删除原来的,让用户重新登录if local_timestamp int(select[0][1]):print(时间戳过期了)# 删除原来的Tokendelete RunSqlite(database.db,SessionAuthDB,delete,ftoken{get_token},none)if delete True:return json.dumps({token: Token 已过期,请重新登录获取}, ensure_asciiFalse)else:return json.dumps({token: 数据库删除异常,请联系开发者}, ensure_asciiFalse)else:# 验证Token是否一致if select[0][0] get_token:print(Token验证正常,继续执行function_ptr指向代码.)# 返回到原函数return func(*args, **kwargs)else:print(Token验证错误 {}.format(select))return json.dumps({token: Token 传入错误}, ensure_asciiFalse)# 装饰器调用原函数# function_ptr func(*args, **kwargs)return json.dumps({token: Token 验证失败}, ensure_asciiFalse)return wrapper调用演示
主调用函数则是具体的功能实现可以自定义扩展当用户访问该路由时会优先调用login_check装饰器来验证用户携带Token的合法性如果合法则会通过return func(*args, **kwargs)返回执行主调函数否则直接返回验证失败的消息。
# 获取参数函数
app.route(/GetPage, methods[POST])
login_check
def GetPage():if request.method POST:# 获取参数信息obtain_dict request.form.to_dict()if len(obtain_dict) ! 0 and len(obtain_dict) 1:pagename obtain_dict[pagename]print(查询名称: {}.format(obtain_dict[pagename]))# 相应头的完整写法req Response(responseok, status200, mimetypeapplication/json)req.headers[Content-Type] text/json; charsetutf-8req.headers[Server] LyShark Server 1.0req.data json.dumps({message: hello world})return reqelse:return json.dumps({message: 传入参数错误,请携带正确参数请求}, ensure_asciiFalse)return json.dumps({token: 未知错误}, ensure_asciiFalse)# 用户注册函数
app.route(/register, methods[POST])
def Register():if request.method POST:obtain_dict request.form.to_dict()if len(obtain_dict) ! 0 and len(obtain_dict) 2:print(用户名: {} 密码: {}.format(obtain_dict[username], obtain_dict[password]))reg_username obtain_dict[username]reg_password obtain_dict[password]# 验证是否合法if CheckParameters(reg_username, reg_password) False:return json.dumps({message: 传入用户名密码不合法}, ensure_asciiFalse)# 查询用户是否存在select RunSqlite(database.db,UserAuthDB,select,id,fusername{reg_username})if select ! []:return json.dumps({message: 用户名已被注册}, ensure_asciiFalse)else:insert RunSqlite(database.db,UserAuthDB,insert,username,password,f{reg_username},{reg_password})if insert True:return json.dumps({message: 注册成功}, ensure_asciiFalse)else:return json.dumps({message: 注册失败}, ensure_asciiFalse)else:return json.dumps({message: 传入参数个数不正确}, ensure_asciiFalse)return json.dumps({message: 未知错误}, ensure_asciiFalse)# 密码修改函数
app.route(/modify, methods[POST])
login_check
def modify():if request.method POST:obtain_dict request.form.to_dict()if len(obtain_dict) ! 0 and len(obtain_dict) 1:mdf_password obtain_dict[password]get_token request.headers.get(token)print(获取token: {} 修改后密码: {}.format(get_token,mdf_password))# 验证是否合法if CheckParameters(get_token, mdf_password) False:return json.dumps({message: 传入密码不合法}, ensure_asciiFalse)# 先得到token对应用户名select RunSqlite(database.db,SessionAuthDB,select,username,ftoken{get_token})if select ! []:# 接着直接修改密码即可modify_username str(select[0][0])print(得到的用户名: {}.format(modify_username))update RunSqlite(database.db,UserAuthDB,update,fusername{modify_username},fpassword{mdf_password})if update True:# 删除原来的token,让用户重新获取delete RunSqlite(database.db,SessionAuthDB,delete,fusername{modify_username},none)print(删除token状态: {}.format(delete))return json.dumps({message: 修改成功,请重新登录获取Token}, ensure_asciiFalse)else:return json.dumps({message: 修改失败}, ensure_asciiFalse)else:return json.dumps({message: 不存在该Token,无法修改密码}, ensure_asciiFalse)else:return json.dumps({message: 传入参数个数不正确}, ensure_asciiFalse)return json.dumps({message: 未知错误}, ensure_asciiFalse)FBV模式下的完整代码以下是对代码的概述
主要功能
数据库操作 封装了对 SQLite 数据库的基本增删改查操作RunSqlite 函数。用户认证 提供了用户登录、注册和密码修改的功能。使用了 Token 机制进行登录认证并通过装饰器 login_check 来验证 Token 的有效性。创建数据库表 提供了一个用于初始化数据库表结构的接口 /create。获取页面信息 通过 /GetPage 接口使用了 login_check 装饰器来验证用户登录状态仅对已登录用户提供页面信息。
主要路由
/create创建数据库表结构。/login用户登录接口返回用户的 Token。/GetPage获取页面信息需要用户登录并携带有效 Token。/register用户注册接口。/modify修改用户密码接口需要用户登录并携带有效 Token。
代码结构
数据库操作 提供了对 SQLite 数据库的基本操作包括插入、更新、查询和删除。 用户认证 使用了装饰器 login_check 对需要登录的路由进行认证。提供了用户登录、注册和密码修改的路由。 创建数据库表 提供了一个用于初始化数据库表结构的路由。 获取页面信息 提供了一个用于获取页面信息的路由需要用户登录并携带有效 Token。
from flask import Flask,render_template,request,Response,redirect,jsonify
from functools import wraps
import json,sqlite3,random,string,timeapp Flask(__name__)# 增删改查简单封装
def RunSqlite(db,table,action,field,value):connect sqlite3.connect(db)cursor connect.cursor()# 执行插入动作if action insert:insert finsert into {table}({field}) values({value});if insert None or len(insert) 0:return Falsetry:cursor.execute(insert)except Exception:return False# 执行更新操作elif action update:update fupdate {table} set {value} where {field};if update None or len(update) 0:return Falsetry:cursor.execute(update)except Exception:return False# 执行查询操作elif action select:# 查询条件是否为空if value none:select fselect {field} from {table};else:select fselect {field} from {table} where {value};try:ref cursor.execute(select)ref_data ref.fetchall()connect.commit()connect.close()return ref_dataexcept Exception:return False# 执行删除操作elif action delete:delete fdelete from {table} where {field};if delete None or len(delete) 0:return Falsetry:cursor.execute(delete)except Exception:return Falsetry:connect.commit()connect.close()return Trueexcept Exception:return Falseapp.route(/create,methods[GET])
def create():conn sqlite3.connect(./database.db)cursor conn.cursor()create_auth create table UserAuthDB( \id INTEGER primary key AUTOINCREMENT not null unique, \username varchar(64) not null unique, \password varchar(64) not null \)cursor.execute(create_auth)create_session create table SessionAuthDB( \id INTEGER primary key AUTOINCREMENT not null unique, \username varchar(64) not null unique, \token varchar(128) not null unique, \invalid_date int not null \)cursor.execute(create_session)conn.commit()cursor.close()conn.close()return create success# 验证用户名密码是否合法
def CheckParameters(*kwargs):for item in range(len(kwargs)):# 先验证长度if len(kwargs[item]) 256 or len(kwargs[item]) 0:return False# 先小写,然后去掉两侧空格,去掉所有空格local_string kwargs[item].lower().strip().replace( ,)# 判断是否只包含 大写 小写 数字for kw in local_string:if kw.isupper() ! True and kw.islower() ! True and kw.isdigit() ! True:return Falsereturn True# 登录认证模块
app.route(/login,methods[POST])
def login():if request.method POST:# 获取参数信息obtain_dict request.form.to_dict()if len(obtain_dict) ! 0 and len(obtain_dict) 2:username obtain_dict[username]password obtain_dict[password]# 验证是否合法is_true CheckParameters(username,password)if is_true True:# 查询是否存在该用户select RunSqlite(./database.db, UserAuthDB, select, username,password, fusername{username})if select[0][0] username and select[0][1] password:# 查询Session列表是否存在select_session RunSqlite(./database.db,SessionAuthDB,select,token,fusername{username})if select_session ! []:ref {message: }ref[message] select_session[0][0]return json.dumps(ref, ensure_asciiFalse)# Session不存在则需要重新生成else:# 生成并写入token和过期时间戳token .join(random.sample(string.ascii_letters string.digits, 32))# 设置360秒周期,过期时间time_stamp int(time.time()) 360insert RunSqlite(./database.db, SessionAuthDB, insert, username,token,invalid_date, f{username},{token},{time_stamp})if insert True:ref {message: }ref[message] tokenreturn json.dumps(ref, ensure_asciiFalse)else:return json.dumps({message: 用户名或密码错误}, ensure_asciiFalse)else:return json.dumps({message: 输入参数不可用}, ensure_asciiFalse)return json.dumps({message: 未知错误}, ensure_asciiFalse)# 检查登录状态 token是否过期的装饰器
def login_check(func):wraps(func)def wrapper(*args, **kwargs):print(处理登录逻辑部分: {}.format(request.url))# 得到token 验证是否登陆了,且token没有过期local_timestamp int(time.time())get_token request.headers.get(token)# 验证传入参数是否合法if CheckParameters(get_token) True:select RunSqlite(./database.db,SessionAuthDB,select,token,invalid_date,ftoken{get_token})print(select)# 判断是否存在记录,如果存在,在判断时间戳是否合理if select ! []:# 如果当前时间与数据库比对,大于说明过期了需要删除原来的,让用户重新登录if local_timestamp int(select[0][1]):print(时间戳过期了)# 删除原来的Tokendelete RunSqlite(./database.db,SessionAuthDB,delete,ftoken{get_token},none)if delete True:return json.dumps({token: Token 已过期,请重新登录获取}, ensure_asciiFalse)else:return json.dumps({token: 数据库删除异常,请联系开发者}, ensure_asciiFalse)else:# 验证Token是否一致if select[0][0] get_token:print(Token验证正常,继续执行function_ptr指向代码.)# 返回到原函数return func(*args, **kwargs)else:print(Token验证错误 {}.format(select))return json.dumps({token: Token 传入错误}, ensure_asciiFalse)# 装饰器调用原函数# function_ptr func(*args, **kwargs)return json.dumps({token: Token 验证失败}, ensure_asciiFalse)return wrapper# 获取参数函数
app.route(/GetPage, methods[POST])
login_check
def GetPage():if request.method POST:# 获取参数信息obtain_dict request.form.to_dict()if len(obtain_dict) ! 0 and len(obtain_dict) 1:pagename obtain_dict[pagename]print(查询名称: {}.format(obtain_dict[pagename]))# 相应头的完整写法req Response(responseok, status200, mimetypeapplication/json)req.headers[Content-Type] text/json; charsetutf-8req.headers[Server] LyShark Server 1.0req.data json.dumps({message: hello world})return reqelse:return json.dumps({message: 传入参数错误,请携带正确参数请求}, ensure_asciiFalse)return json.dumps({token: 未知错误}, ensure_asciiFalse)# 用户注册函数
app.route(/register, methods[POST])
def Register():if request.method POST:obtain_dict request.form.to_dict()if len(obtain_dict) ! 0 and len(obtain_dict) 2:print(用户名: {} 密码: {}.format(obtain_dict[username], obtain_dict[password]))reg_username obtain_dict[username]reg_password obtain_dict[password]# 验证是否合法if CheckParameters(reg_username, reg_password) False:return json.dumps({message: 传入用户名密码不合法}, ensure_asciiFalse)# 查询用户是否存在select RunSqlite(database.db,UserAuthDB,select,id,fusername{reg_username})if select ! []:return json.dumps({message: 用户名已被注册}, ensure_asciiFalse)else:insert RunSqlite(database.db,UserAuthDB,insert,username,password,f{reg_username},{reg_password})if insert True:return json.dumps({message: 注册成功}, ensure_asciiFalse)else:return json.dumps({message: 注册失败}, ensure_asciiFalse)else:return json.dumps({message: 传入参数个数不正确}, ensure_asciiFalse)return json.dumps({message: 未知错误}, ensure_asciiFalse)# 密码修改函数
app.route(/modify, methods[POST])
login_check
def modify():if request.method POST:obtain_dict request.form.to_dict()if len(obtain_dict) ! 0 and len(obtain_dict) 1:mdf_password obtain_dict[password]get_token request.headers.get(token)print(获取token: {} 修改后密码: {}.format(get_token,mdf_password))# 验证是否合法if CheckParameters(get_token, mdf_password) False:return json.dumps({message: 传入密码不合法}, ensure_asciiFalse)# 先得到token对应用户名select RunSqlite(./database.db,SessionAuthDB,select,username,ftoken{get_token})if select ! []:# 接着直接修改密码即可modify_username str(select[0][0])print(得到的用户名: {}.format(modify_username))update RunSqlite(database.db,UserAuthDB,update,fusername{modify_username},fpassword{mdf_password})if update True:# 删除原来的token,让用户重新获取delete RunSqlite(./database.db,SessionAuthDB,delete,fusername{modify_username},none)print(删除token状态: {}.format(delete))return json.dumps({message: 修改成功,请重新登录获取Token}, ensure_asciiFalse)else:return json.dumps({message: 修改失败}, ensure_asciiFalse)else:return json.dumps({message: 不存在该Token,无法修改密码}, ensure_asciiFalse)else:return json.dumps({message: 传入参数个数不正确}, ensure_asciiFalse)return json.dumps({message: 未知错误}, ensure_asciiFalse)if __name__ __main__:app.run(debugTrue)首先需要在Web页面访问http://127.0.0.1/create路径实现对数据库的初始化并打开Postman工具通过传入参数来使用这个案例。