instant-segment/data/grab.py

94 lines
2.9 KiB
Python
Raw Permalink Normal View History

import urllib.request
import gzip
import shutil
import os.path
BLOCK_SIZE = 4 * 1024 * 1024
UNIGRAM_PARTS = 24
BIGRAM_PARTS = 589
NGRAM_URL = 'http://storage.googleapis.com/books/ngrams/books/20200217/eng/{n}-{part:05}-of-{parts:05}.gz'
WORD_LIST_URL = 'http://app.aspell.net/create?max_size=60&spelling=US&spelling=GBs&spelling=GBz&spelling=CA&spelling=AU&max_variant=2&diacritic=strip&special=hacker&download=wordlist&encoding=utf-8&format=inline'
def download(url, fn):
'''Download the given url and write it to the given fn
Download in blocks of size BLOCK_SIZE and report progress after every 5% of the file.
'''
try:
u = urllib.request.urlopen(url)
except urllib.error.HTTPError as err:
print(f'error for {url}: {err}')
return 0
with open(fn + '.tmp', 'wb') as f:
length = u.info().get('Content-Length')
print('downloading: %s (%s bytes)' % (fn, length))
downloaded, complete, notified = 0, 0.0, 0.0
while True:
buf = u.read(BLOCK_SIZE)
if not buf:
break
downloaded += len(buf)
f.write(buf)
if length is None:
continue
complete = downloaded / int(length)
if complete < notified + 0.05:
continue
notified = complete
status = '%10d [%3.2f%%]' % (downloaded, complete * 100)
status = status + chr(8) * (len(status) + 1)
print(status)
os.rename(fn + '.tmp', fn)
return complete
def cache(n, part, parts):
'''Downloads and decompresses ngram data files as necessary
First downloads, then decompresses the given ngram part file. Will do nothing
if the decompressed file already exist and will only decompress if the compressed
file for this part already exists in the proper location.'''
compressed = f'cache/eng-{n}-{part:05}-{parts:05}.gz'
plain = compressed[:-3] + '.txt'
if os.path.isfile(plain):
return
elif not os.path.isfile(compressed):
url = NGRAM_URL.format(n=n, part=part, parts=parts)
complete = download(url, compressed)
print(f'downloaded {compressed} ({complete * 100:3.2f}% complete)')
if os.path.isfile(compressed):
with open(plain + '.tmp', 'wb') as output, gzip.open(compressed, 'rb') as input:
output.write(input.read())
os.rename(plain + '.tmp', plain)
print(f'decompressed {compressed}')
else:
print(f'{compressed} not found')
def main():
if not os.path.exists('cache'):
os.mkdir('cache')
wl_fn = 'cache/eng-wordlist.txt'
if not os.path.isfile(wl_fn):
download(WORD_LIST_URL, wl_fn)
for part in range(UNIGRAM_PARTS):
cache(1, part, UNIGRAM_PARTS)
for part in range(BIGRAM_PARTS):
cache(2, part, BIGRAM_PARTS)
if __name__ == '__main__':
main()