import re
import typing
from abc import ABC, abstractmethod
from ..fsm import FSMContext
from ...types import Event, MessageEvent, EventType, CallbackQueryEvent
[документация]class BoundFilter(metaclass=BoundFilterMeta):
key: str
[документация] @abstractmethod
def check(self, event: Event):
return NotImplemented
[документация]class EventTypeFilter(BoundFilter):
key = 'event_type'
def __init__(self, event_type: EventType):
self.event_type = event_type
[документация] def check(self, event: Event):
if event.type == self.event_type:
return True
return False
[документация]class TextFilter(BoundFilter):
key = 'text'
def __init__(self,
equals: str = None,
contains: str = None,
startswith: str = None,
endswith: str = None,
ignore_case=False):
"""
Можно использовать что-то одно из equals, contains, startswith и endswith
"""
check_count = sum(map(lambda x: x is not None, (equals, contains, startswith, endswith)))
if check_count > 1:
args = '` и `'.join([el[0]
for el in (('equals', equals), ('contains', contains),
('startswith', startswith), ('endswith', endswith))
if el[1] is not None])
raise ValueError(f"Аргументы `{args}` не могут использоваться вместе!")
elif check_count == 0:
raise ValueError("Ни один из требуемых аргументов не передан!")
self.equals = equals
self.contains = contains
self.startswith = startswith
self.endswith = endswith
self.ignore_case = ignore_case
def _pre_process_text(self, s):
return str(s).lower() if self.ignore_case else str(s)
[документация] def check(self, event: MessageEvent):
text = self._pre_process_text(event.message.text)
if self.equals:
return text == self._pre_process_text(self.equals)
elif self.contains:
return self._pre_process_text(self.contains) in text
elif self.startswith:
return text.startswith(self._pre_process_text(self.startswith))
elif self.endswith:
return text.endswith(self._pre_process_text(self.endswith))
[документация]class FunctionFilter(BoundFilter):
key = 'f'
def __init__(self, f):
self.f = f
[документация] def check(self, event: MessageEvent):
return self.f(event)
[документация]class CommandsFilter(BoundFilter):
key = 'commands'
def __init__(self,
commands: typing.List[str],
prefixes: str = '',
ignore_case: bool = False):
self.commands = tuple(map(str.lower, commands)) if ignore_case else commands
self.prefixes = prefixes
self.ignore_case = ignore_case
[документация] def check(self, event: MessageEvent):
message_text = event.message.text.lower() if self.ignore_case else event.message.text
if not message_text:
return False
full_command = message_text.split()[0]
prefix, command = ((full_command[0], full_command[1:])
if self.prefixes else (None, full_command))
if prefix and prefix not in self.prefixes:
return False
if command in self.commands:
return True
return False
[документация]class TypeFromFilter(BoundFilter):
key = 'frm'
def __init__(self, frm: str):
self.frm = frm
[документация] def check(self, event: MessageEvent):
if getattr(event, f'from_{self.frm}'):
return True
return False
[документация]class RegexpFilter(BoundFilter):
key = 'regexp'
def __init__(self, regexp: str):
self.regexp = regexp
[документация] def check(self, event: MessageEvent):
if re.search(self.regexp, event.message.text, re.IGNORECASE):
return True
return False
[документация]class FSMStateFilter(BoundFilter):
key = 'state'
def __init__(self, state: str):
self.state = state
[документация] def check(self, event: MessageEvent):
if self.state == FSMContext.get_current().get_state() or self.state == '*':
return True
return False
[документация]class PayloadFilter(BoundFilter):
key = 'payload'
def __init__(self, payload):
self.payload = payload
[документация] def check(self, event: CallbackQueryEvent):
return self.payload == event.payload
[документация]class Filters:
def __init__(self, *filters: AbstractFilter):
self.filters: typing.Tuple[AbstractFilter] = filters
[документация] def check_all(self, event: Event) -> bool:
if all(filter_.check(event) for filter_ in self.filters):
return True
return False
[документация]class FiltersFactory:
[документация] @classmethod
def get_filters(cls, *custom_filters, **bound_filters):
filters = (*(cls.get_filter_by_key(key, filter_value)
for key, filter_value in bound_filters.items()),
*(custom_filter if isinstance(custom_filter, (AbstractFilter, BoundFilter)) else custom_filter()
for custom_filter in custom_filters))
return filters
[документация] @classmethod
def get_filter_by_key(cls, key: str, filter_value):
for FILTER_CLASS in BoundFilter.registered_filters:
if FILTER_CLASS.key == key:
return FILTER_CLASS(filter_value)
raise cls.UnknownFilterException(f'Нет подходящего фильтра для `{key}`')