我现在正在使用flusk,我决定使用JWT进行授权。授权本身没有问题 - 输入您的登录名和密码并成功登录。 100%创建了access_token和refresh_token,而我将后者存储在数据库中并在离开配置文件时删除它们。通过开发者工具您可以在Cookie中找到token。
@app.route('/user/create', methods=['GET', 'POST'])
@jwt_required()
def create_user():
form = CreateUserForm()
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
password = generate_password_hash(password)
telegram_user = request.form['user_id']
new_user_app = UserApp(username=username, password=password)
user = User.query.filter_by(user_id=telegram_user).first()
db.session.add(new_user_app)
db.session.commit()
user.user_app_id = new_user_app.id
db.session.add(user)
db.session.commit()
return jsonify({'success': 'Пользователь успешно создан'})
return render_template('create_user.html', form=form, title='Добавить пользователя')
我想进入/user/create带有装饰器的页面,并响应{"msg":"Missing Authorization Header"}
以防万一,授权本身:
@app.route('/login', methods=['GET', 'POST'])
def login():
if current_user.is_authenticated:
return redirect(url_for('index'))
form = LoginForm()
if form.validate_on_submit():
user = db.session.scalar(
sa.select(UserApp).where(UserApp.username == form.username.data))
if user is None or not user.check_password(form.password.data):
flash('Invalid username or password')
return redirect(url_for('login'))
login_user(user)
access_token, refresh_token = create_tokens(user.id)
next_page = request.args.get('next')
response = make_response(redirect(next_page) if next_page else redirect(url_for('index')))
response.set_cookie('access_token', access_token, httponly=True, secure=False)
response.set_cookie('refresh_token', refresh_token, httponly=True, secure=False)
return response
return render_template('login.html', title='Авторизация', form=form)
并创建令牌:
def create_tokens(user_id):
access_token = jwt.encode(
{'user_id': user_id,
'exp': datetime.datetime.now(ZoneInfo('UTC')) + datetime.timedelta(minutes=15)}, app.config['SECRET_KEY'], algorithm='HS256')
refresh_token = jwt.encode(
{'user_id': user_id,
'exp': datetime.datetime.now(ZoneInfo('UTC')) + datetime.timedelta(days=30)}, app.config['SECRET_KEY'], algorithm='HS256')
new_refresh_token = RefreshToken(user_id=user_id, token=refresh_token, expires_at=datetime.datetime.now(ZoneInfo('UTC')) + datetime.timedelta(days=30))
db.session.add(new_refresh_token)
db.session.commit()
return access_token, refresh_token