"""
Adapters are used to convert the data model described by the SCIM 2.0
specification to a data model that fits the data provided by the application
implementing a SCIM api.
For example, in a Django app, there are User and Group models that do
not have the same attributes/fields that are defined by the SCIM 2.0
specification. The Django User model has both ``first_name`` and ``last_name``
attributes but the SCIM speicifcation requires this same data be sent under
the names ``givenName`` and ``familyName`` respectively.
An adapter is instantiated with a model instance. Eg::
user = get_user_model().objects.get(id=1)
scim_user = SCIMUser(user)
...
"""
from six.moves.urllib.parse import urljoin
from django.contrib.auth import get_user_model
from django.urls import reverse
from django import core
from . import constants
from .exceptions import BadRequestError
from .utils import get_base_scim_location_getter
from .utils import get_group_adapter
from .utils import get_user_adapter
class SCIMMixin(object):
def __init__(self, obj, request=None):
self.obj = obj
self._request = request
@property
def request(self):
if self._request:
return self._request
raise RuntimeError('Adapter is not associated with a request object. '
'Set object.request to avoid this error.')
@request.setter
def request(self, value):
self._request = value
@property
def id(self):
return str(self.obj.id)
@property
def path(self):
return reverse(self.url_name, kwargs={'uuid': self.obj.id})
@property
def location(self):
return urljoin(get_base_scim_location_getter()(self.request), self.path)
def save(self):
self.obj.save()
def delete(self):
self.obj.__class__.objects.filter(id=self.obj.id).delete()
def handle_operations(self, operations):
"""
The SCIM specification allows for making changes to specific attributes
of a model. These changes are sent in PUT requests and are batched into
operations to be performed on a object.Operations could be 'add',
'remove', 'replace', etc. This method iterates through all of the
operations in ``operations`` and calls the appropriate handler (defined
on the appropriate adapter) for each.
"""
for operation in operations:
op_code = operation.get('op').lower()
op_code = 'handle_' + op_code
handler = getattr(self, op_code)
handler(operation)
[docs]class SCIMUser(SCIMMixin):
"""
Adapter for adding SCIM functionality to a Django User object.
This adapter can be overriden; see the ``USER_ADAPTER`` setting
for details.
"""
# not great, could be more decoupled. But \__( )__/ whatevs.
url_name = 'scim:users'
resource_type = 'User'
@property
def display_name(self):
"""
Return the displayName of the user per the SCIM spec.
"""
if self.obj.first_name and self.obj.last_name:
return u'{0.first_name} {0.last_name}'.format(self.obj)
return self.obj.username
@property
def name_formatted(self):
return self.display_name
@property
def emails(self):
"""
Return the email of the user per the SCIM spec.
"""
return [{'value': self.obj.email, 'primary': True}]
@property
def groups(self):
"""
Return the groups of the user per the SCIM spec.
"""
group_qs = self.obj.groups.all()
scim_groups = [get_group_adapter()(g, self.request) for g in group_qs]
dicts = []
for group in scim_groups:
d = {
'value': group.id,
'$ref': group.location,
'display': group.display_name,
}
dicts.append(d)
return dicts
@property
def meta(self):
"""
Return the meta object of the user per the SCIM spec.
"""
d = {
'resourceType': self.resource_type,
'created': self.obj.date_joined.isoformat(),
'lastModified': self.obj.date_joined.isoformat(),
'location': self.location,
}
return d
[docs] def to_dict(self):
"""
Return a ``dict`` conforming to the SCIM User Schema,
ready for conversion to a JSON object.
"""
d = {
'schemas': [constants.SchemaURI.USER],
'id': self.id,
'userName': self.obj.username,
'name': {
'givenName': self.obj.first_name,
'familyName': self.obj.last_name,
'formatted': self.name_formatted,
},
'displayName': self.display_name,
'emails': self.emails,
'active': self.obj.is_active,
'groups': self.groups,
'meta': self.meta,
}
return d
[docs] def from_dict(self, d):
"""
Consume a ``dict`` conforming to the SCIM User Schema, updating the
internal user object with data from the ``dict``.
Please note, the user object is not saved within this method. To
persist the changes made by this method, please call ``.save()`` on the
adapter. Eg::
scim_user.from_dict(d)
scim_user.save()
"""
username = d.get('userName')
self.obj.username = username or ''
first_name = d.get('name', {}).get('givenName')
self.obj.first_name = first_name or ''
last_name = d.get('name', {}).get('familyName')
self.obj.last_name = last_name or ''
emails = d.get('emails', [])
primary_emails = [e['value'] for e in emails if e.get('primary')]
emails = primary_emails + emails
email = emails[0] if emails else None
self.obj.email = email
cleartext_password = d.get('password')
if cleartext_password:
self.obj.set_password(cleartext_password)
active = d.get('active')
if active is not None:
self.obj.is_active = active
[docs] @classmethod
def resource_type_dict(cls, request=None):
"""
Return a ``dict`` containing ResourceType metadata for the user object.
"""
id_ = cls.resource_type
path = reverse('scim:resource-types', kwargs={'uuid': id_})
location = urljoin(get_base_scim_location_getter()(request), path)
return {
'schemas': [constants.SchemaURI.RESOURCE_TYPE],
'id': id_,
'name': 'User',
'endpoint': reverse('scim:users'),
'description': 'User Account',
'schema': constants.SchemaURI.USER,
'meta': {
'location': location,
'resourceType': 'ResourceType'
}
}
[docs] def handle_replace(self, operation):
"""
Handle the replace operations.
"""
attr_map = {
'familyName': 'last_name',
'givenName': 'first_name',
'userName': 'username',
'active': 'is_active',
}
attrs = operation.get('value', {})
for attr, attr_value in attrs.items():
if attr in attr_map:
setattr(self.obj, attr_map.get(attr), attr_value)
elif attr == 'emails':
primary_emails = [e for e in attr_value if e.get('primary')]
if primary_emails:
email = primary_emails[0].get('value')
elif attr_value:
email = attr_value[0].get('value')
else:
raise BadRequestError('Invalid email value')
try:
validator = core.validators.EmailValidator()
validator(email)
except core.exceptions.ValidationError:
raise BadRequestError('Invalid email value')
self.obj.email = email
else:
raise NotImplementedError('Not Implemented')
self.obj.save()
[docs]class SCIMGroup(SCIMMixin):
"""
Adapter for adding SCIM functionality to a Django Group object.
This adapter can be overriden; see the ``GROUP_ADAPTER``
setting for details.
"""
# not great, could be more decoupled. But \__( )__/ whatevs.
url_name = 'scim:groups'
resource_type = 'Group'
@property
def display_name(self):
"""
Return the displayName of the group per the SCIM spec.
"""
return self.obj.name
@property
def members(self):
"""
Return a list of user dicts (ready for serialization) for the members
of the group.
:rtype: list
"""
users = self.obj.user_set.all()
scim_users = [get_user_adapter()(user, self.request) for user in users]
dicts = []
for user in scim_users:
d = {
'value': user.id,
'$ref': user.location,
'display': user.display_name,
}
dicts.append(d)
return dicts
@property
def meta(self):
"""
Return the meta object of the group per the SCIM spec.
"""
d = {
'resourceType': self.resource_type,
'location': self.location,
}
return d
[docs] def to_dict(self):
"""
Return a ``dict`` conforming to the SCIM User Schema,
ready for conversion to a JSON object.
"""
return {
'schemas': [constants.SchemaURI.GROUP],
'id': self.id,
'displayName': self.display_name,
'members': self.members,
'meta': self.meta,
}
[docs] def from_dict(self, d):
"""
Consume a ``dict`` conforming to the SCIM Group Schema, updating the
internal group object with data from the ``dict``.
Please note, the group object is not saved within this method. To
persist the changes made by this method, please call ``.save()`` on the
adapter. Eg::
scim_group.from_dict(d)
scim_group.save()
"""
name = d.get('displayName')
self.obj.name = name or ''
[docs] @classmethod
def resource_type_dict(cls, request=None):
"""
Return a ``dict`` containing ResourceType metadata for the group object.
"""
id_ = cls.resource_type
path = reverse('scim:resource-types', kwargs={'uuid': id_})
location = urljoin(get_base_scim_location_getter()(request), path)
return {
'schemas': [constants.SchemaURI.RESOURCE_TYPE],
'id': id_,
'name': 'Group',
'endpoint': reverse('scim:groups'),
'description': 'Group',
'schema': constants.SchemaURI.GROUP,
'meta': {
'location': location,
'resourceType': 'ResourceType'
}
}
[docs] def handle_add(self, operation):
"""
Handle add operations.
"""
if operation.get('path') == 'members':
members = operation.get('value', [])
ids = [int(member.get('value')) for member in members]
users = get_user_model().objects.filter(id__in=ids)
if len(ids) != users.count():
raise BadRequestError('Can not add a non-existent user to group')
for user in users:
self.obj.user_set.add(user)
else:
raise NotImplemented
[docs] def handle_remove(self, operation):
"""
Handle remove operations.
"""
if operation.get('path') == 'members':
members = operation.get('value', [])
ids = [int(member.get('value')) for member in members]
users = get_user_model().objects.filter(id__in=ids)
if len(ids) != users.count():
raise BadRequestError('Can not remove a non-existent user from group')
for user in users:
self.obj.user_set.remove(user)
else:
raise NotImplemented
[docs] def handle_replace(self, operation):
"""
Handle the replace operations.
"""
if operation.get('path') == 'name':
name = operation.get('value')[0].get('value')
self.obj.name = name
self.obj.save()
else:
raise NotImplemented