-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlambda.py
More file actions
77 lines (64 loc) · 2.29 KB
/
lambda.py
File metadata and controls
77 lines (64 loc) · 2.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import time
import os
import serviceConstants as const
from io import BytesIO
from urllib.request import Request, urlopen
from zipfile import ZipFile
from zipfile import ZipInfo
import file_processing_worker as fpw
import connection_utils as utils
from queue import Queue
MAX_NUMBER_OF_THREADS = 10
def __download_and_process_data(number_of_chunks: int, chunk_to_process: int) -> None:
print("Downloading facts data from EDGAR...")
req = Request(
url=const.EDGAR_URL + const.DATA_ZIP_PATH,
headers={
'User-Agent': os.environ[const.USER_AGENT_VALUE_KEY],
'Accept-Encoding': 'gzip, deflate',
'Host': 'www.sec.gov'
}
)
with urlopen(req) as zipresp:
with ZipFile(BytesIO(zipresp.read())) as zfile:
__divide_processing_workload(number_of_chunks, chunk_to_process, zfile)
def __divide_processing_workload(number_of_chunks: int, chunk_to_process: int, zfile: ZipFile):
print("Dividing processing workload to workers...")
files: list[ZipInfo] = zfile.filelist
queue = Queue()
chunks = __separate_into_chunks(files, number_of_chunks)
for file in chunks[chunk_to_process]:
queue.put(file)
threads = []
s3Client = utils.initialize_S3()
for i in range(MAX_NUMBER_OF_THREADS):
threads.append(fpw.file_processing_worker(
threadID=i,
name='file_processing_worker_%s' % i,
queue=queue,
zip=zfile,
s3=s3Client
))
print("Starting worker %s..." % i)
threads[int(i)].start()
for t in threads:
t.join()
del (threads)
def __separate_into_chunks(files, num):
avg = len(files) / float(num)
out = []
last = 0.0
while last < len(files):
out.append(files[int(last):int(last + avg)])
last += avg
return out
def start(event, context):
print("Starting facts update...")
startTime = time.time()
number_of_chunks = int(event['NUMBER_OF_CHUNKS_KEY'])
chunk_to_process = int(event['CHUNK_TO_PROCESS_KEY'])
__download_and_process_data(number_of_chunks, chunk_to_process)
end = time.time()
print("""Sticker Price Database Population Complete\n
Elapsed time: %2d minutes, %2d seconds\n"""
% ((end - startTime)/60, (end - startTime) % 60))