Source code for abenc_ca_cpabe_ar17

'''
**User Collusion Avoidance CP-ABE (AR17)**

*Authors:* Jiguo Li, Wei Yao, Jinguang Han, Yichen Zhang, Jian Shen

| **Title:** "User Collusion Avoidance CP-ABE With Efficient Attribute Revocation for Cloud Storage"
| **Published in:** IEEE Systems Journal, 2017
| **Available from:** https://ieeexplore.ieee.org/abstract/document/7867082
| **Notes:** Supports user collusion avoidance with efficient attribute revocation

.. rubric:: Scheme Properties

* **Type:** ciphertext-policy attribute-based encryption (public key)
* **Setting:** Pairing groups
* **Assumption:** Decisional Bilinear Diffie-Hellman

.. rubric:: Implementation

:Authors: Ahmed Bakr
:Date: 07/2023
'''
from charm.toolbox.pairinggroup import PairingGroup,ZR,G1,G2,GT,pair
from charm.toolbox.secretutil import SecretUtil
from charm.toolbox.ABEnc import ABEnc, Input, Output

from typing import Dict, List, Tuple
import queue

# type annotations
mk_t = {'beta':ZR, 'g_alpha':G1 }
pp_t = { 'g':G1, 'g_beta':G1, 'g_1_over_beta':G1, 'e_gg_alpha':GT }



[docs] class TreeNode: def __init__(self, sequence_number, value, parent=None): self.parent = parent self.sequence_number = sequence_number self.value = value self.left = None self.right = None def __str__(self): return str(self.sequence_number) def __repr__(self): return self.__str__()
[docs] class UsersBinaryTree: """ A binary tree that is used to assign users to leafs in a deterministic way. The tree is created and maintained by the AM. """ def __init__(self, group_obj): self.group = group_obj self.leafs_queue = queue.Queue() self.sequence_number = 0 self.root = self.create_node() self.leafs_queue.put(self.root) self.__curr_node = self.leafs_queue.get()
[docs] def create_node(self) -> TreeNode: self.sequence_number += 1 return TreeNode(self.sequence_number, self.group.random(ZR))
[docs] def add_node_to_tree(self, tree_node: TreeNode): """ Add a node to the tree. Inputs: - tree_node: a node to be added to the tree """ if self.__curr_node.left and self.__curr_node.right: assert not self.leafs_queue.empty(), "Leafs queue is empty and pull attempts was made" self.__curr_node = self.leafs_queue.get() if not self.__curr_node.left: self.__curr_node.left = tree_node elif not self.__curr_node.right: self.__curr_node.right = tree_node else: assert True, "This statement should not be reached" tree_node.parent = self.__curr_node self.leafs_queue.put(tree_node)
[docs] def print_tree(self): print("{", end='') self.__print_tree_rec(self.root) print("}")
def __print_tree_rec(self, node: TreeNode): print(node, end=', ') if node.left: self.__print_tree_rec(node.left) if node.right: self.__print_tree_rec(node.right)
[docs] class AM: """Attribute Manager (AM)""" def __init__(self, group_obj): self.users_to_attrs_dict: Dict[str, list] = {} self.attrs_to_users_dict: Dict[str, list] = {} self.users_binary_tree = UsersBinaryTree(group_obj)
[docs] def add_attr_to_user(self, attr_str: str, user_name: str): if user_name not in self.users_to_attrs_dict: self.users_to_attrs_dict[user_name] = [] self.__create_node_in_binary_tree_for_new_user() if attr_str not in self.attrs_to_users_dict: self.attrs_to_users_dict[attr_str] = [] self.users_to_attrs_dict[user_name].append(attr_str) # AB: It is assumed that this attribute does not already # exist in the list self.attrs_to_users_dict[attr_str].append(user_name) # AB: It is assumed that the username does not already
# exist in the list def __create_node_in_binary_tree_for_new_user(self): current_number_of_users = len(list(self.users_to_attrs_dict.keys())) # AB: make sure to add the new user to the # dict first before calling this function while not current_number_of_users == self.users_binary_tree.leafs_queue.qsize(): new_node = self.users_binary_tree.create_node() self.users_binary_tree.add_node_to_tree(new_node)
[docs] def remove_attr_from_user(self, attr_str: str, user_name: str): index = self.attrs_to_users_dict[attr_str].index(user_name) self.attrs_to_users_dict[attr_str].pop(index) index = self.users_to_attrs_dict[user_name].index(attr_str) self.users_to_attrs_dict[user_name].pop(index)
[docs] def get_user_assignation_to_leafs_dict(self) -> Dict[str, TreeNode]: user_names_list = list(self.users_to_attrs_dict.keys()) assert len(user_names_list) == self.users_binary_tree.leafs_queue.qsize(), "The number of usernames list ({})" \ " has to match the number of leaf" \ " elements ({}) in the binary tree".format( len(user_names_list), self.users_binary_tree.leafs_queue.qsize()) ret_dict: Dict[str, TreeNode] = {} for user_name, leaf in zip(user_names_list, self.users_binary_tree.leafs_queue.queue): ret_dict[user_name] = leaf return ret_dict
[docs] def get_minimum_nodes_list_that_represent_users_list(self, user_names_list: List[str]) -> List[TreeNode]: """ This is represented in the paper as calculating node(Gi) """ visited_arr = [False] * (self.users_binary_tree.sequence_number + 1) list_of_leaves_to_traverse = [] user_assignation_to_leafs_dict = self.get_user_assignation_to_leafs_dict() for user_name in user_names_list: user_leaf_node = user_assignation_to_leafs_dict[user_name] visited_arr[user_leaf_node.sequence_number] = True list_of_leaves_to_traverse.append(user_leaf_node) self.__traverse_to_mark_all_children_visited_arr(self.users_binary_tree.root, visited_arr) return self.__traverse_bfs_to_get_minimum_number_nodes_to_cover_users_list(visited_arr)
def __traverse_to_mark_all_children_visited_arr(self, node: TreeNode, visited_arr: List[bool]): is_leaf = not node.left and not node.right if is_leaf: return if node.left: self.__traverse_to_mark_all_children_visited_arr(node.left, visited_arr) if node.right: self.__traverse_to_mark_all_children_visited_arr(node.right, visited_arr) visited_arr[node.sequence_number] = visited_arr[node.left.sequence_number] and visited_arr[ node.right.sequence_number] def __traverse_bfs_to_get_minimum_number_nodes_to_cover_users_list(self, visited_arr) -> List[TreeNode]: ret_list = [] q = queue.Queue() q.put(self.users_binary_tree.root) while not q.empty(): node: TreeNode = q.get() if visited_arr[node.sequence_number]: ret_list.append(node) else: if node.left: q.put(node.left) if node.right: q.put(node.right) return ret_list
[docs] def get_user_path(self, user_name) -> List[TreeNode]: ret_list = [] user_assignation_to_leafs_dict = self.get_user_assignation_to_leafs_dict() assert user_name in user_assignation_to_leafs_dict, \ "Username ({}) must be inside user_assignation_to_leafs_dict ({})".format(user_name, user_assignation_to_leafs_dict) user_leaf_node = user_assignation_to_leafs_dict[user_name] curr_node: TreeNode = user_leaf_node while curr_node: ret_list.append(curr_node) curr_node = curr_node.parent return ret_list
[docs] @staticmethod def get_user_path_intersection_with_node_gi(user_path: List[TreeNode], node_gi: List[TreeNode]) -> List[TreeNode]: ret_intersection_list = [] for user_node in user_path: if user_node in node_gi: ret_intersection_list.append(user_node) return ret_intersection_list
[docs] class CaCpabeAr(ABEnc): def __init__(self, group_obj): ABEnc.__init__(self) self.util = SecretUtil(group_obj, verbose=False) self.group = group_obj
[docs] def system_setup(self) -> (mk_t, pp_t): """ System Setup algorithm. This algorithm is performed by TA Inputs: - None Outputs: - MK: TA's master secret key. - PP: Public Parameters. """ alpha, beta = self.group.random(ZR), self.group.random(ZR) g = self.group.random(G1) MK = {'beta': beta, 'g_alpha': g ** alpha} e_gg_alpha = pair(g, g) ** alpha PP = {'g': g, 'g_beta': g ** beta, 'g_1_over_beta': g ** ~beta, 'e_gg_alpha': e_gg_alpha} return MK, PP
[docs] def manager_setup(self, attribute_names: List[str], PP: pp_t): """ Manager Setup algorithm performed by AM. Inputs: - attribute_names: The name of attributes that AM is responsible for. - PP: Public Parameters from the system setup algorithm. Outputs: - MMK: Manager master key represented as a dictionary. - MPK: Manager public key represented as a dictionary. """ MMK = {} MPK = {} for attr in attribute_names: t_i = self.group.random(ZR) g = PP['g'] T_i = g ** t_i MMK[attr] = t_i MPK[attr] = T_i return MMK, MPK
[docs] def key_generation(self, PP, MK, MPK, user_attribute_names_list: List[str], user_name: str, attributes_manager: AM, UMK, users_TA_KEK): """ This function is responsible for generating the decryption keys used by the user according to his list of attributes. Inputs: - PP: Public Parameters from the system setup algorithm. - MK: TA's master secret key. - MPK: Manager public key represented as a dictionary. - user_attribute_names_list: Attribute names hold by the user. - user_name: User name. - attributes_manager: AM. Inputs/outputs: - UMK: User Master Key. A value stored privately by TA for each user. Represented as a dictionary, where the user_name is the key and a group element is the value. Outputs: - DSK: Attributes decryption keys as in the original CP-ABE paper (abenc_bsw07.py). (shared with the user) - KEK: Key Encryption Keys generated for each attribute hold by the user using the users binary tree generated by AM. (shared with the user) - users_TA_KEK: A dictionary that holds the TA KEK for each user. (stored privately by AM) """ # Attribute key generation. Executed by TA. DSK, TA_KEK = self.user_attributes_key_gen(MK, MPK, PP, user_attribute_names_list, user_name, UMK) users_TA_KEK[user_name] = TA_KEK # KEK generation by AM. KEK = self.user_attributes_kek_generation(TA_KEK, attributes_manager, user_attribute_names_list, user_name) return DSK, KEK
[docs] def user_attributes_key_gen(self, MK, MPK, PP, user_attribute_names_list, user_name, UMK): """ This function is executed by TA and considered as part of key generation procedure. Inputs: - MK: TA's master secret key. - MPK: Manager public key represented as a dictionary. - PP: Public Parameters from the system setup algorithm. - user_attribute_names_list: Attribute names hold by the user. - user_name: User name. Inputs/outputs: - UMK: User Master Key. A value stored privately by TA for each user. Represented as a dictionary, where the user_name is the key and a group element is the value. Outputs: - DSK: Attributes decryption keys as in the original CP-ABE paper (abenc_bsw07.py). (shared with the user) - KEK: Key Encryption Keys generated for each attribute hold by the user using the users binary tree generated by AM. It is a preliminary one that will be changed by AM in the next algorithm. """ r = self.group.random(ZR) g = PP['g'] g_r = g ** r D = (MK['g_alpha'] * g_r) ** (1 / MK['beta']) D_i = {} D_i_dash = {} KEK = {} for attr in user_attribute_names_list: r_i = self.group.random(ZR) D_i[attr] = g_r * (self.group.hash(attr, G1) ** r_i) D_i_dash[attr] = g ** r_i kek_i = MPK[attr] ** r_i KEK[attr] = kek_i DSK = {'D': D, 'D_i': D_i, 'D_i_dash': D_i_dash, 'attrs': user_attribute_names_list} UMK[user_name] = g_r return DSK, KEK
[docs] def user_attributes_kek_generation(self, TA_KEK, attributes_manager, user_attribute_names_list, user_name): """ This function is executed by AM and considered as part of key generation procedure. Inputs: - TA_KEK: Preliminary KEK list generated by TA. - attributes_manager: AM. - user_attribute_names_list: Attribute names hold by the user. - user_name: User name. Outputs: - KEK: Key Encryption Keys generated for each attribute hold by the user using the users binary tree generated by AM. """ KEK = {} for attr in user_attribute_names_list: KEK_attr = self.generate_kek_for_user_with_attr(TA_KEK, attr, attributes_manager, user_name) KEK[attr] = KEK_attr return KEK
[docs] def generate_kek_for_user_with_attr(self, TA_KEK, attr, attributes_manager, user_name): """ This function is executed by AM and considered as part of key generation procedure. Inputs: - TA_KEK: Preliminary KEK list generated by TA. - attributes_manager: AM. - user_attribute_names_list: Attribute names hold by the user. - user_name: User name. Outputs: - KEK: Key Encryption Key generated for a specific attribute hold by the user using the users binary tree generated by AM. """ list_of_users_hold_attr = attributes_manager.attrs_to_users_dict[attr] node_G_i = attributes_manager.get_minimum_nodes_list_that_represent_users_list(list_of_users_hold_attr) user_path = attributes_manager.get_user_path(user_name) intersection = attributes_manager.get_user_path_intersection_with_node_gi(user_path, node_G_i) if len(intersection) == 0: # AB: Do nothing, as mentioned in the paper. return None else: assert len(intersection) == 1, "The intersection list should have only one element." vj_node: TreeNode = intersection[0] kek_i = TA_KEK[attr] # TODO: AB: The attribute has to be added before to any other user in the system setup. # Consider fixing it later if this functionality is needed. KEK_i = kek_i ** (1 / vj_node.value) KEK_attr = {'seq(vj)': vj_node.sequence_number, 'kek_i': kek_i, 'KEK_i': KEK_i} return KEK_attr
[docs] def encrypt(self, PP, MMK, M, A: str, attributes_manager: AM): """ This function is executed by anyone who wants to encrypt a message with an access policy, then by AM to perform the re-encryption. Inputs: - PP: Public Parameters from the system setup algorithm. - MMK: Manager master key represented as a dictionary. - M: Message to by encrypted. - A: Access policy represented as a boolean expression string. Outputs: - CT_dash: Ciphertext. - Hdr: Header message. """ # Local Encryption CT = self.local_encryption(A, M, PP) CT, Hdr = self.reencryption(CT, MMK, PP, attributes_manager) return CT, Hdr
[docs] def reencryption(self, CT, MMK, PP, attributes_manager): """ This function is performed by AM and it is the second part of the encryption procedure. """ Hdr = {} # AB: TODO: g = PP['g'] kys_dict = {} for attr_name_with_idx in CT['Cy_tilde']: # Index is appended only if the attribute is repeated more than one time to the access policy k_y = self.group.random(ZR) kys_dict[attr_name_with_idx] = k_y g_k_y = g ** k_y CT['Cy_tilde'][attr_name_with_idx] = CT['Cy_tilde'][attr_name_with_idx] * g_k_y attr_name_without_idx = self.__get_attr_name_without_idx(attr_name_with_idx) if not attr_name_without_idx in attributes_manager.attrs_to_users_dict: # Attribute manager is not responsible for this attribute # AB: TODO: Attention here. You might need to revisit this part. continue Gi = attributes_manager.attrs_to_users_dict[attr_name_without_idx] node_Gi = attributes_manager.get_minimum_nodes_list_that_represent_users_list(Gi) if not attr_name_with_idx in Hdr: Hdr[attr_name_with_idx] = [] for a_node_Gi in node_Gi: a_node_Gi: TreeNode = a_node_Gi E_k_y = g_k_y ** (a_node_Gi.value / MMK[attr_name_without_idx]) Hdr[attr_name_with_idx].append({'seq': a_node_Gi.sequence_number, 'E(k_y)': E_k_y}) return CT, Hdr
def __get_attr_name_without_idx(self, attr_name: str): if attr_name.find('_') == -1: return attr_name val = attr_name.split('_') return val[0]
[docs] def local_encryption(self, A, M, PP): """ This function is executed by anyone who wants to encrypt a message with an access policy. Inputs: - A: Access policy represented as a boolean expression string. - M: Message to by encrypted. - PP: Public Parameters from the system setup algorithm. Outputs: - CT: Ciphertext. """ s = self.group.random(ZR) e_gg_alpha_s = PP['e_gg_alpha'] ** s g = PP['g'] policy = self.util.createPolicy(A) a_list = self.util.getAttributeList(policy) shares = self.util.calculateSharesDict(s, policy) C0 = e_gg_alpha_s * M C1 = PP['g_beta'] ** s C_y, C_y_pr = {}, {} for i in shares.keys(): j = self.util.strip_index(i) C_y[i] = g ** shares[i] C_y_pr[i] = self.group.hash(j, G1) ** shares[i] CT = {'C0': C0, 'C1': C1, 'Cy': C_y, 'Cy_tilde': C_y_pr, 'A': A, 'attributes': a_list} return CT
[docs] def decrypt(self, PP, CT_tilde, Hdr, DSK, KEK, user_name: str, attributes_manager: AM): """ This function is used by any user who has sufficient, non revoked attributes to decrypted a message under a specific access policy. Inputs: - PP: Public Parameters from the system setup algorithm. - CT_tilde: Ciphertext after re-encryption by the AM. - Hdr: Header message. - DSK: Attributes decryption keys as in the original CP-ABE paper (abenc_bsw07.py). (shared with the user). - KEK: Key Encryption Keys generated for each attribute hold by the user using the users binary tree - user_name: Username who is decrypting the ciphertext. - attributes_manager: AM. Outputs: - M: Recovered message, if the user has the decryption keys of the attributes that satisfy the policy. """ ct = CT_tilde policy_str = ct['A'] policy = self.util.createPolicy(policy_str) pruned_list = self.util.prune(policy, DSK['attrs']) if not pruned_list: return False z = self.util.getCoefficients(policy) A = 1 for i in pruned_list: j = i.getAttributeAndIndex() k = i.getAttribute() KEK_i = KEK[k]['KEK_i'] Hdr_for_attr: list = Hdr[j] chosen_Hdr_element = None user_path = attributes_manager.get_user_path(user_name) for hdr_elem in Hdr_for_attr: # If hdr_ele intersect with the user path, then it is the chosen element found = False for user_node in user_path: if user_node.sequence_number == hdr_elem['seq']: found = True if found: chosen_Hdr_element = hdr_elem E_k_y = chosen_Hdr_element['E(k_y)'] A *= ( (pair(ct['Cy'][j], DSK['D_i'][k]) * pair(KEK_i, E_k_y) )/ pair(DSK['D_i_dash'][k], ct['Cy_tilde'][j]) ) ** z[j] return ct['C0'] / (pair(ct['C1'], DSK['D']) / A)
[docs] def revoke_attribute(self, revoked_user_name, attribute_name, attributes_manager: AM, PP, users_kek_i, MMK, MPK): """ This function is executed by AM when an attribute is revoked from a user. Inputs: - revoked_user_name: The name of the revoked user. - attribute_name: revoked attribute name. - attributes_manager: AM. - PP: Public Parameters from the system setup algorithm. - users_kek_i: A list privately acquired by AM from TA as part of key_generation function. Inputs/Outputs: - MMK: Manager master key represented as a dictionary. - MPK: Manager public key represented as a dictionary. Outputs: - updated_KEK_dict: The key is the user-name of the user whose KEK key is updated and the value is the updated KEK key value. """ attributes_manager.remove_attr_from_user(attribute_name, revoked_user_name) # Key Updating g = PP['g'] t_i = self.group.random(ZR) old_t_i = MMK[attribute_name] t_i = t_i * old_t_i T_i = g ** t_i MMK[attribute_name] = t_i MPK[attribute_name] = T_i # Get List of the users affects. (The users who hold this attribute) affected_users_names = attributes_manager.attrs_to_users_dict[attribute_name] updated_KEK_dict = {} for a_user_name in affected_users_names: users_kek_i[a_user_name][attribute_name] = users_kek_i[a_user_name][attribute_name] ** t_i user_attribute_names_list = attributes_manager.users_to_attrs_dict[a_user_name] # KEK generation by AM new_user_KEK = self.user_attributes_kek_generation(users_kek_i[a_user_name], attributes_manager, user_attribute_names_list, a_user_name) updated_KEK_dict[a_user_name] = new_user_KEK return updated_KEK_dict
[docs] def add_attribute(self, user_name, attribute_name, attributes_manager: AM, PP, UMK, users_kek_i, MMK, MPK): """ This function is executed by AM when an attribute is added to a user. Inputs: - user_name: The name of the user who has an attribute to be added. - attribute_name: To be added attribute name. - attributes_manager: AM. - PP: Public Parameters from the system setup algorithm. - UMK: User Master Key. A value stored privately by TA for each user. Represented as a dictionary, where the user_name is the key and a group element is the value. - users_kek_i: A list privately acquired by AM from TA as part of key_generation function. Inputs/Outputs: - MMK: Manager master key represented as a dictionary. - MPK: Manager public key represented as a dictionary. Outputs: - updated_KEK_dict: The key is the user-name of the user whose KEK key is updated and the value is the updated KEK key value. """ # TA updates D_i, D_i_tilde and send it to the user for him to append it to his DSK g = PP['g'] r_i = self.group.random(ZR) g_r = UMK[user_name] D_i = g_r * self.group.hash(attribute_name, G1) ** r_i D_i_tilde = g ** r_i kek_i = MPK[attribute_name] ** r_i users_kek_i[user_name][attribute_name] = kek_i # AM updates the users tree and returns to each affected user its updated KEK for this attribute. attributes_manager.add_attr_to_user(attribute_name, user_name) list_of_users_hold_attr = attributes_manager.attrs_to_users_dict[attribute_name] node_G_i = attributes_manager.get_minimum_nodes_list_that_represent_users_list(list_of_users_hold_attr) KEK_user_names_dict_for_attr = {} # Each user gets an entry from this dict that is associated to him and # adds/updates it in his KEK. for a_user in list_of_users_hold_attr: KEK_attr = self.generate_kek_for_user_with_attr(users_kek_i[a_user], attribute_name, attributes_manager, a_user) KEK_user_names_dict_for_attr[a_user] = KEK_attr return D_i, D_i_tilde, KEK_user_names_dict_for_attr
[docs] def main(): group_obj = PairingGroup('SS512') attributes_manager = AM(group_obj) user_names_list = ['U1', 'U2', 'U3', 'U4', 'U5', 'U6', 'U7', 'U8'] attributes_manager.add_attr_to_user('ONE', 'U1') attributes_manager.add_attr_to_user('FOUR', 'U1') attributes_manager.add_attr_to_user('TWO', 'U1') attributes_manager.add_attr_to_user('ONE', 'U2') attributes_manager.add_attr_to_user('THREE', 'U2') attributes_manager.add_attr_to_user('ONE', 'U3') attributes_manager.add_attr_to_user('THREE', 'U4') attributes_manager.add_attr_to_user('ONE', 'U5') attributes_manager.add_attr_to_user('TWO', 'U6') attributes_manager.add_attr_to_user('ONE', 'U7') attributes_manager.add_attr_to_user('THREE', 'U8') print("Users attributes list: ", attributes_manager.users_to_attrs_dict) ca_cpabe_ar = CaCpabeAr(group_obj) MK, PP = ca_cpabe_ar.system_setup() print("MK: ", MK) print("PP: ", PP) attributes_names = ['ONE', 'TWO', 'THREE', 'FOUR'] MMK, MPK = ca_cpabe_ar.manager_setup(attributes_names, PP) print("MMK: ", MMK) print("MPK: ", MPK) UMK = {} # A value stored privately by TA for each user. users_private_keys_dict = {} users_kek_i = {} # Held privately by AM for user_name in user_names_list: # Attribute key generation. Executed by TA. user_attribute_names_list = attributes_manager.users_to_attrs_dict[user_name] # KEK generation by AM. DSK, KEK = ca_cpabe_ar.key_generation(PP, MK, MPK, user_attribute_names_list, user_name, attributes_manager, UMK, users_kek_i) users_private_keys_dict[user_name] = {'DSK': DSK, 'KEK': KEK} print("KEK for {}: {}".format(user_name, users_private_keys_dict[user_name])) rand_msg = group_obj.random(GT) print("Message: ", rand_msg) policy_str = '((four or three) and (three or one))' CT_tilde, Hdr = ca_cpabe_ar.encrypt(PP, MMK, rand_msg, policy_str, attributes_manager) print("CT: ", CT_tilde) user_private_keys_dict = users_private_keys_dict['U2'] DSK = user_private_keys_dict['DSK'] KEK = user_private_keys_dict['KEK'] recovered_M = ca_cpabe_ar.decrypt(PP, CT_tilde, Hdr, DSK, KEK, 'U2', attributes_manager) print('Recovered Message: ', recovered_M) assert rand_msg == recovered_M, "FAILED Decryption: message is incorrect" # Revoke the attribute `THREE` from user `U2` updated_users_kek_values = ca_cpabe_ar.revoke_attribute('U2', 'THREE', attributes_manager, PP, users_kek_i, MMK, MPK) for a_user_name in updated_users_kek_values: # The updated users KEK keys need to be distributed to the users users_private_keys_dict[user_name]['KEK'] = updated_users_kek_values[a_user_name] # Now `U7` does not have the ability to decrypt the message because his attributes ['ONE'] does not match the policy user_private_keys_dict = users_private_keys_dict['U7'] DSK = user_private_keys_dict['DSK'] KEK = user_private_keys_dict['KEK'] recovered_M = ca_cpabe_ar.decrypt(PP, CT_tilde, Hdr, DSK, KEK, 'U7', attributes_manager) print("Wrong recovered M for U7: ", recovered_M) # Uncomment the following line and an error will be raised # assert rand_msg == recovered_M, "FAILED Decryption: message is incorrect" # Add attribute `FOUR` to user `U7` attr_to_be_added = 'FOUR' D_i, D_i_tilde, KEK_user_names_dict_for_attr = ca_cpabe_ar.add_attribute('U7', attr_to_be_added, attributes_manager, PP, UMK, users_kek_i, MMK, MPK) user_private_keys_dict = users_private_keys_dict['U7'] # Update DSK for the user DSK = user_private_keys_dict['DSK'] DSK['D_i'][attr_to_be_added] = D_i DSK['D_i_dash'][attr_to_be_added] = D_i_tilde DSK['attrs'].append(attr_to_be_added) # Each user receives the updated KEK for the attribute 'U7' for a_user_name in KEK_user_names_dict_for_attr: user_KEK_for_added_attr = KEK_user_names_dict_for_attr[a_user_name] # KEK for attribute 'FOUR' for a specific # user. user_private_keys_dict = users_private_keys_dict[a_user_name] KEK_for_user = user_private_keys_dict['KEK'] KEK_for_user[attr_to_be_added] = user_KEK_for_added_attr # Encrypt the same message again. CT_tilde, Hdr = ca_cpabe_ar.encrypt(PP, MMK, rand_msg, policy_str, attributes_manager) print("CT: ", CT_tilde) user_private_keys_dict = users_private_keys_dict['U2'] DSK = user_private_keys_dict['DSK'] KEK = user_private_keys_dict['KEK'] # Now `U2` does not have the ability to decrypt the message because his attributes no longer match the policy after # one of his attributes was revoked. recovered_M = ca_cpabe_ar.decrypt(PP, CT_tilde, Hdr, DSK, KEK, 'U2', attributes_manager) print("Wrong recovered M for U2: ", recovered_M) # Uncomment the following line and an error will be raised # assert rand_msg == recovered_M, "FAILED Decryption: message is incorrect" # U7 now has the ability to decrypt the message after because his attributes now match the policy [ONE, FOUR] user_private_keys_dict = users_private_keys_dict['U7'] DSK = user_private_keys_dict['DSK'] KEK = user_private_keys_dict['KEK'] # `U7` has the ability to decrypt the message because his attributes match the policy after adding attribute FOUR recovered_M = ca_cpabe_ar.decrypt(PP, CT_tilde, Hdr, DSK, KEK, 'U7', attributes_manager) print("Recovered M for U7: ", recovered_M) assert rand_msg == recovered_M, "FAILED Decryption: message is incorrect"
if __name__ == "__main__": main()