Scikit-learn으로 네트워크 장애 예측 모델 만들기
Scikit-learn으로 네트워크 장애 예측 모델 만들기
개요
네트워크 장비의 과거 데이터(CPU, 메모리, 인터페이스 에러 등)를 학습하여 장애를 미리 예측하는 머신러닝 모델을 직접 만들어봅니다.
필요한 라이브러리 설치
pip install scikit-learn pandas numpy netmiko matplotlib seaborn
1단계: 데이터 수집
네트워크 장비에서 메트릭 수집
from netmiko import ConnectHandler
import pandas as pd
from datetime import datetime
import re
class NetworkMetricsCollector:
def __init__(self, device_config):
self.device = device_config
self.connection = None
def connect(self):
"""장비 연결"""
self.connection = ConnectHandler(**self.device)
def collect_cpu_memory(self):
"""CPU 및 메모리 수집"""
output = self.connection.send_command("show processes cpu")
# CPU 파싱
cpu_match = re.search(r'CPU utilization.*five minutes: (\d+)%', output)
cpu_5min = int(cpu_match.group(1)) if cpu_match else 0
# 메모리 수집
mem_output = self.connection.send_command("show processes memory")
mem_match = re.search(r'Processor Pool Total:\s+(\d+)\s+Used:\s+(\d+)', mem_output)
if mem_match:
total_mem = int(mem_match.group(1))
used_mem = int(mem_match.group(2))
mem_percent = (used_mem / total_mem) * 100
else:
mem_percent = 0
return {
'timestamp': datetime.now(),
'cpu_5min': cpu_5min,
'memory_percent': mem_percent
}
def collect_interface_errors(self):
"""인터페이스 에러 수집"""
output = self.connection.send_command("show interfaces summary")
total_errors = 0
total_drops = 0
# 에러 파싱 (간단한 예제)
for line in output.split('
'):
if 'error' in line.lower():
error_match = re.findall(r'(\d+)', line)
if error_match:
total_errors += int(error_match[0])
return {
'interface_errors': total_errors,
'interface_drops': total_drops
}
def collect_all_metrics(self):
"""모든 메트릭 수집"""
self.connect()
cpu_mem = self.collect_cpu_memory()
interface = self.collect_interface_errors()
metrics = {**cpu_mem, **interface}
self.connection.disconnect()
return metrics
# 사용 예
device = {
'device_type': 'cisco_ios',
'host': '192.168.1.1',
'username': 'admin',
'password': 'password'
}
collector = NetworkMetricsCollector(device)
metrics = collector.collect_all_metrics()
print(metrics)
주기적 데이터 수집 및 저장
import time
import csv
def collect_training_data(devices, duration_days=30, interval_minutes=5):
"""학습 데이터 수집 (30일간 5분마다)"""
data = []
total_iterations = (duration_days * 24 * 60) // interval_minutes
for i in range(total_iterations):
for device_config in devices:
try:
collector = NetworkMetricsCollector(device_config)
metrics = collector.collect_all_metrics()
# 장애 발생 여부 (실제로는 이벤트 로그에서 가져옴)
metrics['failure_occurred'] = check_failure_in_next_hour(device_config)
data.append(metrics)
except Exception as e:
print(f"수집 실패: {e}")
print(f"진행률: {i+1}/{total_iterations}")
time.sleep(interval_minutes * 60)
# CSV로 저장
df = pd.DataFrame(data)
df.to_csv('network_metrics.csv', index=False)
print(f"데이터 수집 완료: {len(data)}개 샘플")
def check_failure_in_next_hour(device_config):
"""다음 1시간 이내 장애 발생 여부 확인 (실제로는 로그 분석)"""
# 실제 구현에서는 syslog나 이벤트 로그를 확인
# 여기서는 예제를 위한 더미 데이터
return False # or True
2단계: 데이터 전처리
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
def load_and_preprocess_data(csv_path='network_metrics.csv'):
"""데이터 로드 및 전처리"""
# 데이터 로드
df = pd.read_csv(csv_path)
# 타임스탬프를 datetime으로 변환
df['timestamp'] = pd.to_datetime(df['timestamp'])
# 시간 기반 특징 추가
df['hour'] = df['timestamp'].dt.hour
df['day_of_week'] = df['timestamp'].dt.dayofweek
df['is_business_hour'] = ((df['hour'] >= 9) & (df['hour'] <= 18)).astype(int)
# 이동 평균 (추세 파악)
df['cpu_ma_1h'] = df['cpu_5min'].rolling(window=12).mean() # 1시간 이동평균
df['memory_ma_1h'] = df['memory_percent'].rolling(window=12).mean()
# 변화율 (급격한 변화 감지)
df['cpu_change'] = df['cpu_5min'].diff()
df['memory_change'] = df['memory_percent'].diff()
# 결측치 제거
df = df.dropna()
print(f"전처리 완료: {len(df)}개 샘플")
print(f"장애 발생: {df['failure_occurred'].sum()}건")
print(f"정상: {(~df['failure_occurred']).sum()}건")
return df
# 데이터 로드
df = load_and_preprocess_data()
# 피처와 타겟 분리
feature_columns = [
'cpu_5min', 'memory_percent', 'interface_errors', 'interface_drops',
'hour', 'day_of_week', 'is_business_hour',
'cpu_ma_1h', 'memory_ma_1h', 'cpu_change', 'memory_change'
]
X = df[feature_columns]
y = df['failure_occurred']
# 학습/테스트 데이터 분리 (80:20)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# 데이터 정규화
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
print(f"
학습 데이터: {len(X_train)}개")
print(f"테스트 데이터: {len(X_test)}개")
3단계: 머신러닝 모델 학습
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score
import matplotlib.pyplot as plt
import seaborn as sns
def train_models(X_train, y_train, X_test, y_test):
"""여러 모델 학습 및 비교"""
models = {
'Random Forest': RandomForestClassifier(
n_estimators=100,
max_depth=10,
random_state=42
),
'Gradient Boosting': GradientBoostingClassifier(
n_estimators=100,
learning_rate=0.1,
random_state=42
),
'Logistic Regression': LogisticRegression(
max_iter=1000,
random_state=42
)
}
results = {}
for name, model in models.items():
print(f"
{'='*60}")
print(f"모델: {name}")
print(f"{'='*60}")
# 학습
model.fit(X_train, y_train)
# 예측
y_pred = model.predict(X_test)
y_pred_proba = model.predict_proba(X_test)[:, 1]
# 평가
print("
분류 리포트:")
print(classification_report(y_test, y_pred))
# ROC-AUC 스코어
roc_score = roc_auc_score(y_test, y_pred_proba)
print(f"ROC-AUC Score: {roc_score:.4f}")
# Confusion Matrix
cm = confusion_matrix(y_test, y_pred)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.title(f'{name} - Confusion Matrix')
plt.ylabel('실제')
plt.xlabel('예측')
plt.savefig(f'{name.lower().replace(" ", "_")}_confusion_matrix.png')
results[name] = {
'model': model,
'roc_auc': roc_score,
'confusion_matrix': cm
}
return results
# 모델 학습
results = train_models(X_train_scaled, y_train, X_test_scaled, y_test)
# 최고 성능 모델 선택
best_model_name = max(results, key=lambda x: results[x]['roc_auc'])
best_model = results[best_model_name]['model']
print(f"
최고 성능 모델: {best_model_name}")
print(f"ROC-AUC: {results[best_model_name]['roc_auc']:.4f}")
4단계: 특성 중요도 분석
def analyze_feature_importance(model, feature_names):
"""특성 중요도 분석"""
if hasattr(model, 'feature_importances_'):
importances = model.feature_importances_
# 중요도 순으로 정렬
indices = np.argsort(importances)[::-1]
print("
특성 중요도:")
for i in range(len(feature_names)):
print(f"{i+1}. {feature_names[indices[i]]}: {importances[indices[i]]:.4f}")
# 시각화
plt.figure(figsize=(10, 6))
plt.title('Feature Importance')
plt.bar(range(len(importances)), importances[indices])
plt.xticks(range(len(importances)),
[feature_names[i] for i in indices],
rotation=45, ha='right')
plt.tight_layout()
plt.savefig('feature_importance.png')
plt.show()
# 특성 중요도 분석
analyze_feature_importance(best_model, feature_columns)
5단계: 실시간 장애 예측 시스템
import pickle
from datetime import datetime, timedelta
class FailurePredictionSystem:
def __init__(self, model, scaler, feature_columns):
self.model = model
self.scaler = scaler
self.feature_columns = feature_columns
self.history = []
def predict_failure(self, current_metrics):
"""장애 발생 확률 예측"""
# 피처 준비
features = {}
for col in self.feature_columns:
if col in current_metrics:
features[col] = current_metrics[col]
else:
# 계산 필요한 피처 (이동평균 등)
features[col] = self._calculate_feature(col, current_metrics)
# DataFrame으로 변환
X = pd.DataFrame([features])[self.feature_columns]
# 정규화
X_scaled = self.scaler.transform(X)
# 예측
failure_prob = self.model.predict_proba(X_scaled)[0][1]
return {
'failure_probability': failure_prob,
'risk_level': self._get_risk_level(failure_prob),
'timestamp': datetime.now(),
'metrics': current_metrics
}
def _calculate_feature(self, feature_name, current_metrics):
"""동적 피처 계산"""
if 'ma_' in feature_name: # 이동평균
# 최근 히스토리 사용
base_metric = feature_name.split('_')[0]
if len(self.history) >= 12:
values = [h[base_metric] for h in self.history[-12:]]
return np.mean(values)
return current_metrics.get(base_metric.replace('_5min', ''), 0)
elif 'change' in feature_name: # 변화율
base_metric = feature_name.replace('_change', '')
if len(self.history) >= 2:
prev_value = self.history[-1].get(base_metric, 0)
curr_value = current_metrics.get(base_metric.replace('_5min', ''), 0)
return curr_value - prev_value
return 0
return 0
def _get_risk_level(self, probability):
"""위험도 레벨 계산"""
if probability >= 0.8:
return 'CRITICAL'
elif probability >= 0.6:
return 'HIGH'
elif probability >= 0.4:
return 'MEDIUM'
else:
return 'LOW'
def monitor_device(self, device_config, alert_threshold=0.7):
"""장비 모니터링"""
collector = NetworkMetricsCollector(device_config)
while True:
try:
# 메트릭 수집
metrics = collector.collect_all_metrics()
self.history.append(metrics)
# 장애 예측
prediction = self.predict_failure(metrics)
print(f"
{'='*60}")
print(f"시간: {prediction['timestamp']}")
print(f"장애 확률: {prediction['failure_probability']:.2%}")
print(f"위험도: {prediction['risk_level']}")
print(f"CPU: {metrics['cpu_5min']}%")
print(f"메모리: {metrics['memory_percent']:.1f}%")
# 알림 발송
if prediction['failure_probability'] >= alert_threshold:
self.send_alert(prediction)
# 히스토리 정리 (최근 100개만 유지)
if len(self.history) > 100:
self.history = self.history[-100:]
# 5분 대기
time.sleep(300)
except KeyboardInterrupt:
print("
모니터링 종료")
break
except Exception as e:
print(f"오류 발생: {e}")
time.sleep(60)
def send_alert(self, prediction):
"""알림 발송"""
print(f"
🚨 {'='*60}")
print(f"장애 예측 경보!")
print(f"확률: {prediction['failure_probability']:.2%}")
print(f"위험도: {prediction['risk_level']}")
print(f"{'='*60}")
# 실제 구현: Slack, Email, SMS 등
# send_slack_alert(prediction)
# send_email_alert(prediction)
# 모델 저장
with open('failure_prediction_model.pkl', 'wb') as f:
pickle.dump({
'model': best_model,
'scaler': scaler,
'feature_columns': feature_columns
}, f)
print("모델 저장 완료: failure_prediction_model.pkl")
# 실시간 모니터링 시작
prediction_system = FailurePredictionSystem(best_model, scaler, feature_columns)
device = {
'device_type': 'cisco_ios',
'host': '192.168.1.1',
'username': 'admin',
'password': 'password'
}
# prediction_system.monitor_device(device, alert_threshold=0.7)
6단계: 대시보드 시각화
import dash
from dash import dcc, html
from dash.dependencies import Input, Output
import plotly.graph_objs as go
app = dash.Dash(__name__)
# 전역 변수
prediction_history = []
app.layout = html.Div([
html.H1('네트워크 장애 예측 대시보드'),
dcc.Graph(id='live-graph'),
dcc.Graph(id='probability-gauge'),
dcc.Interval(
id='interval-component',
interval=60*1000, # 1분마다 업데이트
n_intervals=0
)
])
@app.callback(
Output('live-graph', 'figure'),
Input('interval-component', 'n_intervals')
)
def update_graph(n):
# 실시간 데이터 수집
# metrics = collector.collect_all_metrics()
# prediction = prediction_system.predict_failure(metrics)
# prediction_history.append(prediction)
# 그래프 생성
fig = go.Figure()
if prediction_history:
times = [p['timestamp'] for p in prediction_history[-50:]]
probs = [p['failure_probability'] for p in prediction_history[-50:]]
fig.add_trace(go.Scatter(
x=times,
y=probs,
mode='lines+markers',
name='장애 확률'
))
fig.update_layout(
title='장애 발생 확률 추이',
xaxis_title='시간',
yaxis_title='확률',
yaxis=dict(range=[0, 1])
)
return fig
@app.callback(
Output('probability-gauge', 'figure'),
Input('interval-component', 'n_intervals')
)
def update_gauge(n):
current_prob = prediction_history[-1]['failure_probability'] if prediction_history else 0
fig = go.Figure(go.Indicator(
mode="gauge+number",
value=current_prob * 100,
domain={'x': [0, 1], 'y': [0, 1]},
title={'text': "현재 장애 위험도 (%)"},
gauge={
'axis': {'range': [None, 100]},
'bar': {'color': "darkblue"},
'steps': [
{'range': [0, 40], 'color': "lightgreen"},
{'range': [40, 60], 'color': "yellow"},
{'range': [60, 80], 'color': "orange"},
{'range': [80, 100], 'color': "red"}
],
'threshold': {
'line': {'color': "red", 'width': 4},
'thickness': 0.75,
'value': 70
}
}
))
return fig
# 대시보드 실행
# app.run_server(debug=True, host='0.0.0.0', port=8050)
성능 개선 팁
1. 데이터 불균형 처리
from imblearn.over_sampling import SMOTE
# SMOTE로 소수 클래스 오버샘플링
smote = SMOTE(random_state=42)
X_train_balanced, y_train_balanced = smote.fit_resample(X_train_scaled, y_train)
print(f"오버샘플링 전: {y_train.value_counts()}")
print(f"오버샘플링 후: {pd.Series(y_train_balanced).value_counts()}")
2. 하이퍼파라미터 튜닝
from sklearn.model_selection import GridSearchCV
param_grid = {
'n_estimators': [50, 100, 200],
'max_depth': [5, 10, 15],
'min_samples_split': [2, 5, 10]
}
grid_search = GridSearchCV(
RandomForestClassifier(random_state=42),
param_grid,
cv=5,
scoring='roc_auc',
n_jobs=-1
)
grid_search.fit(X_train_scaled, y_train)
print(f"최적 파라미터: {grid_search.best_params_}")
print(f"최고 ROC-AUC: {grid_search.best_score_:.4f}")
best_model = grid_search.best_estimator_
3. 앙상블 모델
from sklearn.ensemble import VotingClassifier
# 여러 모델 앙상블
ensemble = VotingClassifier(
estimators=[
('rf', RandomForestClassifier(n_estimators=100, random_state=42)),
('gb', GradientBoostingClassifier(n_estimators=100, random_state=42)),
('lr', LogisticRegression(max_iter=1000, random_state=42))
],
voting='soft' # 확률 평균
)
ensemble.fit(X_train_scaled, y_train)
y_pred = ensemble.predict(X_test_scaled)
print("앙상블 모델 성능:")
print(classification_report(y_test, y_pred))
실전 배포 예제
# Flask API로 배포
from flask import Flask, request, jsonify
app = Flask(__name__)
# 모델 로드
with open('failure_prediction_model.pkl', 'rb') as f:
model_data = pickle.load(f)
model = model_data['model']
scaler = model_data['scaler']
features = model_data['feature_columns']
@app.route('/predict', methods=['POST'])
def predict():
"""장애 예측 API"""
try:
metrics = request.json
# 피처 준비
X = pd.DataFrame([metrics])[features]
X_scaled = scaler.transform(X)
# 예측
prob = model.predict_proba(X_scaled)[0][1]
return jsonify({
'failure_probability': float(prob),
'risk_level': 'CRITICAL' if prob >= 0.8 else 'HIGH' if prob >= 0.6 else 'MEDIUM' if prob >= 0.4 else 'LOW',
'timestamp': datetime.now().isoformat()
})
except Exception as e:
return jsonify({'error': str(e)}), 400
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
요약
이 튜토리얼에서 배운 내용: 1. ✅ Netmiko로 네트워크 메트릭 수집 2. ✅ Pandas로 데이터 전처리 3. ✅ Scikit-learn으로 머신러닝 모델 학습 4. ✅ 실시간 장애 예측 시스템 구축 5. ✅ Dash로 대시보드 시각화 6. ✅ Flask API로 배포
이 시스템을 통해 네트워크 장애를 사전에 감지하고 대응할 수 있습니다!