diff --git a/sros2/sros2/_utilities.py b/sros2/sros2/_utilities.py index f35442f4..8a96c87f 100644 --- a/sros2/sros2/_utilities.py +++ b/sros2/sros2/_utilities.py @@ -29,6 +29,34 @@ _KEYSTORE_DIR_ENV = 'ROS_SECURITY_KEYSTORE' +def create_signed_cert( + keystore_ca_cert_path: pathlib.Path, + keystore_ca_key_path: pathlib.Path, + identity: str, + cert_path: pathlib.Path, + key_path: pathlib.Path, + **kwargs): + # Load the CA cert and key from disk + ca_cert = load_cert(keystore_ca_cert_path) + + with open(keystore_ca_key_path, 'rb') as f: + ca_key = serialization.load_pem_private_key(f.read(), None, cryptography_backend()) + + ca_pub_key = ca_cert.public_key() + # Calculate the key ID from the issuer's public key + key_id = x509.SubjectKeyIdentifier.from_public_key(ca_pub_key).digest + + cert, private_key = build_key_and_cert( + x509.Name([x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, identity)]), + issuer_name=ca_cert.subject, + ca_key=ca_key, + key_id=key_id, + **kwargs) + + write_key(private_key, key_path) + write_cert(cert, cert_path, chain_ca=[ca_cert]) # Store the full chain aswell + + def create_symlink(*, src: pathlib.Path, dst: pathlib.Path): if dst.exists(): # Don't do more work than we need to @@ -66,7 +94,8 @@ def create_smime_signed_file(cert_path, key_path, unsigned_file_path, signed_fil f.write(_sign_bytes(cert, private_key, content)) -def build_key_and_cert(subject_name, *, ca=False, ca_key=None, issuer_name=''): +def build_key_and_cert(subject_name, *, ca=False, ca_key=None, issuer_name='', + key_id=None, path_length=None, duration_days=3650): if not issuer_name: issuer_name = subject_name @@ -75,11 +104,11 @@ def build_key_and_cert(subject_name, *, ca=False, ca_key=None, issuer_name=''): if not ca_key: ca_key = private_key - if ca: - extension = x509.BasicConstraints(ca=True, path_length=1) - else: - extension = x509.BasicConstraints(ca=False, path_length=None) + # Verify path_length is >= 1 if creating a CA + if ca and not path_length: + path_length = 1 + extension = x509.BasicConstraints(ca=ca, path_length=path_length) utcnow = datetime.datetime.utcnow() builder = x509.CertificateBuilder( ).issuer_name( @@ -92,8 +121,7 @@ def build_key_and_cert(subject_name, *, ca=False, ca_key=None, issuer_name=''): # https://github.com/ros2/ci/pull/436#issuecomment-624874296 utcnow - datetime.timedelta(days=1) ).not_valid_after( - # TODO: This should not be hard-coded - utcnow + datetime.timedelta(days=3650) + utcnow + datetime.timedelta(days=duration_days) ).public_key( private_key.public_key() ).subject_name( @@ -101,6 +129,22 @@ def build_key_and_cert(subject_name, *, ca=False, ca_key=None, issuer_name=''): ).add_extension( extension, critical=ca ) + # Add extension for when it's not a self signed certificate + if issuer_name != subject_name and ca: + builder = builder.add_extension( + x509.KeyUsage( + key_agreement=False, + digital_signature=True, + key_encipherment=False, + key_cert_sign=True, + crl_sign=False, + content_commitment=False, + data_encipherment=False, + encipher_only=False, + decipher_only=False + ), + critical=True + ) cert = builder.sign(ca_key, hashes.SHA256(), cryptography_backend()) return (cert, private_key) @@ -121,9 +165,15 @@ def write_key( encryption_algorithm=encryption_algorithm)) -def write_cert(cert, cert_path: pathlib.Path, *, encoding=serialization.Encoding.PEM): +def write_cert(cert, cert_path: pathlib.Path, *, + chain_ca=None, + encoding=serialization.Encoding.PEM): with open(cert_path, 'wb') as f: f.write(cert.public_bytes(encoding=encoding)) + # Write the full chain with the certificate + if chain_ca: + for ca in chain_ca: + f.write(ca.public_bytes(encoding=encoding)) def load_cert(cert_path: pathlib.Path): diff --git a/sros2/sros2/keystore/_enclave.py b/sros2/sros2/keystore/_enclave.py index 2a57b553..ee5511ee 100644 --- a/sros2/sros2/keystore/_enclave.py +++ b/sros2/sros2/keystore/_enclave.py @@ -66,6 +66,9 @@ def create_enclave(keystore_path: pathlib.Path, identity: str) -> None: keystore_identity_ca_key_path = _keystore.get_keystore_private_dir( keystore_path).joinpath('identity_ca.key.pem') + # The root CA that signed the identity_ca.cert.pem + root_ca = _keystore.get_keystore_public_dir( + keystore_path).joinpath('ca.cert.pem') # Only create certs/keys if they don't already exist cert_path = key_dir.joinpath('cert.pem') key_path = key_dir.joinpath('key.pem') @@ -75,7 +78,8 @@ def create_enclave(keystore_path: pathlib.Path, identity: str) -> None: keystore_identity_ca_key_path, identity, cert_path, - key_path + key_path, + root_ca=root_ca ) # create a wildcard permissions file for this node which can be overridden @@ -132,7 +136,8 @@ def _create_key_and_cert( keystore_ca_key_path: pathlib.Path, identity: str, cert_path: pathlib.Path, - key_path: pathlib.Path): + key_path: pathlib.Path, + root_ca: pathlib.Path): # Load the CA cert and key from disk ca_cert = _utilities.load_cert(keystore_ca_cert_path) diff --git a/sros2/sros2/keystore/_keystore.py b/sros2/sros2/keystore/_keystore.py index c29c05d7..316f6423 100644 --- a/sros2/sros2/keystore/_keystore.py +++ b/sros2/sros2/keystore/_keystore.py @@ -30,7 +30,7 @@ _DEFAULT_COMMON_NAME = 'sros2CA' -def create_keystore(keystore_path: pathlib.Path) -> None: +def create_keystore(keystore_path: pathlib.Path, split_CA=False) -> None: if is_valid_keystore(keystore_path): raise sros2.errors.KeystoreExistsError(keystore_path) @@ -63,13 +63,34 @@ def create_keystore(keystore_path: pathlib.Path) -> None: # Create new CA if one doesn't already exist if not all(x.is_file() for x in required_files): - _create_ca_key_cert(keystore_ca_key_path, keystore_ca_cert_path) - for path in (keystore_permissions_ca_cert_path, keystore_identity_ca_cert_path): - _utilities.create_symlink(src=pathlib.Path('ca.cert.pem'), dst=path) - - for path in (keystore_permissions_ca_key_path, keystore_identity_ca_key_path): - _utilities.create_symlink(src=pathlib.Path('ca.key.pem'), dst=path) + if split_CA: + _create_ca_key_cert(keystore_ca_key_path, keystore_ca_cert_path, path_length=2) + # Create independent Permissions and Identity CA + _utilities.create_signed_cert(keystore_ca_cert_path, + keystore_ca_key_path, + 'IdentityCA', + keystore_identity_ca_cert_path, + keystore_identity_ca_key_path, + ca=True, + path_length=1, + duration_days=5) + _utilities.create_signed_cert(keystore_ca_cert_path, + keystore_ca_key_path, + 'PermissionsCA', + keystore_permissions_ca_cert_path, + keystore_permissions_ca_key_path, + ca=True, + path_length=2, + duration_days=5) + else: + _create_ca_key_cert(keystore_ca_key_path, keystore_ca_cert_path) + # Use the root CA as Permissions and Identity CA + for path in (keystore_permissions_ca_cert_path, keystore_identity_ca_cert_path): + _utilities.create_symlink(src=pathlib.Path('ca.cert.pem'), dst=path) + + for path in (keystore_permissions_ca_key_path, keystore_identity_ca_key_path): + _utilities.create_symlink(src=pathlib.Path('ca.key.pem'), dst=path) # Create governance file if it doesn't already exist gov_path = keystore_path.joinpath(_KS_ENCLAVES, 'governance.xml') @@ -108,10 +129,11 @@ def get_keystore_private_dir(keystore_path: pathlib.Path) -> pathlib.Path: return keystore_path.joinpath(_KS_PRIVATE) -def _create_ca_key_cert(ca_key_out_path, ca_cert_out_path): +def _create_ca_key_cert(ca_key_out_path, ca_cert_out_path, + name=_DEFAULT_COMMON_NAME, path_length=1): cert, private_key = _utilities.build_key_and_cert( - x509.Name([x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, _DEFAULT_COMMON_NAME)]), - ca=True) + x509.Name([x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, name)]), + ca=True, path_length=path_length) _utilities.write_key(private_key, ca_key_out_path) _utilities.write_cert(cert, ca_cert_out_path) diff --git a/sros2/sros2/verb/create_keystore.py b/sros2/sros2/verb/create_keystore.py index 82744a32..09b9bea1 100644 --- a/sros2/sros2/verb/create_keystore.py +++ b/sros2/sros2/verb/create_keystore.py @@ -27,11 +27,14 @@ class CreateKeystoreVerb(VerbExtension): def add_arguments(self, parser, cli_name) -> None: arg = parser.add_argument('ROOT', type=pathlib.Path, help='root path of keystore') + arg = parser.add_argument('--split-CA', action='store_true', default=False, + help='splits the Certificate Authority structure to \ + use multiple CAs instead of a single self-signed root CA') arg.completer = DirectoriesCompleter() def main(self, *, args) -> int: try: - sros2.keystore.create_keystore(args.ROOT) + sros2.keystore.create_keystore(args.ROOT, args.split_CA) except sros2.errors.SROS2Error as e: print(f'Unable to create keystore: {str(e)}', file=sys.stderr) return 1