Исходный код vk_maria.dispatcher.filters.filters

import re
import typing
from abc import ABC, abstractmethod

from ..fsm import FSMContext
from ...types import Event, MessageEvent, EventType, CallbackQueryEvent


[документация]class AbstractFilter(ABC):
[документация] @abstractmethod def check(self, event: Event): return NotImplemented
[документация]class BoundFilterMeta(type): registered_filters = [] def __new__(mcs, name, bases, namespace): cls = super().__new__(mcs, name, bases, namespace) if cls.__name__ != 'BoundFilter': mcs.registered_filters.append(cls) return cls
[документация]class BoundFilter(metaclass=BoundFilterMeta): key: str
[документация] @abstractmethod def check(self, event: Event): return NotImplemented
[документация]class EventTypeFilter(BoundFilter): key = 'event_type' def __init__(self, event_type: EventType): self.event_type = event_type
[документация] def check(self, event: Event): if event.type == self.event_type: return True return False
[документация]class TextFilter(BoundFilter): key = 'text' def __init__(self, equals: str = None, contains: str = None, startswith: str = None, endswith: str = None, ignore_case=False): """ Можно использовать что-то одно из equals, contains, startswith и endswith """ check_count = sum(map(lambda x: x is not None, (equals, contains, startswith, endswith))) if check_count > 1: args = '` и `'.join([el[0] for el in (('equals', equals), ('contains', contains), ('startswith', startswith), ('endswith', endswith)) if el[1] is not None]) raise ValueError(f"Аргументы `{args}` не могут использоваться вместе!") elif check_count == 0: raise ValueError("Ни один из требуемых аргументов не передан!") self.equals = equals self.contains = contains self.startswith = startswith self.endswith = endswith self.ignore_case = ignore_case def _pre_process_text(self, s): return str(s).lower() if self.ignore_case else str(s)
[документация] def check(self, event: MessageEvent): text = self._pre_process_text(event.message.text) if self.equals: return text == self._pre_process_text(self.equals) elif self.contains: return self._pre_process_text(self.contains) in text elif self.startswith: return text.startswith(self._pre_process_text(self.startswith)) elif self.endswith: return text.endswith(self._pre_process_text(self.endswith))
[документация]class FunctionFilter(BoundFilter): key = 'f' def __init__(self, f): self.f = f
[документация] def check(self, event: MessageEvent): return self.f(event)
[документация]class CommandsFilter(BoundFilter): key = 'commands' def __init__(self, commands: typing.List[str], prefixes: str = '', ignore_case: bool = False): self.commands = tuple(map(str.lower, commands)) if ignore_case else commands self.prefixes = prefixes self.ignore_case = ignore_case
[документация] def check(self, event: MessageEvent): message_text = event.message.text.lower() if self.ignore_case else event.message.text if not message_text: return False full_command = message_text.split()[0] prefix, command = ((full_command[0], full_command[1:]) if self.prefixes else (None, full_command)) if prefix and prefix not in self.prefixes: return False if command in self.commands: return True return False
[документация]class TypeFromFilter(BoundFilter): key = 'frm' def __init__(self, frm: str): self.frm = frm
[документация] def check(self, event: MessageEvent): if getattr(event, f'from_{self.frm}'): return True return False
[документация]class RegexpFilter(BoundFilter): key = 'regexp' def __init__(self, regexp: str): self.regexp = regexp
[документация] def check(self, event: MessageEvent): if re.search(self.regexp, event.message.text, re.IGNORECASE): return True return False
[документация]class FSMStateFilter(BoundFilter): key = 'state' def __init__(self, state: str): self.state = state
[документация] def check(self, event: MessageEvent): if self.state == FSMContext.get_current().get_state() or self.state == '*': return True return False
[документация]class PayloadFilter(BoundFilter): key = 'payload' def __init__(self, payload): self.payload = payload
[документация] def check(self, event: CallbackQueryEvent): return self.payload == event.payload
[документация]class Filters: def __init__(self, *filters: AbstractFilter): self.filters: typing.Tuple[AbstractFilter] = filters
[документация] def check_all(self, event: Event) -> bool: if all(filter_.check(event) for filter_ in self.filters): return True return False
[документация]class FiltersFactory:
[документация] class UnknownFilterException(Exception): pass
[документация] @classmethod def get_filters(cls, *custom_filters, **bound_filters): filters = (*(cls.get_filter_by_key(key, filter_value) for key, filter_value in bound_filters.items()), *(custom_filter if isinstance(custom_filter, (AbstractFilter, BoundFilter)) else custom_filter() for custom_filter in custom_filters)) return filters
[документация] @classmethod def get_filter_by_key(cls, key: str, filter_value): for FILTER_CLASS in BoundFilter.registered_filters: if FILTER_CLASS.key == key: return FILTER_CLASS(filter_value) raise cls.UnknownFilterException(f'Нет подходящего фильтра для `{key}`')