aiogram/tests/test_fsm/storage/test_storages.py
Andrew 4caf56814e
Some checks failed
Tests / tests (macos-latest, 3.10) (push) Has been cancelled
Tests / tests (macos-latest, 3.11) (push) Has been cancelled
Tests / tests (macos-latest, 3.12) (push) Has been cancelled
Tests / tests (macos-latest, 3.13) (push) Has been cancelled
Tests / tests (macos-latest, 3.14) (push) Has been cancelled
Tests / tests (ubuntu-latest, 3.10) (push) Has been cancelled
Tests / tests (ubuntu-latest, 3.11) (push) Has been cancelled
Tests / tests (ubuntu-latest, 3.12) (push) Has been cancelled
Tests / tests (ubuntu-latest, 3.13) (push) Has been cancelled
Tests / tests (ubuntu-latest, 3.14) (push) Has been cancelled
Tests / tests (windows-latest, 3.10) (push) Has been cancelled
Tests / tests (windows-latest, 3.11) (push) Has been cancelled
Tests / tests (windows-latest, 3.12) (push) Has been cancelled
Tests / tests (windows-latest, 3.13) (push) Has been cancelled
Tests / tests (windows-latest, 3.14) (push) Has been cancelled
Tests / pypy-tests (macos-latest, pypy3.10) (push) Has been cancelled
Tests / pypy-tests (macos-latest, pypy3.11) (push) Has been cancelled
Tests / pypy-tests (ubuntu-latest, pypy3.10) (push) Has been cancelled
Tests / pypy-tests (ubuntu-latest, pypy3.11) (push) Has been cancelled
Support of Py3.14 (#1730)
* Py3.14 support
Bump .pre-commit-config.yaml
Bump `mongo` feature deps
Bump `proxy` feature dep
Bump `test` feature deps
Bump `dev` feature deps

Set `aiohttp` max version `<3.14`

Fix `test_isolation.py` tests
Fix `test_storages.py` tests

Add Py version limit `<3.15` (breaking changes possible)
Add new `uvloop` starter to `Dispatcher.run_polling`

Remove old `uvloop` `set_event_loop_policy`
Remove `pytest-lazy-fixture`

* Make `test` and `dev` features deps strong fixed

* Add 1730.feature.rst

* Remove unneeded `None` from `Dispatcher.run_polling`

* Fix `macos-latest-pypy3.11` test `test_aiohtt_server.py`

* Update `tests.yml`

* Update `tests.yml`
2025-10-11 00:16:50 +03:00

82 lines
3.3 KiB
Python

from typing import TypedDict
import pytest
from aiogram.exceptions import DataNotDictLikeError
from aiogram.fsm.storage.base import BaseStorage, StorageKey
@pytest.mark.parametrize(
"storage",
["memory_storage", "redis_storage", "mongo_storage", "pymongo_storage"],
indirect=True,
)
class TestStorages:
async def test_set_state(self, storage: BaseStorage, storage_key: StorageKey):
assert await storage.get_state(key=storage_key) is None
await storage.set_state(key=storage_key, state="state")
assert await storage.get_state(key=storage_key) == "state"
await storage.set_state(key=storage_key, state=None)
assert await storage.get_state(key=storage_key) is None
async def test_set_data(self, storage: BaseStorage, storage_key: StorageKey):
assert await storage.get_data(key=storage_key) == {}
assert await storage.get_value(storage_key=storage_key, dict_key="foo") is None
assert (
await storage.get_value(storage_key=storage_key, dict_key="foo", default="baz")
== "baz"
)
await storage.set_data(key=storage_key, data={"foo": "bar"})
assert await storage.get_data(key=storage_key) == {"foo": "bar"}
assert await storage.get_value(storage_key=storage_key, dict_key="foo") == "bar"
assert (
await storage.get_value(storage_key=storage_key, dict_key="foo", default="baz")
== "bar"
)
await storage.set_data(key=storage_key, data={})
assert await storage.get_data(key=storage_key) == {}
assert await storage.get_value(storage_key=storage_key, dict_key="foo") is None
assert (
await storage.get_value(storage_key=storage_key, dict_key="foo", default="baz")
== "baz"
)
class CustomTypedDict(TypedDict, total=False):
foo: str
bar: str
await storage.set_data(key=storage_key, data=CustomTypedDict(foo="bar", bar="baz"))
assert await storage.get_data(key=storage_key) == {"foo": "bar", "bar": "baz"}
assert await storage.get_value(storage_key=storage_key, dict_key="foo") == "bar"
assert (
await storage.get_value(storage_key=storage_key, dict_key="foo", default="baz")
== "bar"
)
with pytest.raises(DataNotDictLikeError):
await storage.set_data(key=storage_key, data=())
async def test_update_data(self, storage: BaseStorage, storage_key: StorageKey):
assert await storage.get_data(key=storage_key) == {}
assert await storage.update_data(key=storage_key, data={"foo": "bar"}) == {"foo": "bar"}
assert await storage.update_data(key=storage_key, data={}) == {"foo": "bar"}
assert await storage.get_data(key=storage_key) == {"foo": "bar"}
assert await storage.update_data(key=storage_key, data={"baz": "spam"}) == {
"foo": "bar",
"baz": "spam",
}
assert await storage.get_data(key=storage_key) == {
"foo": "bar",
"baz": "spam",
}
assert await storage.update_data(key=storage_key, data={"baz": "test"}) == {
"foo": "bar",
"baz": "test",
}
assert await storage.get_data(key=storage_key) == {
"foo": "bar",
"baz": "test",
}