Source code for mongo_thingy.cursor

import functools


class _Proxy:
    def __init__(self, name):
        self.name = name

    def __call__(self, cursor):
        return getattr(cursor.delegate, self.name)


class _ChainingProxy(_Proxy):
    def __call__(self, cursor):
        method = getattr(cursor.delegate, self.name)

        @functools.wraps(method)
        def wrapper(*args, **kwargs):
            method(*args, **kwargs)
            return cursor

        return wrapper


class _BindingProxy(_Proxy):
    def __call__(self, cursor):
        method = getattr(cursor.delegate, self.name)

        @functools.wraps(method)
        def wrapper(*args, **kwargs):
            result = method(*args, **kwargs)
            return cursor.bind(result)

        return wrapper


class _AsyncBindingProxy(_Proxy):
    def __call__(self, cursor):
        method = getattr(cursor.delegate, self.name)

        @functools.wraps(method)
        async def wrapper(*args, **kwargs):
            result = await method(*args, **kwargs)
            if isinstance(result, list):
                return cursor.result_cls(cursor.bind(r) for r in result)
            return cursor.bind(result)

        return wrapper


class BaseCursor:
    distinct = _Proxy("distinct")
    explain = _Proxy("explain")
    limit = _ChainingProxy("limit")
    skip = _ChainingProxy("skip")
    sort = _ChainingProxy("sort")

    def __init__(self, delegate, thingy_cls=None, view=None):
        self.delegate = delegate
        self.thingy_cls = thingy_cls
        self.result_cls = getattr(thingy_cls, "_result_cls", list)

        if isinstance(view, str):
            view = self.get_view(view)

        self.thingy_view = view

    def __getattribute__(self, name):
        attribute = object.__getattribute__(self, name)
        if isinstance(attribute, _Proxy):
            return attribute(self)
        return attribute

    def bind(self, document):
        if not self.thingy_cls:
            return document
        thingy = self.thingy_cls(document)
        if self.thingy_view is not None:
            return self.thingy_view(thingy)
        return thingy

    def clone(self):
        delegate = self.delegate.clone()
        return self.__class__(
            delegate, thingy_cls=self.thingy_cls, view=self.thingy_view
        )

    def get_view(self, name):
        return self.thingy_cls._views[name]

    def view(self, name="defaults"):
        self.thingy_view = self.get_view(name)
        return self


[docs]class Cursor(BaseCursor): next = __next__ = _BindingProxy("__next__") def __getitem__(self, index): document = self.delegate.__getitem__(index) return self.bind(document)
[docs] def to_list(self, length): if length is not None: self.limit(length) return self.result_cls(self)
[docs] def delete(self): ids = self.distinct("_id") return self.thingy_cls.collection.delete_many({"_id": {"$in": ids}})
[docs] def first(self): try: document = self.delegate.clone().limit(-1).__next__() except StopIteration: return None return self.bind(document)
[docs]class AsyncCursor(BaseCursor): to_list = _AsyncBindingProxy("to_list") next = __anext__ = _AsyncBindingProxy("__anext__") async def __aiter__(self): async for document in self.delegate: yield self.bind(document)
[docs] async def delete(self): ids = await self.distinct("_id") return await self.thingy_cls.collection.delete_many({"_id": {"$in": ids}})
[docs] async def first(self): try: document = await self.delegate.clone().limit(-1).__anext__() except StopAsyncIteration: return None return self.bind(document)
__all__ = ["AsyncCursor", "Cursor"]