Prevent update handling task pointers from being garbage collected, backport of #1328 (#1331)

* Preserve update handling task pointers, backport of #1328

* Changelog

* Typing improvements
This commit is contained in:
Sergey Akentev 2023-10-08 18:13:06 +03:00 committed by GitHub
parent f681afb879
commit cad42580dd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 11 additions and 4 deletions

1
CHANGES/1331.misc Normal file
View file

@ -0,0 +1 @@
Prevent update handling task pointers from being garbage collected, backport from 2.x

View file

@ -6,7 +6,7 @@ import signal
import warnings import warnings
from asyncio import CancelledError, Event, Future, Lock from asyncio import CancelledError, Event, Future, Lock
from contextlib import suppress from contextlib import suppress
from typing import Any, AsyncGenerator, Dict, List, Optional, Union from typing import Any, AsyncGenerator, Dict, List, Optional, Set, Union
from .. import loggers from .. import loggers
from ..client.bot import Bot from ..client.bot import Bot
@ -95,6 +95,7 @@ class Dispatcher(Router):
self._running_lock = Lock() self._running_lock = Lock()
self._stop_signal: Optional[Event] = None self._stop_signal: Optional[Event] = None
self._stopped_signal: Optional[Event] = None self._stopped_signal: Optional[Event] = None
self._handle_update_tasks: Set[asyncio.Task[Any]] = set()
def __getitem__(self, item: str) -> Any: def __getitem__(self, item: str) -> Any:
return self.workflow_data[item] return self.workflow_data[item]
@ -349,7 +350,9 @@ class Dispatcher(Router):
): ):
handle_update = self._process_update(bot=bot, update=update, **kwargs) handle_update = self._process_update(bot=bot, update=update, **kwargs)
if handle_as_tasks: if handle_as_tasks:
asyncio.create_task(handle_update) handle_update_task = asyncio.create_task(handle_update)
self._handle_update_tasks.add(handle_update_task)
handle_update_task.add_done_callback(self._handle_update_tasks.discard)
else: else:
await handle_update await handle_update
finally: finally:

View file

@ -2,7 +2,7 @@ import asyncio
import secrets import secrets
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from asyncio import Transport from asyncio import Transport
from typing import Any, Awaitable, Callable, Dict, Optional, Tuple, cast from typing import Any, Awaitable, Callable, Dict, Optional, Set, Tuple, cast
from aiohttp import MultipartWriter, web from aiohttp import MultipartWriter, web
from aiohttp.abc import Application from aiohttp.abc import Application
@ -98,6 +98,7 @@ class BaseRequestHandler(ABC):
self.dispatcher = dispatcher self.dispatcher = dispatcher
self.handle_in_background = handle_in_background self.handle_in_background = handle_in_background
self.data = data self.data = data
self._background_feed_update_tasks: Set[asyncio.Task[Any]] = set()
def register(self, app: Application, /, path: str, **kwargs: Any) -> None: def register(self, app: Application, /, path: str, **kwargs: Any) -> None:
""" """
@ -139,11 +140,13 @@ class BaseRequestHandler(ABC):
await self.dispatcher.silent_call_request(bot=bot, result=result) await self.dispatcher.silent_call_request(bot=bot, result=result)
async def _handle_request_background(self, bot: Bot, request: web.Request) -> web.Response: async def _handle_request_background(self, bot: Bot, request: web.Request) -> web.Response:
asyncio.create_task( feed_update_task = asyncio.create_task(
self._background_feed_update( self._background_feed_update(
bot=bot, update=await request.json(loads=bot.session.json_loads) bot=bot, update=await request.json(loads=bot.session.json_loads)
) )
) )
self._background_feed_update_tasks.add(feed_update_task)
feed_update_task.add_done_callback(self._background_feed_update_tasks.discard)
return web.json_response({}, dumps=bot.session.json_dumps) return web.json_response({}, dumps=bot.session.json_dumps)
def _build_response_writer( def _build_response_writer(