Source code for abenc_tbpre_lww14

'''
Qin Liu, Guojun Wang, Jie Wu
 
| From: Time-based proxy re-encryption scheme for secure data sharing in a cloud environment
| Published in: Information Sciences (Volume: 258, year: 2014)
| Available From: http://www.sciencedirect.com/science/article/pii/S0020025512006275
| Notes: 

* type:      ciphertext-policy attribute-based encryption (public key)
* setting:   Pairing

:Author:	artjomb
:Date:		07/2014
'''

from charm.toolbox.pairinggroup import PairingGroup,ZR,G1,GT,pair
from charm.toolbox.secretutil import SecretUtil

from functools import reduce

# taken from https://gist.github.com/endolith/114336
[docs]def gcd(*numbers): """Return the greatest common divisor of the given integers""" from fractions import gcd return reduce(gcd, numbers)
# taken from https://gist.github.com/endolith/114336
[docs]def lcm(numbers): """Return lowest common multiple.""" def lcm(a, b): return (a * b) // gcd(a, b) return reduce(lcm, numbers, 1)
[docs]class TBPRE(object): def __init__(self, groupObj): self.util = SecretUtil(groupObj, verbose=False) #Create Secret Sharing Scheme self.group = groupObj #:Prime order group #self.users = {} #self.authorities = {}
[docs] def setup(self, attributes): '''Global Setup (executed by CA)''' P0 = self.group.random(G1) # generator P1 = self.group.random(G1) # random element s = self.group.random() mk0 = self.group.random() mk1 = self.group.random() Q0 = P0 ** mk0 SK1 = P1 ** mk0 Htemp = lambda x, y: self.group.hash(x + y, ZR) H = { 'user': lambda x: self.group.hash(str(x), ZR), # first convert G1 to str, then hash 'attr': lambda x: Htemp(x,"_attribute"), 'sy': lambda x: Htemp(x,"_year"), 'sym': lambda x: Htemp(x,"_year_month"), 'symd': lambda x: Htemp(x,"_year_month_day") } PK = { 'A': {}, 'Q0': Q0, 'P0': P0, 'P1': P1 } MK = { 'A': {}, 'mk0': mk0, 'mk1': mk1, 'SK1': SK1 } for attribute in attributes: ska = self.group.random() PKa = P0 ** ska PK['A'][attribute] = PKa MK['A'][attribute] = ska #self.MK = MK # private #self.s = s # sent to cloud service provider #self.PK = PK # public return (MK, PK, s, H)
[docs] def registerUser(self, PK, H): '''Registers a user by id (executed by user)''' sku = self.group.random() PKu = PK['P0'] ** sku mku = H['user'](PKu) #self.users[userid] = { 'PKu': PKu, 'mku': mku } return (sku, { 'PKu': PKu, 'mku': mku }) # (private, public)
[docs] def hashDate(self, H, time, s): hash = s key = 'y' if "year" in time: hash = H['sy'](time['year']) ** hash else: print("Error: time has to contain at least 'year'") return None, None if "month" in time: hash = H['sym'](time['month']) ** hash key = 'ym' if "day" in time: hash = H['symd'](time['day']) ** hash key = 'ymd' elif "day" in time: print("Error: time has to contain 'month' if it contains 'year'") return None, None return hash, key
[docs] def timeSuffices(self, timeRange, needle): # assumes that the time obj is valid if timeRange['year'] != needle['year']: return False if 'month' not in timeRange: return True if 'month' not in needle: return None # Error if timeRange['month'] != needle['month']: return False if 'day' not in timeRange: return True if 'day' not in needle: return None # Error return timeRange['day'] == needle['day']
[docs] def policyTerm(self, user, policy): userAttributes = user['A'].keys() for i, term in zip(range(len(policy)), policy): notFound = False for termAttr in term: if termAttr not in userAttributes: notFound = True break if not notFound: return i return False
[docs] def keygen(self, MK, PK, H, s, user, pubuser, attribute, time): '''Generate user keys for a specific attribute (executed by CA)''' hash, key = self.hashDate(H, time, s) if hash is None: return None if 'SKu' not in user: user['SKu'] = PK['P0'] ** (MK['mk1'] * pubuser['mku']) PKat = PK['A'][attribute] * (PK['P0'] ** hash) SKua = MK['SK1'] * (PKat ** (MK['mk1'] * pubuser['mku'])) if 'A' not in user: user['A'] = {} if attribute not in user['A']: user['A'][attribute] = [] user['A'][attribute].append((time, SKua))
[docs] def encrypt(self, PK, policy, F): '''Generate the cipher-text from the content(-key) and a policy (executed by the content owner)''' r = self.group.random() nA = lcm(map(lambda x: len(x), policy)) U0 = PK['P0'] ** r attributes = [] U = [] for term in policy: Ui = 1 for attribute in term: Ui *= PK['A'][attribute] U.append(Ui ** r) V = F * pair(PK['Q0'], PK['P1'] ** (r * nA)) return { 'A': policy, 'U0': U0, 'U': U, 'V': V, 'nA': nA }
[docs] def decrypt(self, CT, user, term = None): '''Decrypts the content(-key) from the cipher-text (executed by user/content consumer)''' if term is None: term = self.policyTerm(user, CT['A']) if term is False: print("Error: user attributes don't satisfy the policy") return None sumSK = 1 for attribute in CT['A'][term]: foundTimeSlot = False for timeRange, SKua in user['A'][attribute]: if self.timeSuffices(timeRange, CT['t']): foundTimeSlot = True sumSK *= SKua break if not foundTimeSlot: print("Error: could not find time slot in user attribute keys") return None n = CT['nA'] // len(CT['A'][term]) return CT['Vt'] / (pair(CT['U0t'], sumSK ** n) / pair(user['SKu'], CT['Ut']['year'][term] ** n)) # TODO: fix year
[docs] def reencrypt(self, PK, H, s, CT, currentTime): '''Re-encrypts the cipher-text using the current time (executed by cloud service provider)''' if 'year' not in currentTime or 'month' not in currentTime or 'day' not in currentTime: print("Error: pass proper current time containing 'year', 'month' and 'day'") return None day = currentTime month = dict(day) del month['day'] year = dict(month) del year['month'] day, daykey = self.hashDate(H, day, s) month, monthkey = self.hashDate(H, month, s) year, yearkey = self.hashDate(H, year, s) rs = self.group.random() U0t = CT['U0'] * (PK['P0'] ** rs) Ut = { 'year': [], 'month': [], 'day': [] } for term, Ui in zip(CT['A'], CT['U']): Uit_year = Ui Uit_month = Ui Uit_day = Ui for attribute in term: Uit_year *= (PK['A'][attribute] ** rs) * (U0t ** year) Uit_month *= (PK['A'][attribute] ** rs) * (U0t ** month) Uit_day *= (PK['A'][attribute] ** rs) * (U0t ** day) Ut['year'].append(Uit_year) Ut['month'].append(Uit_month) Ut['day'].append(Uit_day) Vt = CT['V'] * pair(PK['Q0'], PK['P1'] ** (rs * CT['nA'])) return { 'A': CT['A'], 'U0t': U0t, 'Ut': Ut, 'Vt': Vt, 'nA': CT['nA'], 't': currentTime }
[docs]def basicTest(): print("RUN basicTest") groupObj = PairingGroup('SS512') tbpre = TBPRE(groupObj) attributes = ["ONE", "TWO", "THREE", "FOUR"] MK, PK, s, H = tbpre.setup(attributes) users = {} # public alice = { 'id': 'alice' } alice['sku'], users[alice['id']] = tbpre.registerUser(PK, H) alice2 = { 'id': 'alice2' } alice2['sku'], users[alice2['id']] = tbpre.registerUser(PK, H) year = { 'year': "2014" } pastYear = { 'year': "2013" } for attr in attributes[0:-1]: tbpre.keygen(MK, PK, H, s, alice, users[alice['id']], attr, year) tbpre.keygen(MK, PK, H, s, alice2, users[alice2['id']], attr, pastYear) k = groupObj.random(GT) policy = [['ONE', 'THREE'], ['TWO', 'FOUR']] # [['ONE' and 'THREE'] or ['TWO' and 'FOUR']] currentDate = { 'year': "2014", 'month': "2", 'day': "15" } CT = tbpre.encrypt(PK, policy, k) CTt = tbpre.reencrypt(PK, H, s, CT, currentDate) PT = tbpre.decrypt(CTt, alice) assert k == PT, 'FAILED DECRYPTION! 1' print('SUCCESSFUL DECRYPTION 1') PT2 = tbpre.decrypt(CTt, alice2) assert k != PT2, 'SUCCESSFUL DECRYPTION! 2' print('DECRYPTION correctly failed')
[docs]def basicTest2(): '''Month-based attributes are used''' print("RUN basicTest2") groupObj = PairingGroup('SS512') tbpre = TBPRE(groupObj) attributes = ["ONE", "TWO", "THREE", "FOUR"] MK, PK, s, H = tbpre.setup(attributes) users = {} # public alice = { 'id': 'alice' } alice['sku'], users[alice['id']] = tbpre.registerUser(PK, H) year = { 'year': "2014", 'month': '2' } for attr in attributes[0:-1]: tbpre.keygen(MK, PK, H, s, alice, users[alice['id']], attr, year) k = groupObj.random(GT) policy = [['ONE', 'THREE'], ['TWO', 'FOUR']] # [['ONE' and 'THREE'] or ['TWO' and 'FOUR']] currentDate = { 'year': "2014", 'month': "2", 'day': "15" } CT = tbpre.encrypt(PK, policy, k) CTt = tbpre.reencrypt(PK, H, s, CT, currentDate) PT = tbpre.decrypt(CTt, alice) assert k == PT, 'FAILED DECRYPTION!' print('SUCCESSFUL DECRYPTION')
[docs]def test(): # print 1, lcm(1, 2) print(2, lcm([1, 2]))
if __name__ == '__main__': basicTest() # basicTest2() # test()