Senior SecOps Engineer
SAST/DAST scanning automation, CVE triage and remediation, GDPR and SOC2 compliance workflows, and security operations from a senior SecOps perspective.
What this skill does
Automate comprehensive security reviews and compliance checks to protect your application from risks and meet industry standards. You will receive clear instructions to fix security flaws in your code and tools whenever you need to prepare for an audit or strengthen your system against threats.
name: “senior-secops” description: Senior SecOps engineer skill for application security, vulnerability management, compliance verification, and secure development practices. Runs SAST/DAST scans, generates CVE remediation plans, checks dependency vulnerabilities, creates security policies, enforces secure coding patterns, and automates compliance checks against SOC2, PCI-DSS, HIPAA, and GDPR. Use when conducting a security review or audit, responding to a CVE or security incident, hardening infrastructure, implementing authentication or secrets management, running penetration test prep, checking OWASP Top 10 exposure, or enforcing security controls in CI/CD pipelines.
Senior SecOps Engineer
Complete toolkit for Security Operations including vulnerability management, compliance verification, secure coding practices, and security automation.
Table of Contents
Core Capabilities
1. Security Scanner
Scan source code for security vulnerabilities including hardcoded secrets, SQL injection, XSS, command injection, and path traversal.
# Scan project for security issues
python scripts/security_scanner.py /path/to/project
# Filter by severity
python scripts/security_scanner.py /path/to/project --severity high
# JSON output for CI/CD
python scripts/security_scanner.py /path/to/project --json --output report.json
Detects:
- Hardcoded secrets (API keys, passwords, AWS credentials, GitHub tokens, private keys)
- SQL injection patterns (string concatenation, f-strings, template literals)
- XSS vulnerabilities (innerHTML assignment, unsafe DOM manipulation, React unsafe patterns)
- Command injection (shell=True, exec, eval with user input)
- Path traversal (file operations with user input)
2. Vulnerability Assessor
Scan dependencies for known CVEs across npm, Python, and Go ecosystems.
# Assess project dependencies
python scripts/vulnerability_assessor.py /path/to/project
# Critical/high only
python scripts/vulnerability_assessor.py /path/to/project --severity high
# Export vulnerability report
python scripts/vulnerability_assessor.py /path/to/project --json --output vulns.json
Scans:
package.jsonandpackage-lock.json(npm)requirements.txtandpyproject.toml(Python)go.mod(Go)
Output:
- CVE IDs with CVSS scores
- Affected package versions
- Fixed versions for remediation
- Overall risk score (0-100)
3. Compliance Checker
Verify security compliance against SOC 2, PCI-DSS, HIPAA, and GDPR frameworks.
# Check all frameworks
python scripts/compliance_checker.py /path/to/project
# Specific framework
python scripts/compliance_checker.py /path/to/project --framework soc2
python scripts/compliance_checker.py /path/to/project --framework pci-dss
python scripts/compliance_checker.py /path/to/project --framework hipaa
python scripts/compliance_checker.py /path/to/project --framework gdpr
# Export compliance report
python scripts/compliance_checker.py /path/to/project --json --output compliance.json
Verifies:
- Access control implementation
- Encryption at rest and in transit
- Audit logging
- Authentication strength (MFA, password hashing)
- Security documentation
- CI/CD security controls
Workflows
Workflow 1: Security Audit
Complete security assessment of a codebase.
# Step 1: Scan for code vulnerabilities
python scripts/security_scanner.py . --severity medium
# STOP if exit code 2 — resolve critical findings before continuing
# Step 2: Check dependency vulnerabilities
python scripts/vulnerability_assessor.py . --severity high
# STOP if exit code 2 — patch critical CVEs before continuing
# Step 3: Verify compliance controls
python scripts/compliance_checker.py . --framework all
# STOP if exit code 2 — address critical gaps before proceeding
# Step 4: Generate combined reports
python scripts/security_scanner.py . --json --output security.json
python scripts/vulnerability_assessor.py . --json --output vulns.json
python scripts/compliance_checker.py . --json --output compliance.json
Workflow 2: CI/CD Security Gate
Integrate security checks into deployment pipeline.
# .github/workflows/security.yml
name: "security-scan"
on:
pull_request:
branches: [main, develop]
jobs:
security-scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: "set-up-python"
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: "security-scanner"
run: python scripts/security_scanner.py . --severity high
- name: "vulnerability-assessment"
run: python scripts/vulnerability_assessor.py . --severity critical
- name: "compliance-check"
run: python scripts/compliance_checker.py . --framework soc2
Each step fails the pipeline on its respective exit code — no deployment proceeds past a critical finding.
Workflow 3: CVE Triage
Respond to a new CVE affecting your application.
1. ASSESS (0-2 hours)
- Identify affected systems using vulnerability_assessor.py
- Check if CVE is being actively exploited
- Determine CVSS environmental score for your context
- STOP if CVSS 9.0+ on internet-facing system — escalate immediately
2. PRIORITIZE
- Critical (CVSS 9.0+, internet-facing): 24 hours
- High (CVSS 7.0-8.9): 7 days
- Medium (CVSS 4.0-6.9): 30 days
- Low (CVSS < 4.0): 90 days
3. REMEDIATE
- Update affected dependency to fixed version
- Run security_scanner.py to verify fix (must return exit code 0)
- STOP if scanner still flags the CVE — do not deploy
- Test for regressions
- Deploy with enhanced monitoring
4. VERIFY
- Re-run vulnerability_assessor.py
- Confirm CVE no longer reported
- Document remediation actions
Workflow 4: Incident Response
Security incident handling procedure.
PHASE 1: DETECT & IDENTIFY (0-15 min)
- Alert received and acknowledged
- Initial severity assessment (SEV-1 to SEV-4)
- Incident commander assigned
- Communication channel established
PHASE 2: CONTAIN (15-60 min)
- Affected systems identified
- Network isolation if needed
- Credentials rotated if compromised
- Preserve evidence (logs, memory dumps)
PHASE 3: ERADICATE (1-4 hours)
- Root cause identified
- Malware/backdoors removed
- Vulnerabilities patched (run security_scanner.py; must return exit code 0)
- Systems hardened
PHASE 4: RECOVER (4-24 hours)
- Systems restored from clean backup
- Services brought back online
- Enhanced monitoring enabled
- User access restored
PHASE 5: POST-INCIDENT (24-72 hours)
- Incident timeline documented
- Root cause analysis complete
- Lessons learned documented
- Preventive measures implemented
- Stakeholder report delivered
Tool Reference
security_scanner.py
| Option | Description |
|---|---|
target | Directory or file to scan |
--severity, -s | Minimum severity: critical, high, medium, low |
--verbose, -v | Show files as they’re scanned |
--json | Output results as JSON |
--output, -o | Write results to file |
Exit Codes: 0 = no critical/high findings · 1 = high severity findings · 2 = critical severity findings
vulnerability_assessor.py
| Option | Description |
|---|---|
target | Directory containing dependency files |
--severity, -s | Minimum severity: critical, high, medium, low |
--verbose, -v | Show files as they’re scanned |
--json | Output results as JSON |
--output, -o | Write results to file |
Exit Codes: 0 = no critical/high vulnerabilities · 1 = high severity vulnerabilities · 2 = critical severity vulnerabilities
compliance_checker.py
| Option | Description |
|---|---|
target | Directory to check |
--framework, -f | Framework: soc2, pci-dss, hipaa, gdpr, all |
--verbose, -v | Show checks as they run |
--json | Output results as JSON |
--output, -o | Write results to file |
Exit Codes: 0 = compliant (90%+ score) · 1 = non-compliant (50-69% score) · 2 = critical gaps (<50% score)
Security Standards
See references/security_standards.md for OWASP Top 10 full guidance, secure coding standards, authentication requirements, and API security controls.
Secure Coding Checklist
## Input Validation
- [ ] Validate all input on server side
- [ ] Use allowlists over denylists
- [ ] Sanitize for specific context (HTML, SQL, shell)
## Output Encoding
- [ ] HTML encode for browser output
- [ ] URL encode for URLs
- [ ] JavaScript encode for script contexts
## Authentication
- [ ] Use bcrypt/argon2 for passwords
- [ ] Implement MFA for sensitive operations
- [ ] Enforce strong password policy
## Session Management
- [ ] Generate secure random session IDs
- [ ] Set HttpOnly, Secure, SameSite flags
- [ ] Implement session timeout (15 min idle)
## Error Handling
- [ ] Log errors with context (no secrets)
- [ ] Return generic messages to users
- [ ] Never expose stack traces in production
## Secrets Management
- [ ] Use environment variables or secrets manager
- [ ] Never commit secrets to version control
- [ ] Rotate credentials regularly
Compliance Frameworks
See references/compliance_requirements.md for full control mappings. Run compliance_checker.py to verify the controls below:
SOC 2 Type II
- CC6 Logical Access: authentication, authorization, MFA
- CC7 System Operations: monitoring, logging, incident response
- CC8 Change Management: CI/CD, code review, deployment controls
PCI-DSS v4.0
- Req 3/4: Encryption at rest and in transit (TLS 1.2+)
- Req 6: Secure development (input validation, secure coding)
- Req 8: Strong authentication (MFA, password policy)
- Req 10/11: Audit logging, SAST/DAST/penetration testing
HIPAA Security Rule
- Unique user IDs and audit trails for PHI access (164.312(a)(1), 164.312(b))
- MFA for person/entity authentication (164.312(d))
- Transmission encryption via TLS (164.312(e)(1))
GDPR
- Art 25/32: Privacy by design, encryption, pseudonymization
- Art 33: Breach notification within 72 hours
- Art 17/20: Right to erasure and data portability
Best Practices
Secrets Management
# BAD: Hardcoded secret
API_KEY = "sk-1234567890abcdef"
# GOOD: Environment variable
import os
API_KEY = os.environ.get("API_KEY")
# BETTER: Secrets manager
from your_vault_client import get_secret
API_KEY = get_secret("api/key")
SQL Injection Prevention
# BAD: String concatenation
query = f"SELECT * FROM users WHERE id = {user_id}"
# GOOD: Parameterized query
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
XSS Prevention
// BAD: Direct innerHTML assignment is vulnerable
// GOOD: Use textContent (auto-escaped)
element.textContent = userInput;
// GOOD: Use sanitization library for HTML
import DOMPurify from 'dompurify';
const safeHTML = DOMPurify.sanitize(userInput);
Authentication
// Password hashing
const bcrypt = require('bcrypt');
const SALT_ROUNDS = 12;
// Hash password
const hash = await bcrypt.hash(password, SALT_ROUNDS);
// Verify password
const match = await bcrypt.compare(password, hash);
Security Headers
// Express.js security headers
const helmet = require('helmet');
app.use(helmet());
// Or manually set headers:
app.use((req, res, next) => {
res.setHeader('X-Content-Type-Options', 'nosniff');
res.setHeader('X-Frame-Options', 'DENY');
res.setHeader('X-XSS-Protection', '1; mode=block');
res.setHeader('Strict-Transport-Security', 'max-age=31536000; includeSubDomains');
res.setHeader('Content-Security-Policy', "default-src 'self'");
next();
});
Reference Documentation
| Document | Description |
|---|---|
references/security_standards.md | OWASP Top 10, secure coding, authentication, API security |
references/vulnerability_management_guide.md | CVE triage, CVSS scoring, remediation workflows |
references/compliance_requirements.md | SOC 2, PCI-DSS, HIPAA, GDPR full control mappings |
Compliance Requirements Reference
Comprehensive guide for SOC 2, PCI-DSS, HIPAA, and GDPR compliance requirements.
Table of Contents
SOC 2 Type II
Trust Service Criteria
| Criteria | Description | Key Controls |
|---|---|---|
| Security | Protection against unauthorized access | Access controls, encryption, monitoring |
| Availability | System uptime and performance | SLAs, redundancy, disaster recovery |
| Processing Integrity | Accurate and complete processing | Data validation, error handling |
| Confidentiality | Protection of confidential information | Encryption, access controls |
| Privacy | Personal information handling | Consent, data minimization |
Security Controls Checklist
## SOC 2 Security Controls
### CC1: Control Environment
- [ ] Security policies documented and approved
- [ ] Organizational structure defined
- [ ] Security roles and responsibilities assigned
- [ ] Background checks performed on employees
- [ ] Security awareness training completed annually
### CC2: Communication and Information
- [ ] Security policies communicated to employees
- [ ] Security incidents reported and tracked
- [ ] External communications about security controls
- [ ] Service level agreements documented
### CC3: Risk Assessment
- [ ] Annual risk assessment performed
- [ ] Risk register maintained
- [ ] Risk treatment plans documented
- [ ] Vendor risk assessments completed
- [ ] Business impact analysis current
### CC4: Monitoring Activities
- [ ] Security monitoring implemented
- [ ] Log aggregation and analysis
- [ ] Vulnerability scanning (weekly)
- [ ] Penetration testing (annual)
- [ ] Security metrics reviewed monthly
### CC5: Control Activities
- [ ] Access control policies enforced
- [ ] MFA enabled for all users
- [ ] Password policy enforced (12+ chars)
- [ ] Access reviews (quarterly)
- [ ] Least privilege principle applied
### CC6: Logical and Physical Access
- [ ] Identity management system
- [ ] Role-based access control
- [ ] Physical access controls
- [ ] Network segmentation
- [ ] Data center security
### CC7: System Operations
- [ ] Change management process
- [ ] Incident management process
- [ ] Problem management process
- [ ] Capacity management
- [ ] Backup and recovery tested
### CC8: Change Management
- [ ] Change control board
- [ ] Change approval workflow
- [ ] Testing requirements documented
- [ ] Rollback procedures
- [ ] Emergency change process
### CC9: Risk Mitigation
- [ ] Insurance coverage
- [ ] Business continuity plan
- [ ] Disaster recovery plan tested
- [ ] Vendor management programEvidence Collection
def collect_soc2_evidence(period_start: str, period_end: str) -> dict:
"""
Collect evidence for SOC 2 audit period.
Returns dictionary organized by Trust Service Criteria.
"""
evidence = {
'period': {'start': period_start, 'end': period_end},
'security': {
'access_reviews': get_access_reviews(period_start, period_end),
'vulnerability_scans': get_vulnerability_reports(period_start, period_end),
'penetration_tests': get_pentest_reports(period_start, period_end),
'security_incidents': get_incident_reports(period_start, period_end),
'training_records': get_training_completion(period_start, period_end),
},
'availability': {
'uptime_reports': get_uptime_metrics(period_start, period_end),
'incident_reports': get_availability_incidents(period_start, period_end),
'dr_tests': get_dr_test_results(period_start, period_end),
'backup_tests': get_backup_test_results(period_start, period_end),
},
'processing_integrity': {
'data_validation_logs': get_validation_logs(period_start, period_end),
'error_reports': get_error_reports(period_start, period_end),
'reconciliation_reports': get_reconciliation_reports(period_start, period_end),
},
'confidentiality': {
'encryption_status': get_encryption_audit(period_start, period_end),
'data_classification': get_data_inventory(),
'access_logs': get_sensitive_data_access_logs(period_start, period_end),
}
}
return evidencePCI-DSS
PCI-DSS v4.0 Requirements
| Requirement | Description |
|---|---|
| 1 | Install and maintain network security controls |
| 2 | Apply secure configurations |
| 3 | Protect stored account data |
| 4 | Protect cardholder data with cryptography during transmission |
| 5 | Protect all systems from malware |
| 6 | Develop and maintain secure systems and software |
| 7 | Restrict access to cardholder data by business need-to-know |
| 8 | Identify users and authenticate access |
| 9 | Restrict physical access to cardholder data |
| 10 | Log and monitor all access to network resources |
| 11 | Test security of systems and networks regularly |
| 12 | Support information security with organizational policies |
Cardholder Data Protection
# PCI-DSS compliant card data handling
import re
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64
import os
class PCIDataHandler:
"""Handle cardholder data per PCI-DSS requirements."""
# PAN patterns (masked for display)
PAN_PATTERN = re.compile(r'\b(?:\d{4}[-\s]?){3}\d{4}\b')
def __init__(self, encryption_key: bytes):
self.cipher = Fernet(encryption_key)
@staticmethod
def mask_pan(pan: str) -> str:
"""
Mask PAN per PCI-DSS (show first 6, last 4 only).
Requirement 3.4: Render PAN unreadable.
"""
digits = re.sub(r'\D', '', pan)
if len(digits) < 13:
return '*' * len(digits)
return f"{digits[:6]}{'*' * (len(digits) - 10)}{digits[-4:]}"
def encrypt_pan(self, pan: str) -> str:
"""
Encrypt PAN for storage.
Requirement 3.5: Protect keys used to protect stored account data.
"""
return self.cipher.encrypt(pan.encode()).decode()
def decrypt_pan(self, encrypted_pan: str) -> str:
"""Decrypt PAN (requires authorization logging)."""
return self.cipher.decrypt(encrypted_pan.encode()).decode()
@staticmethod
def validate_pan(pan: str) -> bool:
"""Validate PAN using Luhn algorithm."""
digits = re.sub(r'\D', '', pan)
if len(digits) < 13 or len(digits) > 19:
return False
# Luhn algorithm
total = 0
for i, digit in enumerate(reversed(digits)):
d = int(digit)
if i % 2 == 1:
d *= 2
if d > 9:
d -= 9
total += d
return total % 10 == 0
def sanitize_logs(self, log_message: str) -> str:
"""
Remove PAN from log messages.
Requirement 3.3: Mask PAN when displayed.
"""
def replace_pan(match):
return self.mask_pan(match.group())
return self.PAN_PATTERN.sub(replace_pan, log_message)Network Segmentation
# PCI-DSS network segmentation example
# Cardholder Data Environment (CDE) firewall rules
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: cde-isolation
namespace: payment-processing
spec:
podSelector:
matchLabels:
pci-zone: cde
policyTypes:
- Ingress
- Egress
ingress:
# Only allow from payment gateway
- from:
- namespaceSelector:
matchLabels:
pci-zone: dmz
- podSelector:
matchLabels:
app: payment-gateway
ports:
- protocol: TCP
port: 443
egress:
# Only allow to payment processor
- to:
- ipBlock:
cidr: 10.0.100.0/24 # Payment processor network
ports:
- protocol: TCP
port: 443
# Allow DNS
- to:
- namespaceSelector: {}
podSelector:
matchLabels:
k8s-app: kube-dns
ports:
- protocol: UDP
port: 53HIPAA
HIPAA Security Rule Requirements
| Safeguard | Standard | Implementation |
|---|---|---|
| Administrative | Security Management | Risk analysis, sanctions, activity review |
| Administrative | Workforce Security | Authorization, clearance, termination |
| Administrative | Information Access | Access authorization, workstation use |
| Administrative | Security Awareness | Training, login monitoring, password management |
| Administrative | Security Incident | Response and reporting procedures |
| Administrative | Contingency Plan | Backup, disaster recovery, emergency mode |
| Physical | Facility Access | Access controls, maintenance records |
| Physical | Workstation | Use policies, security |
| Physical | Device and Media | Disposal, media re-use, accountability |
| Technical | Access Control | Unique user ID, emergency access, encryption |
| Technical | Audit Controls | Hardware, software, procedural mechanisms |
| Technical | Integrity | Mechanisms to ensure PHI not altered |
| Technical | Transmission | Encryption of PHI in transit |
PHI Handling
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import hashlib
import logging
# Configure PHI audit logging
phi_logger = logging.getLogger('phi_access')
phi_logger.setLevel(logging.INFO)
@dataclass
class PHIAccessLog:
"""HIPAA-compliant PHI access logging."""
timestamp: datetime
user_id: str
patient_id: str
action: str # view, create, update, delete, export
reason: str
data_elements: list
source_ip: str
success: bool
def log_phi_access(access: PHIAccessLog):
"""
Log PHI access per HIPAA requirements.
164.312(b): Audit controls.
"""
phi_logger.info(
f"PHI_ACCESS|"
f"timestamp={access.timestamp.isoformat()}|"
f"user={access.user_id}|"
f"patient={access.patient_id}|"
f"action={access.action}|"
f"reason={access.reason}|"
f"elements={','.join(access.data_elements)}|"
f"ip={access.source_ip}|"
f"success={access.success}"
)
class HIPAACompliantStorage:
"""HIPAA-compliant PHI storage handler."""
# Minimum Necessary Standard - only access needed data
PHI_ELEMENTS = {
'patient_name': 'high',
'ssn': 'high',
'medical_record_number': 'high',
'diagnosis': 'medium',
'treatment_plan': 'medium',
'appointment_date': 'low',
'provider_name': 'low'
}
def __init__(self, encryption_service, user_context):
self.encryption = encryption_service
self.user = user_context
def access_phi(
self,
patient_id: str,
elements: list,
reason: str
) -> Optional[dict]:
"""
Access PHI with HIPAA controls.
Args:
patient_id: Patient identifier
elements: List of PHI elements to access
reason: Business reason for access
Returns:
Requested PHI elements if authorized
"""
# Verify minimum necessary - user only gets needed elements
authorized_elements = self._check_authorization(elements)
if not authorized_elements:
log_phi_access(PHIAccessLog(
timestamp=datetime.utcnow(),
user_id=self.user.id,
patient_id=patient_id,
action='view',
reason=reason,
data_elements=elements,
source_ip=self.user.ip_address,
success=False
))
raise PermissionError("Not authorized for requested PHI elements")
# Retrieve and decrypt PHI
phi_data = self._retrieve_phi(patient_id, authorized_elements)
# Log successful access
log_phi_access(PHIAccessLog(
timestamp=datetime.utcnow(),
user_id=self.user.id,
patient_id=patient_id,
action='view',
reason=reason,
data_elements=authorized_elements,
source_ip=self.user.ip_address,
success=True
))
return phi_data
def _check_authorization(self, requested_elements: list) -> list:
"""Check user authorization for PHI elements."""
user_clearance = self.user.hipaa_clearance_level
authorized = []
for element in requested_elements:
element_level = self.PHI_ELEMENTS.get(element, 'high')
if self._clearance_allows(user_clearance, element_level):
authorized.append(element)
return authorizedGDPR
GDPR Principles
| Principle | Description | Implementation |
|---|---|---|
| Lawfulness | Legal basis for processing | Consent management, contract basis |
| Purpose Limitation | Specific, explicit purposes | Data use policies, access controls |
| Data Minimization | Adequate, relevant, limited | Collection limits, retention policies |
| Accuracy | Keep data accurate | Update procedures, validation |
| Storage Limitation | Time-limited retention | Retention schedules, deletion |
| Integrity & Confidentiality | Secure processing | Encryption, access controls |
| Accountability | Demonstrate compliance | Documentation, DPO, DPIA |
Data Subject Rights Implementation
from datetime import datetime, timedelta
from enum import Enum
from typing import Optional, List
import json
class DSRType(Enum):
ACCESS = "access" # Article 15
RECTIFICATION = "rectification" # Article 16
ERASURE = "erasure" # Article 17 (Right to be forgotten)
RESTRICTION = "restriction" # Article 18
PORTABILITY = "portability" # Article 20
OBJECTION = "objection" # Article 21
class DataSubjectRequest:
"""Handle GDPR Data Subject Requests."""
# GDPR requires response within 30 days
RESPONSE_DEADLINE_DAYS = 30
def __init__(self, db, notification_service):
self.db = db
self.notifications = notification_service
def submit_request(
self,
subject_email: str,
request_type: DSRType,
details: str
) -> dict:
"""
Submit a Data Subject Request.
Args:
subject_email: Email of the data subject
request_type: Type of GDPR request
details: Additional request details
Returns:
Request tracking information
"""
# Verify identity before processing
verification_token = self._send_verification(subject_email)
request = {
'id': self._generate_request_id(),
'subject_email': subject_email,
'type': request_type.value,
'details': details,
'status': 'pending_verification',
'submitted_at': datetime.utcnow().isoformat(),
'deadline': (datetime.utcnow() + timedelta(days=self.RESPONSE_DEADLINE_DAYS)).isoformat(),
'verification_token': verification_token
}
self.db.dsr_requests.insert(request)
# Notify DPO
self.notifications.notify_dpo(
f"New DSR ({request_type.value}) received",
request
)
return {
'request_id': request['id'],
'deadline': request['deadline'],
'status': 'verification_sent'
}
def process_erasure_request(self, request_id: str) -> dict:
"""
Process Article 17 erasure request (Right to be Forgotten).
Returns:
Erasure completion report
"""
request = self.db.dsr_requests.find_one({'id': request_id})
subject_email = request['subject_email']
erasure_report = {
'request_id': request_id,
'subject': subject_email,
'systems_processed': [],
'data_deleted': [],
'data_retained': [], # With legal basis
'completed_at': None
}
# Find all data for this subject
data_inventory = self._find_subject_data(subject_email)
for data_item in data_inventory:
if self._can_delete(data_item):
self._delete_data(data_item)
erasure_report['data_deleted'].append({
'system': data_item['system'],
'data_type': data_item['type'],
'deleted_at': datetime.utcnow().isoformat()
})
else:
erasure_report['data_retained'].append({
'system': data_item['system'],
'data_type': data_item['type'],
'retention_reason': data_item['legal_basis']
})
erasure_report['completed_at'] = datetime.utcnow().isoformat()
# Update request status
self.db.dsr_requests.update(
{'id': request_id},
{'status': 'completed', 'completion_report': erasure_report}
)
return erasure_report
def generate_portability_export(self, request_id: str) -> dict:
"""
Generate Article 20 data portability export.
Returns machine-readable export in JSON format.
"""
request = self.db.dsr_requests.find_one({'id': request_id})
subject_email = request['subject_email']
export_data = {
'export_date': datetime.utcnow().isoformat(),
'data_subject': subject_email,
'format': 'JSON',
'data': {}
}
# Collect data from all systems
systems = ['user_accounts', 'orders', 'preferences', 'communications']
for system in systems:
system_data = self._extract_portable_data(system, subject_email)
if system_data:
export_data['data'][system] = system_data
return export_dataConsent Management
class ConsentManager:
"""GDPR-compliant consent management."""
def __init__(self, db):
self.db = db
def record_consent(
self,
user_id: str,
purpose: str,
consent_given: bool,
consent_text: str
) -> dict:
"""
Record consent per GDPR Article 7 requirements.
Consent must be:
- Freely given
- Specific
- Informed
- Unambiguous
"""
consent_record = {
'user_id': user_id,
'purpose': purpose,
'consent_given': consent_given,
'consent_text': consent_text,
'timestamp': datetime.utcnow().isoformat(),
'method': 'explicit_checkbox', # Not pre-ticked
'ip_address': self._get_user_ip(),
'user_agent': self._get_user_agent(),
'version': '1.0' # Track consent version
}
self.db.consents.insert(consent_record)
return consent_record
def check_consent(self, user_id: str, purpose: str) -> bool:
"""Check if user has given consent for specific purpose."""
latest_consent = self.db.consents.find_one(
{'user_id': user_id, 'purpose': purpose},
sort=[('timestamp', -1)]
)
return latest_consent and latest_consent.get('consent_given', False)
def withdraw_consent(self, user_id: str, purpose: str) -> dict:
"""
Process consent withdrawal.
GDPR Article 7(3): Withdrawal must be as easy as giving consent.
"""
withdrawal_record = {
'user_id': user_id,
'purpose': purpose,
'consent_given': False,
'timestamp': datetime.utcnow().isoformat(),
'action': 'withdrawal'
}
self.db.consents.insert(withdrawal_record)
# Trigger data processing stop for this purpose
self._stop_processing(user_id, purpose)
return withdrawal_recordCompliance Automation
Automated Compliance Checks
# compliance-checks.yml - GitHub Actions
name: Compliance Checks
on:
push:
branches: [main]
pull_request:
schedule:
- cron: '0 0 * * *' # Daily
jobs:
soc2-checks:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Check for secrets in code
run: |
gitleaks detect --source . --report-format json --report-path gitleaks-report.json
if [ -s gitleaks-report.json ]; then
echo "Secrets detected in code!"
exit 1
fi
- name: Verify encryption at rest
run: |
# Check database encryption configuration
python scripts/compliance_checker.py --check encryption
- name: Verify access controls
run: |
# Check RBAC configuration
python scripts/compliance_checker.py --check access-control
- name: Check logging configuration
run: |
# Verify audit logging enabled
python scripts/compliance_checker.py --check audit-logging
pci-checks:
runs-on: ubuntu-latest
if: contains(github.event.head_commit.message, '[pci]')
steps:
- uses: actions/checkout@v4
- name: Scan for PAN in code
run: |
# Check for unencrypted card numbers
python scripts/compliance_checker.py --check pci-pan-exposure
- name: Verify TLS configuration
run: |
python scripts/compliance_checker.py --check tls-config
gdpr-checks:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Check data retention policies
run: |
python scripts/compliance_checker.py --check data-retention
- name: Verify consent mechanisms
run: |
python scripts/compliance_checker.py --check consent-managementAudit Preparation
Audit Readiness Checklist
## Pre-Audit Checklist
### 60 Days Before Audit
- [ ] Confirm audit scope and timeline
- [ ] Identify control owners
- [ ] Begin evidence collection
- [ ] Review previous audit findings
- [ ] Update policies and procedures
### 30 Days Before Audit
- [ ] Complete evidence collection
- [ ] Perform internal control testing
- [ ] Remediate any gaps identified
- [ ] Prepare executive summary
- [ ] Brief stakeholders
### 7 Days Before Audit
- [ ] Finalize evidence package
- [ ] Prepare interview schedules
- [ ] Set up secure evidence sharing
- [ ] Confirm auditor logistics
- [ ] Final gap assessment
### During Audit
- [ ] Daily status meetings
- [ ] Timely evidence delivery
- [ ] Document all requests
- [ ] Escalate issues promptly
- [ ] Maintain communication logEvidence Repository Structure
evidence/
├── period_YYYY-MM/
│ ├── security/
│ │ ├── access_reviews/
│ │ ├── vulnerability_scans/
│ │ ├── penetration_tests/
│ │ └── security_training/
│ ├── availability/
│ │ ├── uptime_reports/
│ │ ├── incident_reports/
│ │ └── dr_tests/
│ ├── change_management/
│ │ ├── change_requests/
│ │ ├── approval_records/
│ │ └── deployment_logs/
│ ├── policies/
│ │ ├── current_policies/
│ │ └── acknowledgments/
│ └── index.json Security Standards Reference
Comprehensive security standards and secure coding practices for application security.
Table of Contents
- OWASP Top 10
- Secure Coding Practices
- Authentication Standards
- API Security
- Secrets Management
- Security Headers
OWASP Top 10
A01:2021 - Broken Access Control
Description: Access control enforces policy such that users cannot act outside of their intended permissions.
Prevention:
# BAD - No authorization check
@app.route('/admin/users/<user_id>')
def get_user(user_id):
return User.query.get(user_id).to_dict()
# GOOD - Authorization enforced
@app.route('/admin/users/<user_id>')
@requires_role('admin')
def get_user(user_id):
user = User.query.get_or_404(user_id)
if not current_user.can_access(user):
abort(403)
return user.to_dict()Checklist:
- Deny access by default (allowlist approach)
- Implement RBAC or ABAC consistently
- Validate object-level authorization (IDOR prevention)
- Disable directory listing
- Log access control failures and alert on repeated failures
A02:2021 - Cryptographic Failures
Description: Failures related to cryptography which often lead to exposure of sensitive data.
Prevention:
# BAD - Weak hashing
import hashlib
password_hash = hashlib.md5(password.encode()).hexdigest()
# GOOD - Strong password hashing
from argon2 import PasswordHasher
ph = PasswordHasher(
time_cost=3,
memory_cost=65536,
parallelism=4
)
password_hash = ph.hash(password)
# Verify password
try:
ph.verify(stored_hash, password)
except argon2.exceptions.VerifyMismatchError:
raise InvalidCredentials()Checklist:
- Use TLS 1.2+ for all data in transit
- Use AES-256-GCM for encryption at rest
- Use Argon2id, bcrypt, or scrypt for passwords
- Never use MD5, SHA1 for security purposes
- Rotate encryption keys regularly
A03:2021 - Injection
Description: Untrusted data sent to an interpreter as part of a command or query.
SQL Injection Prevention:
# BAD - String concatenation (VULNERABLE)
query = f"SELECT * FROM users WHERE id = {user_id}"
cursor.execute(query)
# GOOD - Parameterized queries
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
# GOOD - ORM with parameter binding
user = User.query.filter_by(id=user_id).first()Command Injection Prevention:
# BAD - Shell execution with user input (VULNERABLE)
# NEVER use: os.system(f"ping {user_input}")
# GOOD - Use subprocess with shell=False and validated input
import subprocess
def safe_ping(hostname: str) -> str:
# Validate hostname format first
if not is_valid_hostname(hostname):
raise ValueError("Invalid hostname")
result = subprocess.run(
["ping", "-c", "4", hostname],
shell=False,
capture_output=True,
text=True
)
return result.stdoutXSS Prevention:
# BAD - Direct HTML insertion (VULNERABLE)
return f"<div>Welcome, {username}</div>"
# GOOD - HTML escaping
from markupsafe import escape
return f"<div>Welcome, {escape(username)}</div>"
# GOOD - Template auto-escaping (Jinja2)
# {{ username }} is auto-escaped by defaultA04:2021 - Insecure Design
Description: Risks related to design and architectural flaws.
Prevention Patterns:
# Threat modeling categories (STRIDE)
THREATS = {
'Spoofing': 'Authentication controls',
'Tampering': 'Integrity controls',
'Repudiation': 'Audit logging',
'Information Disclosure': 'Encryption, access control',
'Denial of Service': 'Rate limiting, resource limits',
'Elevation of Privilege': 'Authorization controls'
}
# Defense in depth - multiple layers
class SecurePaymentFlow:
def process_payment(self, payment_data):
# Layer 1: Input validation
self.validate_input(payment_data)
# Layer 2: Authentication check
self.verify_user_authenticated()
# Layer 3: Authorization check
self.verify_user_can_pay(payment_data.amount)
# Layer 4: Rate limiting
self.check_rate_limit()
# Layer 5: Fraud detection
self.check_fraud_signals(payment_data)
# Layer 6: Secure processing
return self.execute_payment(payment_data)A05:2021 - Security Misconfiguration
Description: Missing or incorrect security hardening.
Prevention:
# Kubernetes pod security
apiVersion: v1
kind: Pod
spec:
securityContext:
runAsNonRoot: true
runAsUser: 1000
fsGroup: 1000
containers:
- name: app
securityContext:
allowPrivilegeEscalation: false
readOnlyRootFilesystem: true
capabilities:
drop:
- ALL# Flask security configuration
app.config.update(
SESSION_COOKIE_SECURE=True,
SESSION_COOKIE_HTTPONLY=True,
SESSION_COOKIE_SAMESITE='Lax',
PERMANENT_SESSION_LIFETIME=timedelta(hours=1),
)Secure Coding Practices
Input Validation
from pydantic import BaseModel, validator, constr
from typing import Optional
import re
class UserInput(BaseModel):
username: constr(min_length=3, max_length=50, regex=r'^[a-zA-Z0-9_]+$')
email: str
age: Optional[int] = None
@validator('email')
def validate_email(cls, v):
# Use proper email validation
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(pattern, v):
raise ValueError('Invalid email format')
return v.lower()
@validator('age')
def validate_age(cls, v):
if v is not None and (v < 0 or v > 150):
raise ValueError('Age must be between 0 and 150')
return vOutput Encoding
import html
import json
from urllib.parse import quote
def encode_for_html(data: str) -> str:
"""Encode data for safe HTML output."""
return html.escape(data)
def encode_for_javascript(data: str) -> str:
"""Encode data for safe JavaScript string."""
return json.dumps(data)
def encode_for_url(data: str) -> str:
"""Encode data for safe URL parameter."""
return quote(data, safe='')
def encode_for_css(data: str) -> str:
"""Encode data for safe CSS value."""
return ''.join(
c if c.isalnum() else f'\\{ord(c):06x}'
for c in data
)Error Handling
import logging
from typing import Dict, Any
logger = logging.getLogger(__name__)
class SecurityException(Exception):
"""Base exception for security-related errors."""
def __init__(self, message: str, internal_details: str = None):
# User-facing message (safe to display)
self.message = message
# Internal details (for logging only)
self.internal_details = internal_details
super().__init__(message)
def handle_request():
try:
process_sensitive_data()
except DatabaseError as e:
# Log full details internally
logger.error(f"Database error: {e}", exc_info=True)
# Return generic message to user
raise SecurityException(
"An error occurred processing your request",
internal_details=str(e)
)
except Exception as e:
logger.error(f"Unexpected error: {e}", exc_info=True)
raise SecurityException("An unexpected error occurred")Authentication Standards
Password Requirements
import re
from typing import Tuple
def validate_password(password: str) -> Tuple[bool, str]:
"""
Validate password against security requirements.
Requirements:
- Minimum 12 characters
- At least one uppercase letter
- At least one lowercase letter
- At least one digit
- At least one special character
- Not in common password list
"""
if len(password) < 12:
return False, "Password must be at least 12 characters"
if not re.search(r'[A-Z]', password):
return False, "Password must contain uppercase letter"
if not re.search(r'[a-z]', password):
return False, "Password must contain lowercase letter"
if not re.search(r'\d', password):
return False, "Password must contain a digit"
if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
return False, "Password must contain special character"
# Check against common passwords (use haveibeenpwned API in production)
common_passwords = {'password123', 'qwerty123456', 'admin123456'}
if password.lower() in common_passwords:
return False, "Password is too common"
return True, "Password meets requirements"JWT Best Practices
import jwt
from datetime import datetime, timedelta
from typing import Dict, Optional
class JWTManager:
def __init__(self, secret_key: str, algorithm: str = 'HS256'):
self.secret_key = secret_key
self.algorithm = algorithm
self.access_token_expiry = timedelta(minutes=15)
self.refresh_token_expiry = timedelta(days=7)
def create_access_token(self, user_id: str, roles: list) -> str:
payload = {
'sub': user_id,
'roles': roles,
'type': 'access',
'iat': datetime.utcnow(),
'exp': datetime.utcnow() + self.access_token_expiry,
'jti': self._generate_jti() # Unique token ID for revocation
}
return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
def verify_token(self, token: str) -> Optional[Dict]:
try:
payload = jwt.decode(
token,
self.secret_key,
algorithms=[self.algorithm],
options={
'require': ['exp', 'iat', 'sub', 'jti'],
'verify_exp': True
}
)
# Check if token is revoked
if self._is_token_revoked(payload['jti']):
return None
return payload
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return NoneMFA Implementation
import pyotp
import qrcode
from io import BytesIO
import base64
class TOTPManager:
def __init__(self, issuer: str = "MyApp"):
self.issuer = issuer
def generate_secret(self) -> str:
"""Generate a new TOTP secret for a user."""
return pyotp.random_base32()
def get_provisioning_uri(self, secret: str, email: str) -> str:
"""Generate URI for QR code."""
totp = pyotp.TOTP(secret)
return totp.provisioning_uri(name=email, issuer_name=self.issuer)
def generate_qr_code(self, provisioning_uri: str) -> str:
"""Generate base64-encoded QR code image."""
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(provisioning_uri)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
buffer = BytesIO()
img.save(buffer, format='PNG')
return base64.b64encode(buffer.getvalue()).decode()
def verify_totp(self, secret: str, code: str) -> bool:
"""Verify TOTP code with time window tolerance."""
totp = pyotp.TOTP(secret)
# Allow 1 period before/after for clock skew
return totp.verify(code, valid_window=1)API Security
Rate Limiting
from functools import wraps
from flask import request, jsonify
import time
from collections import defaultdict
import threading
class RateLimiter:
def __init__(self, requests_per_minute: int = 60):
self.requests_per_minute = requests_per_minute
self.requests = defaultdict(list)
self.lock = threading.Lock()
def is_rate_limited(self, identifier: str) -> bool:
with self.lock:
now = time.time()
minute_ago = now - 60
# Clean old requests
self.requests[identifier] = [
req_time for req_time in self.requests[identifier]
if req_time > minute_ago
]
if len(self.requests[identifier]) >= self.requests_per_minute:
return True
self.requests[identifier].append(now)
return False
rate_limiter = RateLimiter(requests_per_minute=100)
def rate_limit(f):
@wraps(f)
def decorated_function(*args, **kwargs):
identifier = request.remote_addr
if rate_limiter.is_rate_limited(identifier):
return jsonify({
'error': 'Rate limit exceeded',
'retry_after': 60
}), 429
return f(*args, **kwargs)
return decorated_functionAPI Key Validation
import hashlib
import secrets
from datetime import datetime
from typing import Optional, Dict
class APIKeyManager:
def __init__(self, db):
self.db = db
def generate_api_key(self, user_id: str, name: str, scopes: list) -> Dict:
"""Generate a new API key."""
# Generate key with prefix for identification
raw_key = f"sk_live_{secrets.token_urlsafe(32)}"
# Store hash only
key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
api_key_record = {
'id': secrets.token_urlsafe(16),
'user_id': user_id,
'name': name,
'key_hash': key_hash,
'key_prefix': raw_key[:12], # Store prefix for identification
'scopes': scopes,
'created_at': datetime.utcnow(),
'last_used_at': None
}
self.db.api_keys.insert(api_key_record)
# Return raw key only once
return {
'key': raw_key,
'id': api_key_record['id'],
'scopes': scopes
}
def validate_api_key(self, raw_key: str) -> Optional[Dict]:
"""Validate an API key and return associated data."""
key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
api_key = self.db.api_keys.find_one({'key_hash': key_hash})
if not api_key:
return None
# Update last used timestamp
self.db.api_keys.update(
{'id': api_key['id']},
{'last_used_at': datetime.utcnow()}
)
return {
'user_id': api_key['user_id'],
'scopes': api_key['scopes']
}Secrets Management
Environment Variables
import os
from typing import Optional
from dataclasses import dataclass
@dataclass
class AppSecrets:
database_url: str
jwt_secret: str
api_key: str
encryption_key: str
def load_secrets() -> AppSecrets:
"""Load secrets from environment with validation."""
def get_required(name: str) -> str:
value = os.environ.get(name)
if not value:
raise ValueError(f"Required environment variable {name} is not set")
return value
return AppSecrets(
database_url=get_required('DATABASE_URL'),
jwt_secret=get_required('JWT_SECRET'),
api_key=get_required('API_KEY'),
encryption_key=get_required('ENCRYPTION_KEY')
)
# Never log secrets
import logging
class SecretFilter(logging.Filter):
"""Filter to redact secrets from logs."""
def __init__(self, secrets: list):
super().__init__()
self.secrets = secrets
def filter(self, record):
message = record.getMessage()
for secret in self.secrets:
if secret in message:
record.msg = record.msg.replace(secret, '[REDACTED]')
return TrueHashiCorp Vault Integration
import hvac
from typing import Dict, Optional
class VaultClient:
def __init__(self, url: str, token: str = None, role_id: str = None, secret_id: str = None):
self.client = hvac.Client(url=url)
if token:
self.client.token = token
elif role_id and secret_id:
# AppRole authentication
self.client.auth.approle.login(
role_id=role_id,
secret_id=secret_id
)
def get_secret(self, path: str, key: str) -> Optional[str]:
"""Retrieve a secret from Vault."""
try:
response = self.client.secrets.kv.v2.read_secret_version(path=path)
return response['data']['data'].get(key)
except hvac.exceptions.InvalidPath:
return None
def get_database_credentials(self, role: str) -> Dict[str, str]:
"""Get dynamic database credentials."""
response = self.client.secrets.database.generate_credentials(name=role)
return {
'username': response['data']['username'],
'password': response['data']['password'],
'lease_id': response['lease_id'],
'lease_duration': response['lease_duration']
}Security Headers
HTTP Security Headers
from flask import Flask, Response
def add_security_headers(response: Response) -> Response:
"""Add security headers to HTTP response."""
# Prevent clickjacking
response.headers['X-Frame-Options'] = 'DENY'
# Enable XSS filter
response.headers['X-XSS-Protection'] = '1; mode=block'
# Prevent MIME type sniffing
response.headers['X-Content-Type-Options'] = 'nosniff'
# Referrer policy
response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin'
# Content Security Policy
response.headers['Content-Security-Policy'] = (
"default-src 'self'; "
"script-src 'self' 'unsafe-inline'; "
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' data: https:; "
"font-src 'self'; "
"frame-ancestors 'none'; "
"form-action 'self'"
)
# HSTS (enable only with valid HTTPS)
response.headers['Strict-Transport-Security'] = (
'max-age=31536000; includeSubDomains; preload'
)
# Permissions Policy
response.headers['Permissions-Policy'] = (
'geolocation=(), microphone=(), camera=()'
)
return response
app = Flask(__name__)
app.after_request(add_security_headers)Quick Reference
Security Checklist
| Category | Check | Priority |
|---|---|---|
| Authentication | MFA enabled | Critical |
| Authentication | Password policy enforced | Critical |
| Authorization | RBAC implemented | Critical |
| Input | All inputs validated | Critical |
| Injection | Parameterized queries | Critical |
| Crypto | TLS 1.2+ enforced | Critical |
| Secrets | No hardcoded secrets | Critical |
| Headers | Security headers set | High |
| Logging | Security events logged | High |
| Dependencies | No known vulnerabilities | High |
Tool Recommendations
| Purpose | Tool | Usage |
|---|---|---|
| SAST | Semgrep | semgrep --config auto . |
| SAST | Bandit (Python) | bandit -r src/ |
| Secrets | Gitleaks | gitleaks detect --source . |
| Dependencies | Snyk | snyk test |
| Container | Trivy | trivy image myapp:latest |
| DAST | OWASP ZAP | Dynamic scanning |
Vulnerability Management Guide
Complete workflow for vulnerability identification, assessment, prioritization, and remediation.
Table of Contents
- Vulnerability Lifecycle
- CVE Triage Process
- CVSS Scoring
- Remediation Workflows
- Dependency Scanning
- Security Incident Response
Vulnerability Lifecycle
Overview
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ DISCOVER │ → │ ASSESS │ → │ PRIORITIZE │ → │ REMEDIATE │
│ │ │ │ │ │ │ │
│ - Scanning │ │ - CVSS │ │ - Risk │ │ - Patch │
│ - Reports │ │ - Context │ │ - Business │ │ - Mitigate │
│ - Audits │ │ - Impact │ │ - SLA │ │ - Accept │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
│
▼
┌─────────────┐
│ VERIFY │
│ │
│ - Retest │
│ - Close │
└─────────────┘State Definitions
| State | Description | Owner |
|---|---|---|
| New | Vulnerability discovered, not yet triaged | Security Team |
| Triaging | Under assessment for severity and impact | Security Team |
| Assigned | Assigned to development team for fix | Dev Team |
| In Progress | Fix being developed | Dev Team |
| In Review | Fix in code review | Dev Team |
| Testing | Fix being tested | QA Team |
| Deployed | Fix deployed to production | DevOps Team |
| Verified | Fix confirmed effective | Security Team |
| Closed | Vulnerability resolved | Security Team |
| Accepted Risk | Risk accepted with justification | CISO |
CVE Triage Process
Step 1: Initial Assessment
def triage_cve(cve_id: str, affected_systems: list) -> dict:
"""
Perform initial triage of a CVE.
Returns triage assessment with severity and recommended actions.
"""
# Fetch CVE details from NVD
cve_data = fetch_nvd_data(cve_id)
assessment = {
'cve_id': cve_id,
'published': cve_data['published'],
'base_cvss': cve_data['cvss_v3']['base_score'],
'vector': cve_data['cvss_v3']['vector_string'],
'description': cve_data['description'],
'affected_systems': [],
'exploitability': check_exploitability(cve_id),
'recommendation': None
}
# Check which systems are actually affected
for system in affected_systems:
if is_system_vulnerable(system, cve_data):
assessment['affected_systems'].append({
'name': system.name,
'version': system.version,
'exposure': assess_exposure(system)
})
# Determine recommendation
assessment['recommendation'] = determine_action(assessment)
return assessmentStep 2: Severity Classification
| CVSS Score | Severity | Response SLA |
|---|---|---|
| 9.0 - 10.0 | Critical | 24 hours |
| 7.0 - 8.9 | High | 7 days |
| 4.0 - 6.9 | Medium | 30 days |
| 0.1 - 3.9 | Low | 90 days |
| 0.0 | None | Informational |
Step 3: Context Analysis
## CVE Context Checklist
### Exposure Assessment
- [ ] Is the vulnerable component internet-facing?
- [ ] Is the vulnerable component in a DMZ?
- [ ] Does the component process sensitive data?
- [ ] Are there compensating controls in place?
### Exploitability Assessment
- [ ] Is there a public exploit available?
- [ ] Is exploitation being observed in the wild?
- [ ] What privileges are required to exploit?
- [ ] Does exploit require user interaction?
### Business Impact
- [ ] What business processes depend on affected systems?
- [ ] What is the potential data exposure?
- [ ] What are regulatory implications?
- [ ] What is the reputational risk?Step 4: Triage Decision Matrix
| Exposure | Exploitability | Business Impact | Priority |
|---|---|---|---|
| Internet | Active Exploit | High | P0 - Immediate |
| Internet | PoC Available | High | P1 - Critical |
| Internet | Theoretical | Medium | P2 - High |
| Internal | Active Exploit | High | P1 - Critical |
| Internal | PoC Available | Medium | P2 - High |
| Internal | Theoretical | Low | P3 - Medium |
| Isolated | Any | Low | P4 - Low |
CVSS Scoring
CVSS v3.1 Vector Components
CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H
│ │ │ │ │ │ │ │
│ │ │ │ │ │ │ └── Availability Impact (H/L/N)
│ │ │ │ │ │ └────── Integrity Impact (H/L/N)
│ │ │ │ │ └────────── Confidentiality Impact (H/L/N)
│ │ │ │ └────────────── Scope (C/U)
│ │ │ └─────────────────── User Interaction (R/N)
│ │ └──────────────────────── Privileges Required (H/L/N)
│ └───────────────────────────── Attack Complexity (H/L)
└─────────────────────────────────── Attack Vector (N/A/L/P)Environmental Score Adjustments
def calculate_environmental_score(base_cvss: float, environment: dict) -> float:
"""
Adjust CVSS base score based on environmental factors.
Args:
base_cvss: Base CVSS score from NVD
environment: Dictionary with environmental modifiers
Returns:
Adjusted CVSS score for this environment
"""
# Confidentiality Requirement (CR)
cr_modifier = {
'high': 1.5,
'medium': 1.0,
'low': 0.5
}.get(environment.get('confidentiality_requirement', 'medium'))
# Integrity Requirement (IR)
ir_modifier = {
'high': 1.5,
'medium': 1.0,
'low': 0.5
}.get(environment.get('integrity_requirement', 'medium'))
# Availability Requirement (AR)
ar_modifier = {
'high': 1.5,
'medium': 1.0,
'low': 0.5
}.get(environment.get('availability_requirement', 'medium'))
# Modified Attack Vector (reduce if not internet-facing)
if not environment.get('internet_facing', True):
base_cvss = max(0, base_cvss - 1.5)
# Compensating controls reduce score
if environment.get('waf_protected', False):
base_cvss = max(0, base_cvss - 0.5)
if environment.get('network_segmented', False):
base_cvss = max(0, base_cvss - 0.5)
return round(min(10.0, base_cvss), 1)Remediation Workflows
Workflow 1: Emergency Patch (P0/Critical)
Timeline: 24 hours
Stakeholders: Security, DevOps, Engineering Lead, CISO
Hour 0-2: ASSESS
├── Confirm vulnerability affects production
├── Identify all affected systems
├── Assess active exploitation
└── Notify stakeholders
Hour 2-8: MITIGATE
├── Apply temporary mitigations (WAF rules, network blocks)
├── Enable enhanced monitoring
├── Prepare rollback plan
└── Begin patch development/testing
Hour 8-20: REMEDIATE
├── Test patch in staging
├── Security team validates fix
├── Change approval (emergency CAB)
└── Deploy to production (rolling)
Hour 20-24: VERIFY
├── Confirm vulnerability resolved
├── Monitor for issues
├── Update vulnerability tracker
└── Post-incident review scheduledWorkflow 2: Standard Patch (P1-P2)
# Remediation ticket template
REMEDIATION_TICKET = """
## Vulnerability Remediation
**CVE:** {cve_id}
**Severity:** {severity}
**CVSS:** {cvss_score}
**SLA:** {sla_date}
### Affected Components
{affected_components}
### Root Cause
{root_cause}
### Remediation Steps
1. Update {package} from {current_version} to {fixed_version}
2. Run security regression tests
3. Deploy to staging for validation
4. Security team approval required before production
### Testing Requirements
- [ ] Unit tests pass
- [ ] Integration tests pass
- [ ] Security scan shows vulnerability resolved
- [ ] No new vulnerabilities introduced
### Rollback Plan
{rollback_steps}
### Acceptance Criteria
- Vulnerability scan shows CVE resolved
- No functional regression
- Performance baseline maintained
"""Workflow 3: Risk Acceptance
## Risk Acceptance Request
**Vulnerability:** CVE-XXXX-XXXXX
**Affected System:** [System Name]
**Requested By:** [Name]
**Date:** [Date]
### Business Justification
[Explain why the vulnerability cannot be remediated]
### Compensating Controls
- [ ] Control 1: [Description]
- [ ] Control 2: [Description]
- [ ] Control 3: [Description]
### Residual Risk Assessment
- **Likelihood:** [High/Medium/Low]
- **Impact:** [High/Medium/Low]
- **Residual Risk:** [Critical/High/Medium/Low]
### Review Schedule
- Next review date: [Date]
- Review frequency: [Monthly/Quarterly]
### Approvals
- [ ] Security Team Lead
- [ ] Engineering Manager
- [ ] CISO
- [ ] Business OwnerDependency Scanning
Automated Scanning Pipeline
# .github/workflows/security-scan.yml
name: Security Scan
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
schedule:
- cron: '0 6 * * *' # Daily at 6 AM
jobs:
dependency-scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Run Snyk vulnerability scan
uses: snyk/actions/node@master
env:
SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }}
with:
args: --severity-threshold=high
- name: Run npm audit
run: npm audit --audit-level=high
- name: Run Trivy filesystem scan
uses: aquasecurity/trivy-action@master
with:
scan-type: 'fs'
scan-ref: '.'
severity: 'CRITICAL,HIGH'
exit-code: '1'
sast-scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Run Semgrep
uses: returntocorp/semgrep-action@v1
with:
config: >-
p/security-audit
p/secrets
p/owasp-top-tenManual Dependency Review
# Node.js - Check for vulnerabilities
npm audit
npm audit --json > audit-report.json
# Python - Check for vulnerabilities
pip-audit
safety check -r requirements.txt
# Go - Check for vulnerabilities
govulncheck ./...
# Container images
trivy image myapp:latest
grype myapp:latestDependency Update Strategy
| Update Type | Automation | Review Required |
|---|---|---|
| Security patch (same minor) | Auto-merge | No |
| Minor version | Auto-PR | Yes |
| Major version | Manual PR | Yes + Testing |
| Breaking change | Manual | Yes + Migration plan |
Security Incident Response
Incident Severity Levels
| Level | Description | Response Time | Escalation |
|---|---|---|---|
| SEV-1 | Active breach, data exfiltration | Immediate | CISO, Legal, Exec |
| SEV-2 | Confirmed intrusion, no data loss | 1 hour | Security Lead, Engineering |
| SEV-3 | Suspicious activity, potential breach | 4 hours | Security Team |
| SEV-4 | Policy violation, no immediate risk | 24 hours | Security Team |
Incident Response Checklist
## Incident Response Checklist
### 1. DETECT & IDENTIFY (0-15 min)
- [ ] Alert received and acknowledged
- [ ] Initial severity assessment
- [ ] Incident commander assigned
- [ ] Communication channel established
### 2. CONTAIN (15-60 min)
- [ ] Affected systems identified
- [ ] Network isolation if needed
- [ ] Credentials rotated if compromised
- [ ] Preserve evidence (logs, memory dumps)
### 3. ERADICATE (1-4 hours)
- [ ] Root cause identified
- [ ] Malware/backdoors removed
- [ ] Vulnerabilities patched
- [ ] Systems hardened
### 4. RECOVER (4-24 hours)
- [ ] Systems restored from clean backup
- [ ] Services brought back online
- [ ] Enhanced monitoring enabled
- [ ] User access restored
### 5. POST-INCIDENT (24-72 hours)
- [ ] Incident timeline documented
- [ ] Root cause analysis complete
- [ ] Lessons learned documented
- [ ] Preventive measures implemented
- [ ] Report to stakeholdersQuick Reference
Vulnerability Response SLAs
| Severity | Detection to Triage | Triage to Remediation |
|---|---|---|
| Critical | 4 hours | 24 hours |
| High | 24 hours | 7 days |
| Medium | 3 days | 30 days |
| Low | 7 days | 90 days |
Common Vulnerability Databases
| Database | URL | Use Case |
|---|---|---|
| NVD | nvd.nist.gov | CVE details, CVSS |
| MITRE CVE | cve.mitre.org | CVE registry |
| OSV | osv.dev | Open source vulns |
| GitHub Advisory | github.com/advisories | Package vulns |
| Snyk DB | snyk.io/vuln | Package vulns |
Remediation Priority Formula
Priority Score = (CVSS × Exposure × Business_Impact) / Compensating_Controls
Where:
- CVSS: 0-10 (from NVD)
- Exposure: 1.0 (internal) to 2.0 (internet-facing)
- Business_Impact: 1.0 (low) to 2.0 (critical)
- Compensating_Controls: 1.0 (none) to 0.5 (multiple controls) #!/usr/bin/env python3
"""
Compliance Checker - Verify security compliance against SOC 2, PCI-DSS, HIPAA, GDPR.
Table of Contents:
ComplianceChecker - Main class for compliance verification
__init__ - Initialize with target path and framework
check() - Run compliance checks for selected framework
check_soc2() - Check SOC 2 Type II controls
check_pci_dss() - Check PCI-DSS v4.0 requirements
check_hipaa() - Check HIPAA security rule requirements
check_gdpr() - Check GDPR data protection requirements
_check_encryption_at_rest() - Verify data encryption
_check_access_controls() - Verify access control implementation
_check_logging() - Verify audit logging
_check_secrets_management() - Verify secrets handling
_calculate_compliance_score() - Calculate overall compliance score
main() - CLI entry point
Usage:
python compliance_checker.py /path/to/project
python compliance_checker.py /path/to/project --framework soc2
python compliance_checker.py /path/to/project --framework pci-dss --output report.json
"""
import os
import sys
import json
import re
import argparse
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, asdict
from datetime import datetime
@dataclass
class ComplianceControl:
"""Represents a compliance control check result."""
control_id: str
framework: str
category: str
title: str
description: str
status: str # passed, failed, warning, not_applicable
evidence: List[str]
recommendation: str
severity: str # critical, high, medium, low
class ComplianceChecker:
"""Verify security compliance against industry frameworks."""
FRAMEWORKS = ['soc2', 'pci-dss', 'hipaa', 'gdpr', 'all']
def __init__(
self,
target_path: str,
framework: str = "all",
verbose: bool = False
):
"""
Initialize the compliance checker.
Args:
target_path: Directory to scan
framework: Compliance framework to check (soc2, pci-dss, hipaa, gdpr, all)
verbose: Enable verbose output
"""
self.target_path = Path(target_path)
self.framework = framework.lower()
self.verbose = verbose
self.controls: List[ComplianceControl] = []
self.files_scanned = 0
def check(self) -> Dict:
"""
Run compliance checks for selected framework.
Returns:
Dict with compliance results
"""
print(f"Compliance Checker - Scanning: {self.target_path}")
print(f"Framework: {self.framework.upper()}")
print()
if not self.target_path.exists():
return {"status": "error", "message": f"Path not found: {self.target_path}"}
start_time = datetime.now()
# Run framework-specific checks
if self.framework in ('soc2', 'all'):
self.check_soc2()
if self.framework in ('pci-dss', 'all'):
self.check_pci_dss()
if self.framework in ('hipaa', 'all'):
self.check_hipaa()
if self.framework in ('gdpr', 'all'):
self.check_gdpr()
end_time = datetime.now()
scan_duration = (end_time - start_time).total_seconds()
# Calculate statistics
passed = len([c for c in self.controls if c.status == 'passed'])
failed = len([c for c in self.controls if c.status == 'failed'])
warnings = len([c for c in self.controls if c.status == 'warning'])
na = len([c for c in self.controls if c.status == 'not_applicable'])
compliance_score = self._calculate_compliance_score()
result = {
"status": "completed",
"target": str(self.target_path),
"framework": self.framework,
"scan_duration_seconds": round(scan_duration, 2),
"compliance_score": compliance_score,
"compliance_level": self._get_compliance_level(compliance_score),
"summary": {
"passed": passed,
"failed": failed,
"warnings": warnings,
"not_applicable": na,
"total": len(self.controls)
},
"controls": [asdict(c) for c in self.controls]
}
self._print_summary(result)
return result
def check_soc2(self):
"""Check SOC 2 Type II controls."""
if self.verbose:
print(" Checking SOC 2 Type II controls...")
# CC1: Control Environment - Access Controls
self._check_access_controls_soc2()
# CC2: Communication and Information
self._check_documentation()
# CC3: Risk Assessment
self._check_risk_assessment()
# CC6: Logical and Physical Access Controls
self._check_authentication()
# CC7: System Operations
self._check_logging()
# CC8: Change Management
self._check_change_management()
def check_pci_dss(self):
"""Check PCI-DSS v4.0 requirements."""
if self.verbose:
print(" Checking PCI-DSS v4.0 requirements...")
# Requirement 3: Protect stored cardholder data
self._check_data_encryption()
# Requirement 4: Encrypt transmission of cardholder data
self._check_transmission_encryption()
# Requirement 6: Develop and maintain secure systems
self._check_secure_development()
# Requirement 8: Identify users and authenticate access
self._check_strong_authentication()
# Requirement 10: Log and monitor all access
self._check_audit_logging()
# Requirement 11: Test security of systems regularly
self._check_security_testing()
def check_hipaa(self):
"""Check HIPAA security rule requirements."""
if self.verbose:
print(" Checking HIPAA Security Rule requirements...")
# 164.312(a)(1): Access Control
self._check_hipaa_access_control()
# 164.312(b): Audit Controls
self._check_hipaa_audit()
# 164.312(c)(1): Integrity Controls
self._check_hipaa_integrity()
# 164.312(d): Person or Entity Authentication
self._check_hipaa_authentication()
# 164.312(e)(1): Transmission Security
self._check_hipaa_transmission()
def check_gdpr(self):
"""Check GDPR data protection requirements."""
if self.verbose:
print(" Checking GDPR requirements...")
# Article 25: Data protection by design
self._check_privacy_by_design()
# Article 32: Security of processing
self._check_gdpr_security()
# Article 33/34: Breach notification
self._check_breach_notification()
# Article 17: Right to erasure
self._check_data_deletion()
# Article 20: Data portability
self._check_data_export()
def _check_access_controls_soc2(self):
"""SOC 2 CC1/CC6: Check access control implementation."""
evidence = []
status = 'failed'
# Look for authentication middleware
auth_patterns = [
r'authMiddleware',
r'requireAuth',
r'isAuthenticated',
r'@login_required',
r'@authenticated',
r'passport\.authenticate',
r'jwt\.verify',
r'verifyToken'
]
for pattern in auth_patterns:
files = self._search_files(pattern)
if files:
evidence.extend(files[:3])
status = 'passed'
break
# Check for RBAC implementation
rbac_patterns = [r'role', r'permission', r'authorize', r'can\(', r'hasRole']
for pattern in rbac_patterns:
files = self._search_files(pattern)
if files:
evidence.extend(files[:2])
if status == 'failed':
status = 'warning'
break
self.controls.append(ComplianceControl(
control_id='SOC2-CC6.1',
framework='SOC 2',
category='Logical Access Controls',
title='Access Control Implementation',
description='Verify authentication and authorization controls are implemented',
status=status,
evidence=evidence[:5],
recommendation='Implement authentication middleware and role-based access control (RBAC)',
severity='high' if status == 'failed' else 'low'
))
def _check_documentation(self):
"""SOC 2 CC2: Check security documentation."""
evidence = []
status = 'failed'
doc_files = [
'SECURITY.md',
'docs/security.md',
'CONTRIBUTING.md',
'docs/security-policy.md',
'.github/SECURITY.md'
]
for doc in doc_files:
doc_path = self.target_path / doc
if doc_path.exists():
evidence.append(str(doc))
status = 'passed' if 'security' in doc.lower() else 'warning'
break
self.controls.append(ComplianceControl(
control_id='SOC2-CC2.1',
framework='SOC 2',
category='Communication and Information',
title='Security Documentation',
description='Verify security policies and procedures are documented',
status=status,
evidence=evidence,
recommendation='Create SECURITY.md documenting security policies, incident response, and vulnerability reporting',
severity='medium' if status == 'failed' else 'low'
))
def _check_risk_assessment(self):
"""SOC 2 CC3: Check risk assessment artifacts."""
evidence = []
status = 'failed'
# Look for security scanning configuration
scan_configs = [
'.snyk',
'.github/workflows/security.yml',
'.github/workflows/codeql.yml',
'trivy.yaml',
'.semgrep.yml',
'sonar-project.properties'
]
for config in scan_configs:
config_path = self.target_path / config
if config_path.exists():
evidence.append(str(config))
status = 'passed'
break
# Check for dependabot/renovate
dep_configs = [
'.github/dependabot.yml',
'renovate.json',
'.github/renovate.json'
]
for config in dep_configs:
config_path = self.target_path / config
if config_path.exists():
evidence.append(str(config))
if status == 'failed':
status = 'warning'
break
self.controls.append(ComplianceControl(
control_id='SOC2-CC3.1',
framework='SOC 2',
category='Risk Assessment',
title='Automated Security Scanning',
description='Verify automated vulnerability scanning is configured',
status=status,
evidence=evidence,
recommendation='Configure automated security scanning (Snyk, CodeQL, Trivy) and dependency updates (Dependabot)',
severity='high' if status == 'failed' else 'low'
))
def _check_authentication(self):
"""SOC 2 CC6: Check authentication strength."""
evidence = []
status = 'failed'
# Check for MFA/2FA
mfa_patterns = [r'mfa', r'2fa', r'totp', r'authenticator', r'twoFactor']
for pattern in mfa_patterns:
files = self._search_files(pattern, case_sensitive=False)
if files:
evidence.extend(files[:2])
status = 'passed'
break
# Check for password hashing
hash_patterns = [r'bcrypt', r'argon2', r'scrypt', r'pbkdf2']
for pattern in hash_patterns:
files = self._search_files(pattern, case_sensitive=False)
if files:
evidence.extend(files[:2])
if status == 'failed':
status = 'warning'
break
self.controls.append(ComplianceControl(
control_id='SOC2-CC6.2',
framework='SOC 2',
category='Authentication',
title='Strong Authentication',
description='Verify multi-factor authentication and secure password storage',
status=status,
evidence=evidence[:5],
recommendation='Implement MFA/2FA and use bcrypt/argon2 for password hashing',
severity='critical' if status == 'failed' else 'low'
))
def _check_logging(self):
"""SOC 2 CC7: Check audit logging implementation."""
evidence = []
status = 'failed'
# Check for logging configuration
log_patterns = [
r'winston',
r'pino',
r'bunyan',
r'logging\.getLogger',
r'log\.info',
r'logger\.',
r'audit.*log'
]
for pattern in log_patterns:
files = self._search_files(pattern)
if files:
evidence.extend(files[:3])
status = 'passed'
break
# Check for structured logging
struct_patterns = [r'json.*log', r'structured.*log', r'log.*format']
for pattern in struct_patterns:
files = self._search_files(pattern, case_sensitive=False)
if files:
evidence.extend(files[:2])
break
self.controls.append(ComplianceControl(
control_id='SOC2-CC7.1',
framework='SOC 2',
category='System Operations',
title='Audit Logging',
description='Verify comprehensive audit logging is implemented',
status=status,
evidence=evidence[:5],
recommendation='Implement structured audit logging with security events (auth, access, changes)',
severity='high' if status == 'failed' else 'low'
))
def _check_change_management(self):
"""SOC 2 CC8: Check change management controls."""
evidence = []
status = 'failed'
# Check for CI/CD configuration
ci_configs = [
'.github/workflows',
'.gitlab-ci.yml',
'Jenkinsfile',
'.circleci/config.yml',
'azure-pipelines.yml'
]
for config in ci_configs:
config_path = self.target_path / config
if config_path.exists():
evidence.append(str(config))
status = 'passed'
break
# Check for branch protection indicators
branch_patterns = [r'protected.*branch', r'require.*review', r'pull.*request']
for pattern in branch_patterns:
files = self._search_files(pattern, case_sensitive=False)
if files:
evidence.extend(files[:2])
break
self.controls.append(ComplianceControl(
control_id='SOC2-CC8.1',
framework='SOC 2',
category='Change Management',
title='CI/CD and Code Review',
description='Verify automated deployment pipeline and code review process',
status=status,
evidence=evidence[:5],
recommendation='Implement CI/CD pipeline with required code reviews and branch protection',
severity='medium' if status == 'failed' else 'low'
))
def _check_data_encryption(self):
"""PCI-DSS Req 3: Check encryption at rest."""
evidence = []
status = 'failed'
encryption_patterns = [
r'AES',
r'encrypt',
r'crypto\.createCipher',
r'Fernet',
r'KMS',
r'encryptedField'
]
for pattern in encryption_patterns:
files = self._search_files(pattern)
if files:
evidence.extend(files[:3])
status = 'passed'
break
self.controls.append(ComplianceControl(
control_id='PCI-DSS-3.5',
framework='PCI-DSS',
category='Protect Stored Data',
title='Encryption at Rest',
description='Verify sensitive data is encrypted at rest',
status=status,
evidence=evidence[:5],
recommendation='Implement AES-256 encryption for sensitive data storage using approved libraries',
severity='critical' if status == 'failed' else 'low'
))
def _check_transmission_encryption(self):
"""PCI-DSS Req 4: Check encryption in transit."""
evidence = []
status = 'failed'
tls_patterns = [
r'https://',
r'TLS',
r'SSL',
r'secure.*cookie',
r'HSTS'
]
for pattern in tls_patterns:
files = self._search_files(pattern, case_sensitive=False)
if files:
evidence.extend(files[:3])
status = 'passed'
break
self.controls.append(ComplianceControl(
control_id='PCI-DSS-4.1',
framework='PCI-DSS',
category='Encrypt Transmissions',
title='TLS/HTTPS Enforcement',
description='Verify TLS 1.2+ is enforced for all transmissions',
status=status,
evidence=evidence[:5],
recommendation='Enforce HTTPS with TLS 1.2+, enable HSTS, use secure cookies',
severity='critical' if status == 'failed' else 'low'
))
def _check_secure_development(self):
"""PCI-DSS Req 6: Check secure development practices."""
evidence = []
status = 'failed'
# Check for input validation
validation_patterns = [
r'validator',
r'sanitize',
r'escape',
r'zod',
r'yup',
r'joi'
]
for pattern in validation_patterns:
files = self._search_files(pattern, case_sensitive=False)
if files:
evidence.extend(files[:3])
status = 'passed'
break
self.controls.append(ComplianceControl(
control_id='PCI-DSS-6.5',
framework='PCI-DSS',
category='Secure Development',
title='Input Validation',
description='Verify input validation and sanitization is implemented',
status=status,
evidence=evidence[:5],
recommendation='Use validation libraries (Joi, Zod, validator.js) for all user input',
severity='high' if status == 'failed' else 'low'
))
def _check_strong_authentication(self):
"""PCI-DSS Req 8: Check authentication requirements."""
evidence = []
status = 'failed'
# Check for session management
session_patterns = [
r'session.*timeout',
r'maxAge',
r'expiresIn',
r'session.*expire'
]
for pattern in session_patterns:
files = self._search_files(pattern, case_sensitive=False)
if files:
evidence.extend(files[:3])
status = 'passed'
break
self.controls.append(ComplianceControl(
control_id='PCI-DSS-8.6',
framework='PCI-DSS',
category='Authentication',
title='Session Management',
description='Verify session timeout and management controls',
status=status,
evidence=evidence[:5],
recommendation='Implement 15-minute session timeout, secure session tokens, and session invalidation on logout',
severity='high' if status == 'failed' else 'low'
))
def _check_audit_logging(self):
"""PCI-DSS Req 10: Check audit logging."""
# Reuse SOC 2 logging check logic
evidence = []
status = 'failed'
log_patterns = [r'audit', r'log.*event', r'security.*log']
for pattern in log_patterns:
files = self._search_files(pattern, case_sensitive=False)
if files:
evidence.extend(files[:3])
status = 'passed'
break
self.controls.append(ComplianceControl(
control_id='PCI-DSS-10.2',
framework='PCI-DSS',
category='Logging and Monitoring',
title='Security Event Logging',
description='Verify security events are logged with sufficient detail',
status=status,
evidence=evidence[:5],
recommendation='Log all authentication events, access to cardholder data, and administrative actions',
severity='high' if status == 'failed' else 'low'
))
def _check_security_testing(self):
"""PCI-DSS Req 11: Check security testing."""
evidence = []
status = 'failed'
# Check for test configuration
test_patterns = [
r'security.*test',
r'penetration.*test',
r'vulnerability.*scan'
]
for pattern in test_patterns:
files = self._search_files(pattern, case_sensitive=False)
if files:
evidence.extend(files[:3])
status = 'passed'
break
# Check for SAST/DAST configuration
sast_configs = ['.snyk', '.semgrep.yml', 'sonar-project.properties']
for config in sast_configs:
if (self.target_path / config).exists():
evidence.append(config)
if status == 'failed':
status = 'warning'
break
self.controls.append(ComplianceControl(
control_id='PCI-DSS-11.3',
framework='PCI-DSS',
category='Security Testing',
title='Vulnerability Assessment',
description='Verify regular security testing is performed',
status=status,
evidence=evidence[:5],
recommendation='Configure SAST/DAST scanning and schedule quarterly penetration tests',
severity='high' if status == 'failed' else 'low'
))
def _check_hipaa_access_control(self):
"""HIPAA 164.312(a)(1): Access Control."""
evidence = []
status = 'failed'
# Check for user identification
auth_patterns = [r'user.*id', r'authentication', r'identity']
for pattern in auth_patterns:
files = self._search_files(pattern, case_sensitive=False)
if files:
evidence.extend(files[:3])
status = 'passed'
break
self.controls.append(ComplianceControl(
control_id='HIPAA-164.312(a)(1)',
framework='HIPAA',
category='Access Control',
title='Unique User Identification',
description='Verify unique user identification for accessing PHI',
status=status,
evidence=evidence[:5],
recommendation='Implement unique user accounts with individual credentials for all PHI access',
severity='critical' if status == 'failed' else 'low'
))
def _check_hipaa_audit(self):
"""HIPAA 164.312(b): Audit Controls."""
evidence = []
status = 'failed'
audit_patterns = [r'audit.*trail', r'access.*log', r'phi.*log']
for pattern in audit_patterns:
files = self._search_files(pattern, case_sensitive=False)
if files:
evidence.extend(files[:3])
status = 'passed'
break
self.controls.append(ComplianceControl(
control_id='HIPAA-164.312(b)',
framework='HIPAA',
category='Audit Controls',
title='PHI Access Audit Trail',
description='Verify audit trails for PHI access are maintained',
status=status,
evidence=evidence[:5],
recommendation='Implement comprehensive audit logging for all PHI access with who/what/when/where',
severity='critical' if status == 'failed' else 'low'
))
def _check_hipaa_integrity(self):
"""HIPAA 164.312(c)(1): Integrity Controls."""
evidence = []
status = 'failed'
integrity_patterns = [r'checksum', r'hash', r'signature', r'integrity']
for pattern in integrity_patterns:
files = self._search_files(pattern, case_sensitive=False)
if files:
evidence.extend(files[:3])
status = 'passed'
break
self.controls.append(ComplianceControl(
control_id='HIPAA-164.312(c)(1)',
framework='HIPAA',
category='Integrity',
title='Data Integrity Controls',
description='Verify mechanisms to protect PHI from improper alteration',
status=status,
evidence=evidence[:5],
recommendation='Implement checksums, digital signatures, or hashing for PHI integrity verification',
severity='high' if status == 'failed' else 'low'
))
def _check_hipaa_authentication(self):
"""HIPAA 164.312(d): Authentication."""
evidence = []
status = 'failed'
auth_patterns = [r'mfa', r'two.*factor', r'biometric', r'token.*auth']
for pattern in auth_patterns:
files = self._search_files(pattern, case_sensitive=False)
if files:
evidence.extend(files[:3])
status = 'passed'
break
self.controls.append(ComplianceControl(
control_id='HIPAA-164.312(d)',
framework='HIPAA',
category='Authentication',
title='Person Authentication',
description='Verify mechanisms to authenticate person or entity accessing PHI',
status=status,
evidence=evidence[:5],
recommendation='Implement multi-factor authentication for all PHI access',
severity='critical' if status == 'failed' else 'low'
))
def _check_hipaa_transmission(self):
"""HIPAA 164.312(e)(1): Transmission Security."""
evidence = []
status = 'failed'
transmission_patterns = [r'tls', r'ssl', r'https', r'encrypt.*transit']
for pattern in transmission_patterns:
files = self._search_files(pattern, case_sensitive=False)
if files:
evidence.extend(files[:3])
status = 'passed'
break
self.controls.append(ComplianceControl(
control_id='HIPAA-164.312(e)(1)',
framework='HIPAA',
category='Transmission Security',
title='PHI Transmission Encryption',
description='Verify PHI is encrypted during transmission',
status=status,
evidence=evidence[:5],
recommendation='Enforce TLS 1.2+ for all PHI transmissions, implement end-to-end encryption',
severity='critical' if status == 'failed' else 'low'
))
def _check_privacy_by_design(self):
"""GDPR Article 25: Privacy by design."""
evidence = []
status = 'failed'
privacy_patterns = [
r'data.*minimization',
r'privacy.*config',
r'consent',
r'gdpr'
]
for pattern in privacy_patterns:
files = self._search_files(pattern, case_sensitive=False)
if files:
evidence.extend(files[:3])
status = 'passed'
break
self.controls.append(ComplianceControl(
control_id='GDPR-25',
framework='GDPR',
category='Privacy by Design',
title='Data Minimization',
description='Verify data collection is limited to necessary purposes',
status=status,
evidence=evidence[:5],
recommendation='Implement data minimization, purpose limitation, and privacy-by-default configurations',
severity='high' if status == 'failed' else 'low'
))
def _check_gdpr_security(self):
"""GDPR Article 32: Security of processing."""
evidence = []
status = 'failed'
security_patterns = [r'encrypt', r'pseudonymization', r'anonymization']
for pattern in security_patterns:
files = self._search_files(pattern, case_sensitive=False)
if files:
evidence.extend(files[:3])
status = 'passed'
break
self.controls.append(ComplianceControl(
control_id='GDPR-32',
framework='GDPR',
category='Security',
title='Pseudonymization and Encryption',
description='Verify appropriate security measures for personal data',
status=status,
evidence=evidence[:5],
recommendation='Implement encryption and pseudonymization for personal data processing',
severity='high' if status == 'failed' else 'low'
))
def _check_breach_notification(self):
"""GDPR Article 33/34: Breach notification."""
evidence = []
status = 'failed'
breach_patterns = [
r'breach.*notification',
r'incident.*response',
r'security.*incident'
]
for pattern in breach_patterns:
files = self._search_files(pattern, case_sensitive=False)
if files:
evidence.extend(files[:3])
status = 'passed'
break
# Check for incident response documentation
incident_docs = ['SECURITY.md', 'docs/incident-response.md', '.github/SECURITY.md']
for doc in incident_docs:
if (self.target_path / doc).exists():
evidence.append(doc)
if status == 'failed':
status = 'warning'
break
self.controls.append(ComplianceControl(
control_id='GDPR-33',
framework='GDPR',
category='Breach Notification',
title='Incident Response Procedure',
description='Verify breach notification procedures are documented',
status=status,
evidence=evidence[:5],
recommendation='Document incident response procedures with 72-hour notification capability',
severity='high' if status == 'failed' else 'low'
))
def _check_data_deletion(self):
"""GDPR Article 17: Right to erasure."""
evidence = []
status = 'failed'
deletion_patterns = [
r'delete.*user',
r'erasure',
r'right.*forgotten',
r'data.*deletion',
r'gdpr.*delete'
]
for pattern in deletion_patterns:
files = self._search_files(pattern, case_sensitive=False)
if files:
evidence.extend(files[:3])
status = 'passed'
break
self.controls.append(ComplianceControl(
control_id='GDPR-17',
framework='GDPR',
category='Data Subject Rights',
title='Right to Erasure',
description='Verify data deletion capability is implemented',
status=status,
evidence=evidence[:5],
recommendation='Implement complete user data deletion including all backups and third-party systems',
severity='high' if status == 'failed' else 'low'
))
def _check_data_export(self):
"""GDPR Article 20: Data portability."""
evidence = []
status = 'failed'
export_patterns = [
r'export.*data',
r'data.*portability',
r'download.*data',
r'gdpr.*export'
]
for pattern in export_patterns:
files = self._search_files(pattern, case_sensitive=False)
if files:
evidence.extend(files[:3])
status = 'passed'
break
self.controls.append(ComplianceControl(
control_id='GDPR-20',
framework='GDPR',
category='Data Subject Rights',
title='Data Portability',
description='Verify data export capability is implemented',
status=status,
evidence=evidence[:5],
recommendation='Implement data export in machine-readable format (JSON, CSV)',
severity='medium' if status == 'failed' else 'low'
))
def _search_files(self, pattern: str, case_sensitive: bool = True) -> List[str]:
"""Search files for pattern matches."""
matches = []
flags = 0 if case_sensitive else re.IGNORECASE
try:
for root, dirs, files in os.walk(self.target_path):
# Skip common non-relevant directories
dirs[:] = [d for d in dirs if d not in {
'node_modules', '.git', '__pycache__', 'venv', '.venv',
'dist', 'build', 'coverage', '.next'
}]
for filename in files:
if filename.endswith(('.js', '.ts', '.py', '.go', '.java', '.md', '.yml', '.yaml', '.json')):
file_path = Path(root) / filename
try:
content = file_path.read_text(encoding='utf-8', errors='ignore')
if re.search(pattern, content, flags):
rel_path = str(file_path.relative_to(self.target_path))
matches.append(rel_path)
self.files_scanned += 1
except Exception:
pass
except Exception:
pass
return matches[:10] # Limit results
def _calculate_compliance_score(self) -> float:
"""Calculate overall compliance score (0-100)."""
if not self.controls:
return 0.0
# Weight by severity
severity_weights = {'critical': 4.0, 'high': 3.0, 'medium': 2.0, 'low': 1.0}
status_scores = {'passed': 1.0, 'warning': 0.5, 'failed': 0.0, 'not_applicable': None}
total_weight = 0.0
total_score = 0.0
for control in self.controls:
score = status_scores.get(control.status)
if score is not None: # Skip N/A
weight = severity_weights.get(control.severity, 1.0)
total_weight += weight
total_score += score * weight
return round((total_score / total_weight) * 100, 1) if total_weight > 0 else 0.0
def _get_compliance_level(self, score: float) -> str:
"""Get compliance level from score."""
if score >= 90:
return "COMPLIANT"
elif score >= 70:
return "PARTIALLY_COMPLIANT"
elif score >= 50:
return "NON_COMPLIANT"
return "CRITICAL_GAPS"
def _print_summary(self, result: Dict):
"""Print compliance summary."""
print("\n" + "=" * 60)
print("COMPLIANCE CHECK SUMMARY")
print("=" * 60)
print(f"Target: {result['target']}")
print(f"Framework: {result['framework'].upper()}")
print(f"Scan duration: {result['scan_duration_seconds']}s")
print(f"Compliance score: {result['compliance_score']}% ({result['compliance_level']})")
print()
summary = result['summary']
print(f"Controls checked: {summary['total']}")
print(f" Passed: {summary['passed']}")
print(f" Failed: {summary['failed']}")
print(f" Warning: {summary['warnings']}")
print(f" N/A: {summary['not_applicable']}")
print("=" * 60)
# Show failed controls
failed = [c for c in result['controls'] if c['status'] == 'failed']
if failed:
print("\nFailed controls requiring remediation:")
for control in failed[:5]:
print(f"\n [{control['severity'].upper()}] {control['control_id']}")
print(f" {control['title']}")
print(f" Recommendation: {control['recommendation']}")
def main():
"""Main entry point for CLI."""
parser = argparse.ArgumentParser(
description="Check compliance against SOC 2, PCI-DSS, HIPAA, GDPR",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s /path/to/project
%(prog)s /path/to/project --framework soc2
%(prog)s /path/to/project --framework pci-dss --output report.json
%(prog)s . --framework all --verbose
"""
)
parser.add_argument(
"target",
help="Directory to check for compliance"
)
parser.add_argument(
"--framework", "-f",
choices=["soc2", "pci-dss", "hipaa", "gdpr", "all"],
default="all",
help="Compliance framework to check (default: all)"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Enable verbose output"
)
parser.add_argument(
"--json",
action="store_true",
help="Output results as JSON"
)
parser.add_argument(
"--output", "-o",
help="Output file path"
)
args = parser.parse_args()
checker = ComplianceChecker(
target_path=args.target,
framework=args.framework,
verbose=args.verbose
)
result = checker.check()
if args.json:
output = json.dumps(result, indent=2)
if args.output:
with open(args.output, 'w') as f:
f.write(output)
print(f"\nResults written to {args.output}")
else:
print(output)
elif args.output:
with open(args.output, 'w') as f:
json.dump(result, f, indent=2)
print(f"\nResults written to {args.output}")
# Exit with error code based on compliance level
if result.get('compliance_level') == 'CRITICAL_GAPS':
sys.exit(2)
if result.get('compliance_level') == 'NON_COMPLIANT':
sys.exit(1)
if __name__ == "__main__":
main()
#!/usr/bin/env python3
"""
Security Scanner - Scan source code for security vulnerabilities.
Table of Contents:
SecurityScanner - Main class for security scanning
__init__ - Initialize with target path and options
scan() - Run all security scans
scan_secrets() - Detect hardcoded secrets
scan_sql_injection() - Detect SQL injection patterns
scan_xss() - Detect XSS vulnerabilities
scan_command_injection() - Detect command injection
scan_path_traversal() - Detect path traversal
_scan_file() - Scan individual file for patterns
_calculate_severity() - Calculate finding severity
main() - CLI entry point
Usage:
python security_scanner.py /path/to/project
python security_scanner.py /path/to/project --severity high
python security_scanner.py /path/to/project --output report.json --json
"""
import os
import sys
import json
import re
import argparse
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, asdict
from datetime import datetime
@dataclass
class SecurityFinding:
"""Represents a security finding."""
rule_id: str
severity: str # critical, high, medium, low, info
category: str
title: str
description: str
file_path: str
line_number: int
code_snippet: str
recommendation: str
class SecurityScanner:
"""Scan source code for security vulnerabilities."""
# File extensions to scan
SCAN_EXTENSIONS = {
'.py', '.js', '.ts', '.jsx', '.tsx', '.java', '.go',
'.rb', '.php', '.cs', '.rs', '.swift', '.kt',
'.yml', '.yaml', '.json', '.xml', '.env', '.conf', '.config'
}
# Directories to skip
SKIP_DIRS = {
'node_modules', '.git', '__pycache__', '.venv', 'venv',
'vendor', 'dist', 'build', '.next', 'coverage'
}
# Secret patterns
SECRET_PATTERNS = [
(r'(?i)(api[_-]?key|apikey)\s*[:=]\s*["\']?([a-zA-Z0-9_\-]{20,})["\']?',
'API Key', 'Hardcoded API key detected'),
(r'(?i)(secret[_-]?key|secretkey)\s*[:=]\s*["\']?([a-zA-Z0-9_\-]{16,})["\']?',
'Secret Key', 'Hardcoded secret key detected'),
(r'(?i)(password|passwd|pwd)\s*[:=]\s*["\']([^"\']{4,})["\']',
'Password', 'Hardcoded password detected'),
(r'(?i)(aws[_-]?access[_-]?key[_-]?id)\s*[:=]\s*["\']?(AKIA[A-Z0-9]{16})["\']?',
'AWS Access Key', 'Hardcoded AWS access key detected'),
(r'(?i)(aws[_-]?secret[_-]?access[_-]?key)\s*[:=]\s*["\']?([a-zA-Z0-9/+=]{40})["\']?',
'AWS Secret Key', 'Hardcoded AWS secret access key detected'),
(r'ghp_[a-zA-Z0-9]{36}',
'GitHub Token', 'GitHub personal access token detected'),
(r'sk-[a-zA-Z0-9]{48}',
'OpenAI API Key', 'OpenAI API key detected'),
(r'-----BEGIN\s+(RSA|DSA|EC|OPENSSH)?\s*PRIVATE KEY-----',
'Private Key', 'Private key detected in source code'),
]
# SQL injection patterns
SQL_INJECTION_PATTERNS = [
(r'execute\s*\(\s*["\']?\s*SELECT.*\+.*\+',
'Dynamic SQL query with string concatenation'),
(r'execute\s*\(\s*f["\']SELECT',
'F-string SQL query (Python)'),
(r'cursor\.execute\s*\(\s*["\'].*%s.*%\s*\(',
'Unsafe string formatting in SQL'),
(r'query\s*\(\s*[`"\']SELECT.*\$\{',
'Template literal SQL injection (JavaScript)'),
(r'\.query\s*\(\s*["\'].*\+.*\+',
'String concatenation in SQL query'),
]
# XSS patterns
XSS_PATTERNS = [
(r'innerHTML\s*=\s*[^;]+(?:user|input|param|query)',
'User input assigned to innerHTML'),
(r'document\.write\s*\([^;]*(?:user|input|param|query)',
'User input in document.write'),
(r'\.html\s*\(\s*[^)]*(?:user|input|param|query)',
'User input in jQuery .html()'),
(r'dangerouslySetInnerHTML',
'React dangerouslySetInnerHTML usage'),
(r'\|safe\s*}}',
'Django safe filter may disable escaping'),
]
# Command injection patterns (detection rules for finding unsafe patterns)
COMMAND_INJECTION_PATTERNS = [
(r'subprocess\.(?:call|run|Popen)\s*\([^)]*shell\s*=\s*True',
'Subprocess with shell=True'),
(r'exec\s*\(\s*[^)]*(?:user|input|param|request)',
'exec() with potential user input'),
(r'eval\s*\(\s*[^)]*(?:user|input|param|request)',
'eval() with potential user input'),
]
# Path traversal patterns
PATH_TRAVERSAL_PATTERNS = [
(r'open\s*\(\s*[^)]*(?:user|input|param|request)',
'File open with potential user input'),
(r'readFile\s*\(\s*[^)]*(?:user|input|param|req\.|query)',
'File read with potential user input'),
(r'path\.join\s*\([^)]*(?:user|input|param|req\.|query)',
'Path.join with user input without validation'),
]
def __init__(
self,
target_path: str,
severity_threshold: str = "low",
verbose: bool = False
):
"""
Initialize the security scanner.
Args:
target_path: Directory or file to scan
severity_threshold: Minimum severity to report (critical, high, medium, low)
verbose: Enable verbose output
"""
self.target_path = Path(target_path)
self.severity_threshold = severity_threshold
self.verbose = verbose
self.findings: List[SecurityFinding] = []
self.files_scanned = 0
self.severity_order = {'critical': 0, 'high': 1, 'medium': 2, 'low': 3, 'info': 4}
def scan(self) -> Dict:
"""
Run all security scans.
Returns:
Dict with scan results and findings
"""
print(f"Security Scanner - Scanning: {self.target_path}")
print(f"Severity threshold: {self.severity_threshold}")
print()
if not self.target_path.exists():
return {"status": "error", "message": f"Path not found: {self.target_path}"}
start_time = datetime.now()
# Collect files to scan
files_to_scan = self._collect_files()
print(f"Files to scan: {len(files_to_scan)}")
# Run scans
for file_path in files_to_scan:
self._scan_file(file_path)
self.files_scanned += 1
# Filter by severity threshold
threshold_level = self.severity_order.get(self.severity_threshold, 3)
filtered_findings = [
f for f in self.findings
if self.severity_order.get(f.severity, 3) <= threshold_level
]
end_time = datetime.now()
scan_duration = (end_time - start_time).total_seconds()
# Group findings by severity
severity_counts = {}
for finding in filtered_findings:
severity_counts[finding.severity] = severity_counts.get(finding.severity, 0) + 1
result = {
"status": "completed",
"target": str(self.target_path),
"files_scanned": self.files_scanned,
"scan_duration_seconds": round(scan_duration, 2),
"total_findings": len(filtered_findings),
"severity_counts": severity_counts,
"findings": [asdict(f) for f in filtered_findings]
}
self._print_summary(result)
return result
def _collect_files(self) -> List[Path]:
"""Collect files to scan."""
files = []
if self.target_path.is_file():
return [self.target_path]
for root, dirs, filenames in os.walk(self.target_path):
# Skip directories
dirs[:] = [d for d in dirs if d not in self.SKIP_DIRS]
for filename in filenames:
file_path = Path(root) / filename
if file_path.suffix.lower() in self.SCAN_EXTENSIONS:
files.append(file_path)
return files
def _scan_file(self, file_path: Path):
"""Scan a single file for security issues."""
try:
content = file_path.read_text(encoding='utf-8', errors='ignore')
lines = content.split('\n')
relative_path = str(file_path.relative_to(self.target_path) if self.target_path.is_dir() else file_path.name)
# Scan for secrets
self._scan_patterns(
lines, relative_path,
self.SECRET_PATTERNS,
'secrets',
'Hardcoded Secret',
'critical'
)
# Scan for SQL injection
self._scan_patterns(
lines, relative_path,
[(p[0], p[1]) for p in self.SQL_INJECTION_PATTERNS],
'injection',
'SQL Injection',
'high'
)
# Scan for XSS
self._scan_patterns(
lines, relative_path,
[(p[0], p[1]) for p in self.XSS_PATTERNS],
'xss',
'Cross-Site Scripting (XSS)',
'high'
)
# Scan for command injection
self._scan_patterns(
lines, relative_path,
[(p[0], p[1]) for p in self.COMMAND_INJECTION_PATTERNS],
'injection',
'Command Injection',
'critical'
)
# Scan for path traversal
self._scan_patterns(
lines, relative_path,
[(p[0], p[1]) for p in self.PATH_TRAVERSAL_PATTERNS],
'path-traversal',
'Path Traversal',
'medium'
)
if self.verbose:
print(f" Scanned: {relative_path}")
except Exception as e:
if self.verbose:
print(f" Error scanning {file_path}: {e}")
def _scan_patterns(
self,
lines: List[str],
file_path: str,
patterns: List[Tuple],
category: str,
title: str,
default_severity: str
):
"""Scan lines for patterns."""
for line_num, line in enumerate(lines, 1):
for pattern_tuple in patterns:
pattern = pattern_tuple[0]
description = pattern_tuple[1] if len(pattern_tuple) > 1 else title
match = re.search(pattern, line, re.IGNORECASE)
if match:
# Check for false positives (comments, test files)
if self._is_false_positive(line, file_path):
continue
# Determine severity based on context
severity = self._calculate_severity(
default_severity,
file_path,
category
)
finding = SecurityFinding(
rule_id=f"{category}-{len(self.findings) + 1:04d}",
severity=severity,
category=category,
title=title,
description=description,
file_path=file_path,
line_number=line_num,
code_snippet=line.strip()[:100],
recommendation=self._get_recommendation(category)
)
self.findings.append(finding)
def _is_false_positive(self, line: str, file_path: str) -> bool:
"""Check if finding is likely a false positive."""
# Skip comments
stripped = line.strip()
if stripped.startswith('#') or stripped.startswith('//') or stripped.startswith('*'):
return True
# Skip test files for some patterns
if 'test' in file_path.lower() or 'spec' in file_path.lower():
return True
# Skip example/sample values
lower_line = line.lower()
if any(skip in lower_line for skip in ['example', 'sample', 'placeholder', 'xxx', 'your_']):
return True
return False
def _calculate_severity(self, default: str, file_path: str, category: str) -> str:
"""Calculate severity based on context."""
# Increase severity for production-related files
if any(prod in file_path.lower() for prod in ['prod', 'production', 'deploy']):
if default == 'high':
return 'critical'
if default == 'medium':
return 'high'
# Decrease severity for config examples
if 'example' in file_path.lower() or 'sample' in file_path.lower():
if default == 'critical':
return 'high'
if default == 'high':
return 'medium'
return default
def _get_recommendation(self, category: str) -> str:
"""Get remediation recommendation for category."""
recommendations = {
'secrets': 'Remove hardcoded secrets. Use environment variables or a secrets manager (HashiCorp Vault, AWS Secrets Manager).',
'injection': 'Use parameterized queries or prepared statements. Never concatenate user input into queries.',
'xss': 'Always escape or sanitize user input before rendering. Use framework-provided escaping functions.',
'path-traversal': 'Validate and sanitize file paths. Use allowlists for permitted directories.',
}
return recommendations.get(category, 'Review and remediate the security issue.')
def _print_summary(self, result: Dict):
"""Print scan summary."""
print("\n" + "=" * 60)
print("SECURITY SCAN SUMMARY")
print("=" * 60)
print(f"Target: {result['target']}")
print(f"Files scanned: {result['files_scanned']}")
print(f"Scan duration: {result['scan_duration_seconds']}s")
print(f"Total findings: {result['total_findings']}")
print()
if result['severity_counts']:
print("Findings by severity:")
for severity in ['critical', 'high', 'medium', 'low', 'info']:
count = result['severity_counts'].get(severity, 0)
if count > 0:
print(f" {severity.upper()}: {count}")
print("=" * 60)
if result['total_findings'] > 0:
print("\nTop findings:")
for finding in result['findings'][:5]:
print(f"\n [{finding['severity'].upper()}] {finding['title']}")
print(f" File: {finding['file_path']}:{finding['line_number']}")
print(f" {finding['description']}")
def main():
"""Main entry point for CLI."""
parser = argparse.ArgumentParser(
description="Scan source code for security vulnerabilities",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s /path/to/project
%(prog)s /path/to/project --severity high
%(prog)s /path/to/project --output report.json --json
%(prog)s /path/to/file.py --verbose
"""
)
parser.add_argument(
"target",
help="Directory or file to scan"
)
parser.add_argument(
"--severity", "-s",
choices=["critical", "high", "medium", "low", "info"],
default="low",
help="Minimum severity to report (default: low)"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Enable verbose output"
)
parser.add_argument(
"--json",
action="store_true",
help="Output results as JSON"
)
parser.add_argument(
"--output", "-o",
help="Output file path"
)
args = parser.parse_args()
scanner = SecurityScanner(
target_path=args.target,
severity_threshold=args.severity,
verbose=args.verbose
)
result = scanner.scan()
if args.json:
output = json.dumps(result, indent=2)
if args.output:
with open(args.output, 'w') as f:
f.write(output)
print(f"\nResults written to {args.output}")
else:
print(output)
elif args.output:
with open(args.output, 'w') as f:
json.dump(result, f, indent=2)
print(f"\nResults written to {args.output}")
# Exit with error code if critical/high findings
if result.get('severity_counts', {}).get('critical', 0) > 0:
sys.exit(2)
if result.get('severity_counts', {}).get('high', 0) > 0:
sys.exit(1)
if __name__ == "__main__":
main()
#!/usr/bin/env python3
"""
Vulnerability Assessor - Scan dependencies for known CVEs and security issues.
Table of Contents:
VulnerabilityAssessor - Main class for dependency vulnerability assessment
__init__ - Initialize with target path and options
assess() - Run complete vulnerability assessment
scan_npm() - Scan package.json for npm vulnerabilities
scan_python() - Scan requirements.txt for Python vulnerabilities
scan_go() - Scan go.mod for Go vulnerabilities
_parse_package_json() - Parse npm package.json
_parse_requirements() - Parse Python requirements.txt
_parse_go_mod() - Parse Go go.mod
_check_vulnerability() - Check package against CVE database
_calculate_risk_score() - Calculate overall risk score
main() - CLI entry point
Usage:
python vulnerability_assessor.py /path/to/project
python vulnerability_assessor.py /path/to/project --severity high
python vulnerability_assessor.py /path/to/project --output report.json --json
"""
import os
import sys
import json
import re
import argparse
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, asdict
from datetime import datetime
@dataclass
class Vulnerability:
"""Represents a dependency vulnerability."""
cve_id: str
package: str
installed_version: str
fixed_version: str
severity: str # critical, high, medium, low
cvss_score: float
description: str
ecosystem: str # npm, pypi, go
recommendation: str
class VulnerabilityAssessor:
"""Assess project dependencies for known vulnerabilities."""
# Known CVE database (simplified - real implementation would query NVD/OSV)
KNOWN_CVES = {
# npm packages
'lodash': [
{'version_lt': '4.17.21', 'cve': 'CVE-2021-23337', 'cvss': 7.2,
'severity': 'high', 'desc': 'Command injection in lodash',
'fixed': '4.17.21'},
{'version_lt': '4.17.19', 'cve': 'CVE-2020-8203', 'cvss': 7.4,
'severity': 'high', 'desc': 'Prototype pollution in lodash',
'fixed': '4.17.19'},
],
'axios': [
{'version_lt': '1.6.0', 'cve': 'CVE-2023-45857', 'cvss': 6.5,
'severity': 'medium', 'desc': 'CSRF token exposure in axios',
'fixed': '1.6.0'},
],
'express': [
{'version_lt': '4.17.3', 'cve': 'CVE-2022-24999', 'cvss': 7.5,
'severity': 'high', 'desc': 'Open redirect in express',
'fixed': '4.17.3'},
],
'jsonwebtoken': [
{'version_lt': '9.0.0', 'cve': 'CVE-2022-23529', 'cvss': 9.8,
'severity': 'critical', 'desc': 'JWT algorithm confusion attack',
'fixed': '9.0.0'},
],
'minimist': [
{'version_lt': '1.2.6', 'cve': 'CVE-2021-44906', 'cvss': 9.8,
'severity': 'critical', 'desc': 'Prototype pollution in minimist',
'fixed': '1.2.6'},
],
'node-fetch': [
{'version_lt': '2.6.7', 'cve': 'CVE-2022-0235', 'cvss': 8.8,
'severity': 'high', 'desc': 'Information exposure in node-fetch',
'fixed': '2.6.7'},
],
# Python packages
'django': [
{'version_lt': '4.2.8', 'cve': 'CVE-2023-46695', 'cvss': 7.5,
'severity': 'high', 'desc': 'DoS via file uploads in Django',
'fixed': '4.2.8'},
],
'requests': [
{'version_lt': '2.31.0', 'cve': 'CVE-2023-32681', 'cvss': 6.1,
'severity': 'medium', 'desc': 'Proxy-Auth header leak in requests',
'fixed': '2.31.0'},
],
'pillow': [
{'version_lt': '10.0.1', 'cve': 'CVE-2023-44271', 'cvss': 7.5,
'severity': 'high', 'desc': 'DoS via crafted image in Pillow',
'fixed': '10.0.1'},
],
'cryptography': [
{'version_lt': '41.0.4', 'cve': 'CVE-2023-38325', 'cvss': 7.5,
'severity': 'high', 'desc': 'NULL pointer dereference in cryptography',
'fixed': '41.0.4'},
],
'pyyaml': [
{'version_lt': '6.0.1', 'cve': 'CVE-2020-14343', 'cvss': 9.8,
'severity': 'critical', 'desc': 'Arbitrary code execution in PyYAML',
'fixed': '6.0.1'},
],
'urllib3': [
{'version_lt': '2.0.6', 'cve': 'CVE-2023-43804', 'cvss': 8.1,
'severity': 'high', 'desc': 'Cookie header leak in urllib3',
'fixed': '2.0.6'},
],
# Go packages
'golang.org/x/crypto': [
{'version_lt': 'v0.17.0', 'cve': 'CVE-2023-48795', 'cvss': 5.9,
'severity': 'medium', 'desc': 'SSH prefix truncation attack',
'fixed': 'v0.17.0'},
],
'golang.org/x/net': [
{'version_lt': 'v0.17.0', 'cve': 'CVE-2023-44487', 'cvss': 7.5,
'severity': 'high', 'desc': 'HTTP/2 rapid reset attack',
'fixed': 'v0.17.0'},
],
}
SEVERITY_ORDER = {'critical': 0, 'high': 1, 'medium': 2, 'low': 3}
def __init__(
self,
target_path: str,
severity_threshold: str = "low",
verbose: bool = False
):
"""
Initialize the vulnerability assessor.
Args:
target_path: Directory to scan for dependency files
severity_threshold: Minimum severity to report
verbose: Enable verbose output
"""
self.target_path = Path(target_path)
self.severity_threshold = severity_threshold
self.verbose = verbose
self.vulnerabilities: List[Vulnerability] = []
self.packages_scanned = 0
self.files_scanned = 0
def assess(self) -> Dict:
"""
Run complete vulnerability assessment.
Returns:
Dict with assessment results
"""
print(f"Vulnerability Assessor - Scanning: {self.target_path}")
print(f"Severity threshold: {self.severity_threshold}")
print()
if not self.target_path.exists():
return {"status": "error", "message": f"Path not found: {self.target_path}"}
start_time = datetime.now()
# Scan npm dependencies
package_json = self.target_path / "package.json"
if package_json.exists():
self.scan_npm(package_json)
self.files_scanned += 1
# Scan Python dependencies
requirements_files = [
"requirements.txt",
"requirements-dev.txt",
"requirements-prod.txt",
"pyproject.toml"
]
for req_file in requirements_files:
req_path = self.target_path / req_file
if req_path.exists():
self.scan_python(req_path)
self.files_scanned += 1
# Scan Go dependencies
go_mod = self.target_path / "go.mod"
if go_mod.exists():
self.scan_go(go_mod)
self.files_scanned += 1
# Scan package-lock.json for transitive dependencies
package_lock = self.target_path / "package-lock.json"
if package_lock.exists():
self.scan_npm_lock(package_lock)
self.files_scanned += 1
# Filter by severity
threshold_level = self.SEVERITY_ORDER.get(self.severity_threshold, 3)
filtered_vulns = [
v for v in self.vulnerabilities
if self.SEVERITY_ORDER.get(v.severity, 3) <= threshold_level
]
end_time = datetime.now()
scan_duration = (end_time - start_time).total_seconds()
# Group by severity
severity_counts = {}
for vuln in filtered_vulns:
severity_counts[vuln.severity] = severity_counts.get(vuln.severity, 0) + 1
# Calculate risk score
risk_score = self._calculate_risk_score(filtered_vulns)
result = {
"status": "completed",
"target": str(self.target_path),
"files_scanned": self.files_scanned,
"packages_scanned": self.packages_scanned,
"scan_duration_seconds": round(scan_duration, 2),
"total_vulnerabilities": len(filtered_vulns),
"risk_score": risk_score,
"risk_level": self._get_risk_level(risk_score),
"severity_counts": severity_counts,
"vulnerabilities": [asdict(v) for v in filtered_vulns]
}
self._print_summary(result)
return result
def scan_npm(self, package_json_path: Path):
"""Scan package.json for npm vulnerabilities."""
if self.verbose:
print(f" Scanning: {package_json_path}")
try:
with open(package_json_path, 'r') as f:
data = json.load(f)
deps = {}
deps.update(data.get('dependencies', {}))
deps.update(data.get('devDependencies', {}))
for package, version_spec in deps.items():
self.packages_scanned += 1
version = self._normalize_version(version_spec)
self._check_vulnerability(package.lower(), version, 'npm')
except Exception as e:
if self.verbose:
print(f" Error scanning {package_json_path}: {e}")
def scan_npm_lock(self, package_lock_path: Path):
"""Scan package-lock.json for transitive dependencies."""
if self.verbose:
print(f" Scanning: {package_lock_path}")
try:
with open(package_lock_path, 'r') as f:
data = json.load(f)
# Handle npm v2/v3 lockfile format
packages = data.get('packages', {})
if not packages:
# npm v1 format
packages = data.get('dependencies', {})
for pkg_path, pkg_info in packages.items():
if not pkg_path: # Skip root
continue
# Extract package name from path
package = pkg_path.split('node_modules/')[-1]
version = pkg_info.get('version', '')
if package and version:
self.packages_scanned += 1
self._check_vulnerability(package.lower(), version, 'npm')
except Exception as e:
if self.verbose:
print(f" Error scanning {package_lock_path}: {e}")
def scan_python(self, requirements_path: Path):
"""Scan requirements.txt for Python vulnerabilities."""
if self.verbose:
print(f" Scanning: {requirements_path}")
try:
content = requirements_path.read_text()
# Handle pyproject.toml
if requirements_path.name == 'pyproject.toml':
self._scan_pyproject(content)
return
# Parse requirements.txt
for line in content.split('\n'):
line = line.strip()
if not line or line.startswith('#') or line.startswith('-'):
continue
# Parse package==version or package>=version
match = re.match(r'^([a-zA-Z0-9_-]+)\s*([=<>!~]+)\s*([0-9.]+)', line)
if match:
package = match.group(1).lower()
version = match.group(3)
self.packages_scanned += 1
self._check_vulnerability(package, version, 'pypi')
except Exception as e:
if self.verbose:
print(f" Error scanning {requirements_path}: {e}")
def _scan_pyproject(self, content: str):
"""Parse pyproject.toml for dependencies."""
# Simple parsing - real implementation would use toml library
in_deps = False
for line in content.split('\n'):
line = line.strip()
if '[project.dependencies]' in line or '[tool.poetry.dependencies]' in line:
in_deps = True
continue
if line.startswith('[') and in_deps:
in_deps = False
continue
if in_deps and '=' in line:
match = re.match(r'"?([a-zA-Z0-9_-]+)"?\s*[=:]\s*"?([^"]+)"?', line)
if match:
package = match.group(1).lower()
version_spec = match.group(2)
version = self._normalize_version(version_spec)
self.packages_scanned += 1
self._check_vulnerability(package, version, 'pypi')
def scan_go(self, go_mod_path: Path):
"""Scan go.mod for Go vulnerabilities."""
if self.verbose:
print(f" Scanning: {go_mod_path}")
try:
content = go_mod_path.read_text()
# Parse require blocks
in_require = False
for line in content.split('\n'):
line = line.strip()
if line.startswith('require ('):
in_require = True
continue
if in_require and line == ')':
in_require = False
continue
# Parse single require or block require
if line.startswith('require ') or in_require:
parts = line.replace('require ', '').split()
if len(parts) >= 2:
package = parts[0]
version = parts[1]
self.packages_scanned += 1
self._check_vulnerability(package, version, 'go')
except Exception as e:
if self.verbose:
print(f" Error scanning {go_mod_path}: {e}")
def _normalize_version(self, version_spec: str) -> str:
"""Extract version number from version specification."""
# Remove prefixes like ^, ~, >=, etc.
version = re.sub(r'^[\^~>=<]+', '', version_spec)
# Remove suffixes like -alpha, -beta, etc.
version = re.split(r'[-+]', version)[0]
return version.strip()
def _check_vulnerability(self, package: str, version: str, ecosystem: str):
"""Check if package version has known vulnerabilities."""
cves = self.KNOWN_CVES.get(package, [])
for cve_info in cves:
if self._version_lt(version, cve_info['version_lt']):
vuln = Vulnerability(
cve_id=cve_info['cve'],
package=package,
installed_version=version,
fixed_version=cve_info['fixed'],
severity=cve_info['severity'],
cvss_score=cve_info['cvss'],
description=cve_info['desc'],
ecosystem=ecosystem,
recommendation=f"Upgrade {package} to {cve_info['fixed']} or later"
)
# Avoid duplicates
if not any(v.cve_id == vuln.cve_id and v.package == vuln.package
for v in self.vulnerabilities):
self.vulnerabilities.append(vuln)
def _version_lt(self, version: str, threshold: str) -> bool:
"""Compare version strings (simplified)."""
try:
# Remove 'v' prefix for Go versions
v1 = version.lstrip('v')
v2 = threshold.lstrip('v')
parts1 = [int(x) for x in re.split(r'[.\-]', v1) if x.isdigit()]
parts2 = [int(x) for x in re.split(r'[.\-]', v2) if x.isdigit()]
# Pad shorter version
while len(parts1) < len(parts2):
parts1.append(0)
while len(parts2) < len(parts1):
parts2.append(0)
return parts1 < parts2
except (ValueError, AttributeError):
return False
def _calculate_risk_score(self, vulnerabilities: List[Vulnerability]) -> float:
"""Calculate overall risk score (0-100)."""
if not vulnerabilities:
return 0.0
# Weight by severity and CVSS
severity_weights = {'critical': 4.0, 'high': 3.0, 'medium': 2.0, 'low': 1.0}
total_weight = 0.0
for vuln in vulnerabilities:
weight = severity_weights.get(vuln.severity, 1.0)
total_weight += (vuln.cvss_score * weight)
# Normalize to 0-100
max_possible = len(vulnerabilities) * 10.0 * 4.0
score = (total_weight / max_possible) * 100 if max_possible > 0 else 0
return min(100.0, round(score, 1))
def _get_risk_level(self, score: float) -> str:
"""Get risk level from score."""
if score >= 70:
return "CRITICAL"
elif score >= 50:
return "HIGH"
elif score >= 25:
return "MEDIUM"
elif score > 0:
return "LOW"
return "NONE"
def _print_summary(self, result: Dict):
"""Print assessment summary."""
print("\n" + "=" * 60)
print("VULNERABILITY ASSESSMENT SUMMARY")
print("=" * 60)
print(f"Target: {result['target']}")
print(f"Files scanned: {result['files_scanned']}")
print(f"Packages scanned: {result['packages_scanned']}")
print(f"Scan duration: {result['scan_duration_seconds']}s")
print(f"Total vulnerabilities: {result['total_vulnerabilities']}")
print(f"Risk score: {result['risk_score']}/100 ({result['risk_level']})")
print()
if result['severity_counts']:
print("Vulnerabilities by severity:")
for severity in ['critical', 'high', 'medium', 'low']:
count = result['severity_counts'].get(severity, 0)
if count > 0:
print(f" {severity.upper()}: {count}")
print("=" * 60)
if result['total_vulnerabilities'] > 0:
print("\nTop vulnerabilities:")
# Sort by CVSS score
sorted_vulns = sorted(
result['vulnerabilities'],
key=lambda x: x['cvss_score'],
reverse=True
)
for vuln in sorted_vulns[:5]:
print(f"\n [{vuln['severity'].upper()}] {vuln['cve_id']}")
print(f" Package: {vuln['package']}@{vuln['installed_version']}")
print(f" CVSS: {vuln['cvss_score']}")
print(f" Fix: Upgrade to {vuln['fixed_version']}")
def main():
"""Main entry point for CLI."""
parser = argparse.ArgumentParser(
description="Scan dependencies for known vulnerabilities",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s /path/to/project
%(prog)s /path/to/project --severity high
%(prog)s /path/to/project --output report.json --json
%(prog)s . --verbose
"""
)
parser.add_argument(
"target",
help="Directory containing dependency files"
)
parser.add_argument(
"--severity", "-s",
choices=["critical", "high", "medium", "low"],
default="low",
help="Minimum severity to report (default: low)"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Enable verbose output"
)
parser.add_argument(
"--json",
action="store_true",
help="Output results as JSON"
)
parser.add_argument(
"--output", "-o",
help="Output file path"
)
args = parser.parse_args()
assessor = VulnerabilityAssessor(
target_path=args.target,
severity_threshold=args.severity,
verbose=args.verbose
)
result = assessor.assess()
if args.json:
output = json.dumps(result, indent=2)
if args.output:
with open(args.output, 'w') as f:
f.write(output)
print(f"\nResults written to {args.output}")
else:
print(output)
elif args.output:
with open(args.output, 'w') as f:
json.dump(result, f, indent=2)
print(f"\nResults written to {args.output}")
# Exit with error code if critical/high vulnerabilities
if result.get('severity_counts', {}).get('critical', 0) > 0:
sys.exit(2)
if result.get('severity_counts', {}).get('high', 0) > 0:
sys.exit(1)
if __name__ == "__main__":
main()
Install this Skill
Skills give your AI agent a consistent, structured approach to this task — better output than a one-off prompt.
npx skills add alirezarezvani/claude-skills --skill engineering-team/senior-secops Community skill by @alirezarezvani. Need a walkthrough? See the install guide →
Works with
Prefer no terminal? Download the ZIP and place it manually.
Details
- Category
- Development
- License
- MIT
- Author
- @alirezarezvani
- Source
- GitHub →
- Source file
-
show path
engineering-team/senior-secops/SKILL.md
People who install this also use
Senior Security Engineer
Threat modeling, penetration testing guidance, zero-trust architecture design, and security code review from a senior security engineering perspective.
@alirezarezvani
Senior DevOps Engineer
CI/CD pipeline design, Infrastructure as Code, containerization with Docker and Kubernetes, and deployment automation from a senior DevOps perspective.
@alirezarezvani
CISO Advisor
Information security leadership — risk quantification, compliance roadmaps (SOC2, ISO 27001), security architecture, and board-level security reporting.
@alirezarezvani