Source code for snow.resource

from urllib.parse import urljoin, urlencode
from typing import Iterable, Type, Union

from snow.exceptions import (
    SnowException,
    NoSchemaFields,
    TooManyItems,
    NoItems,
    SchemaError,
    SelectError
)

from snow.consts import Joined
from snow.request import Reader, Creator, Updater, Deleter
from snow.config import ConfigSchema

from .schema import Schema
from .query import QueryBuilder, Condition, select

from . import fields


[docs]class Resource: """ServiceNow API Resource Model Args: schema_cls (Schema): Schema class app (Application): Application instance Attributes: config (ConfigSchema): Application config url: API URL fields: Schema fields """ def __init__(self, schema_cls: Union[Type[Schema], Schema], app): self.app = app # Read Resource schema self.schema_cls = schema_cls self.fields = self._get_fields() self.primary_key = self._get_primary_key() # Configure self self.config = self.app.config self.url = urljoin(self.config.address, str(schema_cls.__location__)) self._resolve = any([f for f in self.fields.values() if f.joined != Joined.VALUE]) # Create helpers self.reader = Reader(self) self.updater = Updater(self) self.creator = Creator(self) self.deleter = Deleter(self) async def __aenter__(self): self.session = self.app.get_session() return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.session.close() def _get_primary_key(self): primary_keys = [n for n, f in self.fields.items() if f.is_primary is True] if len(primary_keys) > 1: raise SchemaError( f"Multiple primary keys (is_primary) supplied " f"in {self.name}. Maximum allowed is 1." ) elif len(primary_keys) == 0: return None return primary_keys[0] def _get_fields(self): schema_fields = getattr(self.schema_cls, "_declared_fields") if not schema_fields: raise NoSchemaFields(f"Schema {self.schema_cls} lacks fields definitions") return schema_fields @property def name(self): return self.schema_cls.__name__ def get_url(self, fragments=None): if fragments and not isinstance(fragments, list): raise SnowException(f"Expected a list of path fragments, got: {fragments}") params = dict( sysparm_fields=",".join(self.fields), sysparm_display_value="all" if self._resolve else "false" ) url = self.url if fragments: url += "/" + "/".join(fragments) return f"{url}{'?' + urlencode(params) if params else ''}"
[docs] def stream(self, selection=None, **kwargs) -> Iterable: """Stream-like async generator Fetches data in chunks using the ServiceNow pagination system. Chunk size determines the number of records to fetch in one go, and can be tweaked to Keyword Args: selection: Snow compatible query limit (int): Maximum number of records to return offset (int): Starting record index chunk_size (int): Number of records to fetch in one go Yields: list: Chunk of records """ return self.reader.stream( select(selection).sysparms, **kwargs )
[docs] async def get(self, selection=None, **kwargs) -> dict: """Buffered many Fetches data and stores in buffer. Note: It's recommended to use the stream method when dealing with large number of records. Keyword Args: selection: Snow compatible query limit (int): Maximum number of records to return offset (int): Starting record index Returns: list: Records """ return await self.reader.collect( select(selection).sysparms, **kwargs )
[docs] async def get_one(self, selection=None): """Get one record Args: selection: Snow compatible query Returns: dict: Record """ if not self.primary_key: raise SchemaError( f'The selected resource "{self.name}" cannot ' f'be queried: this schema lacks a field with "is_primary" set' ) items = await self.get(selection, limit=2) if len(items) > 1: raise TooManyItems("Too many results: expected one, got at least 2") elif len(items) == 0: raise NoItems("Expected a single object in response, got none") return items[0]
[docs] async def get_pk_value(self, selection): """Given a selection, return the resulting record's PK field's value Args: selection: Snow compatible query Returns: str: PK field's value """ record = await self.get_one(selection) return record[self.primary_key]
[docs] async def get_object_id(self, value): """Get object id by str or Condition Immediately return if value is str. Args: value: Condition or str Returns: str: Object id """ if isinstance(value, str): return value elif isinstance(value, Condition): return await self.get_pk_value(value) else: raise SelectError(f"Selection must be of type {Condition} or {str}")
[docs] async def update(self, selection, payload) -> dict: """Update matching record Args: selection: Condition or ID payload (dict): Update payload Returns: dict: Updated record """ object_id = await self.get_object_id(selection) return await self.updater.patch(object_id, payload)
[docs] async def create(self, payload): """Create a new record Args: payload (dict): New record payload Returns: dict: Created record """ return await self.creator.write(payload)
[docs] async def delete(self, selection): """Delete matching record Args: selection: Condition or ID Returns: dict: {"result": <status>} """ object_id = await self.get_object_id(selection) return await self.deleter.delete(object_id)