idc网站模板,网上智慧团建系统,asp.net网站开发实例教程pdf,珠海自适应网站建设【squids.cn】 全网zui低价RDS#xff0c;免费的迁移工具DBMotion、数据库备份工具DBTwin、SQL开发工具等
当使用两个不同的异步会话来测试FastAPI应用程序与数据库的连接时#xff0c;可能会出现以下错误#xff1a; 在测试中#xff0c;在数据库中创建了一个对象#x…【squids.cn】 全网zui低价RDS免费的迁移工具DBMotion、数据库备份工具DBTwin、SQL开发工具等
当使用两个不同的异步会话来测试FastAPI应用程序与数据库的连接时可能会出现以下错误 在测试中在数据库中创建了一个对象测试会话。 在应用程序中发出一个请求在此请求中修改了这个对象应用会话。 在测试中从数据库加载对象但其中没有所需的更改测试会话。
让我们找出发生了什么。
我们通常在应用程序和测试中使用两个不同的会话。
此外在测试中我们通常将会话包装在一个准备数据库进行测试的fixture中测试完成后所有内容都会被清理。
以下是应用程序的示例。
一个带有数据库连接的文件app/database.py Database settings file
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.orm import declarative_base
DATABASE_URL postgresqlasyncpg://user:passwordhost:5432/dbname
engine create_async_engine(DATABASE_URL, echoTrue, futureTrue)
async_session async_sessionmaker(bindengine, class_AsyncSession, expire_on_commitFalse)
async def get_session() - AsyncGenerator: Returns async session async with async_session() as session:yield session
Base declarative_base()
一个带有模型描述的文件app/models.py Model file from sqlalchemy import Integer, Stringfrom sqlalchemy.orm import Mapped, mapped_columnfrom .database import Baseclass Lamp(Base): Lamp model __tablename__ lamps id: Mapped[int] mapped_column(Integer, primary_keyTrue, indexTrue) status: Mapped[str] mapped_column(String, defaultoff)
一个带有端点描述的文件app/main.py Main file import loggingfrom fastapi import FastAPI, Dependsfrom sqlalchemy import selectfrom sqlalchemy.ext.asyncio import AsyncSessionfrom .database import get_sessionfrom .models import Lampapp FastAPI()app.post(/lamps/{lamp_id}/on)async def check_lamp( lamp_id: int, session: AsyncSession Depends(get_session)) - dict: Lamp on endpoint results await session.execute(select(Lamp).where(Lamp.id lamp_id)) lamp results.scalar_one_or_none() if lamp: logging.error(Status before update: %s, lamp.status) lamp.status on session.add(lamp) await session.commit() await session.refresh(lamp) logging.error(Status after update: %s, lamp.status) return {}
我特意在示例中添加了日志记录和一些其他请求以使其更加清晰。
这里使用Depends创建了一个会话。
以下是带有测试示例的文件tests/test_lamp.py Test lamp
import logging
from typing import AsyncGenerator
import pytest
import pytest_asyncio
from httpx import AsyncClient
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from app.database import Base, engine
from app.main import app, Lamp
pytest_asyncio.fixture(scopefunction, nametest_session)
async def test_session_fixture() - AsyncGenerator: Async session fixture async_session async_sessionmaker(engine, class_AsyncSession, expire_on_commitFalse)
async with async_session() as session:async with engine.begin() as conn:await conn.run_sync(Base.metadata.create_all)
yield session
async with engine.begin() as conn:await conn.run_sync(Base.metadata.drop_all)
await engine.dispose()
pytest.mark.asyncio
async def test_lamp_on(test_session): Test lamp switch on lamp Lamp()test_session.add(lamp)await test_session.commit()await test_session.refresh(lamp)
logging.error(New client status: %s, lamp.status)assert lamp.status off
async with AsyncClient(appapp, base_urlhttp://testserver) as async_client:response await async_client.post(f/lamps/{lamp.id}/on)assert response.status_code 200results await test_session.execute(select(Lamp).where(Lamp.id lamp.id))new_lamp results.scalar_one_or_none()logging.error(Updated status: %s, new_lamp.status)
assert new_lamp.status on 这是一个常规的Pytest它在一个fixture中获取到数据库的会话。在返回会话之前此fixture会先创建所有的表格使用之后它们会被删除。
请再次注意在测试中我们使用来自test_session fixture的会话而在主代码中我们使用来自app/database.py文件的会话。尽管我们使用相同的引擎但是生成的会话是不同的。这一点很重要。 预期的数据库请求序列
应从数据库返回status on。
在测试中我首先在数据库中创建一个对象。这是通过来自测试的会话进行的常规INSERT。我们称其为Session 1。此时只有这个会话连接到了数据库。应用程序会话尚未连接。
在创建对象之后我执行了一个刷新操作。这是通过Session 1对新创建的对象进行的SELECT并通过实例更新。
结果我确保对象正确创建并且status字段填充了所需的值 - off。
然后我对/lamps/1/on端点执行一个POST请求。这是打开灯的操作。为了使示例更短我没有使用fixture。一旦请求开始工作将创建一个新的数据库会话。我们称其为Session 2。使用这个会话我从数据库加载所需的对象。我将状态输出到日志。它是off。之后我更新了这个状态并在数据库中保存了更新。数据库收到了一个请求
BEGIN (implicit)UPDATE lamps SET status$1::VARCHAR WHERE lamps.id $2::INTEGERparameters: (on, 1)COMMIT
请注意也存在COMMIT命令。尽管事务是隐式的但其结果在其他会话中在COMMIT之后立即可用。
接下来我使用refresh发出请求从数据库获取更新后的对象。我输出状态。现在它的值是on。
看起来一切都应该正常工作。端点停止工作关闭Session 2并将控制权转移到测试。
在测试中我从Session 1发出常规请求以获取修改过的对象。但在状态字段中我看到的值是off。
以下是代码中操作序列的方案。 代码中的操作序列
代码中的操作序列 同时根据所有日志最后一个SELECT请求被执行并返回了status on。此刻数据库中的其值肯定等于on。这是engine asyncpg响应SELECT请求时接收的值。
那么发生了什么
发生了以下情况。
结果是为获取新对象而做的请求并没有更新当前的对象而是找到并使用了现有的对象。一开始我使用ORM添加了一个灯泡对象。我在另一个会话中更改了它。当更改完成时当前会话对此更改一无所知。并且在Session 2中进行的提交并未在Session 1中请求expire_all方法。
为了修复这个问题你可以做以下操作之一 对于测试和应用程序使用共享会话。 刷新实例而不是尝试从数据库中获取它。 强制使实例过期。 关闭会话。
依赖性覆盖
为了使用相同的会话您可以简单地使用我在测试中创建的那个覆盖应用程序中的会话。这很简单。
为此我们需要在测试中添加以下代码
async def _override_get_db(): yield test_sessionapp.dependency_overrides[get_session] _override_get_db
如果你愿意可以将这部分包装成一个夹具以便在所有测试中使用。
所得到的算法将如下所示 代码中使用依赖性覆盖时的步骤
下面是带有会话替代的测试代码
pytest.mark.asyncioasync def test_lamp_on(test_session): Test lamp switch on async def _override_get_db(): yield test_session app.dependency_overrides[get_session] _override_get_db lamp Lamp() test_session.add(lamp) await test_session.commit() await test_session.refresh(lamp) logging.error(New client status: %s, lamp.status) assert lamp.status off async with AsyncClient(appapp, base_urlhttp://testserver) as async_client: response await async_client.post(f/lamps/{lamp.id}/on) assert response.status_code 200 results await test_session.execute(select(Lamp).where(Lamp.id 1)) new_lamp results.scalar_one_or_none() logging.error(Updated status: %s, new_lamp.status) assert new_lamp.status on
但是如果应用程序使用多个会话这是可能的那么这可能不是最佳方法。此外如果在被测试的函数中没有调用commit或rollback那么这也将无济于事。
刷新
第二种解决方案是最简单和最有逻辑的。我们不应该创建一个新的请求来获取一个对象。为了更新处理端点请求后立即调用刷新就足够了。在内部它调用expires这导致保存的实例不用于新的请求并且数据重新填充。这种解决方案是最有逻辑的也最容易理解。
await test_session.refresh(lamp)
之后你不需要再试图加载new_lamp对象只需检查相同的lamp。
以下是使用刷新的代码方案。 使用刷新时的代码中的步骤
以下是带有更新的测试代码。
pytest.mark.asyncioasync def test_lamp_on(test_session): Test lamp switch on lamp Lamp() test_session.add(lamp) await test_session.commit() await test_session.refresh(lamp) logging.error(New client status: %s, lamp.status) assert lamp.status off async with AsyncClient(appapp, base_urlhttp://testserver) as async_client: response await async_client.post(f/lamps/{lamp.id}/on) assert response.status_code 200 await test_session.refresh(lamp) logging.error(Updated status: %s, lamp.status) assert lamp.status on 过期
但是如果我们更改了很多对象最好调用expire_all。然后所有实例都将从数据库中读取一致性不会被破坏。
test_session.expire_all()
你还可以在特定实例甚至实例属性上调用expire。
test_session.expire(lamp)
在这些调用之后你将不得不手动从数据库中读取对象。
以下是使用过期时代码中的步骤序列。 使用过期时的代码中的步骤
使用过期时的代码中的步骤 以下是带有过期的测试代码。
pytest.mark.asyncioasync def test_lamp_on(test_session): Test lamp switch on lamp Lamp() test_session.add(lamp) await test_session.commit() await test_session.refresh(lamp) logging.error(New client status: %s, lamp.status) assert lamp.status off async with AsyncClient(appapp, base_urlhttp://testserver) as async_client: response await async_client.post(f/lamps/{lamp.id}/on) assert response.status_code 200 test_session.expire_all() # OR: # test_session.expire(lamp) results await test_session.execute(select(Lamp).where(Lamp.id 1)) new_lamp results.scalar_one_or_none() logging.error(Updated status: %s, new_lamp.status) assert new_lamp.status on 关闭
实际上使用会话终止的最后一种方法也调用了expire_all但会话可以进一步使用。当读取新数据时我们将获得最新的对象。
await test_session.close()
这应该在应用程序请求完成之后并在检查开始之前立即调用。
以下是使用关闭时代码中的步骤。 使用关闭时的代码中的步骤
以下是带有会话关闭的测试代码。
pytest.mark.asyncioasync def test_lamp_on(test_session): Test lamp switch on lamp Lamp() test_session.add(lamp) await test_session.commit() await test_session.refresh(lamp) logging.error(New client status: %s, lamp.status) assert lamp.status off async with AsyncClient(appapp, base_urlhttp://testserver) as async_client: response await async_client.post(f/lamps/{lamp.id}/on) assert response.status_code 200 await test_session.close() results await test_session.execute(select(Lamp).where(Lamp.id 1)) new_lamp results.scalar_one_or_none() logging.error(Updated status: %s, new_lamp.status) assert new_lamp.status on
调用 rollback() 也会有帮助。它也调用 expire_all但它明确地回滚了事务。如果需要执行事务commit() 也会执行 expire_all。但在这个例子中既不需要回滚也不需要提交因为测试中的事务已经完成应用程序中的事务不会影响来自测试的会话。
实际上此功能仅在SQLAlchemy ORM的异步模式中在事务中工作。然而在代码中我确实向数据库发出请求以获取新对象的行为看起来似乎不合逻辑如果它仍然返回一个缓存的对象而不是从数据库强制接收的对象。当调试代码时这有点令人困惑。但当正确使用时这就是它应有的样子。
结论
在异步模式下使用SQLAlchemy ORM您必须并行跟踪事务和线程中的会话。如果所有这些看起来太复杂那么使用SQLAlchemy ORM的同步模式。它里面的一切都简单得多。
作者Aleksei Sharypov
更多内容请关注公号【云原生数据库】
squids.cn云数据库RDS迁移工具DBMotion云备份DBTwin等数据库生态工具。