import json
import logging
import deform
import colander
import peppercorn
import sqlalchemy
import transaction
from pyramid.view import view_config, view_defaults
from sacrud.common import pk_list_to_dict
from pyramid.compat import escape, text_type
from pyramid_sacrud import PYRAMID_SACRUD_VIEW
from pyramid.renderers import render_to_response
from sqlalchemy.orm.exc import NoResultFound
from paginate_sqlalchemy import SqlalchemyOrmPage
from pyramid.httpexceptions import HTTPFound, HTTPNotFound
from pyramid_sacrud.exceptions import SacrudException
from pyramid_sacrud.localization import _ps
from .paginator import get_paginator
from .resources import (
ListResource,
CreateResource,
DeleteResource,
MassActionResource
)
[docs]def preprocessing_value(key, value, form):
for groups in form.children:
for column in groups:
if column.name == key:
if isinstance(column.typ, colander.Number):
try:
float(text_type(value))
except ValueError:
value = sqlalchemy.sql.null()
elif value is colander.null:
value = ""
return value
@view_defaults(permission=PYRAMID_SACRUD_VIEW)
[docs]class CRUD(object):
def __init__(self, context, request):
self.context = context
self.request = request
[docs] def commit(self):
try:
self.context.dbsession.commit()
except AssertionError:
transaction.commit()
[docs] def abort(self):
try:
self.context.dbsession.rollback()
except AssertionError:
transaction.abort()
[docs] def list_view_response(self):
return HTTPFound(
location='/' + self.request.sacrud_prefix + '/' +
self.request.resource_path(
self.context.get_list_resource(self.context)
)
)
[docs] def flash_message(self, message, status="success"):
if hasattr(self.request, 'session'):
self.request.session.flash([message, status])
[docs]class Read(CRUD):
@view_config(
request_method='GET',
context=ListResource,
route_name=PYRAMID_SACRUD_VIEW,
)
[docs] def list_view(self):
rows = self.context.sacrud.read()
try:
paginator_attr = get_paginator(
self.request, self.context.items_per_page
)
except ValueError:
raise HTTPNotFound
params = {
'paginator': SqlalchemyOrmPage(rows, **paginator_attr)
}
return render_to_response(
self.context.renderer, params, request=self.request
)
[docs]class Create(CRUD):
@view_config(
request_method='GET',
context=CreateResource,
route_name=PYRAMID_SACRUD_VIEW
)
@view_config(
request_method='POST',
context=CreateResource,
route_name=PYRAMID_SACRUD_VIEW
)
[docs] def edit_form_post_view(self):
form = self.context.form(self.request)
params = {'form': form.render()}
def get_reponse(form=None):
if form:
params['form'] = form
return render_to_response(
self.context.renderer, params, request=self.request
)
if 'form.submitted' in self.request.params:
controls = self.request.POST.items()
pstruct = peppercorn.parse(controls)
# Validate form
try:
deserialized = form.validate_pstruct(pstruct).values()
except deform.ValidationFailure as e:
return get_reponse(e.render())
data = {k: preprocessing_value(k, v, form) # TODO: optimize it
for d in deserialized
for k, v in d.items()}
# Update object
try:
if self.context.obj:
obj = self.context.sacrud._add(self.context.obj, data)
flash_action = 'updated'
else:
obj = self.context.sacrud.create(data)
flash_action = 'created'
name = obj.__repr__()
self.context.dbsession.flush()
except SacrudException as e:
self.flash_message(e.message, status=e.status)
return get_reponse()
except Exception as e:
self.abort()
logging.exception("Something awful happened!")
raise e
self.commit()
# Make response
self.flash_message(_ps(
u"You ${action} object of ${name}",
mapping={'action': flash_action, 'name': escape(name or '')}
))
return self.list_view_response()
return get_reponse()
[docs]class Delete(CRUD):
@view_config(
request_method='POST',
context=DeleteResource,
route_name=PYRAMID_SACRUD_VIEW
)
[docs] def delete_view(self):
if not hasattr(self.context, 'obj'):
raise HTTPNotFound
self.context.dbsession.delete(self.context.obj)
self.commit()
self.flash_message(
_ps("You have removed object of ${name}",
mapping={'name': escape(text_type(self.context.obj) or '')}))
return self.list_view_response()
@view_config(
request_method='POST',
context=MassActionResource,
request_param='mass_action=delete',
route_name=PYRAMID_SACRUD_VIEW
)
[docs] def mass_delete_view(self):
items_list = self.request.POST.getall('selected_item')
primary_keys = [pk_list_to_dict(json.loads(item))
for item in items_list]
objects = self.context.sacrud.read(*primary_keys)
try:
if hasattr(objects, 'delete'):
object_names = [escape(x.__repr__() or '') for x in objects]
objects.delete()
else:
object_names = [escape(objects.__repr__() or ''), ]
self.request.dbsession.delete(objects)
except (NoResultFound, KeyError):
raise HTTPNotFound
except SacrudException as e:
self.flash_message(e.message, status=e.status)
except Exception as e:
transaction.abort()
logging.exception("Something awful happened!")
raise e
transaction.commit()
self.flash_message(_ps("You delete the following objects:"))
[self.flash_message(obj) for obj in object_names]
return self.list_view_response()