Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/hooks/cli.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import anyio
import argparse
import sys
import time


from typing import List, Optional
Expand Down Expand Up @@ -75,6 +76,8 @@ def parse_args(argv):


async def main_async(argv: Optional[List[str]] = None):
hook_run_time = time.time()

args = parse_args(argv)

init_logger(args.verbose)
Expand All @@ -100,6 +103,9 @@ async def main_async(argv: Optional[List[str]] = None):
run_result = await hook.run()
logger.info("%s", run_result.run_summary())

hook_run_time = time.time() - hook_run_time
logger.debug("Hook took %s seconds", hook_run_time)

if not run_result.run_success():
logger.info("Hook '%s' did not successfully run.", hook)
return 1
Expand Down
48 changes: 15 additions & 33 deletions src/hooks/presidio/path_filter.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,28 @@
import git
import re

from anyio import open_file, Path

from enum import Enum
from typing import List


from src.hooks.config import (
DEFAULT_FILE_TYPES,
LOGGER,
PRESIDIO_EXCLUSIONS_FILE_PATH,
)

logger = LOGGER


class PathScanStatus(Enum):
SKIPPED = 1
EXCLUDED = 2
PASSED = 3
FAILED = 4


class PathFilter:
LINE_BY_LINE_FILE_EXTENSIONS = [".csv"]

def __init__(
self,
verbose: bool = False,
) -> None:
self.verbose = verbose

def _is_path_excluded(self, path: str, exclusions: List[re.Pattern[str]]):
for exclusion in exclusions:
match = exclusion.search(path)
Expand All @@ -34,17 +33,17 @@ def _is_path_excluded(self, path: str, exclusions: List[re.Pattern[str]]):
logger.debug("The path %s was not found in any exclusion regexes", path)
return False

async def _should_scan_path(self, path: str, exclusions: List[re.Pattern[str]]):
async def _check_is_path_invalid(self, path: str, exclusions: List[re.Pattern[str]]):
if self._is_path_excluded(path, exclusions):
return False
return PathScanStatus.EXCLUDED

if not await Path(path).exists():
logger.debug("Path %s does not exist", path)
return False
return PathScanStatus.SKIPPED

if not await Path(path).is_file():
logger.debug("Path %s is a directory, presidio can only scan files", path)
return False
return PathScanStatus.SKIPPED

file_extension = Path(path).suffix
if file_extension not in DEFAULT_FILE_TYPES:
Expand All @@ -53,15 +52,15 @@ async def _should_scan_path(self, path: str, exclusions: List[re.Pattern[str]]):
path,
DEFAULT_FILE_TYPES,
)
return False
return PathScanStatus.SKIPPED

logger.debug(
"Path %s is valid and should be scanned",
path,
)
return True
return None

async def _get_exclusions(self, exclusions_file: str):
async def _get_exclusions(self, exclusions_file: str) -> List[re.Pattern[str]]:
exclusions = []

if not await Path(exclusions_file).exists():
Expand All @@ -80,20 +79,3 @@ async def _get_exclusions(self, exclusions_file: str):
)
raise
return exclusions

async def get_paths_to_scan(
self,
paths: List[str],
github_action: bool = False,
):
if github_action:
repo = git.Repo(paths[0])
logger.debug("Scanning files in git repository %s", repo)
paths = [entry.abspath for entry in repo.tree().traverse()]

exclusions = await self._get_exclusions(exclusions_file=PRESIDIO_EXCLUSIONS_FILE_PATH)
logger.debug("Exclusions file loaded with exclusions %s", exclusions)

for path in paths:
if await self._should_scan_path(path, exclusions):
yield path
95 changes: 68 additions & 27 deletions src/hooks/presidio/scanner.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import asyncio
import json
import re

from io import StringIO
from anyio import open_file
import json
from pathlib import Path
from typing import List

Expand All @@ -12,10 +15,11 @@
DEFAULT_LANGUAGE_CODE,
LOGGER,
NLP_CONFIG_FILE,
PRESIDIO_EXCLUSIONS_FILE_PATH,
RECOGNIZER_CONFIG_FILE,
)
from src.hooks.presidio.spacy_post_processing_recognizer import SpacyPostProcessingRecognizer
from src.hooks.presidio.path_filter import PathFilter
from src.hooks.presidio.path_filter import PathFilter, PathScanStatus

logger = LOGGER

Expand All @@ -30,38 +34,65 @@ def __repr__(self) -> str:


class PathScanResult:
def __init__(self, path: str, results: List[PersonalDataDetection]) -> None:
def __init__(self, path: str, status: PathScanStatus, results: List[PersonalDataDetection] = []) -> None:
self.path = path
self.status = status
self.results = results


class PresidioScanResult:
def __init__(
self,
) -> None:
self.valid_path_scans: List[PathScanResult] = []
self.invalid_path_scans: List[PathScanResult] = []
def __init__(self, results: List[PathScanResult] = []) -> None:
self.paths_without_personal_data: List[PathScanResult] = []
self.paths_containing_personal_data: List[PathScanResult] = []
self.paths_skipped: List[PathScanResult] = []
self.paths_excluded: List[PathScanResult] = []
self.add_path_scan_results(results)

def add_path_scan_results(self, scan_results: List[PathScanResult]):
for scan_result in scan_results:
self.add_path_scan_result(scan_result)

def add_path_scan_result(self, scan_result: PathScanResult):
if scan_result.status == PathScanStatus.EXCLUDED:
self.paths_excluded.append(scan_result)

def add_scan_result(self, scan_result: PathScanResult):
if not scan_result.results or len(scan_result.results) == 0:
self.valid_path_scans.append(scan_result)
else:
self.invalid_path_scans.append(scan_result)
if scan_result.status == PathScanStatus.FAILED:
self.paths_containing_personal_data.append(scan_result)

if scan_result.status == PathScanStatus.PASSED:
self.paths_without_personal_data.append(scan_result)

if scan_result.status == PathScanStatus.SKIPPED:
self.paths_skipped.append(scan_result)

def __str__(self) -> str:
with StringIO() as output_buffer:
output_buffer.write("--------PERSONAL DATA SCAN SUMMARY--------")
if self.valid_path_scans:
if self.paths_excluded:
output_buffer.write("\n\nFILES EXCLUDED\n")
excluded_paths_table = PrettyTable(["Path"])
for excluded_path in self.paths_excluded:
excluded_paths_table.add_row([excluded_path.path])
output_buffer.write(str(excluded_paths_table))

if self.paths_skipped:
output_buffer.write("\n\nFILES SKIPPED\n")
skipped_paths_table = PrettyTable(["Path"])
for skipped_path in self.paths_skipped:
skipped_paths_table.add_row([skipped_path.path])
output_buffer.write(str(skipped_paths_table))

if self.paths_without_personal_data:
output_buffer.write("\n\nFILES WITHOUT PERSONAL DATA\n")
paths_without_issues_table = PrettyTable(["Path"])
for valid_path in self.valid_path_scans:
for valid_path in self.paths_without_personal_data:
paths_without_issues_table.add_row([valid_path.path])
output_buffer.write(str(paths_without_issues_table))

if self.invalid_path_scans:
if self.paths_containing_personal_data:
output_buffer.write("\n\nFILES CONTAINING PERSONAL DATA\n")

for invalid_path_scan in self.invalid_path_scans:
for invalid_path_scan in self.paths_containing_personal_data:
output_buffer.write(f"\n{invalid_path_scan.path}\n")
table = PrettyTable(["Type", "Value", "Score"])
for invalid_path in invalid_path_scan.results:
Expand Down Expand Up @@ -117,11 +148,14 @@ def _scan_content(self, analyzer: AnalyzerEngine, entities: List[str], content:
return [PersonalDataDetection(result, content[result.start : result.end]) for result in results]

async def _scan_path(
self,
analyzer: AnalyzerEngine,
entities: List[str],
file_path: str,
self, analyzer: AnalyzerEngine, entities: List[str], file_path: str, exclusions: List[re.Pattern[str]]
) -> PathScanResult:
sources = PathFilter()

invalid_check_result = await sources._check_is_path_invalid(file_path, exclusions)
if invalid_check_result is not None:
return PathScanResult(file_path, invalid_check_result)

file_extension = Path(file_path).suffix.lower()
async with await open_file(file_path, "r", encoding="utf-8") as fs:
results: List[PersonalDataDetection] = []
Expand All @@ -133,23 +167,30 @@ async def _scan_path(
contents = await fs.read()
logger.debug("Scanning file %s by reading all contents", file_path)
results.extend(self._scan_content(analyzer, entities, contents))

return PathScanResult(
file_path,
status=PathScanStatus.PASSED if len(results) == 0 else PathScanStatus.FAILED,
results=results,
)

async def scan(
self,
github_action: bool = False,
) -> PresidioScanResult:
sources = PathFilter(self.verbose)
sources = PathFilter()

analyzer = self._get_analyzer()
entities = analyzer.get_supported_entities()

scan_result = PresidioScanResult()
exclusions = await sources._get_exclusions(exclusions_file=PRESIDIO_EXCLUSIONS_FILE_PATH)
logger.debug("Personal data exclusions file loaded with exclusions %s", exclusions)

tasks: list[asyncio.Task] = []
async with asyncio.TaskGroup() as tg:
for path in self.paths:
tasks.append(
tg.create_task(self._scan_path(analyzer, entities, path, exclusions)),
)
scan_result = PresidioScanResult(results=[task.result() for task in tasks])

async for path in sources.get_paths_to_scan(self.paths, github_action):
path_scan_result = await self._scan_path(analyzer, entities, path)
scan_result.add_scan_result(path_scan_result)
return scan_result
26 changes: 18 additions & 8 deletions src/hooks/run_security_scan.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import asyncio
import aiohttp
import git

from pathlib import Path
from typing import List


from src.hooks.config import (
LOGGER,
PERSONAL_DATA_SCAN,
Expand Down Expand Up @@ -34,7 +36,10 @@ def run_success(self) -> bool:
if self.trufflehog_scan_result.detected_keys is not None:
is_success = False
if self.presidio_scan_result:
if self.presidio_scan_result.invalid_path_scans and len(self.presidio_scan_result.invalid_path_scans) > 0:
if (
self.presidio_scan_result.paths_containing_personal_data
and len(self.presidio_scan_result.paths_containing_personal_data) > 0
):
is_success = False
return is_success

Expand Down Expand Up @@ -135,26 +140,30 @@ async def run_security_scan(self) -> TrufflehogScanResult:
)

async def run_personal_scan(self) -> PresidioScanResult:
paths_to_scan = self.paths
if self.github_action:
repo = git.Repo(self.paths[0])
logger.debug("Scanning files in git repository %s", repo)
paths_to_scan = [entry.abspath for entry in repo.tree().traverse()]

return await PresidioScanner(
self.verbose,
self.paths,
).scan(self.github_action)

# TODO
# File skipped due to file extension
# File excluded from scan
paths_to_scan,
).scan()

async def run(self) -> RunSecurityScanResult:
security_scan_task = None
personal_data_scan_task = None

async with asyncio.TaskGroup() as tg:
if SECURITY_SCAN not in self.excluded_scans:
logger.debug("Running security scan")
security_scan_task = tg.create_task(self.run_security_scan())
else:
logger.debug("Security scan is excluded")

if PERSONAL_DATA_SCAN not in self.excluded_scans:
logger.debug("Running personal data scan")
personal_data_scan_task = tg.create_task(self.run_personal_scan())
else:
logger.debug("Personal data scan is excluded")
Expand All @@ -163,5 +172,6 @@ async def run(self) -> RunSecurityScanResult:
personal_data_scan_result = personal_data_scan_task.result() if personal_data_scan_task else None

return RunSecurityScanResult(
trufflehog_scan_result=security_scan_result, presidio_scan_result=personal_data_scan_result
trufflehog_scan_result=security_scan_result,
presidio_scan_result=personal_data_scan_result,
)
2 changes: 1 addition & 1 deletion src/hooks/trufflehog/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async def _get_args(
trufflehog_cmd_args.append("--since-commit=main")

if await Path(TRUFFLEHOG_EXCLUSIONS_FILE_PATH).exists():
logger.debug("This repo has an exclusions file, adding this file to the trufflehog runner")
logger.debug("Security scanner exclusions file loaded")
trufflehog_cmd_args.append(f"--exclude-paths={TRUFFLEHOG_EXCLUSIONS_FILE_PATH}")

trufflehog_detectors = ",".join(allowed_vendor_codes)
Expand Down
Loading
Loading