Skip to content

Commit 5653571

Browse files
committed
working
1 parent dfcaeda commit 5653571

4 files changed

Lines changed: 866 additions & 34 deletions

File tree

aws_log_parser/cli/count_hosts_concurrent.py

Lines changed: 103 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,27 @@
11
import asyncio
2+
import logging
23
import time
34
from collections import Counter
5+
from io import BytesIO
6+
from pathlib import Path
7+
8+
import aioboto3
49

510
from rich.console import Console
611
from rich.table import Table
712
from rich import progress as rich_progress
813

914
from ..interface import AwsLogParser
15+
from ..io import FileIterator
1016

1117
console = Console()
1218

1319
counter = Counter()
1420

1521
counter_lock = asyncio.Lock()
1622

23+
logger = logging.getLogger(__name__)
24+
1725

1826
def print_results(counter):
1927
table = Table(show_header=True)
@@ -38,47 +46,96 @@ def print_results(counter):
3846
console.print(table)
3947

4048

49+
def key_display(key):
50+
path = Path(key)
51+
split = path.name.split(".")
52+
split.pop(0)
53+
return ".".join(split)
54+
55+
4156
async def download_worker(
57+
name,
58+
session,
4259
aws_log_parser,
4360
bucket,
4461
progress,
45-
task_progress,
62+
progress_task,
63+
total_progress_task,
4664
queue,
4765
):
66+
def update_progress(size):
67+
progress.update(progress_task, advance=size)
68+
progress.update(total_progress_task, advance=size)
69+
70+
progress.console.log(f"Started {name}")
71+
4872
while True:
4973
s3_object = await queue.get()
5074

75+
key = s3_object["Key"]
76+
5177
progress.update(
52-
task_progress,
53-
filename=s3_object["Key"],
78+
progress_task,
5479
total=s3_object["Size"],
80+
filename=f"Downloading {key_display(key)}",
5581
)
5682

57-
progress.console.log(f"Downloading {s3_object['Key']}")
83+
progress.console.log(f"Downloading {key_display(key)} {s3_object['Size']}")
84+
85+
contents = BytesIO()
86+
87+
async with session.client("s3") as s3_client:
88+
await s3_client.download_fileobj(
89+
bucket,
90+
key,
91+
contents,
92+
Callback=update_progress,
93+
)
94+
95+
contents.seek(0)
96+
97+
lines = list(
98+
line
99+
for line in FileIterator(
100+
fileobj=contents,
101+
gzipped=key.endswith(".gz"),
102+
)
103+
)
104+
105+
progress.reset(
106+
progress_task,
107+
filename=f"Parsing {key_display(key)}",
108+
completed=s3_object["Size"],
109+
total=len(lines),
110+
refresh=True,
111+
)
112+
113+
progress.console.log(f"Parsing {key_display(key)} {len(lines)}")
58114

59115
entries = []
60-
for entry in aws_log_parser.parse(
61-
aws_log_parser.s3_client.read_key(bucket, s3_object["Key"])
62-
):
116+
for entry in aws_log_parser.parse(lines):
63117
entries.append(entry)
64-
progress.update(task_progress, advance=1)
118+
# progress.update(progress_task, advance=1)
65119

66120
async with counter_lock:
67121
counter.update([entry.client_ip for entry in entries])
68122

123+
# progress.reset(
124+
# progress_task,
125+
# filename=f"Parsing {key_display(key)}",
126+
# refresh=True,
127+
# )
128+
69129
queue.task_done()
70130

71131

72-
async def download_objects(aws_log_parser, bucket, s3_objects):
132+
async def download_objects(aws_log_parser, bucket, s3_objects, num_workers):
73133
queue = asyncio.Queue()
74134

75-
for s3_object in s3_objects:
76-
queue.put_nowait(s3_object)
135+
session = aioboto3.Session()
77136

78137
progress = rich_progress.Progress(
79-
rich_progress.TextColumn(
80-
"[bold blue]{taws_log_parser.parseask.fields[filename]}", justify="right"
81-
),
138+
rich_progress.TextColumn("[bold blue]{task.fields[filename]}", justify="right"),
82139
rich_progress.BarColumn(bar_width=None),
83140
"[progress.percentage]{task.percentage:>3.1f}%",
84141
"•",
@@ -87,15 +144,43 @@ async def download_objects(aws_log_parser, bucket, s3_objects):
87144
rich_progress.TransferSpeedColumn(),
88145
"•",
89146
rich_progress.TimeRemainingColumn(),
147+
auto_refresh=False,
148+
)
149+
150+
progress.start()
151+
152+
total_size = 0
153+
for s3_object in s3_objects:
154+
queue.put_nowait(s3_object)
155+
total_size += s3_object["Size"]
156+
157+
total_progress_task_id = progress.add_task(
158+
"Total Progress",
159+
filename="Total Progress",
160+
total=total_size,
90161
)
91162

92163
tasks = []
93-
for i in range(3):
94-
progress_task = progress.add_task(f"download-{i}", start=False)
164+
for i in range(num_workers):
165+
progress_task_id = progress.add_task(
166+
f"downloader-{i}",
167+
filename=f"downloader-{i}",
168+
start=False,
169+
)
95170

96171
task = asyncio.create_task(
97-
download_worker(aws_log_parser, bucket, progress, progress_task, queue)
172+
download_worker(
173+
f"downloader-{i}",
174+
session,
175+
aws_log_parser,
176+
bucket,
177+
progress,
178+
progress_task_id,
179+
total_progress_task_id,
180+
queue,
181+
)
98182
)
183+
99184
tasks.append(task)
100185

101186
started_at = time.monotonic()
@@ -136,4 +221,4 @@ async def count_hosts(args):
136221

137222
console.log(f"Found {len(s3_objects)} S3 objects")
138223

139-
await download_objects(aws_log_parser, bucket, s3_objects)
224+
await download_objects(aws_log_parser, bucket, s3_objects, args.concurrency)

aws_log_parser/cli/main.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,13 @@ def main():
9393
help="The sort the S3 objects with this key.",
9494
)
9595

96+
parser.add_argument(
97+
"--concurrency",
98+
help="Number of concurrent downloads.",
99+
default=3,
100+
type=int,
101+
)
102+
96103
args = parser.parse_args()
97104

98105
if args.run_async:

0 commit comments

Comments
 (0)