mirror of
https://github.com/GAM-team/GAM.git
synced 2025-07-08 21:53:36 +00:00
Add existing GAM 3.21 code
This commit is contained in:
295
gdata/Crypto/Protocol/AllOrNothing.py
Normal file
295
gdata/Crypto/Protocol/AllOrNothing.py
Normal file
@ -0,0 +1,295 @@
|
||||
"""This file implements all-or-nothing package transformations.
|
||||
|
||||
An all-or-nothing package transformation is one in which some text is
|
||||
transformed into message blocks, such that all blocks must be obtained before
|
||||
the reverse transformation can be applied. Thus, if any blocks are corrupted
|
||||
or lost, the original message cannot be reproduced.
|
||||
|
||||
An all-or-nothing package transformation is not encryption, although a block
|
||||
cipher algorithm is used. The encryption key is randomly generated and is
|
||||
extractable from the message blocks.
|
||||
|
||||
This class implements the All-Or-Nothing package transformation algorithm
|
||||
described in:
|
||||
|
||||
Ronald L. Rivest. "All-Or-Nothing Encryption and The Package Transform"
|
||||
http://theory.lcs.mit.edu/~rivest/fusion.pdf
|
||||
|
||||
"""
|
||||
|
||||
__revision__ = "$Id: AllOrNothing.py,v 1.8 2003/02/28 15:23:20 akuchling Exp $"
|
||||
|
||||
import operator
|
||||
import string
|
||||
from Crypto.Util.number import bytes_to_long, long_to_bytes
|
||||
|
||||
|
||||
|
||||
class AllOrNothing:
|
||||
"""Class implementing the All-or-Nothing package transform.
|
||||
|
||||
Methods for subclassing:
|
||||
|
||||
_inventkey(key_size):
|
||||
Returns a randomly generated key. Subclasses can use this to
|
||||
implement better random key generating algorithms. The default
|
||||
algorithm is probably not very cryptographically secure.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, ciphermodule, mode=None, IV=None):
|
||||
"""AllOrNothing(ciphermodule, mode=None, IV=None)
|
||||
|
||||
ciphermodule is a module implementing the cipher algorithm to
|
||||
use. It must provide the PEP272 interface.
|
||||
|
||||
Note that the encryption key is randomly generated
|
||||
automatically when needed. Optional arguments mode and IV are
|
||||
passed directly through to the ciphermodule.new() method; they
|
||||
are the feedback mode and initialization vector to use. All
|
||||
three arguments must be the same for the object used to create
|
||||
the digest, and to undigest'ify the message blocks.
|
||||
"""
|
||||
|
||||
self.__ciphermodule = ciphermodule
|
||||
self.__mode = mode
|
||||
self.__IV = IV
|
||||
self.__key_size = ciphermodule.key_size
|
||||
if self.__key_size == 0:
|
||||
self.__key_size = 16
|
||||
|
||||
__K0digit = chr(0x69)
|
||||
|
||||
def digest(self, text):
|
||||
"""digest(text:string) : [string]
|
||||
|
||||
Perform the All-or-Nothing package transform on the given
|
||||
string. Output is a list of message blocks describing the
|
||||
transformed text, where each block is a string of bit length equal
|
||||
to the ciphermodule's block_size.
|
||||
"""
|
||||
|
||||
# generate a random session key and K0, the key used to encrypt the
|
||||
# hash blocks. Rivest calls this a fixed, publically-known encryption
|
||||
# key, but says nothing about the security implications of this key or
|
||||
# how to choose it.
|
||||
key = self._inventkey(self.__key_size)
|
||||
K0 = self.__K0digit * self.__key_size
|
||||
|
||||
# we need two cipher objects here, one that is used to encrypt the
|
||||
# message blocks and one that is used to encrypt the hashes. The
|
||||
# former uses the randomly generated key, while the latter uses the
|
||||
# well-known key.
|
||||
mcipher = self.__newcipher(key)
|
||||
hcipher = self.__newcipher(K0)
|
||||
|
||||
# Pad the text so that its length is a multiple of the cipher's
|
||||
# block_size. Pad with trailing spaces, which will be eliminated in
|
||||
# the undigest() step.
|
||||
block_size = self.__ciphermodule.block_size
|
||||
padbytes = block_size - (len(text) % block_size)
|
||||
text = text + ' ' * padbytes
|
||||
|
||||
# Run through the algorithm:
|
||||
# s: number of message blocks (size of text / block_size)
|
||||
# input sequence: m1, m2, ... ms
|
||||
# random key K' (`key' in the code)
|
||||
# Compute output sequence: m'1, m'2, ... m's' for s' = s + 1
|
||||
# Let m'i = mi ^ E(K', i) for i = 1, 2, 3, ..., s
|
||||
# Let m's' = K' ^ h1 ^ h2 ^ ... hs
|
||||
# where hi = E(K0, m'i ^ i) for i = 1, 2, ... s
|
||||
#
|
||||
# The one complication I add is that the last message block is hard
|
||||
# coded to the number of padbytes added, so that these can be stripped
|
||||
# during the undigest() step
|
||||
s = len(text) / block_size
|
||||
blocks = []
|
||||
hashes = []
|
||||
for i in range(1, s+1):
|
||||
start = (i-1) * block_size
|
||||
end = start + block_size
|
||||
mi = text[start:end]
|
||||
assert len(mi) == block_size
|
||||
cipherblock = mcipher.encrypt(long_to_bytes(i, block_size))
|
||||
mticki = bytes_to_long(mi) ^ bytes_to_long(cipherblock)
|
||||
blocks.append(mticki)
|
||||
# calculate the hash block for this block
|
||||
hi = hcipher.encrypt(long_to_bytes(mticki ^ i, block_size))
|
||||
hashes.append(bytes_to_long(hi))
|
||||
|
||||
# Add the padbytes length as a message block
|
||||
i = i + 1
|
||||
cipherblock = mcipher.encrypt(long_to_bytes(i, block_size))
|
||||
mticki = padbytes ^ bytes_to_long(cipherblock)
|
||||
blocks.append(mticki)
|
||||
|
||||
# calculate this block's hash
|
||||
hi = hcipher.encrypt(long_to_bytes(mticki ^ i, block_size))
|
||||
hashes.append(bytes_to_long(hi))
|
||||
|
||||
# Now calculate the last message block of the sequence 1..s'. This
|
||||
# will contain the random session key XOR'd with all the hash blocks,
|
||||
# so that for undigest(), once all the hash blocks are calculated, the
|
||||
# session key can be trivially extracted. Calculating all the hash
|
||||
# blocks requires that all the message blocks be received, thus the
|
||||
# All-or-Nothing algorithm succeeds.
|
||||
mtick_stick = bytes_to_long(key) ^ reduce(operator.xor, hashes)
|
||||
blocks.append(mtick_stick)
|
||||
|
||||
# we convert the blocks to strings since in Python, byte sequences are
|
||||
# always represented as strings. This is more consistent with the
|
||||
# model that encryption and hash algorithms always operate on strings.
|
||||
return map(long_to_bytes, blocks)
|
||||
|
||||
|
||||
def undigest(self, blocks):
|
||||
"""undigest(blocks : [string]) : string
|
||||
|
||||
Perform the reverse package transformation on a list of message
|
||||
blocks. Note that the ciphermodule used for both transformations
|
||||
must be the same. blocks is a list of strings of bit length
|
||||
equal to the ciphermodule's block_size.
|
||||
"""
|
||||
|
||||
# better have at least 2 blocks, for the padbytes package and the hash
|
||||
# block accumulator
|
||||
if len(blocks) < 2:
|
||||
raise ValueError, "List must be at least length 2."
|
||||
|
||||
# blocks is a list of strings. We need to deal with them as long
|
||||
# integers
|
||||
blocks = map(bytes_to_long, blocks)
|
||||
|
||||
# Calculate the well-known key, to which the hash blocks are
|
||||
# encrypted, and create the hash cipher.
|
||||
K0 = self.__K0digit * self.__key_size
|
||||
hcipher = self.__newcipher(K0)
|
||||
|
||||
# Since we have all the blocks (or this method would have been called
|
||||
# prematurely), we can calcualte all the hash blocks.
|
||||
hashes = []
|
||||
for i in range(1, len(blocks)):
|
||||
mticki = blocks[i-1] ^ i
|
||||
hi = hcipher.encrypt(long_to_bytes(mticki))
|
||||
hashes.append(bytes_to_long(hi))
|
||||
|
||||
# now we can calculate K' (key). remember the last block contains
|
||||
# m's' which we don't include here
|
||||
key = blocks[-1] ^ reduce(operator.xor, hashes)
|
||||
|
||||
# and now we can create the cipher object
|
||||
mcipher = self.__newcipher(long_to_bytes(key))
|
||||
block_size = self.__ciphermodule.block_size
|
||||
|
||||
# And we can now decode the original message blocks
|
||||
parts = []
|
||||
for i in range(1, len(blocks)):
|
||||
cipherblock = mcipher.encrypt(long_to_bytes(i, block_size))
|
||||
mi = blocks[i-1] ^ bytes_to_long(cipherblock)
|
||||
parts.append(mi)
|
||||
|
||||
# The last message block contains the number of pad bytes appended to
|
||||
# the original text string, such that its length was an even multiple
|
||||
# of the cipher's block_size. This number should be small enough that
|
||||
# the conversion from long integer to integer should never overflow
|
||||
padbytes = int(parts[-1])
|
||||
text = string.join(map(long_to_bytes, parts[:-1]), '')
|
||||
return text[:-padbytes]
|
||||
|
||||
def _inventkey(self, key_size):
|
||||
# TBD: Not a very secure algorithm. Eventually, I'd like to use JHy's
|
||||
# kernelrand module
|
||||
import time
|
||||
from Crypto.Util import randpool
|
||||
# TBD: key_size * 2 to work around possible bug in RandomPool?
|
||||
pool = randpool.RandomPool(key_size * 2)
|
||||
while key_size > pool.entropy:
|
||||
pool.add_event()
|
||||
|
||||
# we now have enough entropy in the pool to get a key_size'd key
|
||||
return pool.get_bytes(key_size)
|
||||
|
||||
def __newcipher(self, key):
|
||||
if self.__mode is None and self.__IV is None:
|
||||
return self.__ciphermodule.new(key)
|
||||
elif self.__IV is None:
|
||||
return self.__ciphermodule.new(key, self.__mode)
|
||||
else:
|
||||
return self.__ciphermodule.new(key, self.__mode, self.__IV)
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
import sys
|
||||
import getopt
|
||||
import base64
|
||||
|
||||
usagemsg = '''\
|
||||
Test module usage: %(program)s [-c cipher] [-l] [-h]
|
||||
|
||||
Where:
|
||||
--cipher module
|
||||
-c module
|
||||
Cipher module to use. Default: %(ciphermodule)s
|
||||
|
||||
--aslong
|
||||
-l
|
||||
Print the encoded message blocks as long integers instead of base64
|
||||
encoded strings
|
||||
|
||||
--help
|
||||
-h
|
||||
Print this help message
|
||||
'''
|
||||
|
||||
ciphermodule = 'AES'
|
||||
aslong = 0
|
||||
|
||||
def usage(code, msg=None):
|
||||
if msg:
|
||||
print msg
|
||||
print usagemsg % {'program': sys.argv[0],
|
||||
'ciphermodule': ciphermodule}
|
||||
sys.exit(code)
|
||||
|
||||
try:
|
||||
opts, args = getopt.getopt(sys.argv[1:],
|
||||
'c:l', ['cipher=', 'aslong'])
|
||||
except getopt.error, msg:
|
||||
usage(1, msg)
|
||||
|
||||
if args:
|
||||
usage(1, 'Too many arguments')
|
||||
|
||||
for opt, arg in opts:
|
||||
if opt in ('-h', '--help'):
|
||||
usage(0)
|
||||
elif opt in ('-c', '--cipher'):
|
||||
ciphermodule = arg
|
||||
elif opt in ('-l', '--aslong'):
|
||||
aslong = 1
|
||||
|
||||
# ugly hack to force __import__ to give us the end-path module
|
||||
module = __import__('Crypto.Cipher.'+ciphermodule, None, None, ['new'])
|
||||
|
||||
a = AllOrNothing(module)
|
||||
print 'Original text:\n=========='
|
||||
print __doc__
|
||||
print '=========='
|
||||
msgblocks = a.digest(__doc__)
|
||||
print 'message blocks:'
|
||||
for i, blk in map(None, range(len(msgblocks)), msgblocks):
|
||||
# base64 adds a trailing newline
|
||||
print ' %3d' % i,
|
||||
if aslong:
|
||||
print bytes_to_long(blk)
|
||||
else:
|
||||
print base64.encodestring(blk)[:-1]
|
||||
#
|
||||
# get a new undigest-only object so there's no leakage
|
||||
b = AllOrNothing(module)
|
||||
text = b.undigest(msgblocks)
|
||||
if text == __doc__:
|
||||
print 'They match!'
|
||||
else:
|
||||
print 'They differ!'
|
Reference in New Issue
Block a user