mirror of
https://github.com/instant-labs/instant-segment.git
synced 2025-01-19 15:29:05 +00:00
94 lines
2.9 KiB
Python
94 lines
2.9 KiB
Python
import urllib.request
|
|
import gzip
|
|
import shutil
|
|
import os.path
|
|
|
|
BLOCK_SIZE = 4 * 1024 * 1024
|
|
|
|
UNIGRAM_PARTS = 24
|
|
BIGRAM_PARTS = 589
|
|
NGRAM_URL = 'http://storage.googleapis.com/books/ngrams/books/20200217/eng/{n}-{part:05}-of-{parts:05}.gz'
|
|
|
|
WORD_LIST_URL = 'http://app.aspell.net/create?max_size=60&spelling=US&spelling=GBs&spelling=GBz&spelling=CA&spelling=AU&max_variant=2&diacritic=strip&special=hacker&download=wordlist&encoding=utf-8&format=inline'
|
|
|
|
|
|
def download(url, fn):
|
|
'''Download the given url and write it to the given fn
|
|
|
|
Download in blocks of size BLOCK_SIZE and report progress after every 5% of the file.
|
|
'''
|
|
try:
|
|
u = urllib.request.urlopen(url)
|
|
except urllib.error.HTTPError as err:
|
|
print(f'error for {url}: {err}')
|
|
return 0
|
|
|
|
with open(fn + '.tmp', 'wb') as f:
|
|
length = u.info().get('Content-Length')
|
|
print('downloading: %s (%s bytes)' % (fn, length))
|
|
|
|
downloaded, complete, notified = 0, 0.0, 0.0
|
|
while True:
|
|
buf = u.read(BLOCK_SIZE)
|
|
if not buf:
|
|
break
|
|
|
|
downloaded += len(buf)
|
|
f.write(buf)
|
|
|
|
if length is None:
|
|
continue
|
|
|
|
complete = downloaded / int(length)
|
|
if complete < notified + 0.05:
|
|
continue
|
|
|
|
notified = complete
|
|
status = '%10d [%3.2f%%]' % (downloaded, complete * 100)
|
|
status = status + chr(8) * (len(status) + 1)
|
|
print(status)
|
|
|
|
os.rename(fn + '.tmp', fn)
|
|
return complete
|
|
|
|
|
|
def cache(n, part, parts):
|
|
'''Downloads and decompresses ngram data files as necessary
|
|
|
|
First downloads, then decompresses the given ngram part file. Will do nothing
|
|
if the decompressed file already exist and will only decompress if the compressed
|
|
file for this part already exists in the proper location.'''
|
|
compressed = f'cache/eng-{n}-{part:05}-{parts:05}.gz'
|
|
plain = compressed[:-3] + '.txt'
|
|
|
|
if os.path.isfile(plain):
|
|
return
|
|
elif not os.path.isfile(compressed):
|
|
url = NGRAM_URL.format(n=n, part=part, parts=parts)
|
|
complete = download(url, compressed)
|
|
print(f'downloaded {compressed} ({complete * 100:3.2f}% complete)')
|
|
|
|
if os.path.isfile(compressed):
|
|
with open(plain + '.tmp', 'wb') as output, gzip.open(compressed, 'rb') as input:
|
|
output.write(input.read())
|
|
os.rename(plain + '.tmp', plain)
|
|
print(f'decompressed {compressed}')
|
|
else:
|
|
print(f'{compressed} not found')
|
|
|
|
|
|
def main():
|
|
if not os.path.exists('cache'):
|
|
os.mkdir('cache')
|
|
|
|
wl_fn = 'cache/eng-wordlist.txt'
|
|
if not os.path.isfile(wl_fn):
|
|
download(WORD_LIST_URL, wl_fn)
|
|
for part in range(UNIGRAM_PARTS):
|
|
cache(1, part, UNIGRAM_PARTS)
|
|
for part in range(BIGRAM_PARTS):
|
|
cache(2, part, BIGRAM_PARTS)
|
|
|
|
if __name__ == '__main__':
|
|
main()
|