Scikit-learn으로 네트워크 장애 예측 모델 만들기

25 조회 2025-11-17 튜토리얼

Scikit-learn으로 네트워크 장애 예측 모델 만들기

개요

네트워크 장비의 과거 데이터(CPU, 메모리, 인터페이스 에러 등)를 학습하여 장애를 미리 예측하는 머신러닝 모델을 직접 만들어봅니다.

필요한 라이브러리 설치

pip install scikit-learn pandas numpy netmiko matplotlib seaborn

1단계: 데이터 수집

네트워크 장비에서 메트릭 수집

from netmiko import ConnectHandler
import pandas as pd
from datetime import datetime
import re

class NetworkMetricsCollector:
    def __init__(self, device_config):
        self.device = device_config
        self.connection = None

    def connect(self):
        """장비 연결"""
        self.connection = ConnectHandler(**self.device)

    def collect_cpu_memory(self):
        """CPU 및 메모리 수집"""
        output = self.connection.send_command("show processes cpu")

        # CPU 파싱
        cpu_match = re.search(r'CPU utilization.*five minutes: (\d+)%', output)
        cpu_5min = int(cpu_match.group(1)) if cpu_match else 0

        # 메모리 수집
        mem_output = self.connection.send_command("show processes memory")
        mem_match = re.search(r'Processor Pool Total:\s+(\d+)\s+Used:\s+(\d+)', mem_output)

        if mem_match:
            total_mem = int(mem_match.group(1))
            used_mem = int(mem_match.group(2))
            mem_percent = (used_mem / total_mem) * 100
        else:
            mem_percent = 0

        return {
            'timestamp': datetime.now(),
            'cpu_5min': cpu_5min,
            'memory_percent': mem_percent
        }

    def collect_interface_errors(self):
        """인터페이스 에러 수집"""
        output = self.connection.send_command("show interfaces summary")

        total_errors = 0
        total_drops = 0

        # 에러 파싱 (간단한 예제)
        for line in output.split('
'):
            if 'error' in line.lower():
                error_match = re.findall(r'(\d+)', line)
                if error_match:
                    total_errors += int(error_match[0])

        return {
            'interface_errors': total_errors,
            'interface_drops': total_drops
        }

    def collect_all_metrics(self):
        """모든 메트릭 수집"""
        self.connect()

        cpu_mem = self.collect_cpu_memory()
        interface = self.collect_interface_errors()

        metrics = {**cpu_mem, **interface}

        self.connection.disconnect()
        return metrics

# 사용 예
device = {
    'device_type': 'cisco_ios',
    'host': '192.168.1.1',
    'username': 'admin',
    'password': 'password'
}

collector = NetworkMetricsCollector(device)
metrics = collector.collect_all_metrics()
print(metrics)

주기적 데이터 수집 및 저장

import time
import csv

def collect_training_data(devices, duration_days=30, interval_minutes=5):
    """학습 데이터 수집 (30일간 5분마다)"""

    data = []

    total_iterations = (duration_days * 24 * 60) // interval_minutes

    for i in range(total_iterations):
        for device_config in devices:
            try:
                collector = NetworkMetricsCollector(device_config)
                metrics = collector.collect_all_metrics()

                # 장애 발생 여부 (실제로는 이벤트 로그에서 가져옴)
                metrics['failure_occurred'] = check_failure_in_next_hour(device_config)

                data.append(metrics)

            except Exception as e:
                print(f"수집 실패: {e}")

        print(f"진행률: {i+1}/{total_iterations}")
        time.sleep(interval_minutes * 60)

    # CSV로 저장
    df = pd.DataFrame(data)
    df.to_csv('network_metrics.csv', index=False)
    print(f"데이터 수집 완료: {len(data)}개 샘플")

def check_failure_in_next_hour(device_config):
    """다음 1시간 이내 장애 발생 여부 확인 (실제로는 로그 분석)"""
    # 실제 구현에서는 syslog나 이벤트 로그를 확인
    # 여기서는 예제를 위한 더미 데이터
    return False  # or True

2단계: 데이터 전처리

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

def load_and_preprocess_data(csv_path='network_metrics.csv'):
    """데이터 로드 및 전처리"""

    # 데이터 로드
    df = pd.read_csv(csv_path)

    # 타임스탬프를 datetime으로 변환
    df['timestamp'] = pd.to_datetime(df['timestamp'])

    # 시간 기반 특징 추가
    df['hour'] = df['timestamp'].dt.hour
    df['day_of_week'] = df['timestamp'].dt.dayofweek
    df['is_business_hour'] = ((df['hour'] >= 9) & (df['hour'] <= 18)).astype(int)

    # 이동 평균 (추세 파악)
    df['cpu_ma_1h'] = df['cpu_5min'].rolling(window=12).mean()  # 1시간 이동평균
    df['memory_ma_1h'] = df['memory_percent'].rolling(window=12).mean()

    # 변화율 (급격한 변화 감지)
    df['cpu_change'] = df['cpu_5min'].diff()
    df['memory_change'] = df['memory_percent'].diff()

    # 결측치 제거
    df = df.dropna()

    print(f"전처리 완료: {len(df)}개 샘플")
    print(f"장애 발생: {df['failure_occurred'].sum()}건")
    print(f"정상: {(~df['failure_occurred']).sum()}건")

    return df

# 데이터 로드
df = load_and_preprocess_data()

# 피처와 타겟 분리
feature_columns = [
    'cpu_5min', 'memory_percent', 'interface_errors', 'interface_drops',
    'hour', 'day_of_week', 'is_business_hour',
    'cpu_ma_1h', 'memory_ma_1h', 'cpu_change', 'memory_change'
]

X = df[feature_columns]
y = df['failure_occurred']

# 학습/테스트 데이터 분리 (80:20)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

# 데이터 정규화
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

print(f"
학습 데이터: {len(X_train)}개")
print(f"테스트 데이터: {len(X_test)}개")

3단계: 머신러닝 모델 학습

from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score
import matplotlib.pyplot as plt
import seaborn as sns

def train_models(X_train, y_train, X_test, y_test):
    """여러 모델 학습 및 비교"""

    models = {
        'Random Forest': RandomForestClassifier(
            n_estimators=100,
            max_depth=10,
            random_state=42
        ),
        'Gradient Boosting': GradientBoostingClassifier(
            n_estimators=100,
            learning_rate=0.1,
            random_state=42
        ),
        'Logistic Regression': LogisticRegression(
            max_iter=1000,
            random_state=42
        )
    }

    results = {}

    for name, model in models.items():
        print(f"
{'='*60}")
        print(f"모델: {name}")
        print(f"{'='*60}")

        # 학습
        model.fit(X_train, y_train)

        # 예측
        y_pred = model.predict(X_test)
        y_pred_proba = model.predict_proba(X_test)[:, 1]

        # 평가
        print("
분류 리포트:")
        print(classification_report(y_test, y_pred))

        # ROC-AUC 스코어
        roc_score = roc_auc_score(y_test, y_pred_proba)
        print(f"ROC-AUC Score: {roc_score:.4f}")

        # Confusion Matrix
        cm = confusion_matrix(y_test, y_pred)
        plt.figure(figsize=(8, 6))
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
        plt.title(f'{name} - Confusion Matrix')
        plt.ylabel('실제')
        plt.xlabel('예측')
        plt.savefig(f'{name.lower().replace(" ", "_")}_confusion_matrix.png')

        results[name] = {
            'model': model,
            'roc_auc': roc_score,
            'confusion_matrix': cm
        }

    return results

# 모델 학습
results = train_models(X_train_scaled, y_train, X_test_scaled, y_test)

# 최고 성능 모델 선택
best_model_name = max(results, key=lambda x: results[x]['roc_auc'])
best_model = results[best_model_name]['model']

print(f"
최고 성능 모델: {best_model_name}")
print(f"ROC-AUC: {results[best_model_name]['roc_auc']:.4f}")

4단계: 특성 중요도 분석

def analyze_feature_importance(model, feature_names):
    """특성 중요도 분석"""

    if hasattr(model, 'feature_importances_'):
        importances = model.feature_importances_

        # 중요도 순으로 정렬
        indices = np.argsort(importances)[::-1]

        print("
특성 중요도:")
        for i in range(len(feature_names)):
            print(f"{i+1}. {feature_names[indices[i]]}: {importances[indices[i]]:.4f}")

        # 시각화
        plt.figure(figsize=(10, 6))
        plt.title('Feature Importance')
        plt.bar(range(len(importances)), importances[indices])
        plt.xticks(range(len(importances)),
                   [feature_names[i] for i in indices],
                   rotation=45, ha='right')
        plt.tight_layout()
        plt.savefig('feature_importance.png')
        plt.show()

# 특성 중요도 분석
analyze_feature_importance(best_model, feature_columns)

5단계: 실시간 장애 예측 시스템

import pickle
from datetime import datetime, timedelta

class FailurePredictionSystem:
    def __init__(self, model, scaler, feature_columns):
        self.model = model
        self.scaler = scaler
        self.feature_columns = feature_columns
        self.history = []

    def predict_failure(self, current_metrics):
        """장애 발생 확률 예측"""

        # 피처 준비
        features = {}
        for col in self.feature_columns:
            if col in current_metrics:
                features[col] = current_metrics[col]
            else:
                # 계산 필요한 피처 (이동평균 등)
                features[col] = self._calculate_feature(col, current_metrics)

        # DataFrame으로 변환
        X = pd.DataFrame([features])[self.feature_columns]

        # 정규화
        X_scaled = self.scaler.transform(X)

        # 예측
        failure_prob = self.model.predict_proba(X_scaled)[0][1]

        return {
            'failure_probability': failure_prob,
            'risk_level': self._get_risk_level(failure_prob),
            'timestamp': datetime.now(),
            'metrics': current_metrics
        }

    def _calculate_feature(self, feature_name, current_metrics):
        """동적 피처 계산"""

        if 'ma_' in feature_name:  # 이동평균
            # 최근 히스토리 사용
            base_metric = feature_name.split('_')[0]
            if len(self.history) >= 12:
                values = [h[base_metric] for h in self.history[-12:]]
                return np.mean(values)
            return current_metrics.get(base_metric.replace('_5min', ''), 0)

        elif 'change' in feature_name:  # 변화율
            base_metric = feature_name.replace('_change', '')
            if len(self.history) >= 2:
                prev_value = self.history[-1].get(base_metric, 0)
                curr_value = current_metrics.get(base_metric.replace('_5min', ''), 0)
                return curr_value - prev_value
            return 0

        return 0

    def _get_risk_level(self, probability):
        """위험도 레벨 계산"""
        if probability >= 0.8:
            return 'CRITICAL'
        elif probability >= 0.6:
            return 'HIGH'
        elif probability >= 0.4:
            return 'MEDIUM'
        else:
            return 'LOW'

    def monitor_device(self, device_config, alert_threshold=0.7):
        """장비 모니터링"""

        collector = NetworkMetricsCollector(device_config)

        while True:
            try:
                # 메트릭 수집
                metrics = collector.collect_all_metrics()
                self.history.append(metrics)

                # 장애 예측
                prediction = self.predict_failure(metrics)

                print(f"
{'='*60}")
                print(f"시간: {prediction['timestamp']}")
                print(f"장애 확률: {prediction['failure_probability']:.2%}")
                print(f"위험도: {prediction['risk_level']}")
                print(f"CPU: {metrics['cpu_5min']}%")
                print(f"메모리: {metrics['memory_percent']:.1f}%")

                # 알림 발송
                if prediction['failure_probability'] >= alert_threshold:
                    self.send_alert(prediction)

                # 히스토리 정리 (최근 100개만 유지)
                if len(self.history) > 100:
                    self.history = self.history[-100:]

                # 5분 대기
                time.sleep(300)

            except KeyboardInterrupt:
                print("
모니터링 종료")
                break
            except Exception as e:
                print(f"오류 발생: {e}")
                time.sleep(60)

    def send_alert(self, prediction):
        """알림 발송"""
        print(f"
🚨 {'='*60}")
        print(f"장애 예측 경보!")
        print(f"확률: {prediction['failure_probability']:.2%}")
        print(f"위험도: {prediction['risk_level']}")
        print(f"{'='*60}")

        # 실제 구현: Slack, Email, SMS 등
        # send_slack_alert(prediction)
        # send_email_alert(prediction)

# 모델 저장
with open('failure_prediction_model.pkl', 'wb') as f:
    pickle.dump({
        'model': best_model,
        'scaler': scaler,
        'feature_columns': feature_columns
    }, f)

print("모델 저장 완료: failure_prediction_model.pkl")

# 실시간 모니터링 시작
prediction_system = FailurePredictionSystem(best_model, scaler, feature_columns)

device = {
    'device_type': 'cisco_ios',
    'host': '192.168.1.1',
    'username': 'admin',
    'password': 'password'
}

# prediction_system.monitor_device(device, alert_threshold=0.7)

6단계: 대시보드 시각화

import dash
from dash import dcc, html
from dash.dependencies import Input, Output
import plotly.graph_objs as go

app = dash.Dash(__name__)

# 전역 변수
prediction_history = []

app.layout = html.Div([
    html.H1('네트워크 장애 예측 대시보드'),

    dcc.Graph(id='live-graph'),
    dcc.Graph(id='probability-gauge'),

    dcc.Interval(
        id='interval-component',
        interval=60*1000,  # 1분마다 업데이트
        n_intervals=0
    )
])

@app.callback(
    Output('live-graph', 'figure'),
    Input('interval-component', 'n_intervals')
)
def update_graph(n):
    # 실시간 데이터 수집
    # metrics = collector.collect_all_metrics()
    # prediction = prediction_system.predict_failure(metrics)
    # prediction_history.append(prediction)

    # 그래프 생성
    fig = go.Figure()

    if prediction_history:
        times = [p['timestamp'] for p in prediction_history[-50:]]
        probs = [p['failure_probability'] for p in prediction_history[-50:]]

        fig.add_trace(go.Scatter(
            x=times,
            y=probs,
            mode='lines+markers',
            name='장애 확률'
        ))

    fig.update_layout(
        title='장애 발생 확률 추이',
        xaxis_title='시간',
        yaxis_title='확률',
        yaxis=dict(range=[0, 1])
    )

    return fig

@app.callback(
    Output('probability-gauge', 'figure'),
    Input('interval-component', 'n_intervals')
)
def update_gauge(n):
    current_prob = prediction_history[-1]['failure_probability'] if prediction_history else 0

    fig = go.Figure(go.Indicator(
        mode="gauge+number",
        value=current_prob * 100,
        domain={'x': [0, 1], 'y': [0, 1]},
        title={'text': "현재 장애 위험도 (%)"},
        gauge={
            'axis': {'range': [None, 100]},
            'bar': {'color': "darkblue"},
            'steps': [
                {'range': [0, 40], 'color': "lightgreen"},
                {'range': [40, 60], 'color': "yellow"},
                {'range': [60, 80], 'color': "orange"},
                {'range': [80, 100], 'color': "red"}
            ],
            'threshold': {
                'line': {'color': "red", 'width': 4},
                'thickness': 0.75,
                'value': 70
            }
        }
    ))

    return fig

# 대시보드 실행
# app.run_server(debug=True, host='0.0.0.0', port=8050)

성능 개선 팁

1. 데이터 불균형 처리

from imblearn.over_sampling import SMOTE

# SMOTE로 소수 클래스 오버샘플링
smote = SMOTE(random_state=42)
X_train_balanced, y_train_balanced = smote.fit_resample(X_train_scaled, y_train)

print(f"오버샘플링 전: {y_train.value_counts()}")
print(f"오버샘플링 후: {pd.Series(y_train_balanced).value_counts()}")

2. 하이퍼파라미터 튜닝

from sklearn.model_selection import GridSearchCV

param_grid = {
    'n_estimators': [50, 100, 200],
    'max_depth': [5, 10, 15],
    'min_samples_split': [2, 5, 10]
}

grid_search = GridSearchCV(
    RandomForestClassifier(random_state=42),
    param_grid,
    cv=5,
    scoring='roc_auc',
    n_jobs=-1
)

grid_search.fit(X_train_scaled, y_train)

print(f"최적 파라미터: {grid_search.best_params_}")
print(f"최고 ROC-AUC: {grid_search.best_score_:.4f}")

best_model = grid_search.best_estimator_

3. 앙상블 모델

from sklearn.ensemble import VotingClassifier

# 여러 모델 앙상블
ensemble = VotingClassifier(
    estimators=[
        ('rf', RandomForestClassifier(n_estimators=100, random_state=42)),
        ('gb', GradientBoostingClassifier(n_estimators=100, random_state=42)),
        ('lr', LogisticRegression(max_iter=1000, random_state=42))
    ],
    voting='soft'  # 확률 평균
)

ensemble.fit(X_train_scaled, y_train)
y_pred = ensemble.predict(X_test_scaled)

print("앙상블 모델 성능:")
print(classification_report(y_test, y_pred))

실전 배포 예제

# Flask API로 배포
from flask import Flask, request, jsonify

app = Flask(__name__)

# 모델 로드
with open('failure_prediction_model.pkl', 'rb') as f:
    model_data = pickle.load(f)
    model = model_data['model']
    scaler = model_data['scaler']
    features = model_data['feature_columns']

@app.route('/predict', methods=['POST'])
def predict():
    """장애 예측 API"""
    try:
        metrics = request.json

        # 피처 준비
        X = pd.DataFrame([metrics])[features]
        X_scaled = scaler.transform(X)

        # 예측
        prob = model.predict_proba(X_scaled)[0][1]

        return jsonify({
            'failure_probability': float(prob),
            'risk_level': 'CRITICAL' if prob >= 0.8 else 'HIGH' if prob >= 0.6 else 'MEDIUM' if prob >= 0.4 else 'LOW',
            'timestamp': datetime.now().isoformat()
        })

    except Exception as e:
        return jsonify({'error': str(e)}), 400

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

요약

이 튜토리얼에서 배운 내용: 1. ✅ Netmiko로 네트워크 메트릭 수집 2. ✅ Pandas로 데이터 전처리 3. ✅ Scikit-learn으로 머신러닝 모델 학습 4. ✅ 실시간 장애 예측 시스템 구축 5. ✅ Dash로 대시보드 시각화 6. ✅ Flask API로 배포

이 시스템을 통해 네트워크 장애를 사전에 감지하고 대응할 수 있습니다!