Source code for atomdb.nosql
"""
Copyright (c) 2018-2022, CodeLV.
Distributed under the terms of the MIT License.
The full license is in the file LICENSE.text, distributed with this software.
Created on Jun 12, 2018
"""
import weakref
import bson
from atom.api import Atom, Dict, Instance, Typed, Value
from .base import JSONSerializer, Model, ModelManager, ModelSerializer, find_subclasses
[docs]class NoSQLModelSerializer(ModelSerializer):
"""Handles serializing and deserializing of Model subclasses. It
will automatically save and restore references where present.
"""
[docs] async def get_or_create(self, cls, state, scope):
"""Restore an object from the database. If the object is cached,
use that instead.
"""
# Check if this is in the cache
pk = state.get("_id")
cache = cls.objects.cache
if pk is not None:
obj = cache.get(pk)
else:
obj = None
if obj is None:
# Create and cache it
obj = cls.__new__(cls)
if pk is not None:
cache[pk] = obj
# This ideally should only be done if created
return (obj, True)
return (obj, False)
[docs] async def get_object_state(self, obj, state, scope):
ModelType = obj.__class__
return await ModelType.objects.find_one({"_id": state["_id"]})
[docs] def flatten_object(self, obj, scope):
ref = obj.__ref__
if ref in scope:
return {"__ref__": ref, "__model__": obj.__model__}
scope[ref] = obj
state = obj.__getstate__(scope)
_id = state.get("_id")
if _id is None:
return state
return {"_id": _id, "__ref__": ref, "__model__": obj.__model__}
def _default_registry(self):
"""Add all nosql and json models to the registry"""
registry = JSONSerializer.instance().registry.copy()
registry.update({m.__model__: m for m in find_subclasses(NoSQLModel)})
return registry
[docs]class NoSQLDatabaseProxy(Atom):
"""A proxy to the collection which holds a cache of model objects."""
#: Object cache
cache = Typed(weakref.WeakValueDictionary, ())
#: Database handle
table = Value()
def __getattr__(self, name):
return getattr(self.table, name)
[docs]class NoSQLModelManager(ModelManager):
"""A descriptor so you can use this somewhat like Django's models.
Assuming your using motor or txmongo.
Examples
--------
MyModel.objects.find_one({'_id':'someid})
"""
#: Table proxy cache
proxies = Dict()
def __get__(self, obj, cls=None):
"""Handle objects from the class that owns the manager"""
cls = cls or obj.__class__
if not issubclass(cls, Model):
return self # Only return the collection when used from a Model
proxy = self.proxies.get(cls)
if proxy is None:
proxy = self.proxies[cls] = NoSQLDatabaseProxy(
table=self.database[cls.__model__]
)
return proxy
def _default_database(self):
raise EnvironmentError(
"No database has been set. Use "
"NoSQLModelManager.instance().database = <db>"
)
[docs]class NoSQLModel(Model):
"""An atom model that can be serialized and deserialized to and from
MongoDB.
"""
#: ID of this object in the database
_id = Instance(bson.ObjectId) # type: ignore
#: Handles encoding and decoding
serializer = NoSQLModelSerializer.instance()
#: Handles database access
objects = NoSQLModelManager.instance()
[docs] @classmethod
async def restore(cls, state, force=False):
"""Restore an object from the database. If the object is cached,
use that instead.
"""
pk = state["_id"]
if pk:
# Check if this is in the cache
cache = cls.objects.cache
obj = cache.get(pk)
else:
obj = None
# Restore
if obj is None:
# Create and cache it
obj = cls.__new__(cls)
if pk:
cache[pk] = obj
restore = True
else:
restore = force
if restore:
await obj.__restorestate__(state)
return obj
[docs] async def load(self):
"""Alias to load this object from the database"""
pk = self._id
if self.__restored__ or pk is None:
return # Already loaded or nothing to load
state = await self.objects.find_one({"_id": pk})
if state is not None:
await self.__restorestate__(state)
[docs] async def save(self):
"""Alias to delete this object to the database"""
db = self.objects
state = self.__getstate__()
if self._id is None:
r = await db.insert_one(state)
self._id = r.inserted_id
db.cache[self._id] = self
else:
r = await db.replace_one({"_id": self._id}, state, upsert=True)
self.__restored__ = True
return r
[docs] async def delete(self):
"""Alias to delete this object in the database"""
db = self.objects
pk = self._id
if pk:
r = await db.delete_one({"_id": pk})
del db.cache[pk]
del self._id
return r