Python 3.x HuffmanHeap.py needs to be completed: Encoder.py Question 1 (17 point
ID: 3920746 • Letter: P
Question
Python 3.x
HuffmanHeap.py needs to be completed:
Encoder.py
Question 1 (17 points): Purpose: To implement an ADT to support efficient execution of a HuffmanTree encoder Degree of Difficulty: Easy to Moderate Phoenix Wright, ace attorney, loves being lazy - erm, efficient. Phoenix is also short on money. He has heard of text compression, and thinks it might be a good idea to compress all his text files to save on valuable disk space (disk space is expensive for him!). Larry Butz, his useless IT friend, wrote most of the code for text compression using Huffman trees, but did not finish the job (surprising no one). On the Assignment 10 Moodle page, you'll find a program named Encoder.py. The program follows a simple design, and is well-documented. All the pieces should be familiar to you, as they are similar to discussion in class. The Encoder.py program imports two supporting ADTs » HuffmanTree.py Defines the Huf fmanTree class . HuffmanHeap.py Defines the HuffmanHeap class. The HuffmanTree class was discussed in lecture, and its definition should be familiar. There is one change that you should note: the function build_codec that builds the codec from a HuffmanTree object is novw a method in the Huf fmanTree class The ADT HuffmanHeap, was also discussed in lecture, though the code was not given. A heap is a kind of queue that guarantees that the dequeue operation always removes and returns the smallest value in the queue (this is not the FIFO protoco). The HuffmanHeap stores HuffmanTree objects, and has the following methods . __init_.(self, leafs): Creates a new HuffmanHeap object. . enqueue(self, tree): Adds the given HuffmanTree to the heap . dequeue(self): Removes and returns the smallest HuffmanTree in the heap ·-len--(self): Returns the number of HuffmanTrees in the heap. Defining this method allows pro- grammers to call len ) on HuffmanHeap objects In lecture we saw that we could implement these operations so that they all have O(n) worst case time complexity, where n is the number of HuffmanTree objects in the HuffmanHeap. However, we also de- scribed in lecture how these operations could be implemented so that they all have worst case time com plexity of O(1). Hint: use 2 lists The HuffmanHeap class is incomplete. Your job is to complete the 4 methods described above, so that the Encoder.py program correctly encodes text files. (When you've completed Questions 1 and 2, you should be able to encode and decode any text files. f you want to test your encoder, you can try decoding with the program given for Question 2.) NOTE: some symbols do not work for our encoding solution. apostrophes, semi-colons and non-standard symbols may have errorsExplanation / Answer
import json
from collections import Counter, namedtuple
from multiprocessing import Pool, cpu_count
from os.path import getsize
from time import clock
MULTIPROCESSING_THRESHOLD = 200000
class HuffmanHeap(list):
Element = namedtuple('Element', ('weight', 'encodings'))
def push(self, weight, encodings):
element = HuffmanHeap.Element(weight, encodings)
if not self or weight >= self[-1].weight:
self.append(element)
return
for i in range(len(self)):
if weight < self[i].weight:
self.insert(i - 1, element)
return
def create_encoding_table(bytes_):
heap = HuffmanHeap()
for byte, count in Counter(bytes_).items():
heap.push(count, {byte: ''})
while len(heap) - 1:
lo, hi = heap.pop(0), heap.pop(0)
for byte in lo.encodings:
lo.encodings[byte] = '0' + lo.encodings[byte]
for byte in hi.encodings:
hi.encodings[byte] = '1' + hi.encodings[byte]
heap.push(lo.weight + hi.weight, {**lo.encodings, **hi.encodings})
return heap[0].encodings
def segment(array, parts):
avg = len(array) / parts
last = 0.0
while last < len(array):
yield array[int(last):int(last + avg)]
last += avg
def encoding_worker(data, table):
return ''.join(table[byte] for byte in data)
def encode(data, table, pool):
tasks = [pool.apply_async(encoding_worker, args=(part, table)) for part in segment(data, cpu_count())]
del data, table
return ''.join(task.get() for task in tasks)
def segment_bytewise(string, parts):
avg = len(string) / parts
avg = avg + (8 - avg % 8)
last = 0
while last < len(string):
yield string[int(last):int(last + avg)]
last += avg
def pack_bytes_worker(string):
return bytearray(int(string[i:i + 8], 2) for i in range(0, len(string), 8))
def pack_bytes(string, pool):
tasks = [pool.apply_async(pack_bytes_worker, args=(part,))
for part in segment_bytewise(string + '0' * (8 - len(string) % 8), cpu_count())]
del string
return bytearray(b''.join(task.get() for task in tasks))
def encode_file(filename, table_filename, compressed_filename, encoding='UTF-8', silent=False):
start_time = clock()
silent or print('Reading... ', end='')
with open(filename, 'rb') as file:
uncompressed_bytes = file.read()
silent or print('Done')
if not uncompressed_bytes or len(uncompressed_bytes) == 1:
raise ValueError("Not enough data to compress")
silent or print('Creating tables... ', end='')
encoding_table = create_encoding_table(uncompressed_bytes)
decoding_table = {value: key for key, value in encoding_table.items()}
decoding_table['size'] = len(uncompressed_bytes)
silent or print('Done')
silent or print('Writing table... ', end='')
with open(table_filename, 'w+', encoding=encoding) as file:
json.dump(decoding_table, file)
silent or print('Done')
del decoding_table
pool = Pool(cpu_count())
silent or print('Compressing... ', end='')
if len(uncompressed_bytes) < MULTIPROCESSING_THRESHOLD:
compressed_string = encoding_worker(uncompressed_bytes, encoding_table)
else:
silent or print('using multiprocessing... ', end='')
compressed_string = encode(uncompressed_bytes, encoding_table, pool)
silent or print('Done')
del uncompressed_bytes, encoding_table
silent or print('Preparing data using multiprocessing... ', end='')
compressed_bytes = pack_bytes(compressed_string, pool)
silent or print('Done')
del compressed_string
pool.close()
pool.terminate()
silent or print('Writing... ', end='')
with open(compressed_filename, 'wb+') as file:
file.write(compressed_bytes)
silent or print('Done')
silent or print('Encoded in {:.4f} seconds, {:.2%} Space savings'.format(
clock() - start_time,
1 - (getsize(compressed_filename) + getsize(table_filename)) / getsize(filename))
)
def unpack_bytes_worker(bytes_):
return ''.join('{0:08b}'.format(byte) for byte in bytes_)
def unpack_bytes(bytes_, pool):
tasks = [pool.apply_async(unpack_bytes_worker, args=(part,)) for part in segment(bytes_, cpu_count())]
del bytes_
return ''.join(task.get() for task in tasks)
def decode(string, table, size):
uncompressed_bytes = b''
string = iter(string)
buffer = ''
while len(uncompressed_bytes) < size:
buffer += next(string)
if buffer in table:
uncompressed_bytes += bytes([table[buffer]])
buffer = ''
return uncompressed_bytes
def decode_file(filename, table_filename, uncompressed_filename, encoding='UTF-8', silent=False):
start_time = clock()
silent or print('Reading file...', end='')
with open(filename, 'rb') as file:
compressed_bytes = file.read()
silent or print('Done')
pool = Pool(cpu_count())
silent or print('Preparing data using multiprocessing... ', end='')
compressed_string = unpack_bytes(compressed_bytes, pool)
silent or print('Done')
pool.close()
pool.terminate()
silent or print('Reading table... ', end='')
with open(table_filename, encoding=encoding) as file:
decoding_table = json.load(file)
size = decoding_table.pop('size')
silent or print('Done')
silent or print('Uncompressing...', end='')
uncompressed_bytes = decode(compressed_string, decoding_table, size)
silent or print('Done')
silent or print('Writing... ', end='')
with open(uncompressed_filename, 'wb+') as file:
file.write(uncompressed_bytes)
silent or print('Done')
silent or print('Decoded in {:.4f} seconds'.format(clock() - start_time))
def main():
in_name = 'in.bmp'
table_name = 'table.json'
compressed_name = 'compressed.bin'
out_name = 'out.bmp'
encode_file(in_name, table_name, compressed_name)
decode_file(compressed_name, table_name, out_name)
with open(in_name, 'rb') as in_, open(out_name, 'rb') as out:
assert in_.read() == out.read()
if __name__ == '__main__':
main()
Encoder.py
import heapq
class Node:
def __init__(self, prob, symbol = None):
"""Create node for given symbol and probability."""
self.left = None
self.right = None
self.symbol = symbol
self.prob = prob
# Need comparator method at a minimum to work with heapq
def __lt__(self, other):
return self.prob < other.prob
def encode(self, encoding):
"""Return bit encoding in traversal."""
if self.left is None and self.right is None:
yield (self.symbol, encoding)
else:
for v in self.left.encode(encoding + '0'):
yield v
for v in self.right.encode(encoding + '1'):
yield v
class Huffman:
def __init__(self, initial):
"""Construct encoding given initial corpus."""
self.initial = initial
# Count frequencies
freq = {}
for _ in initial:
if _ in freq:
freq[_] += 1
else:
freq[_] = 1
# Construct priority queue
pq = []
for symbol in freq:
pq.append(Node(freq[symbol], symbol))
heapq.heapify(pq)
# special case: what if only one symbol?
if len(pq) == 1:
self.root = Node(1)
self.root.left = pq[0]
self.encoding = {symbol: '0'}
return
# Huffman Encoding Algorithm
while len(pq) > 1:
n1 = heapq.heappop(pq)
n2 = heapq.heappop(pq)
n3 = Node(n1.prob + n2.prob)
n3.left = n1
n3.right = n2
heapq.heappush(pq, n3)
# Record
self.root = pq[0]
self.encoding = {}
for sym,code in pq[0].encode(''):
self.encoding[sym]=code
def __repr__(self):
"""Show encoding"""
return 'huffman:' + str(self.encoding)
def encode(self, s):
"""Return bit string for encoding."""
bits = ''
for _ in s:
if not _ in self.encoding:
raise ValueError("'" + _ + "' is not encoded character")
bits += self.encoding[_]
return bits
def decode(self, bits):
"""Decode ASCII bit string for simplicity."""
node = self.root
s = ''
for _ in bits:
if _ == '0':
node = node.left
else:
node = node.right
if node.symbol:
s += node.symbol
node = self.root
return s
Related Questions
Navigate
Integrity-first tutoring: explanations and feedback only — we do not complete graded work. Learn more.