Source code for django_scim.filters
"""
Filter transformers are used to convert the SCIM query and filter syntax into
valid SQL queries.
"""
import dateutil.parser
import itertools
import re
from plyplus import Grammar, STransformer, PlyplusException
from django.contrib.auth import get_user_model
from django.contrib.auth.models import Group
from .grammars import USER_GRAMMAR
from .grammars import GROUP_GRAMMAR
STRING_REPLACEMENT_RE_PAT = re.compile(r'\%\([^).]+\)s', re.MULTILINE)
[docs]class SCIMUserFilterTransformer(STransformer):
"""Transforms a PlyPlus parse tree into a tuple containing a raw SQL query
and a dict with query parameters to go with the query."""
# data types:
t_string = lambda self, exp: exp.tail[0][1:-1]
t_date = lambda self, exp: dateutil.parser.parse(exp.tail[0][1:-1])
t_bool = lambda self, exp: exp.tail[0] == u'true'
# operators
op_or = lambda self, exp: u'OR'
op_and = lambda self, exp: u'AND'
gt = ge = lt = le = pr = eq = co = sw = lambda self, ex: ex.tail[0].lower()
# fully qualified column names:
pk = lambda *args: u'u.id'
username = lambda *args: u'u.username'
external_id = lambda *args: u'u.external_id'
password = lambda *args: u'u.password'
first_name = lambda *args: u'u.first_name'
last_name = lambda *args: u'u.last_name'
email = lambda *args: u'u.email'
date_joined = lambda *args: u'u.date_joined'
is_active = lambda *args: u'u.is_active'
@property
def auth_user_db_table(self):
return get_user_model()._meta.db_table
# expressions:
def logical_or(self, exp):
# We're not doing a simple 'OR', as that doesn't scale when the two
# conditions operate on different tables and rely on indices.
# This is because Postgres' query planner cannot leverage indices from
# different tables. Instead, we'll use a UNION.
#
# http://grokbase.com/t/postgresql/pgsql-general/034qrq6me0/left-join-not-using-index
return u"""
u.id IN (
WITH users AS (
SELECT DISTINCT u.id
FROM {auth_user_db_table} u
{join}
WHERE
{operand1}
UNION
SELECT DISTINCT u.id
FROM {auth_user_db_table} u
{join}
WHERE
{operand2}
)
SELECT DISTINCT id FROM users
)
""".format(join=self.join(),
auth_user_db_table=self.auth_user_db_table,
operand1=exp.tail[0],
operand2=exp.tail[1])
def logical_and(self, exp):
op1, op2 = exp.tail
if isinstance(op1, self.PasswordExpression):
params = {'fragment': op2, 'password': op1}
elif isinstance(op2, self.PasswordExpression):
params = {'fragment': op1, 'password': op2}
else:
return u'(%s AND %s)' % (op1, op2)
# When expensive CRYPT functions are involved we can't rely on
# Postgres' query planner to chose the optimal order in which to
# evaluate the conditions.
# If a password condition is AND'ed against something else, we
# explicitly write SQL that will force Postgres to compute the password
# hashes on the *result* of the other condition, to minimize the number
# of CRYPT calculations.
return u"""
u.id IN
(
WITH users AS (
SELECT DISTINCT u.id, u.password
FROM {auth_user_db_table} u
{join}
WHERE {fragment}
)
SELECT DISTINCT users.id
FROM users
WHERE {password}
)""".format(join=self.join(),
auth_user_db_table=self.auth_user_db_table,
**params)
__default__ = lambda self, exp: exp.tail[0]
def __init__(self):
self._seq = itertools.count(0, step=1)
self._params = {}
def _push_param(self, value):
name = str(next(self._seq))
self._params[name] = value
return name
[docs] def join(self):
"""Returns join expressions. E.g.
JOIN bb_userprofile p ON p.user_id = u.id
"""
return ''
def start(self, exp):
return u"""
SELECT DISTINCT u.*
FROM {auth_user_db_table} u
{join}
WHERE {fragment}
ORDER BY u.id ASC
""".format(join=self.join(),
auth_user_db_table=self.auth_user_db_table,
fragment=exp.tail[0]), self._params
def un_expr(self, exp):
return u'%s IS NOT NULL' % exp.tail[0]
def un_string_expr(self, exp):
# Django uses empty strings instead of NULL in VARCHARs:
return u"(%s IS NOT NULL AND %s != '')" % (exp.tail[0], exp.tail[0])
def bin_string_expr(self, exp):
field, op, literal = exp.tail
if op == u'eq':
return u'UPPER(%s) = UPPER(%%(%s)s)' % (field, self._push_param(literal))
elif op == u'sw':
literal += u'%'
elif op == u'co':
literal = u'%' + literal + u'%'
return u'%s ILIKE %%(%s)s' % (field, self._push_param(literal))
def bin_date_expr(self, exp):
field, op, literal = exp.tail
op = {u'gt': u'>', u'ge': u'>=', u'lt': u'<', u'le': u'<='}[op]
return u'%s %s %%(%s)s' % (field, op, self._push_param(literal))
def bin_bool_expr(self, exp):
return u'%s = %%(%s)s' % (exp.tail[0], self._push_param(exp.tail[2]))
def bin_pk_expr(self, exp):
field, op, value = exp.tail
return u'%s = %%(%s)s' % (field, self._push_param(int(value)))
def bin_passwd_expr(self, exp):
pname = self._push_param(exp.tail[2])
return self.PasswordExpression(u"""
(
CHAR_LENGTH(password) >= 51
AND
(
-- Check for SHA1
SPLIT_PART(password, '$', 3) = ENCODE(DIGEST(
SPLIT_PART(password, '$', 2')||%%(%s)s, 'sha1'), 'hex')
OR
-- Check for BCrypt
SUBSTRING(password FROM 8) = CRYPT(
%%(%s)s, SUBSTRING(password FROM 8))
OR
-- Check for BCryptSHA256
SUBSTRING(password FROM 15) = CRYPT(
ENCODE(DIGEST(%%(%s)s, 'sha256'), 'hex'),
SUBSTRING(password FROM 15))
)
)""" % (pname, pname, pname))
@classmethod
def condition_sql_and_params(cls, sql, params):
# replace %(id)s with %s and update params accordingly
replacements = STRING_REPLACEMENT_RE_PAT.findall(sql)
new_sql = STRING_REPLACEMENT_RE_PAT.sub('%s', sql, count=len(replacements))
new_params = []
for replacement in replacements:
# strip '%(' adn ')s' from replacement arg
replacement = replacement[2:-2]
new_params.append(params.get(replacement))
return new_sql, new_params
[docs] @classmethod
def search(cls, query, request=None):
"""Takes a SCIM 1.1 filter query and returns a Django `QuerySet` that
contains zero or more user model instances.
:param unicode query: a `unicode` query string.
"""
try:
sql, params = cls().transform(USER_GRAMMAR.parse(query))
sql, params = cls.condition_sql_and_params(sql, params)
except PlyplusException as e:
raise ValueError(e)
else:
return get_user_model().objects.raw(sql, params)
class PasswordExpression(object):
def __init__(self, sql):
assert isinstance(sql, unicode)
self.sql = sql
def __str__(self):
return self.sql.encode('utf-8')
def __unicode__(self):
return self.sql
[docs]class SCIMGroupFilterTransformer(STransformer):
"""Transforms a PlyPlus parse tree into a tuple containing a raw SQL query
and a dict with query parameters to go with the query."""
# data types:
t_string = lambda self, exp: exp.tail[0][1:-1]
# operators
op_or = lambda self, exp: u'OR'
op_and = lambda self, exp: u'AND'
pr = eq = co = sw = lambda self, ex: ex.tail[0].lower()
# fully qualified column names:
pk = lambda *args: u'g.id'
name = lambda *args: u'g.name'
@property
def group_table(self):
return Group._meta.db_table
# expressions:
def logical_or(self, exp):
# We're not doing a simple 'OR', as that doesn't scale when the two
# conditions operate on different tables and rely on indices.
# This is because Postgres' query planner cannot leverage indices from
# different tables. Instead, we'll use a UNION.
#
# http://grokbase.com/t/postgresql/pgsql-general/034qrq6me0/left-join-not-using-index
return u"""
g.id IN (
WITH group AS (
SELECT DISTINCT g.id
FROM {group_table} g
{join}
WHERE
{operand1}
UNION
SELECT DISTINCT g.id
FROM {group_table} g
{join}
WHERE
{operand2}
)
SELECT DISTINCT id FROM groups
)
""".format(join=self.join(),
group_table=self.group_table,
operand1=exp.tail[0],
operand2=exp.tail[1])
def logical_and(self, exp):
op1, op2 = exp.tail
return u'(%s AND %s)' % (op1, op2)
__default__ = lambda self, exp: exp.tail[0]
def __init__(self):
self._seq = itertools.count(0, step=1)
self._params = {}
def _push_param(self, value):
name = str(next(self._seq))
self._params[name] = value
return name
[docs] def join(self):
"""Returns join expressions. E.g.
JOIN bb_userprofile p ON p.user_id = u.id
"""
return ''
def start(self, exp):
return u"""
SELECT DISTINCT g.*
FROM {group_table} g
{join}
WHERE {fragment}
ORDER BY g.id ASC
""".format(join=self.join(),
group_table=self.group_table,
fragment=exp.tail[0]), self._params
def un_string_expr(self, exp):
# Django uses empty strings instead of NULL in VARCHARs:
return u"(%s IS NOT NULL AND %s != '')" % (exp.tail[0], exp.tail[0])
def bin_string_expr(self, exp):
field, op, literal = exp.tail
if op == u'eq':
return u'UPPER(%s) = UPPER(%%(%s)s)' % (field, self._push_param(literal))
elif op == u'sw':
literal += u'%'
elif op == u'co':
literal = u'%' + literal + u'%'
return u'%s ILIKE %%(%s)s' % (field, self._push_param(literal))
def bin_pk_expr(self, exp):
field, op, value = exp.tail
return u'%s = %%(%s)s' % (field, self._push_param(int(value)))
@classmethod
def condition_sql_and_params(cls, sql, params):
# replace %(id)s with %s and update params accordingly
replacements = STRING_REPLACEMENT_RE_PAT.findall(sql)
new_sql = STRING_REPLACEMENT_RE_PAT.sub('%s', sql, count=len(replacements))
new_params = []
for replacement in replacements:
# strip '%(' adn ')s' from replacement arg
replacement = replacement[2:-2]
new_params.append(params.get(replacement))
return new_sql, new_params
[docs] @classmethod
def search(cls, query, request=None):
"""Takes a SCIM 1.1 filter query and returns a Django `QuerySet` that
contains zero or more group model instances.
:param unicode query: a `unicode` query string.
"""
try:
sql, params = cls().transform(GROUP_GRAMMAR.parse(query))
sql, params = cls.condition_sql_and_params(sql, params)
except PlyplusException as e:
raise ValueError(e)
else:
return Group.objects.raw(sql, params)