# -*- coding: utf-8 -*-
#
# Copyright (C) 2019-2023 CERN.
# Copyright (C) 2019-2020 Northwestern University.
#
# Invenio-Records-Permissions is free software; you can redistribute it
# and/or modify it under the terms of the MIT License; see LICENSE file for
# more details.
"""Invenio Records Permissions Generators."""
import operator
from abc import abstractmethod
from functools import reduce
from itertools import chain
from flask import current_app
from flask_principal import ActionNeed, UserNeed
from invenio_access import ActionRoles, ActionUsers
from invenio_access.permissions import (
any_user,
authenticated_user,
superuser_access,
system_process,
)
from invenio_search.engine import dsl
[docs]class Generator(object):
"""Parent class mapping the context when an action is allowed or denied.
It does so by *generating* "needed" and "excluded" Needs. At the search
level it implements the *query filters* to restrict the search.
Any context inherits from this class.
"""
[docs] def needs(self, **kwargs):
"""Enabling Needs."""
return []
[docs] def excludes(self, **kwargs):
"""Preventing Needs."""
return []
[docs] def query_filter(self, **kwargs):
"""Search filters."""
return []
[docs]class AnyUser(Generator):
"""Allows any user."""
[docs] def needs(self, **kwargs):
"""Enabling Needs."""
return [any_user]
[docs] def query_filter(self, **kwargs):
"""Match all in search."""
# TODO: Implement with new permissions metadata
return dsl.Q("match_all")
[docs]class SystemProcess(Generator):
"""Allows system_process role."""
[docs] def needs(self, **kwargs):
"""Enabling Needs."""
return [system_process]
[docs] def query_filter(self, identity=None, **kwargs):
"""Filters for current identity as system process."""
if system_process in identity.provides:
return dsl.Q("match_all")
else:
return []
[docs]class SystemProcessWithoutSuperUser(SystemProcess):
"""Allows system_process role, excluding superuser-access needs."""
@staticmethod
def _expand_superuser_access_action():
"""Fetch users and roles allowed for the superuser-access action."""
roles = (
ActionRoles.query_by_action(superuser_access).join(ActionRoles.role).all()
)
users = ActionUsers.query_by_action(superuser_access).all()
return chain(roles, users)
[docs] def excludes(self, **kwargs):
"""Preventing Needs."""
return [role.need for role in self._expand_superuser_access_action()]
[docs]class Disable(Generator):
"""Denies ALL users including users and roles allowed to superuser-access action."""
[docs] def excludes(self, **kwargs):
"""Preventing Needs."""
return [any_user]
[docs] def query_filter(self, **kwargs):
"""Match None in search."""
return ~dsl.Q("match_all")
[docs]class RecordOwners(Generator):
"""Allows record owners."""
[docs] def needs(self, record=None, **kwargs):
"""Enabling Needs."""
return [UserNeed(owner) for owner in record.get("owners", [])]
[docs] def query_filter(self, identity=None, **kwargs):
"""Filters for current identity as owner."""
for need in identity.provides:
if need.method == "id":
return dsl.Q("term", owners=need.value)
return []
[docs]class AnyUserIfPublic(Generator):
"""Allows any user if record is public.
TODO: Revisit when dealing with files.
"""
[docs] def needs(self, record=None, **kwargs):
"""Enabling Needs."""
is_restricted = record and record.get("_access", {}).get(
"metadata_restricted", False
)
return [any_user] if not is_restricted else []
[docs] def excludes(self, record=None, **kwargs):
"""Preventing Needs."""
return []
[docs] def query_filter(self, **kwargs):
"""Filters for non-restricted records."""
# TODO: Implement with new permissions metadata
return dsl.Q("term", **{"_access.metadata_restricted": False})
[docs]class AuthenticatedUser(Generator):
"""Allows authenticated users."""
[docs] def needs(self, **kwargs):
"""Enabling Needs."""
return [authenticated_user]
[docs] def query_filter(self, **kwargs):
"""Filters for current identity as super user."""
# TODO: Implement with new permissions metadata
return dsl.Q("match_all")
[docs]class AllowedByAccessLevel(Generator):
"""Allows users/roles/groups that have an appropriate access level."""
# TODO: Implement other access levels:
# 'metadata_reader'
# 'files_reader'
# 'files_curator'
# 'superuser'
ACTION_TO_ACCESS_LEVELS = {
"create": [],
"read": ["metadata_curator"],
"update": ["metadata_curator"],
"delete": [],
}
def __init__(self, action="read"):
"""Constructor."""
self.action = action
[docs] def needs(self, record=None, **kwargs):
"""Enabling UserNeeds for each person."""
if not record:
return []
access_levels = AllowedByAccessLevel.ACTION_TO_ACCESS_LEVELS.get(
self.action, []
)
# Name "identity" is used bc it correlates with flask-principal
# identity while not being one.
allowed_identities = chain.from_iterable(
[
record.get("internal", {})
.get("access_levels", {})
.get(access_level, [])
for access_level in access_levels
]
)
return [
UserNeed(identity.get("id"))
for identity in allowed_identities
if identity.get("scheme") == "person" and identity.get("id")
# TODO: Implement other schemes
]
[docs] def query_filter(self, identity=None, **kwargs):
"""Search filter for the current user with this generator."""
id_need = next(
(need for need in identity.provides if need.method == "id"), None
)
if not id_need:
return []
# To get the record in the search results, the access level must
# have been put in the 'read' array
read_levels = AllowedByAccessLevel.ACTION_TO_ACCESS_LEVELS.get("read", [])
queries = [
dsl.Q(
"term",
**{
"internal.access_levels.{}".format(access_level): {
"scheme": "person",
"id": id_need.value
# TODO: Implement other schemes
}
}
)
for access_level in read_levels
]
return reduce(operator.or_, queries)
[docs]class AdminAction(Generator):
"""Generator for admin needs.
This generator's purpose is to be used in cases where administration needs are required.
The query filter of this generator is quite broad (match_all). Therefore, it must be used with care.
"""
def __init__(self, action):
"""Constructor."""
self.action = action
super().__init__()
[docs] def needs(self, **kwargs):
"""Enabling Needs."""
return [self.action]
[docs] def query_filter(self, identity, **kwargs):
"""Not implemented at this level."""
for need in identity.provides:
if need.value == self.action.value:
return dsl.Q("match_all")
return []
[docs]class ConditionalGenerator(Generator):
"""Generator that depends on whether a condition is true or not.
.. code-block::python
If...(
then_=[...],
else_=[...],
)
"""
def __init__(self, then_, else_):
"""Constructor."""
self.then_ = then_
self.else_ = else_
@abstractmethod
def _condition(self, **kwargs):
"""Condition to choose generators set."""
raise NotImplementedError()
def _generators(self, record, **kwargs):
"""Get the "then" or "else" generators."""
return self.then_ if self._condition(record=record, **kwargs) else self.else_
[docs] def needs(self, record=None, **kwargs):
"""Set of Needs granting permission."""
needs = [
g.needs(record=record, **kwargs) for g in self._generators(record, **kwargs)
]
return set(chain.from_iterable(needs))
[docs] def excludes(self, record=None, **kwargs):
"""Set of Needs denying permission."""
excludes = [
g.excludes(record=record, **kwargs)
for g in self._generators(record, **kwargs)
]
return set(chain.from_iterable(excludes))
@staticmethod
def _make_query(generators, **kwargs):
"""Make a query for one set of generators."""
queries = [g.query_filter(**kwargs) for g in generators]
queries = [q for q in queries if q]
return reduce(operator.or_, queries) if queries else None
[docs]class IfConfig(ConditionalGenerator):
"""Config-based conditional generator."""
def __init__(self, config_key, accept_values=None, **kwargs):
"""Initialize generator."""
self.accept_values = accept_values or [True]
self.config_key = config_key
super().__init__(**kwargs)
def _condition(self, **_):
"""Check if the config value is truthy."""
return current_app.config.get(self.config_key) in self.accept_values
#
# | Meta Restricted | Files Restricted | Access Right | Result |
# |-----------------|------------------|--------------|--------|
# | True | True | Not Open | False |
# |-----------------|------------------|--------------|--------|
# | True | True | Open | False | # Inconsistent
# |-----------------|------------------|--------------|--------|
# | True | False | Not Open | False | # Inconsistent
# |-----------------|------------------|--------------|--------|
# | True | False | Open | False | # Inconsistent
# |-----------------|------------------|--------------|--------|
# | False | True | Not Open | False | ??Inconsistent
# |-----------------|------------------|--------------|--------|
# | False | True | Open | False |
# |-----------------|------------------|--------------|--------|
# | False | False | Not Open | False | # Inconsistent
# |-----------------|------------------|--------------|--------|
# | False | False | Open | True |
# |-----------------|------------------|--------------|--------|
#