mirror of
https://github.com/aiogram/aiogram.git
synced 2026-04-08 16:37:47 +00:00
💩 First iteration
This commit is contained in:
parent
0bd7fc2c7e
commit
fac69e52b7
25 changed files with 427 additions and 273 deletions
|
|
@ -1,9 +1,17 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import Any, Optional, TypeVar
|
||||
from typing import (
|
||||
Any,
|
||||
AsyncIterator,
|
||||
Optional,
|
||||
TypeVar,
|
||||
)
|
||||
|
||||
from ...utils.mixins import ContextInstanceMixin, DataMixin
|
||||
from ...utils.mixins import (
|
||||
ContextInstance,
|
||||
ContextInstanceMixin,
|
||||
)
|
||||
from ...utils.token import extract_bot_id, validate_token
|
||||
from ..methods import TelegramMethod
|
||||
from .session.aiohttp import AiohttpSession
|
||||
|
|
@ -12,13 +20,13 @@ from .session.base import BaseSession
|
|||
T = TypeVar("T")
|
||||
|
||||
|
||||
class BaseBot(ContextInstanceMixin, DataMixin):
|
||||
class BaseBot(ContextInstanceMixin[ContextInstance]):
|
||||
"""
|
||||
Base class for bots
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self, token: str, session: BaseSession = None, parse_mode: Optional[str] = None
|
||||
self, token: str, session: Optional[BaseSession] = None, parse_mode: Optional[str] = None
|
||||
) -> None:
|
||||
validate_token(token)
|
||||
|
||||
|
|
@ -54,14 +62,15 @@ class BaseBot(ContextInstanceMixin, DataMixin):
|
|||
await self.session.close()
|
||||
|
||||
@asynccontextmanager
|
||||
async def context(self, auto_close: bool = True):
|
||||
async def context(self, auto_close: bool = True) -> AsyncIterator["BaseBot[ContextInstance]"]:
|
||||
"""
|
||||
Generate bot context
|
||||
|
||||
:param auto_close:
|
||||
:return:
|
||||
"""
|
||||
token = self.set_current(self)
|
||||
# TODO: because set_current expects Bot, not BaseBot — this check fails
|
||||
token = self.set_current(self) # type: ignore
|
||||
try:
|
||||
yield self
|
||||
finally:
|
||||
|
|
|
|||
|
|
@ -101,12 +101,12 @@ from ..types import (
|
|||
from .base import BaseBot
|
||||
|
||||
|
||||
class Bot(BaseBot):
|
||||
class Bot(BaseBot["Bot"]):
|
||||
"""
|
||||
Class where located all API methods
|
||||
"""
|
||||
|
||||
@alru_cache()
|
||||
@alru_cache() # type: ignore
|
||||
async def me(self) -> User:
|
||||
return await self.get_me()
|
||||
|
||||
|
|
|
|||
|
|
@ -15,8 +15,8 @@ class AiohttpSession(BaseSession):
|
|||
def __init__(
|
||||
self,
|
||||
api: TelegramAPIServer = PRODUCTION,
|
||||
json_loads: Optional[Callable] = None,
|
||||
json_dumps: Optional[Callable] = None,
|
||||
json_loads: Optional[Callable[..., str]] = None,
|
||||
json_dumps: Optional[Callable[..., str]] = None,
|
||||
):
|
||||
super(AiohttpSession, self).__init__(api=api, json_loads=json_loads, json_dumps=json_dumps)
|
||||
self._session: Optional[ClientSession] = None
|
||||
|
|
@ -27,11 +27,11 @@ class AiohttpSession(BaseSession):
|
|||
|
||||
return self._session
|
||||
|
||||
async def close(self):
|
||||
async def close(self) -> None:
|
||||
if self._session is not None and not self._session.closed:
|
||||
await self._session.close()
|
||||
|
||||
def build_form_data(self, request: Request):
|
||||
def build_form_data(self, request: Request) -> FormData:
|
||||
form = FormData(quote_fields=False)
|
||||
for key, value in request.data.items():
|
||||
if value is None:
|
||||
|
|
|
|||
|
|
@ -3,7 +3,16 @@ from __future__ import annotations
|
|||
import abc
|
||||
import datetime
|
||||
import json
|
||||
from typing import Any, AsyncGenerator, Callable, Optional, TypeVar, Union
|
||||
from types import TracebackType
|
||||
from typing import (
|
||||
Any,
|
||||
AsyncGenerator,
|
||||
Callable,
|
||||
Optional,
|
||||
Type,
|
||||
TypeVar,
|
||||
Union,
|
||||
)
|
||||
|
||||
from aiogram.utils.exceptions import TelegramAPIError
|
||||
|
||||
|
|
@ -17,8 +26,8 @@ class BaseSession(abc.ABC):
|
|||
def __init__(
|
||||
self,
|
||||
api: Optional[TelegramAPIServer] = None,
|
||||
json_loads: Optional[Callable[[Any], Any]] = None,
|
||||
json_dumps: Optional[Callable[[Any], Any]] = None,
|
||||
json_loads: Optional[Callable[..., str]] = None,
|
||||
json_dumps: Optional[Callable[..., str]] = None,
|
||||
) -> None:
|
||||
if api is None:
|
||||
api = PRODUCTION
|
||||
|
|
@ -37,7 +46,7 @@ class BaseSession(abc.ABC):
|
|||
raise TelegramAPIError(response.description)
|
||||
|
||||
@abc.abstractmethod
|
||||
async def close(self): # pragma: no cover
|
||||
async def close(self) -> None: # pragma: no cover
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
|
|
@ -73,5 +82,10 @@ class BaseSession(abc.ABC):
|
|||
async def __aenter__(self) -> BaseSession:
|
||||
return self
|
||||
|
||||
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
||||
async def __aexit__(
|
||||
self,
|
||||
exc_type: Optional[Type[BaseException]],
|
||||
exc_value: Optional[BaseException],
|
||||
traceback: Optional[TracebackType],
|
||||
) -> None:
|
||||
await self.close()
|
||||
|
|
|
|||
|
|
@ -2,7 +2,16 @@ from __future__ import annotations
|
|||
|
||||
import abc
|
||||
import secrets
|
||||
from typing import TYPE_CHECKING, Any, Dict, Generic, Optional, Type, TypeVar, Union
|
||||
from typing import (
|
||||
Generator,
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
Dict,
|
||||
Generic,
|
||||
Optional,
|
||||
TypeVar,
|
||||
Union,
|
||||
)
|
||||
|
||||
from pydantic import BaseConfig, BaseModel, Extra
|
||||
from pydantic.generics import GenericModel
|
||||
|
|
@ -24,7 +33,7 @@ class Request(BaseModel):
|
|||
class Config(BaseConfig):
|
||||
arbitrary_types_allowed = True
|
||||
|
||||
def render_webhook_request(self):
|
||||
def render_webhook_request(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"method": self.method,
|
||||
**{key: value for key, value in self.data.items() if value is not None},
|
||||
|
|
@ -48,7 +57,7 @@ class TelegramMethod(abc.ABC, BaseModel, Generic[T]):
|
|||
|
||||
@property
|
||||
@abc.abstractmethod
|
||||
def __returning__(self) -> Type: # pragma: no cover
|
||||
def __returning__(self) -> type: # pragma: no cover
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
|
|
@ -62,14 +71,14 @@ class TelegramMethod(abc.ABC, BaseModel, Generic[T]):
|
|||
async def emit(self, bot: Bot) -> T:
|
||||
return await bot(self)
|
||||
|
||||
def __await__(self):
|
||||
def __await__(self) -> Generator[Any, None, T]:
|
||||
from aiogram.api.client.bot import Bot
|
||||
|
||||
bot = Bot.get_current(no_error=False)
|
||||
return self.emit(bot).__await__()
|
||||
|
||||
|
||||
def prepare_file(name: str, value: Any, data: Dict[str, Any], files: Dict[str, Any]):
|
||||
def prepare_file(name: str, value: Any, data: Dict[str, Any], files: Dict[str, Any]) -> None:
|
||||
if not value:
|
||||
return
|
||||
if name == "thumb":
|
||||
|
|
@ -101,7 +110,7 @@ def prepare_media_file(data: Dict[str, Any], files: Dict[str, InputFile]) -> Non
|
|||
and isinstance(data["media"]["media"], InputFile)
|
||||
):
|
||||
tag = secrets.token_urlsafe(10)
|
||||
files[tag] = data["media"].pop("media") # type: ignore
|
||||
files[tag] = data["media"].pop("media")
|
||||
data["media"]["media"] = f"attach://{tag}"
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ from pydantic import BaseModel, Extra
|
|||
from aiogram.utils.mixins import ContextInstanceMixin
|
||||
|
||||
|
||||
class TelegramObject(ContextInstanceMixin, BaseModel):
|
||||
class TelegramObject(ContextInstanceMixin["TelegramObject"], BaseModel):
|
||||
class Config:
|
||||
use_enum_values = True
|
||||
orm_mode = True
|
||||
|
|
|
|||
|
|
@ -4,7 +4,13 @@ import io
|
|||
import os
|
||||
from abc import ABC, abstractmethod
|
||||
from pathlib import Path
|
||||
from typing import AsyncGenerator, Optional, Union
|
||||
from typing import (
|
||||
AsyncGenerator,
|
||||
AsyncIterator,
|
||||
Iterator,
|
||||
Optional,
|
||||
Union,
|
||||
)
|
||||
|
||||
import aiofiles as aiofiles
|
||||
|
||||
|
|
@ -24,14 +30,14 @@ class InputFile(ABC):
|
|||
self.chunk_size = chunk_size
|
||||
|
||||
@classmethod
|
||||
def __get_validators__(cls):
|
||||
yield
|
||||
def __get_validators__(cls) -> Iterator[None]:
|
||||
yield None
|
||||
|
||||
@abstractmethod
|
||||
async def read(self, chunk_size: int) -> AsyncGenerator[bytes, None]: # pragma: no cover
|
||||
yield b""
|
||||
|
||||
async def __aiter__(self):
|
||||
async def __aiter__(self) -> AsyncIterator[bytes]:
|
||||
async for chunk in self.read(self.chunk_size):
|
||||
yield chunk
|
||||
|
||||
|
|
|
|||
|
|
@ -181,7 +181,7 @@ class Message(TelegramObject):
|
|||
buttons."""
|
||||
|
||||
@property
|
||||
def content_type(self):
|
||||
def content_type(self) -> str:
|
||||
if self.text:
|
||||
return ContentType.TEXT
|
||||
if self.audio:
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@ class User(TelegramObject):
|
|||
"""True, if the bot supports inline queries. Returned only in getMe."""
|
||||
|
||||
@property
|
||||
def full_name(self):
|
||||
def full_name(self) -> str:
|
||||
if self.last_name:
|
||||
return f"{self.first_name} {self.last_name}"
|
||||
return self.first_name
|
||||
|
|
|
|||
|
|
@ -1,3 +1,5 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import contextvars
|
||||
import warnings
|
||||
|
|
@ -96,7 +98,7 @@ class Dispatcher(Router):
|
|||
update_id = update.update_id + 1
|
||||
|
||||
@classmethod
|
||||
async def _silent_call_request(cls, bot: Bot, result: TelegramMethod) -> None:
|
||||
async def _silent_call_request(cls, bot: Bot, result: TelegramMethod[Any]) -> None:
|
||||
"""
|
||||
Simulate answer into WebHook
|
||||
|
||||
|
|
@ -172,7 +174,7 @@ class Dispatcher(Router):
|
|||
raise
|
||||
|
||||
async def feed_webhook_update(
|
||||
self, bot: Bot, update: Union[Update, Dict[str, Any]], _timeout: int = 55, **kwargs
|
||||
self, bot: Bot, update: Union[Update, Dict[str, Any]], _timeout: int = 55, **kwargs: Any
|
||||
) -> Optional[Dict[str, Any]]:
|
||||
if not isinstance(update, Update): # Allow to use raw updates
|
||||
update = Update(**update)
|
||||
|
|
@ -181,18 +183,18 @@ class Dispatcher(Router):
|
|||
loop = asyncio.get_running_loop()
|
||||
waiter = loop.create_future()
|
||||
|
||||
def release_waiter(*args: Any):
|
||||
def release_waiter(*args: Any) -> None:
|
||||
if not waiter.done():
|
||||
waiter.set_result(None)
|
||||
|
||||
timeout_handle = loop.call_later(_timeout, release_waiter)
|
||||
|
||||
process_updates: Future = asyncio.ensure_future(
|
||||
process_updates: Future[Any] = asyncio.ensure_future(
|
||||
self._feed_webhook_update(bot=bot, update=update, **kwargs)
|
||||
)
|
||||
process_updates.add_done_callback(release_waiter, context=ctx)
|
||||
|
||||
def process_response(task: Future):
|
||||
def process_response(task: Future[Any]) -> None:
|
||||
warnings.warn(
|
||||
f"Detected slow response into webhook.\n"
|
||||
f"Telegram is waiting for response only first 60 seconds and then re-send update.\n"
|
||||
|
|
|
|||
|
|
@ -1,7 +1,16 @@
|
|||
import inspect
|
||||
from dataclasses import dataclass, field
|
||||
from functools import partial
|
||||
from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple, Union
|
||||
from typing import (
|
||||
Any,
|
||||
Awaitable,
|
||||
Callable,
|
||||
Dict,
|
||||
List,
|
||||
Optional,
|
||||
Tuple,
|
||||
Union,
|
||||
)
|
||||
|
||||
from aiogram.dispatcher.filters.base import BaseFilter
|
||||
from aiogram.dispatcher.handler.base import BaseHandler
|
||||
|
|
@ -19,21 +28,22 @@ class CallableMixin:
|
|||
awaitable: bool = field(init=False)
|
||||
spec: inspect.FullArgSpec = field(init=False)
|
||||
|
||||
def __post_init__(self):
|
||||
def __post_init__(self) -> None:
|
||||
callback = self.callback
|
||||
self.awaitable = inspect.isawaitable(callback) or inspect.iscoroutinefunction(callback)
|
||||
while hasattr(callback, "__wrapped__"): # Try to resolve decorated callbacks
|
||||
callback = callback.__wrapped__
|
||||
callback = callback.__wrapped__ # type: ignore
|
||||
self.spec = inspect.getfullargspec(callback)
|
||||
|
||||
def _prepare_kwargs(self, kwargs):
|
||||
def _prepare_kwargs(self, kwargs: Dict[str, Any]) -> Dict[str, Any]:
|
||||
if self.spec.varkw:
|
||||
return kwargs
|
||||
|
||||
return {k: v for k, v in kwargs.items() if k in self.spec.args}
|
||||
|
||||
async def call(self, *args, **kwargs):
|
||||
wrapped = partial(self.callback, *args, **self._prepare_kwargs(kwargs))
|
||||
async def call(self, *args: Any, **kwargs: Any) -> Any:
|
||||
# TODO: what we should do if callback is BaseHandler?
|
||||
wrapped = partial(self.callback, *args, **self._prepare_kwargs(kwargs)) # type: ignore
|
||||
if self.awaitable:
|
||||
return await wrapped()
|
||||
return wrapped()
|
||||
|
|
@ -49,10 +59,10 @@ class HandlerObject(CallableMixin):
|
|||
callback: HandlerType
|
||||
filters: Optional[List[FilterObject]] = None
|
||||
|
||||
def __post_init__(self):
|
||||
def __post_init__(self) -> None:
|
||||
super(HandlerObject, self).__post_init__()
|
||||
|
||||
if inspect.isclass(self.callback) and issubclass(self.callback, BaseHandler):
|
||||
# TODO: by types callback must be Callable or BaseHandler, not Type[BaseHandler]
|
||||
if inspect.isclass(self.callback) and issubclass(self.callback, BaseHandler): # type: ignore
|
||||
self.awaitable = True
|
||||
|
||||
async def check(self, *args: Any, **kwargs: Any) -> Tuple[bool, Dict[str, Any]]:
|
||||
|
|
|
|||
|
|
@ -1,4 +1,8 @@
|
|||
from typing import Dict, Tuple, Union
|
||||
from typing import (
|
||||
Dict,
|
||||
Tuple,
|
||||
Type,
|
||||
)
|
||||
|
||||
from .base import BaseFilter
|
||||
from .command import Command, CommandObject
|
||||
|
|
@ -14,7 +18,7 @@ __all__ = (
|
|||
"ContentTypesFilter",
|
||||
)
|
||||
|
||||
BUILTIN_FILTERS: Dict[str, Union[Tuple[BaseFilter], Tuple]] = {
|
||||
BUILTIN_FILTERS: Dict[str, Tuple[Type[BaseFilter], ...]] = {
|
||||
"update": (),
|
||||
"message": (Text, Command, ContentTypesFilter),
|
||||
"edited_message": (Text, Command, ContentTypesFilter),
|
||||
|
|
|
|||
|
|
@ -1,22 +1,26 @@
|
|||
from abc import ABC, abstractmethod
|
||||
from typing import TYPE_CHECKING, Any, Dict, Union
|
||||
from typing import (
|
||||
Awaitable,
|
||||
Callable,
|
||||
Any,
|
||||
Dict,
|
||||
Union,
|
||||
)
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
async def _call_for_override(*args: Any, **kwargs: Any) -> Union[bool, Dict[str, Any]]:
|
||||
pass
|
||||
|
||||
|
||||
class BaseFilter(ABC, BaseModel):
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
# This checking type-hint is needed because mypy checks validity of overrides and raises:
|
||||
# error: Signature of "__call__" incompatible with supertype "BaseFilter" [override]
|
||||
# https://mypy.readthedocs.io/en/latest/error_code_list.html#check-validity-of-overrides-override
|
||||
# This little hack with typehint is needed because mypy checks validity of overrides and raises:
|
||||
# error: Signature of "__call__" incompatible with supertype "BaseFilter" [override]
|
||||
# https://mypy.readthedocs.io/en/latest/error_code_list.html#check-validity-of-overrides-override
|
||||
__call__: Callable[..., Awaitable[Union[bool, Dict[str, Any]]]] = _call_for_override
|
||||
abstractmethod(__call__)
|
||||
|
||||
pass
|
||||
else: # pragma: no cover
|
||||
|
||||
@abstractmethod
|
||||
async def __call__(self, *args: Any, **kwargs: Any) -> Union[bool, Dict[str, Any]]:
|
||||
pass
|
||||
|
||||
def __await__(self): # pragma: no cover
|
||||
def __await__(self): # type: ignore # pragma: no cover
|
||||
# Is needed only for inspection and this method is never be called
|
||||
return self.__call__
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ from aiogram import Bot
|
|||
from aiogram.api.types import Message
|
||||
from aiogram.dispatcher.filters import BaseFilter
|
||||
|
||||
CommandPatterType = Union[str, re.Pattern] # type: ignore
|
||||
CommandPatterType = Union[str, re.Pattern]
|
||||
|
||||
|
||||
class Command(BaseFilter):
|
||||
|
|
|
|||
|
|
@ -80,7 +80,7 @@ class Text(BaseFilter):
|
|||
# Impossible because the validator prevents this situation
|
||||
return False # pragma: no cover
|
||||
|
||||
def prepare_text(self, text: str):
|
||||
def prepare_text(self, text: str) -> str:
|
||||
if self.text_ignore_case:
|
||||
return str(text).lower()
|
||||
else:
|
||||
|
|
|
|||
|
|
@ -1,5 +1,13 @@
|
|||
from abc import ABC, abstractmethod
|
||||
from typing import TYPE_CHECKING, Any, Dict, Generic, TypeVar
|
||||
from typing import (
|
||||
Optional,
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
Dict,
|
||||
Generic,
|
||||
TypeVar,
|
||||
cast,
|
||||
)
|
||||
|
||||
from aiogram import Bot
|
||||
from aiogram.api.types import Update
|
||||
|
|
@ -23,18 +31,20 @@ class BaseHandler(BaseHandlerMixin[T], ABC):
|
|||
self.data: Dict[str, Any] = kwargs
|
||||
|
||||
@property
|
||||
def bot(self) -> Bot:
|
||||
def bot(self) -> Optional[Bot]:
|
||||
if "bot" in self.data:
|
||||
return self.data["bot"]
|
||||
# TODO: remove cast
|
||||
return cast(Bot, self.data["bot"])
|
||||
return Bot.get_current()
|
||||
|
||||
@property
|
||||
def update(self) -> Update:
|
||||
return self.data["update"]
|
||||
# TODO: remove cast
|
||||
return cast(Update, self.data["update"])
|
||||
|
||||
@abstractmethod
|
||||
async def handle(self) -> Any: # pragma: no cover
|
||||
pass
|
||||
|
||||
def __await__(self):
|
||||
def __await__(self) -> Any:
|
||||
return self.handle().__await__()
|
||||
|
|
|
|||
|
|
@ -1,5 +1,8 @@
|
|||
from abc import ABC
|
||||
from typing import Optional
|
||||
from typing import (
|
||||
Optional,
|
||||
cast,
|
||||
)
|
||||
|
||||
from aiogram.api.types import Chat, Message, User
|
||||
from aiogram.dispatcher.filters import CommandObject
|
||||
|
|
@ -24,5 +27,6 @@ class MessageHandlerCommandMixin(BaseHandlerMixin[Message]):
|
|||
@property
|
||||
def command(self) -> Optional[CommandObject]:
|
||||
if "command" in self.data:
|
||||
return self.data["command"]
|
||||
# TODO: remove cast
|
||||
return cast(CommandObject, self.data["command"])
|
||||
return None
|
||||
|
|
|
|||
|
|
@ -3,7 +3,12 @@ from __future__ import annotations
|
|||
import warnings
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
|
||||
from ..api.types import Chat, Update, User
|
||||
from ..api.types import (
|
||||
Chat,
|
||||
TelegramObject,
|
||||
Update,
|
||||
User,
|
||||
)
|
||||
from ..utils.imports import import_module
|
||||
from ..utils.warnings import CodeHasNoEffect
|
||||
from .event.observer import EventObserver, SkipHandler, TelegramEventObserver
|
||||
|
|
@ -151,6 +156,7 @@ class Router:
|
|||
chat: Optional[Chat] = None
|
||||
from_user: Optional[User] = None
|
||||
|
||||
event: TelegramObject
|
||||
if update.message:
|
||||
update_type = "message"
|
||||
from_user = update.message.from_user
|
||||
|
|
@ -211,7 +217,7 @@ class Router:
|
|||
|
||||
raise SkipHandler
|
||||
|
||||
async def emit_startup(self, *args, **kwargs) -> None:
|
||||
async def emit_startup(self, *args: Any, **kwargs: Any) -> None:
|
||||
"""
|
||||
Recursively call startup callbacks
|
||||
|
||||
|
|
@ -225,7 +231,7 @@ class Router:
|
|||
for router in self.sub_routers:
|
||||
await router.emit_startup(*args, **kwargs)
|
||||
|
||||
async def emit_shutdown(self, *args, **kwargs) -> None:
|
||||
async def emit_shutdown(self, *args: Any, **kwargs: Any) -> None:
|
||||
"""
|
||||
Recursively call shutdown callbacks to graceful shutdown
|
||||
|
||||
|
|
|
|||
|
|
@ -13,7 +13,15 @@ Example:
|
|||
>>> print(MyHelper.all())
|
||||
<<< ['barItem', 'bazItem', 'fooItem', 'lorem']
|
||||
"""
|
||||
from typing import List
|
||||
from typing import (
|
||||
Any,
|
||||
Callable,
|
||||
Iterable,
|
||||
List,
|
||||
Optional,
|
||||
Union,
|
||||
cast,
|
||||
)
|
||||
|
||||
PROPS_KEYS_ATTR_NAME = "_props_keys"
|
||||
|
||||
|
|
@ -22,12 +30,12 @@ class Helper:
|
|||
mode = ""
|
||||
|
||||
@classmethod
|
||||
def all(cls):
|
||||
def all(cls) -> List[Any]:
|
||||
"""
|
||||
Get all consts
|
||||
:return: list
|
||||
"""
|
||||
result = []
|
||||
result: List[Any] = []
|
||||
for name in dir(cls):
|
||||
if not name.isupper():
|
||||
continue
|
||||
|
|
@ -49,7 +57,7 @@ class HelperMode(Helper):
|
|||
lowercase = "lowercase"
|
||||
|
||||
@classmethod
|
||||
def all(cls):
|
||||
def all(cls) -> List[str]:
|
||||
return [
|
||||
cls.SCREAMING_SNAKE_CASE,
|
||||
cls.lowerCamelCase,
|
||||
|
|
@ -59,7 +67,7 @@ class HelperMode(Helper):
|
|||
]
|
||||
|
||||
@classmethod
|
||||
def _screaming_snake_case(cls, text):
|
||||
def _screaming_snake_case(cls, text: str) -> str:
|
||||
"""
|
||||
Transform text to SCREAMING_SNAKE_CASE
|
||||
|
||||
|
|
@ -77,7 +85,7 @@ class HelperMode(Helper):
|
|||
return result
|
||||
|
||||
@classmethod
|
||||
def _snake_case(cls, text):
|
||||
def _snake_case(cls, text: str) -> str:
|
||||
"""
|
||||
Transform text to snake case (Based on SCREAMING_SNAKE_CASE)
|
||||
|
||||
|
|
@ -89,7 +97,7 @@ class HelperMode(Helper):
|
|||
return cls._screaming_snake_case(text).lower()
|
||||
|
||||
@classmethod
|
||||
def _camel_case(cls, text, first_upper=False):
|
||||
def _camel_case(cls, text: str, first_upper: bool = False) -> str:
|
||||
"""
|
||||
Transform text to camelCase or CamelCase
|
||||
|
||||
|
|
@ -113,7 +121,7 @@ class HelperMode(Helper):
|
|||
return result
|
||||
|
||||
@classmethod
|
||||
def apply(cls, text, mode):
|
||||
def apply(cls, text: str, mode: Union[str, Callable[[str], str]]) -> str:
|
||||
"""
|
||||
Apply mode for text
|
||||
|
||||
|
|
@ -136,7 +144,20 @@ class HelperMode(Helper):
|
|||
return text
|
||||
|
||||
|
||||
class Item:
|
||||
class _BaseItem:
|
||||
def __init__(self, value: Optional[str] = None):
|
||||
self._value = cast(str, value)
|
||||
|
||||
def __set_name__(self, owner: Any, name: str) -> None:
|
||||
if not name.isupper():
|
||||
raise NameError("Name for helper item must be in uppercase!")
|
||||
if not self._value:
|
||||
# TODO: а если не имеет?
|
||||
if hasattr(owner, "mode"):
|
||||
self._value = HelperMode.apply(name, getattr(owner, "mode"))
|
||||
|
||||
|
||||
class Item(_BaseItem):
|
||||
"""
|
||||
Helper item
|
||||
|
||||
|
|
@ -144,34 +165,24 @@ class Item:
|
|||
it will be automatically generated based on a variable's name
|
||||
"""
|
||||
|
||||
def __init__(self, value=None):
|
||||
self._value = value
|
||||
|
||||
def __get__(self, instance, owner):
|
||||
def __get__(self, instance: Any, owner: Any) -> str:
|
||||
return self._value
|
||||
|
||||
def __set_name__(self, owner, name):
|
||||
if not name.isupper():
|
||||
raise NameError("Name for helper item must be in uppercase!")
|
||||
if not self._value:
|
||||
if hasattr(owner, "mode"):
|
||||
self._value = HelperMode.apply(name, getattr(owner, "mode"))
|
||||
|
||||
|
||||
class ListItem(Item):
|
||||
class ListItem(_BaseItem):
|
||||
"""
|
||||
This item is always a list
|
||||
|
||||
You can use &, | and + operators for that.
|
||||
"""
|
||||
|
||||
def add(self, other): # pragma: no cover
|
||||
def add(self, other: "ListItem") -> "ListItem": # pragma: no cover
|
||||
return self + other
|
||||
|
||||
def __get__(self, instance, owner):
|
||||
def __get__(self, instance: Any, owner: Any) -> "ItemsList":
|
||||
return ItemsList(self._value)
|
||||
|
||||
def __getitem__(self, item): # pragma: no cover
|
||||
def __getitem__(self, item: Any) -> Any: # pragma: no cover
|
||||
# Only for IDE. This method is never be called.
|
||||
return self._value
|
||||
|
||||
|
|
@ -179,17 +190,17 @@ class ListItem(Item):
|
|||
__iadd__ = __add__ = __rand__ = __and__ = __ror__ = __or__ = add
|
||||
|
||||
|
||||
class ItemsList(list):
|
||||
class ItemsList(List[str]):
|
||||
"""
|
||||
Patch for default list
|
||||
|
||||
This class provides +, &, |, +=, &=, |= operators for extending the list
|
||||
"""
|
||||
|
||||
def __init__(self, *seq):
|
||||
def __init__(self, *seq: Any):
|
||||
super(ItemsList, self).__init__(map(str, seq))
|
||||
|
||||
def add(self, other):
|
||||
def add(self, other: Iterable[str]) -> "ItemsList":
|
||||
self.extend(other)
|
||||
return self
|
||||
|
||||
|
|
@ -197,7 +208,7 @@ class ItemsList(list):
|
|||
|
||||
|
||||
class OrderedHelperMeta(type):
|
||||
def __new__(mcs, name, bases, namespace, **kwargs):
|
||||
def __new__(mcs, name: Any, bases: Any, namespace: Any, **kwargs: Any) -> "OrderedHelperMeta":
|
||||
cls = super().__new__(mcs, name, bases, namespace)
|
||||
|
||||
props_keys = []
|
||||
|
|
@ -209,7 +220,8 @@ class OrderedHelperMeta(type):
|
|||
|
||||
setattr(cls, PROPS_KEYS_ATTR_NAME, props_keys)
|
||||
|
||||
return cls
|
||||
# ref: https://gitter.im/python/typing?at=5da98cc5fa637359fc9cbfe1
|
||||
return cast(OrderedHelperMeta, cls)
|
||||
|
||||
|
||||
class OrderedHelper(metaclass=OrderedHelperMeta):
|
||||
|
|
|
|||
|
|
@ -1,51 +1,64 @@
|
|||
from __future__ import annotations
|
||||
import contextvars
|
||||
from typing import Type, TypeVar
|
||||
from typing import (
|
||||
Any,
|
||||
ClassVar,
|
||||
Generic,
|
||||
Optional,
|
||||
TypeVar,
|
||||
cast,
|
||||
overload,
|
||||
)
|
||||
|
||||
__all__ = ("DataMixin", "ContextInstanceMixin")
|
||||
__all__ = ("ContextInstanceMixin",)
|
||||
|
||||
from typing_extensions import Literal
|
||||
|
||||
|
||||
class DataMixin:
|
||||
@property
|
||||
def data(self):
|
||||
data = getattr(self, "_data", None)
|
||||
if data is None:
|
||||
data = {}
|
||||
setattr(self, "_data", data)
|
||||
return data
|
||||
|
||||
def __getitem__(self, item):
|
||||
return self.data[item]
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
self.data[key] = value
|
||||
|
||||
def __delitem__(self, key):
|
||||
del self.data[key]
|
||||
|
||||
def __contains__(self, item):
|
||||
return item in self.data
|
||||
|
||||
def get(self, key, default=None):
|
||||
return self.data.get(key, default)
|
||||
ContextInstance = TypeVar("ContextInstance")
|
||||
|
||||
|
||||
T = TypeVar("T")
|
||||
class ContextInstanceMixin(Generic[ContextInstance]):
|
||||
__context_instance: ClassVar[contextvars.ContextVar[ContextInstance]]
|
||||
|
||||
|
||||
class ContextInstanceMixin:
|
||||
def __init_subclass__(cls, **kwargs):
|
||||
super().__init_subclass__(**kwargs)
|
||||
def __init_subclass__(cls, **kwargs: Any) -> None:
|
||||
super().__init_subclass__()
|
||||
cls.__context_instance = contextvars.ContextVar(f"instance_{cls.__name__}")
|
||||
return cls
|
||||
|
||||
@overload
|
||||
@classmethod
|
||||
def get_current(cls) -> Optional[ContextInstance]:
|
||||
...
|
||||
|
||||
@overload # noqa: F811, it's overload, not redefinition
|
||||
@classmethod
|
||||
def get_current(cls, no_error: Literal[True]) -> Optional[ContextInstance]:
|
||||
...
|
||||
|
||||
@overload # noqa: F811, it's overload, not redefinition
|
||||
@classmethod
|
||||
def get_current(cls, no_error: Literal[False]) -> ContextInstance:
|
||||
...
|
||||
|
||||
@classmethod # noqa: F811, it's overload, not redefinition
|
||||
def get_current(cls, no_error: bool = True) -> Optional[ContextInstance]:
|
||||
# on mypy 0.770 I catch that contextvars.ContextVar always contextvars.ContextVar[Any]
|
||||
cls.__context_instance = cast(
|
||||
contextvars.ContextVar[ContextInstance], cls.__context_instance
|
||||
)
|
||||
|
||||
try:
|
||||
current: Optional[ContextInstance] = cls.__context_instance.get()
|
||||
except LookupError:
|
||||
if no_error:
|
||||
current = None
|
||||
else:
|
||||
raise
|
||||
|
||||
return current
|
||||
|
||||
@classmethod
|
||||
def get_current(cls: Type[T], no_error=True) -> T:
|
||||
if no_error:
|
||||
return cls.__context_instance.get(None)
|
||||
return cls.__context_instance.get()
|
||||
|
||||
@classmethod
|
||||
def set_current(cls: Type[T], value: T) -> contextvars.Token:
|
||||
def set_current(cls, value: ContextInstance) -> contextvars.Token[ContextInstance]:
|
||||
if not isinstance(value, cls):
|
||||
raise TypeError(
|
||||
f"Value should be instance of {cls.__name__!r} not {type(value).__name__!r}"
|
||||
|
|
@ -53,5 +66,5 @@ class ContextInstanceMixin:
|
|||
return cls.__context_instance.set(value)
|
||||
|
||||
@classmethod
|
||||
def reset_current(cls: Type[T], token: contextvars.Token):
|
||||
def reset_current(cls, token: contextvars.Token[ContextInstance]) -> None:
|
||||
cls.__context_instance.reset(token)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue