diff --git a/keepercommander/commands/security_audit.py b/keepercommander/commands/security_audit.py index 7b0547ab6..e4c3b2320 100644 --- a/keepercommander/commands/security_audit.py +++ b/keepercommander/commands/security_audit.py @@ -1,7 +1,9 @@ import argparse import base64 +import datetime import json import logging +import os from json import JSONDecodeError from typing import Dict, List, Optional, Any @@ -9,6 +11,7 @@ from .helpers.enterprise import get_enterprise_key, try_enterprise_decrypt from .. import api, crypto, utils +from ..error import CommandError from ..breachwatch import BreachWatch from .base import GroupCommand, field_to_title, dump_report_data, report_output_parser from .enterprise_common import EnterpriseCommand @@ -30,6 +33,8 @@ def register_command_info(aliases, command_info): report_parser = argparse.ArgumentParser(prog='security-audit-report', description='Run a security audit report.', parents=[report_output_parser]) +report_parser.add_argument('--siem', dest='siem', action='store_true', + help='output in SIEM-ready NDJSON format (one event per line)') report_parser.add_argument('--syntax-help', dest='syntax_help', action='store_true', help='display help') node_filter_help = 'name(s) or UID(s) of node(s) to filter results of the report by' report_parser.add_argument('-n', '--node', action='append', help=node_filter_help) @@ -206,6 +211,12 @@ def execute(self, params, **kwargs): logging.info(security_audit_report_description) return + # Validate --siem flag combinations before expensive API calls + if kwargs.get('siem') and kwargs.get('record_details'): + raise CommandError('security-audit', '--siem and --record-details cannot be used together') + if kwargs.get('siem') and kwargs.get('format') and kwargs['format'] != 'table': + raise CommandError('security-audit', '--siem produces NDJSON output exclusively and cannot be combined with --format') + self.enterprise_private_rsa_key = None self._node_map = None # Reset node cache for correct MC context self.params = params @@ -492,12 +503,17 @@ def decrypt_security_data(sec_data, key_type): fields = ('email', 'name', 'sync_pending', 'at_risk', 'passed', 'ignored') if show_breachwatch else \ ('email', 'name', 'sync_pending', 'weak', 'fair', 'medium', 'strong', 'reused', 'unique', 'securityScore', 'twoFactorChannel', 'node') - field_descriptions = fields + report_title = f'Security Audit Report{" (BreachWatch)" if show_breachwatch else ""}' + + # SIEM-ready NDJSON output + if kwargs.get('siem'): + return self._export_siem(params, rows, fields, show_breachwatch, out, report_title) + + field_descriptions = fields if fmt == 'table': field_descriptions = (field_to_title(x) for x in fields) - report_title = f'Security Audit Report{" (BreachWatch)" if show_breachwatch else ""}' table = [] for raw in rows: row = [] @@ -506,6 +522,70 @@ def decrypt_security_data(sec_data, key_type): table.append(row) return dump_report_data(table, field_descriptions, fmt=fmt, filename=out, title=report_title) + @staticmethod + def _export_siem(params, rows, fields, show_breachwatch, filename, title): + """Export security audit as NDJSON (one JSON event per line). + + Each line is a self-contained JSON object that SIEMs can ingest + independently. Compatible with Splunk HEC, Elastic Filebeat, + Datadog log pipelines, and any NDJSON consumer. + """ + now = datetime.datetime.now(tz=datetime.timezone.utc).isoformat(timespec='seconds') + server = params.server if hasattr(params, 'server') else '' + ndjson_lines = [] + + # Fields containing PII — mask to avoid clear-text sensitive data in SIEM output + _sensitive_fields = frozenset({'email', 'name', 'node'}) + + def _mask_pii(field_name, value): + if field_name not in _sensitive_fields or not value or not isinstance(value, str): + return value + if field_name == 'email' and '@' in value: + local, domain = value.rsplit('@', 1) + return f'{local[0]}***@{domain}' if local else f'***@{domain}' + return f'{value[0]}***' if len(value) > 1 else '***' + + for raw in rows: + user_data = {f: _mask_pii(f, raw.get(f)) for f in fields} + + # Collect risk factors from the data — no hardcoded thresholds + risk_factors = [] + if not show_breachwatch: + if raw.get('weak', 0) > 0: + risk_factors.append('weak_passwords') + if raw.get('reused', 0) > 0: + risk_factors.append('reused_passwords') + two_fa = raw.get('twoFactorChannel', '') + if not two_fa or two_fa == 'Off': + risk_factors.append('no_2fa') + else: + if raw.get('at_risk', 0) > 0: + risk_factors.append('breach_exposure') + + event = { + 'event_type': 'keeper.security_audit', + 'timestamp': now, + 'source': server, + 'user': user_data, + 'risk_factors': risk_factors, + } + if not show_breachwatch: + event['security_score'] = raw.get('securityScore', 0) + ndjson_lines.append(json.dumps(event, default=str)) + + report = ('\n'.join(ndjson_lines) + '\n') if ndjson_lines else '' + + if filename: + if not os.path.splitext(filename)[1]: + filename += '.ndjson' + logging.info('SIEM report path: %s', os.path.abspath(filename)) + fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) + os.fchmod(fd, 0o600) + with os.fdopen(fd, 'w') as f: + f.write(report) + else: + return report + def get_updated_security_report_row(self, sr, rsa_key, ec_key, last_saved_data): # type: (APIRequest_pb2.SecurityReport, rsa.RSAPrivateKey, ec.EllipticCurvePrivateKey, Dict[str, int]) -> Dict[str, int] diff --git a/unit-tests/test_siem_export.py b/unit-tests/test_siem_export.py new file mode 100644 index 000000000..a4060f522 --- /dev/null +++ b/unit-tests/test_siem_export.py @@ -0,0 +1,326 @@ +"""Tests for PR #1876: --siem NDJSON export for security-audit report.""" + +import json +import os +import stat +import tempfile +import unittest +from unittest.mock import MagicMock + +from keepercommander.commands.security_audit import SecurityAuditReportCommand +from keepercommander.error import CommandError + + +class TestSiemExport(unittest.TestCase): + """Test the _export_siem static method.""" + + def _make_params(self, server='https://vault.example.com'): + params = MagicMock() + params.server = server + return params + + def _sample_rows(self): + return [ + { + 'email': 'alice@example.com', + 'name': 'Alice Smith', + 'sync_pending': 0, + 'weak': 2, + 'fair': 1, + 'medium': 3, + 'strong': 10, + 'reused': 1, + 'unique': 15, + 'securityScore': 72, + 'twoFactorChannel': 'Off', + 'node': 'Root', + }, + { + 'email': 'bob@example.com', + 'name': 'Bob Jones', + 'sync_pending': 0, + 'weak': 0, + 'fair': 0, + 'medium': 0, + 'strong': 5, + 'reused': 0, + 'unique': 5, + 'securityScore': 100, + 'twoFactorChannel': 'TOTP', + 'node': 'Root', + }, + ] + + def _fields(self): + return ('email', 'name', 'sync_pending', 'weak', 'fair', 'medium', + 'strong', 'reused', 'unique', 'securityScore', + 'twoFactorChannel', 'node') + + def test_ndjson_output_format(self): + """Each line should be valid JSON.""" + result = SecurityAuditReportCommand._export_siem( + self._make_params(), self._sample_rows(), self._fields(), + show_breachwatch=False, filename=None, title='Test' + ) + lines = result.strip().split('\n') + self.assertEqual(len(lines), 2) + for line in lines: + event = json.loads(line) + self.assertEqual(event['event_type'], 'keeper.security_audit') + self.assertIn('timestamp', event) + self.assertIn('user', event) + self.assertIn('risk_factors', event) + + def test_empty_rows_produces_empty_string(self): + """Empty rows should produce empty string, not a blank line.""" + result = SecurityAuditReportCommand._export_siem( + self._make_params(), [], self._fields(), + show_breachwatch=False, filename=None, title='Test' + ) + self.assertEqual(result, '') + + def test_trailing_newline(self): + """Non-empty output should end with exactly one newline.""" + result = SecurityAuditReportCommand._export_siem( + self._make_params(), self._sample_rows(), self._fields(), + show_breachwatch=False, filename=None, title='Test' + ) + self.assertTrue(result.endswith('\n')) + self.assertFalse(result.endswith('\n\n')) + + def test_email_masked(self): + """Email in SIEM output should be masked.""" + result = SecurityAuditReportCommand._export_siem( + self._make_params(), self._sample_rows(), self._fields(), + show_breachwatch=False, filename=None, title='Test' + ) + event = json.loads(result.strip().split('\n')[0]) + email = event['user']['email'] + self.assertNotEqual(email, 'alice@example.com') + self.assertIn('@example.com', email) + self.assertIn('***', email) + + def test_name_masked(self): + """Name in SIEM output should be masked.""" + result = SecurityAuditReportCommand._export_siem( + self._make_params(), self._sample_rows(), self._fields(), + show_breachwatch=False, filename=None, title='Test' + ) + event = json.loads(result.strip().split('\n')[0]) + name = event['user']['name'] + self.assertNotEqual(name, 'Alice Smith') + self.assertIn('***', name) + + def test_risk_factors_weak_passwords(self): + """Users with weak > 0 should have weak_passwords risk factor.""" + result = SecurityAuditReportCommand._export_siem( + self._make_params(), self._sample_rows(), self._fields(), + show_breachwatch=False, filename=None, title='Test' + ) + alice = json.loads(result.strip().split('\n')[0]) + self.assertIn('weak_passwords', alice['risk_factors']) + + def test_risk_factors_reused_passwords(self): + """Users with reused > 0 should have reused_passwords risk factor.""" + result = SecurityAuditReportCommand._export_siem( + self._make_params(), self._sample_rows(), self._fields(), + show_breachwatch=False, filename=None, title='Test' + ) + alice = json.loads(result.strip().split('\n')[0]) + self.assertIn('reused_passwords', alice['risk_factors']) + + def test_risk_factors_no_2fa(self): + """Users with 2FA off should have no_2fa risk factor.""" + result = SecurityAuditReportCommand._export_siem( + self._make_params(), self._sample_rows(), self._fields(), + show_breachwatch=False, filename=None, title='Test' + ) + alice = json.loads(result.strip().split('\n')[0]) + self.assertIn('no_2fa', alice['risk_factors']) + + def test_clean_user_no_risk_factors(self): + """Users with strong passwords and 2FA should have no risk factors.""" + result = SecurityAuditReportCommand._export_siem( + self._make_params(), self._sample_rows(), self._fields(), + show_breachwatch=False, filename=None, title='Test' + ) + bob = json.loads(result.strip().split('\n')[1]) + self.assertEqual(bob['risk_factors'], []) + self.assertEqual(bob['security_score'], 100) + + def test_security_score_included(self): + """Security score should be in the event when not BreachWatch.""" + result = SecurityAuditReportCommand._export_siem( + self._make_params(), self._sample_rows(), self._fields(), + show_breachwatch=False, filename=None, title='Test' + ) + event = json.loads(result.strip().split('\n')[0]) + self.assertIn('security_score', event) + self.assertEqual(event['security_score'], 72) + + def test_breachwatch_mode(self): + """BreachWatch mode should use at_risk for risk factors.""" + bw_fields = ('email', 'name', 'sync_pending', 'at_risk', 'passed', 'ignored') + bw_rows = [{'email': 'user@test.com', 'name': 'User', 'sync_pending': 0, + 'at_risk': 3, 'passed': 10, 'ignored': 1}] + result = SecurityAuditReportCommand._export_siem( + self._make_params(), bw_rows, bw_fields, + show_breachwatch=True, filename=None, title='Test' + ) + event = json.loads(result.strip()) + self.assertIn('breach_exposure', event['risk_factors']) + self.assertNotIn('security_score', event) + + def test_file_output_permissions(self): + """File output should have 0600 permissions.""" + with tempfile.TemporaryDirectory() as tmpdir: + filepath = os.path.join(tmpdir, 'report') + SecurityAuditReportCommand._export_siem( + self._make_params(), self._sample_rows(), self._fields(), + show_breachwatch=False, filename=filepath, title='Test' + ) + actual_path = filepath + '.ndjson' + self.assertTrue(os.path.isfile(actual_path)) + mode = os.stat(actual_path).st_mode + self.assertEqual(stat.S_IMODE(mode), 0o600) + + def test_file_output_auto_extension(self): + """Files without extension should get .ndjson added.""" + with tempfile.TemporaryDirectory() as tmpdir: + filepath = os.path.join(tmpdir, 'report') + SecurityAuditReportCommand._export_siem( + self._make_params(), self._sample_rows(), self._fields(), + show_breachwatch=False, filename=filepath, title='Test' + ) + self.assertTrue(os.path.isfile(filepath + '.ndjson')) + + def test_file_output_keeps_existing_extension(self): + """Files with extension should keep it.""" + with tempfile.TemporaryDirectory() as tmpdir: + filepath = os.path.join(tmpdir, 'report.json') + SecurityAuditReportCommand._export_siem( + self._make_params(), self._sample_rows(), self._fields(), + show_breachwatch=False, filename=filepath, title='Test' + ) + self.assertTrue(os.path.isfile(filepath)) + + def test_file_content_valid_ndjson(self): + """File output should contain valid NDJSON.""" + with tempfile.TemporaryDirectory() as tmpdir: + filepath = os.path.join(tmpdir, 'report.ndjson') + SecurityAuditReportCommand._export_siem( + self._make_params(), self._sample_rows(), self._fields(), + show_breachwatch=False, filename=filepath, title='Test' + ) + with open(filepath) as f: + content = f.read() + lines = content.strip().split('\n') + self.assertEqual(len(lines), 2) + for line in lines: + json.loads(line) # must not raise + + def test_source_field_from_params(self): + """Source field should come from params.server.""" + result = SecurityAuditReportCommand._export_siem( + self._make_params('https://my.keeper.io'), self._sample_rows(), + self._fields(), show_breachwatch=False, filename=None, title='Test' + ) + event = json.loads(result.strip().split('\n')[0]) + self.assertEqual(event['source'], 'https://my.keeper.io') + + +class TestSiemFlagValidation(unittest.TestCase): + """Test that incompatible flag combinations are rejected.""" + + def test_siem_with_record_details_raises(self): + """--siem and --record-details should be rejected.""" + # We test the validation logic directly by checking the code path. + # The actual execute() requires full enterprise params, but the + # validation is at the top of the method after save_report. + # We verify via source inspection that the check exists. + import inspect + source = inspect.getsource(SecurityAuditReportCommand.execute) + self.assertIn("--siem and --record-details cannot be used together", source) + + def test_siem_with_format_raises(self): + """--siem and --format should be rejected.""" + import inspect + source = inspect.getsource(SecurityAuditReportCommand.execute) + self.assertIn("--siem produces NDJSON output exclusively", source) + + def test_siem_flag_registered(self): + """--siem flag should be registered in the parser.""" + from keepercommander.commands.security_audit import report_parser + actions = {a.dest: a for a in report_parser._actions} + self.assertIn('siem', actions) + self.assertEqual(actions['siem'].const, True) + + +class TestPiiMasking(unittest.TestCase): + """Test edge cases for the PII masking function.""" + + def _mask(self, field_name, value): + """Call _mask_pii indirectly through _export_siem.""" + rows = [{field_name: value, 'sync_pending': 0, 'weak': 0, 'fair': 0, + 'medium': 0, 'strong': 0, 'reused': 0, 'unique': 0, + 'securityScore': 100, 'twoFactorChannel': 'TOTP', 'node': 'Root'}] + if field_name == 'email': + rows[0]['name'] = 'Test' + else: + rows[0]['email'] = 'test@test.com' + fields = ('email', 'name', 'sync_pending', 'weak', 'fair', 'medium', + 'strong', 'reused', 'unique', 'securityScore', + 'twoFactorChannel', 'node') + result = SecurityAuditReportCommand._export_siem( + MagicMock(server='test'), rows, fields, + show_breachwatch=False, filename=None, title='Test' + ) + event = json.loads(result.strip()) + return event['user'][field_name] + + def test_email_single_char_local(self): + """Single character email local part should still mask.""" + masked = self._mask('email', 'a@example.com') + self.assertIn('***', masked) + self.assertIn('@example.com', masked) + + def test_email_no_at_sign(self): + """Malformed email without @ should still mask.""" + masked = self._mask('email', 'noemail') + self.assertIn('***', masked) + self.assertNotEqual(masked, 'noemail') + + def test_name_single_char(self): + """Single character name should return '***'.""" + masked = self._mask('name', 'J') + self.assertEqual(masked, '***') + + def test_empty_email_passthrough(self): + """Empty/falsy email should pass through.""" + masked = self._mask('email', '') + self.assertEqual(masked, '') + + def test_none_name_passthrough(self): + """None name should pass through.""" + masked = self._mask('name', None) + self.assertIsNone(masked) + + def test_numeric_fields_not_masked(self): + """Non-sensitive fields should not be masked.""" + rows = [{'email': 'test@test.com', 'name': 'Test', 'sync_pending': 0, + 'weak': 5, 'fair': 0, 'medium': 0, 'strong': 0, 'reused': 0, + 'unique': 0, 'securityScore': 50, 'twoFactorChannel': 'Off', 'node': 'Root'}] + fields = ('email', 'name', 'sync_pending', 'weak', 'fair', 'medium', + 'strong', 'reused', 'unique', 'securityScore', + 'twoFactorChannel', 'node') + result = SecurityAuditReportCommand._export_siem( + MagicMock(server='test'), rows, fields, + show_breachwatch=False, filename=None, title='Test' + ) + event = json.loads(result.strip()) + self.assertEqual(event['user']['weak'], 5) + self.assertEqual(event['user']['securityScore'], 50) + + +if __name__ == '__main__': + unittest.main()