Nis2 Compliance Monitor
Verifiedby Dryade Daemon
Requires enterprise tier subscription
Description
NIS2 cybersecurity compliance monitoring — risk assessment, incident reporting (24h/72h/1m), supply chain audit, gap analysis, board reporting
Screenshots
Details
NIS2 Compliance Monitor Documentation
Overview
The nis2_compliance_monitor module provides comprehensive monitoring, assessment, and reporting tools for NIS2 (Network and Information Security Directive 2) cybersecurity compliance. It enables organizations to track compliance status, manage risk assessments, handle incident reporting, conduct supply chain audits, perform gap analysis, and generate board-level reports.
Key Features
- Risk Assessment: Evaluate and score security risks across infrastructure
- Incident Reporting: Track, categorize, and report security incidents
- Supply Chain Audit: Monitor third-party and vendor security compliance
- Gap Analysis: Identify compliance gaps and remediation needs
- Board Reporting: Generate executive-level compliance reports
- Compliance Tracking: Monitor ongoing compliance with NIS2 requirements
- Audit Trails: Maintain detailed logs of all compliance activities
Installation
Prerequisites
- Python 3.8 or higher
- pip package manager
- Database support (PostgreSQL recommended for production)
Basic Installation
pip install nis2-compliance-monitor
Installation from Source
git clone https://github.com/your-org/nis2-compliance-monitor.git
cd nis2-compliance-monitor
pip install -e .
Dependencies
Core dependencies are automatically installed:
sqlalchemy>=1.4.0- Database ORMpydantic>=1.9.0- Data validationpython-dateutil>=2.8.2- Date utilitiesrequests>=2.28.0- HTTP clientcryptography>=38.0.0- Security utilities
Optional Dependencies
For enhanced features:
pip install nis2-compliance-monitor[postgres] # PostgreSQL support
pip install nis2-compliance-monitor[reporting] # Advanced reporting
pip install nis2-compliance-monitor[api] # REST API server
Configuration
Environment Variables
NIS2_DATABASE_URL="postgresql://user:password@localhost/nis2_db"
NIS2_API_KEY="your-api-key"
NIS2_ENVIRONMENT="production" # development, staging, production
NIS2_LOG_LEVEL="INFO" # DEBUG, INFO, WARNING, ERROR
NIS2_REPORT_FORMAT="pdf" # pdf, html, json
Configuration File
Create nis2_config.yaml:
database:
url: "postgresql://user:password@localhost/nis2_db"
pool_size: 10
echo: false
compliance:
framework: "nis2"
assessment_frequency: "quarterly"
risk_threshold: 7 # 1-10 scale
incident_management:
auto_escalate_critical: true
notification_channels:
- email
- slack
- webhook
supply_chain:
vendor_assessment_interval: 180 # days
require_soc2: true
require_iso27001: false
reporting:
board_report_frequency: "monthly"
include_metrics:
- risk_score
- incident_count
- compliance_percentage
- remediation_status
Python Configuration
from nis2_compliance_monitor import ComplianceManager, Config
config = Config(
database_url="postgresql://user:password@localhost/nis2_db",
environment="production",
log_level="INFO",
risk_threshold=7,
)
manager = ComplianceManager(config)
Usage
Basic Usage
from nis2_compliance_monitor import ComplianceManager, Asset, RiskLevel
Initialize manager
manager = ComplianceManager()
Register an asset
asset = Asset(
name="Web Server 01",
asset_type="server",
criticality="high",
location="primary_datacenter"
)
manager.register_asset(asset)
Create risk assessment
assessment = manager.create_risk_assessment(
asset_id=asset.id,
vulnerability_count=3,
exposure_level="medium"
)
Get compliance status
status = manager.get_compliance_status()
print(f"Compliance Score: {status.score}%")
print(f"Risk Level: {status.risk_level}")
Risk Assessment Workflow
from nis2_compliance_monitor import RiskAssessment, Vulnerability
Create assessment
risk_assessment = manager.create_risk_assessment(
asset_id="asset-001",
assessment_date="2026-03-28",
assessor="John Doe"
)
Add vulnerabilities
vuln1 = Vulnerability(
cve_id="CVE-2026-1234",
severity="high",
cvss_score=8.5,
remediation_status="pending"
)
risk_assessment.add_vulnerability(vuln1)
Score risk
score = risk_assessment.calculate_risk_score()
manager.update_risk_assessment(risk_assessment)
Incident Management
from nis2_compliance_monitor import Incident, IncidentSeverity
Report incident
incident = Incident(
title="Unauthorized Access Attempt",
description="Multiple failed login attempts detected",
severity=IncidentSeverity.MEDIUM,
affected_asset_id="asset-001",
discovered_date="2026-03-28T10:30:00Z"
)
manager.report_incident(incident)
Update incident status
manager.update_incident_status(
incident_id=incident.id,
new_status="investigating",
notes="Escalated to security team"
)
Get incident statistics
stats = manager.get_incident_statistics(days=30)
print(f"Total Incidents (30d): {stats.total_count}")
print(f"Critical Incidents: {stats.critical_count}")
Supply Chain Audit
from nis2_compliance_monitor import Vendor, VendorAssessment
Register vendor
vendor = Vendor(
name="CloudProvider Inc",
vendor_type="infrastructure",
contract_start="2025-01-01",
contract_end="2027-12-31"
)
manager.register_vendor(vendor)
Create assessment
assessment = manager.create_vendor_assessment(
vendor_id=vendor.id,
assessment_type="security_controls",
assessor="compliance_team"
)
Evaluate vendor
assessment.add_control_evaluation(
control_id="SC-001",
control_name="Data Encryption",
compliance_status="compliant",
evidence_url="https://vendor.example.com/certifications"
)
manager.save_vendor_assessment(assessment)
Gap Analysis
from nis2_compliance_monitor import GapAnalysis
Perform gap analysis
gap_analysis = manager.perform_gap_analysis(
framework="nis2",
focus_area="incident_response"
)
Review findings
for gap in gap_analysis.gaps:
print(f"Gap: {gap.requirement}")
print(f"Current State: {gap.current_state}")
print(f"Required State: {gap.required_state}")
print(f"Priority: {gap.priority}")
print(f"Estimated Effort: {gap.estimated_effort_hours}h\n")
Create remediation plan
plan = manager.create_remediation_plan(gap_analysis)
manager.track_remediation(plan)
Board Reporting
from nis2_compliance_monitor import ReportFormat
Generate board report
report = manager.generate_board_report(
reporting_period="Q1 2026",
format=ReportFormat.PDF,
include_sections=[
"executive_summary",
"compliance_metrics",
"risk_dashboard",
"incident_summary",
"remediation_progress",
"recommendations"
]
)
Save report
report.save("board_report_q1_2026.pdf")
Get executive metrics
metrics = manager.get_executive_metrics()
print(f"Overall Compliance: {metrics.compliance_percentage}%")
print(f"Critical Risks: {metrics.critical_risk_count}")
print(f"Open Incidents: {metrics.open_incident_count}")
API Reference
ComplianceManager
Main class for managing NIS2 compliance operations.
Methods
__init__(config: Config)
- Initialize the compliance manager
- Parameters:
config(Config) - Configuration object
register_asset(asset: Asset) -> str
- Register a new asset for monitoring
- Returns: Asset ID
create_risk_assessment(asset_id: str, **kwargs) -> RiskAssessment
- Create a risk assessment for an asset
- Returns: RiskAssessment object
report_incident(incident: Incident) -> str
- Report a security incident
- Returns: Incident ID
update_incident_status(incident_id: str, new_status: str, notes: str)
- Update incident status and add notes
- Raises: IncidentNotFound
register_vendor(vendor: Vendor) -> str
- Register a vendor/third-party
- Returns: Vendor ID
create_vendor_assessment(vendor_id: str, **kwargs) -> VendorAssessment
- Create vendor security assessment
- Returns: VendorAssessment object
perform_gap_analysis(framework: str, focus_area: str = None) -> GapAnalysis
- Perform compliance gap analysis
- Returns: GapAnalysis object
create_remediation_plan(gap_analysis: GapAnalysis) -> RemediationPlan
- Create remediation plan from gap analysis
- Returns: RemediationPlan object
generate_board_report(**kwargs) -> Report
- Generate executive board report
- Returns: Report object
get_compliance_status() -> ComplianceStatus
- Get current compliance status
- Returns: ComplianceStatus object
get_executive_metrics() -> ExecutiveMetrics
- Get high-level metrics for executives
- Returns: ExecutiveMetrics object
get_incident_statistics(days: int = 30) -> IncidentStats
- Get incident statistics for period
- Returns: IncidentStats object
Asset
Represents a monitored asset.
Attributes:
id: str- Unique identifiername: str- Asset nameasset_type: str- Type (server, network, application, etc.)criticality: str- Criticality level (low, medium, high, critical)location: str- Physical or logical locationowner: str- Asset ownercreated_date: datetime- Creation date
Incident
Represents a security incident.
Attributes:
id: str- Unique identifiertitle: str- Incident titledescription: str- Detailed descriptionseverity: IncidentSeverity- Severity level (low, medium, high, critical)affected_asset_id: str- Related asset IDstatus: str- Current status (reported, investigating, resolved)discovered_date: datetime- Discovery dateresolved_date: datetime- Resolution date (if applicable)
Vendor
Represents a third-party vendor.
Attributes:
id: str- Unique identifiername: str- Vendor namevendor_type: str- Type of vendor (cloud, infrastructure, software, etc.)contract_start: date- Contract start datecontract_end: date- Contract end datecontact_person: str- Primary contactassessment_status: str- Latest assessment status
RiskAssessment
Risk evaluation for an asset.
Methods:
add_vulnerability(vulnerability: Vulnerability)- Add vulnerabilitycalculate_risk_score() -> float- Calculate overall risk score (1-10)get_vulnerabilities() -> List[Vulnerability]- Get all vulnerabilities
GapAnalysis
Compliance gap analysis results.
Attributes:
id: str- Analysis IDframework: str- Compliance frameworkgaps: List[Gap]- List of identified gapscompletion_date: datetime- Analysis completion date
Report
Generated compliance report.
Methods:
save(filepath: str, format: str = None)- Save report to fileget_html() -> str- Get HTML versionget_json() -> dict- Get JSON version
Examples
Complete Compliance Workflow
from nis2_compliance_monitor import ComplianceManager, Config
1. Initialize
config = Config(
database_url="postgresql://localhost/nis2_db",
environment="production"
)
manager = ComplianceManager(config)
2. Register assets
assets = [
{"name": "Web Server", "type": "server", "criticality": "high"},
{"name": "Database Server", "type": "server", "criticality": "critical"},
{"name": "Firewall", "type": "network", "criticality": "critical"},
]
for asset_data in assets:
asset = manager.create_asset(**asset_data)
print(f"Registered: {asset.name}")
3. Perform risk assessments
for asset in manager.list_assets():
assessment = manager.create_risk_assessment(
asset_id=asset.id,
scan_date="2026-03-28"
)
score = assessment.calculate_risk_score()
print(f"{asset.name}: Risk Score {score}/10")
4. Handle incidents
recent_incidents = manager.get_incidents(days=7, status="open")
for incident in recent_incidents:
manager.update_incident_status(
incident.id,
"investigating",
"Under investigation by SOC"
)
5. Audit vendors
vendors = manager.list_vendors()
for vendor in vendors:
assessment = manager.create_vendor_assessment(
vendor_id=vendor.id
)
manager.save_vendor_assessment(assessment)
6. Gap analysis
gap_analysis = manager.perform_gap_analysis("nis2")
remediation = manager.create_remediation_plan(gap_analysis)
print(f"Found {len(gap_analysis.gaps)} compliance gaps")
7. Generate reports
board_report = manager.generate_board_report(
reporting_period="Q1 2026",
format="pdf"
)
board_report.save("q1_2026_compliance_report.pdf")
metrics = manager.get_executive_metrics()
print(f"Compliance Score: {metrics.compliance_percentage}%")
Risk Assessment Deep Dive
from nis2_compliance_monitor import RiskAssessment, Vulnerability
Create detailed assessment
assessment = manager.create_risk_assessment(
asset_id="db-server-01",
assessment_type="vulnerability_scan"
)
Add vulnerabilities
vulnerabilities = [
Vulnerability(
cve_id="CVE-2026-1001",
title="SQL Injection",
severity="critical",
cvss_score=9.8,
affected_component="application_layer"
),
Vulnerability(
cve_id="CVE-2026-1002",
title="Unpatched OS",
severity="high",
cvss_score=8.2,
affected_component="operating_system"
),
]
for vuln in vulnerabilities:
assessment.add_vulnerability(vuln)
Calculate and update
risk_score = assessment.calculate_risk_score()
assessment.set_remediation_priority("immediate" if risk_score > 8 else "standard")
manager.update_risk_assessment(assessment)
Supply Chain Compliance Check
# Register and assess critical vendor
vendor = manager.create_vendor(
name="AWS",
vendor_type="cloud_infrastructure",
criticality="critical"
)
assessment = manager.create_vendor_assessment(
vendor_id=vendor.id,
assessment_scope="security_controls"
)
Evaluate against requirements
requirements = [
("data_encryption", "AES-256 at rest and in transit"),
("incident_response", "24/7 SOC monitoring"),
("backup_recovery", "RTO 4 hours, RPO 1 hour"),
("soc2_compliance", "SOC 2 Type II certified"),
]
for req_id, requirement in requirements:
assessment.evaluate_requirement(
req_id=req_id,
requirement=requirement,
compliant=True,
evidence="AWS Compliance Portal verification"
)
manager.save_vendor_assessment(assessment)
compliance_status = assessment.get_compliance_status()
print(f"Vendor Compliance: {compliance_status.percentage}%")
Testing
Unit Tests
import pytest
from nis2_compliance_monitor import ComplianceManager, Asset, Incident
@pytest.fixture
def manager():
config = Config(database_url="sqlite:///:memory:")
return ComplianceManager(config)
def test_register_asset(manager):
asset = Asset(name="Test Server", asset_type="server")
asset_id = manager.register_asset(asset)
assert asset_id is not None
retrieved = manager.get_asset(asset_id)
assert retrieved.name == "Test Server"
def test_report_incident(manager):
incident = Incident(
title="Test Incident",
severity="high",
affected_asset_id="asset-001"
)
incident_id = manager.report_incident(incident)
assert incident_id is not None
def test_risk_assessment(manager):
assessment = manager.create_risk_assessment(asset_id="asset-001")
score = assessment.calculate_risk_score()
assert 0 <= score <= 10
def test_gap_analysis(manager):
gap_analysis = manager.perform_gap_analysis("nis2")
assert gap_analysis is not None
assert len(gap_analysis.gaps) > 0
Integration Tests
def test_complete_workflow(manager):
# Register asset
asset = Asset(name="Production Server", asset_type="server")
asset_id = manager.register_asset(asset)
# Create risk assessment
assessment = manager.create_risk_assessment(asset_id=asset_id)
risk_score = assessment.calculate_risk_score()
# Report incident
incident = Incident(
title="High Risk Finding",
severity="high" if risk_score > 7 else "medium",
affected_asset_id=asset_id
)
incident_id = manager.report_incident(incident)
# Verify
assert incident_id is not None
assert manager.get_incident(incident_id).severity == "high"
def test_vendor_assessment_workflow(manager):
vendor = manager.create_vendor(name="Test Vendor")
assessment = manager.create_vendor_assessment(vendor_id=vendor.id)
manager.save_vendor_assessment(assessment)
retrieved = manager.get_vendor_assessment(assessment.id)
assert retrieved.vendor_id == vendor.id
Running Tests
# Run all tests
pytest
Run with coverage
pytest --cov=nis2_compliance_monitor
Run specific test file
pytest tests/test_risk_assessment.py
Run with verbose output
pytest -v
Troubleshooting
Common Issues
Database Connection Error
Error: could not connect to database
Solution: Verify DATABASE_URL is correct and PostgreSQL service is running
Permission Denied on Reports
Error: Permission denied writing report
Solution: Ensure application has write permissions to report output directory
Vendor Assessment Timeout
Error: Vendor assessment request timed out
Solution: Increase timeout in config or check vendor API availability
Support
For issues, feature requests, or documentation updates:
- GitHub Issues: https://github.com/your-org/nis2-compliance-monitor/issues
- Documentation: https://nis2-compliance-monitor.readthedocs.io
- Email: compliance-support@your-org.com
Requires enterprise tier subscription