Source code for certica.cert_manager

"""
Certificate Manager - Handles certificate signing operations
"""

import os
import subprocess
import tempfile
from pathlib import Path
from typing import List, Dict


[docs] class CertManager: """Manages certificate signing operations"""
[docs] def __init__(self, base_dir: str = "output"): self.base_dir = Path(base_dir).resolve() self.certs_dir = self.base_dir / "certs" self.certs_dir.mkdir(parents=True, exist_ok=True)
[docs] def sign_certificate( self, ca_key: str, ca_cert: str, ca_name: str, cert_name: str, cert_type: str = "server", # "server" or "client" common_name: str = "", dns_names: List[str] = None, ip_addresses: List[str] = None, organization: str = "", country: str = "CN", state: str = "Beijing", city: str = "Beijing", validity_days: int = 365, key_size: int = 2048, ) -> Dict[str, str]: """ Sign a certificate using the specified CA Args: ca_key: Path to CA private key ca_cert: Path to CA certificate ca_name: Name of the CA (for directory organization) cert_name: Name for the certificate cert_type: "server" or "client" common_name: Common name for the certificate dns_names: List of DNS names ip_addresses: List of IP addresses organization: Organization name country: Country code state: State/Province city: City validity_days: Certificate validity in days key_size: Key size in bits Returns: Dict with paths to generated files """ if dns_names is None: dns_names = [] if ip_addresses is None: ip_addresses = [] if not common_name: if dns_names: common_name = dns_names[0] elif ip_addresses: common_name = ip_addresses[0] else: common_name = cert_name # Create output directory organized by CA: certs/{ca_name}/{cert_name}/ ca_certs_dir = self.certs_dir / ca_name cert_output_dir = ca_certs_dir / cert_name cert_output_dir.mkdir(parents=True, exist_ok=True) key_path = cert_output_dir / "key.pem" cert_path = cert_output_dir / "cert.pem" # Create temporary config file with tempfile.NamedTemporaryFile(mode="w", suffix=".cnf", delete=False) as f: config_path = f.name self._write_cert_config( f, cert_type, common_name, dns_names, ip_addresses, organization, country, state, city, ) try: # Generate private key subprocess.run( ["openssl", "genrsa", "-out", str(key_path), str(key_size)], check=True, capture_output=True, ) # Generate certificate signing request csr_path = cert_output_dir / "csr.pem" subprocess.run( [ "openssl", "req", "-new", "-key", str(key_path), "-out", str(csr_path), "-config", config_path, ], check=True, capture_output=True, ) # Sign certificate subprocess.run( [ "openssl", "x509", "-req", "-in", str(csr_path), "-CA", ca_cert, "-CAkey", ca_key, "-CAcreateserial", "-out", str(cert_path), "-days", str(validity_days), "-extensions", "v3_req", "-extfile", config_path, ], check=True, capture_output=True, ) # Set permissions os.chmod(key_path, 0o600) os.chmod(cert_path, 0o644) # Clean up CSR csr_path.unlink() return { "cert_name": cert_name, "key": str(key_path), "cert": str(cert_path), "type": cert_type, "validity_days": validity_days, } except (KeyboardInterrupt, Exception): # Clean up partial files if creation was interrupted or failed if key_path.exists(): key_path.unlink() if cert_path.exists(): cert_path.unlink() if csr_path.exists(): csr_path.unlink() # Remove empty directories if all files are gone if cert_output_dir.exists() and not any(cert_output_dir.iterdir()): cert_output_dir.rmdir() if ca_certs_dir.exists() and not any(ca_certs_dir.iterdir()): ca_certs_dir.rmdir() raise finally: os.unlink(config_path)
def _write_cert_config( self, f, cert_type: str, common_name: str, dns_names: List[str], ip_addresses: List[str], organization: str, country: str, state: str, city: str, ): """Write OpenSSL configuration for certificate""" f.write( f"""[req] req_extensions = v3_req distinguished_name = req_distinguished_name prompt = no [req_distinguished_name] C = {country} ST = {state} L = {city} O = {organization} CN = {common_name} [v3_req] basicConstraints = CA:FALSE keyUsage = digitalSignature, keyEncipherment """ ) if cert_type == "server": f.write("extendedKeyUsage = serverAuth, clientAuth\n") else: f.write("extendedKeyUsage = clientAuth\n") # Add Subject Alternative Names if dns_names or ip_addresses: f.write("subjectAltName = @alt_names\n\n") f.write("[alt_names]\n") dns_count = 1 for dns in dns_names: f.write(f"DNS.{dns_count} = {dns}\n") dns_count += 1 ip_count = 1 for ip in ip_addresses: f.write(f"IP.{ip_count} = {ip}\n") ip_count += 1
[docs] def list_certificates(self) -> List[Dict[str, str]]: """List all signed certificates""" certs = [] # Certificates are organized by CA: certs/{ca_name}/{cert_name}/ for ca_dir in self.certs_dir.iterdir(): if ca_dir.is_dir(): ca_name = ca_dir.name for cert_dir in ca_dir.iterdir(): if cert_dir.is_dir(): key_path = cert_dir / "key.pem" cert_path = cert_dir / "cert.pem" if key_path.exists() and cert_path.exists(): certs.append( { "name": cert_dir.name, "ca_name": ca_name, "key": str(key_path), "cert": str(cert_path), } ) return certs
[docs] def get_certificate_info(self, cert_path: str) -> Dict[str, str]: """Get information about a certificate""" try: result = subprocess.run( ["openssl", "x509", "-in", cert_path, "-text", "-noout"], capture_output=True, text=True, check=True, ) return {"info": result.stdout} except subprocess.CalledProcessError: return {"info": "Failed to read certificate"}
[docs] def delete_certificate(self, ca_name: str, cert_name: str) -> bool: """Delete a certificate""" cert_dir = self.certs_dir / ca_name / cert_name if not cert_dir.exists(): return False try: import shutil shutil.rmtree(cert_dir) return True except Exception: return False