当前位置: 首页 > news >正文

idc网站模板网上智慧团建系统

idc网站模板,网上智慧团建系统,asp.net网站开发实例教程pdf,珠海自适应网站建设【squids.cn】 全网zui低价RDS#xff0c;免费的迁移工具DBMotion、数据库备份工具DBTwin、SQL开发工具等 当使用两个不同的异步会话来测试FastAPI应用程序与数据库的连接时#xff0c;可能会出现以下错误#xff1a; 在测试中#xff0c;在数据库中创建了一个对象#x…【squids.cn】 全网zui低价RDS免费的迁移工具DBMotion、数据库备份工具DBTwin、SQL开发工具等 当使用两个不同的异步会话来测试FastAPI应用程序与数据库的连接时可能会出现以下错误 在测试中在数据库中创建了一个对象测试会话。  在应用程序中发出一个请求在此请求中修改了这个对象应用会话。  在测试中从数据库加载对象但其中没有所需的更改测试会话。  让我们找出发生了什么。 我们通常在应用程序和测试中使用两个不同的会话。 此外在测试中我们通常将会话包装在一个准备数据库进行测试的fixture中测试完成后所有内容都会被清理。 以下是应用程序的示例。 一个带有数据库连接的文件app/database.py Database settings file from typing import AsyncGenerator ​ from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker from sqlalchemy.orm import declarative_base ​ DATABASE_URL postgresqlasyncpg://user:passwordhost:5432/dbname engine create_async_engine(DATABASE_URL, echoTrue, futureTrue) async_session async_sessionmaker(bindengine, class_AsyncSession, expire_on_commitFalse) ​ ​ async def get_session() - AsyncGenerator: Returns async session async with async_session() as session:yield session ​ Base declarative_base() 一个带有模型描述的文件app/models.py  Model file from sqlalchemy import Integer, Stringfrom sqlalchemy.orm import Mapped, mapped_column​from .database import Base​​class Lamp(Base): Lamp model __tablename__ lamps​ id: Mapped[int] mapped_column(Integer, primary_keyTrue, indexTrue)   status: Mapped[str] mapped_column(String, defaultoff) 一个带有端点描述的文件app/main.py  Main file import logging​from fastapi import FastAPI, Dependsfrom sqlalchemy import selectfrom sqlalchemy.ext.asyncio import AsyncSession​from .database import get_sessionfrom .models import Lamp​app FastAPI()​​app.post(/lamps/{lamp_id}/on)async def check_lamp( lamp_id: int, session: AsyncSession Depends(get_session)) - dict: Lamp on endpoint results await session.execute(select(Lamp).where(Lamp.id lamp_id)) lamp results.scalar_one_or_none() if lamp: logging.error(Status before update: %s, lamp.status)​ lamp.status on session.add(lamp) await session.commit() await session.refresh(lamp)​ logging.error(Status after update: %s, lamp.status)   return {} 我特意在示例中添加了日志记录和一些其他请求以使其更加清晰。 这里使用Depends创建了一个会话。 以下是带有测试示例的文件tests/test_lamp.py Test lamp import logging from typing import AsyncGenerator ​ import pytest import pytest_asyncio from httpx import AsyncClient from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker ​ from app.database import Base, engine from app.main import app, Lamp ​ ​ pytest_asyncio.fixture(scopefunction, nametest_session) async def test_session_fixture() - AsyncGenerator: Async session fixture async_session async_sessionmaker(engine, class_AsyncSession, expire_on_commitFalse) ​async with async_session() as session:async with engine.begin() as conn:await conn.run_sync(Base.metadata.create_all) ​yield session ​async with engine.begin() as conn:await conn.run_sync(Base.metadata.drop_all) ​await engine.dispose() ​ ​ pytest.mark.asyncio async def test_lamp_on(test_session): Test lamp switch on lamp Lamp()test_session.add(lamp)await test_session.commit()await test_session.refresh(lamp) ​logging.error(New client status: %s, lamp.status)assert lamp.status off ​async with AsyncClient(appapp, base_urlhttp://testserver) as async_client:response await async_client.post(f/lamps/{lamp.id}/on)assert response.status_code 200results await test_session.execute(select(Lamp).where(Lamp.id lamp.id))new_lamp results.scalar_one_or_none()logging.error(Updated status: %s, new_lamp.status) ​assert new_lamp.status on 这是一个常规的Pytest它在一个fixture中获取到数据库的会话。在返回会话之前此fixture会先创建所有的表格使用之后它们会被删除。 请再次注意在测试中我们使用来自test_session fixture的会话而在主代码中我们使用来自app/database.py文件的会话。尽管我们使用相同的引擎但是生成的会话是不同的。这一点很重要。 预期的数据库请求序列 应从数据库返回status on。 在测试中我首先在数据库中创建一个对象。这是通过来自测试的会话进行的常规INSERT。我们称其为Session 1。此时只有这个会话连接到了数据库。应用程序会话尚未连接。 在创建对象之后我执行了一个刷新操作。这是通过Session 1对新创建的对象进行的SELECT并通过实例更新。 结果我确保对象正确创建并且status字段填充了所需的值 - off。 然后我对/lamps/1/on端点执行一个POST请求。这是打开灯的操作。为了使示例更短我没有使用fixture。一旦请求开始工作将创建一个新的数据库会话。我们称其为Session 2。使用这个会话我从数据库加载所需的对象。我将状态输出到日志。它是off。之后我更新了这个状态并在数据库中保存了更新。数据库收到了一个请求 BEGIN (implicit)UPDATE lamps SET status$1::VARCHAR WHERE lamps.id $2::INTEGERparameters: (on, 1)COMMIT 请注意也存在COMMIT命令。尽管事务是隐式的但其结果在其他会话中在COMMIT之后立即可用。 接下来我使用refresh发出请求从数据库获取更新后的对象。我输出状态。现在它的值是on。 看起来一切都应该正常工作。端点停止工作关闭Session 2并将控制权转移到测试。 在测试中我从Session 1发出常规请求以获取修改过的对象。但在状态字段中我看到的值是off。 以下是代码中操作序列的方案。 代码中的操作序列 代码中的操作序列 同时根据所有日志最后一个SELECT请求被执行并返回了status on。此刻数据库中的其值肯定等于on。这是engine asyncpg响应SELECT请求时接收的值。 那么发生了什么 发生了以下情况。 结果是为获取新对象而做的请求并没有更新当前的对象而是找到并使用了现有的对象。一开始我使用ORM添加了一个灯泡对象。我在另一个会话中更改了它。当更改完成时当前会话对此更改一无所知。并且在Session 2中进行的提交并未在Session 1中请求expire_all方法。 为了修复这个问题你可以做以下操作之一 对于测试和应用程序使用共享会话。 刷新实例而不是尝试从数据库中获取它。 强制使实例过期。 关闭会话。 依赖性覆盖 为了使用相同的会话您可以简单地使用我在测试中创建的那个覆盖应用程序中的会话。这很简单。 为此我们需要在测试中添加以下代码 async def _override_get_db(): yield test_sessionapp.dependency_overrides[get_session] _override_get_db 如果你愿意可以将这部分包装成一个夹具以便在所有测试中使用。 所得到的算法将如下所示 代码中使用依赖性覆盖时的步骤 下面是带有会话替代的测试代码 pytest.mark.asyncioasync def test_lamp_on(test_session): Test lamp switch on async def _override_get_db(): yield test_session app.dependency_overrides[get_session] _override_get_db lamp Lamp() test_session.add(lamp) await test_session.commit() await test_session.refresh(lamp)​ logging.error(New client status: %s, lamp.status) assert lamp.status off​ async with AsyncClient(appapp, base_urlhttp://testserver) as async_client: response await async_client.post(f/lamps/{lamp.id}/on) assert response.status_code 200​ results await test_session.execute(select(Lamp).where(Lamp.id 1)) new_lamp results.scalar_one_or_none() logging.error(Updated status: %s, new_lamp.status)​   assert new_lamp.status on 但是如果应用程序使用多个会话这是可能的那么这可能不是最佳方法。此外如果在被测试的函数中没有调用commit或rollback那么这也将无济于事。 刷新  第二种解决方案是最简单和最有逻辑的。我们不应该创建一个新的请求来获取一个对象。为了更新处理端点请求后立即调用刷新就足够了。在内部它调用expires这导致保存的实例不用于新的请求并且数据重新填充。这种解决方案是最有逻辑的也最容易理解。 await test_session.refresh(lamp) 之后你不需要再试图加载new_lamp对象只需检查相同的lamp。 以下是使用刷新的代码方案。 使用刷新时的代码中的步骤  以下是带有更新的测试代码。 pytest.mark.asyncioasync def test_lamp_on(test_session): Test lamp switch on lamp Lamp() test_session.add(lamp) await test_session.commit() await test_session.refresh(lamp)​ logging.error(New client status: %s, lamp.status) assert lamp.status off​ async with AsyncClient(appapp, base_urlhttp://testserver) as async_client: response await async_client.post(f/lamps/{lamp.id}/on) assert response.status_code 200​ await test_session.refresh(lamp) logging.error(Updated status: %s, lamp.status)​   assert lamp.status on 过期  但是如果我们更改了很多对象最好调用expire_all。然后所有实例都将从数据库中读取一致性不会被破坏。 test_session.expire_all() 你还可以在特定实例甚至实例属性上调用expire。 test_session.expire(lamp) 在这些调用之后你将不得不手动从数据库中读取对象。 以下是使用过期时代码中的步骤序列。 使用过期时的代码中的步骤 使用过期时的代码中的步骤 以下是带有过期的测试代码。 pytest.mark.asyncioasync def test_lamp_on(test_session): Test lamp switch on lamp Lamp() test_session.add(lamp) await test_session.commit() await test_session.refresh(lamp)​ logging.error(New client status: %s, lamp.status) assert lamp.status off​ async with AsyncClient(appapp, base_urlhttp://testserver) as async_client: response await async_client.post(f/lamps/{lamp.id}/on) assert response.status_code 200​ test_session.expire_all() # OR: # test_session.expire(lamp)​ results await test_session.execute(select(Lamp).where(Lamp.id 1)) new_lamp results.scalar_one_or_none() logging.error(Updated status: %s, new_lamp.status)​   assert new_lamp.status on 关闭  实际上使用会话终止的最后一种方法也调用了expire_all但会话可以进一步使用。当读取新数据时我们将获得最新的对象。 await test_session.close() 这应该在应用程序请求完成之后并在检查开始之前立即调用。 以下是使用关闭时代码中的步骤。 使用关闭时的代码中的步骤 以下是带有会话关闭的测试代码。 pytest.mark.asyncioasync def test_lamp_on(test_session): Test lamp switch on lamp Lamp() test_session.add(lamp) await test_session.commit() await test_session.refresh(lamp)​ logging.error(New client status: %s, lamp.status) assert lamp.status off​ async with AsyncClient(appapp, base_urlhttp://testserver) as async_client: response await async_client.post(f/lamps/{lamp.id}/on) assert response.status_code 200​ await test_session.close()​ results await test_session.execute(select(Lamp).where(Lamp.id 1)) new_lamp results.scalar_one_or_none() logging.error(Updated status: %s, new_lamp.status)​   assert new_lamp.status on 调用 rollback() 也会有帮助。它也调用 expire_all但它明确地回滚了事务。如果需要执行事务commit() 也会执行 expire_all。但在这个例子中既不需要回滚也不需要提交因为测试中的事务已经完成应用程序中的事务不会影响来自测试的会话。 实际上此功能仅在SQLAlchemy ORM的异步模式中在事务中工作。然而在代码中我确实向数据库发出请求以获取新对象的行为看起来似乎不合逻辑如果它仍然返回一个缓存的对象而不是从数据库强制接收的对象。当调试代码时这有点令人困惑。但当正确使用时这就是它应有的样子。 结论  在异步模式下使用SQLAlchemy ORM您必须并行跟踪事务和线程中的会话。如果所有这些看起来太复杂那么使用SQLAlchemy ORM的同步模式。它里面的一切都简单得多。 作者Aleksei Sharypov 更多内容请关注公号【云原生数据库】 squids.cn云数据库RDS迁移工具DBMotion云备份DBTwin等数据库生态工具。
http://www.huolong8.cn/news/28455/

相关文章:

  • asp手机网站统计代码百度引流平台
  • 我的网站百度找不到了衡水住房和城乡建设局网站
  • 网站上的洗衣液瓶子做花瓶怎么材质wordpress ssl配置
  • 网站建设过程的结构图做儿童业态招商要去哪些网站
  • 软件介绍网站模板河源做网站
  • 网站建设开发人员须知wordpress 获取指定文章标题
  • 做设计不进设计公司网站购物平台有哪些比较火
  • 网站建设前景国内做网站比较好的公司有哪些
  • 有什么网站可以做家教产品全网营销推广
  • 南京网站建设公司排名网站首页界面设计
  • 网站页面改版龙岗做网站公司哪家好
  • 个人网站建设怎么样网站footer模板
  • 网站建设建网站年轻的母亲通用网站模板
  • 营销型网站建设题库做那个网站比较好
  • 大型旅行社自建网站网站推广网络营销
  • 免费网站建设排名十大免费下载软件
  • 生鲜市场型网站开发如何免费建设公司网站
  • 怎么制作弹幕网站wordpress 工作流
  • 滑县网站建设哪家专业北京哪里做网站好
  • 腾讯网站建设公司wordpress主题接口
  • 换服务器后网站首页不收录仿门户网站多功能js相册画廊源码
  • 广西网站开发软件天堂 在线地址8
  • 做外墙资料的网站网站建设公司权威机构
  • 设计非常漂亮的网站win10优化大师是官方的吗
  • 中小型网站建设效果电商网站开发的背景及意义
  • 长沙大型做网站公司企业推广文案范文
  • 网站建设终稿确认书wordpress好用插件
  • 电子商务网站业务流程分析wordpress文章 页面
  • 打开网站notfound国内电商企业有哪些
  • 网站建设指南视频教程河北省建设机械协会是正规网站吗