LSTM으로 네트워크 트래픽 패턴 분석 및 이상 탐지
LSTM으로 네트워크 트래픽 패턴 분석 및 이상 탐지
개요
LSTM(Long Short-Term Memory) 신경망을 사용하여 네트워크 트래픽의 시계열 패턴을 학습하고, 정상 패턴에서 벗어나는 이상 트래픽을 자동으로 탐지하는 시스템을 만듭니다.
필요한 라이브러리
pip install tensorflow keras pandas numpy matplotlib seaborn netmiko scapy
1단계: 네트워크 트래픽 데이터 수집
SNMP로 인터페이스 트래픽 수집
from pysnmp.hlapi import *
import pandas as pd
from datetime import datetime
import time
class SNMPTrafficCollector:
def __init__(self, target, community='public'):
self.target = target
self.community = community
# OID 정의
self.ifInOctets_OID = '1.3.6.1.2.1.2.2.1.10'
self.ifOutOctets_OID = '1.3.6.1.2.1.2.2.1.16'
def get_interface_traffic(self, interface_index):
"""특정 인터페이스의 트래픽 조회"""
# InOctets
in_iterator = getCmd(
SnmpEngine(),
CommunityData(self.community),
UdpTransportTarget((self.target, 161)),
ContextData(),
ObjectType(ObjectIdentity(f'{self.ifInOctets_OID}.{interface_index}'))
)
errorIndication, errorStatus, errorIndex, varBinds = next(in_iterator)
in_octets = int(varBinds[0][1])
# OutOctets
out_iterator = getCmd(
SnmpEngine(),
CommunityData(self.community),
UdpTransportTarget((self.target, 161)),
ContextData(),
ObjectType(ObjectIdentity(f'{self.ifOutOctets_OID}.{interface_index}'))
)
errorIndication, errorStatus, errorIndex, varBinds = next(out_iterator)
out_octets = int(varBinds[0][1])
return {
'timestamp': datetime.now(),
'in_octets': in_octets,
'out_octets': out_octets,
'total_octets': in_octets + out_octets
}
def collect_timeseries(self, interface_index, duration_minutes=60, interval_seconds=10):
"""시계열 데이터 수집"""
data = []
iterations = (duration_minutes * 60) // interval_seconds
prev_in = None
prev_out = None
for i in range(iterations):
traffic = self.get_interface_traffic(interface_index)
# 초당 전송량 계산 (bps)
if prev_in is not None:
in_bps = ((traffic['in_octets'] - prev_in) * 8) / interval_seconds
out_bps = ((traffic['out_octets'] - prev_out) * 8) / interval_seconds
data.append({
'timestamp': traffic['timestamp'],
'in_bps': in_bps,
'out_bps': out_bps,
'total_bps': in_bps + out_bps
})
prev_in = traffic['in_octets']
prev_out = traffic['out_octets']
print(f"수집 진행: {i+1}/{iterations}")
time.sleep(interval_seconds)
df = pd.DataFrame(data)
df.to_csv('traffic_data.csv', index=False)
return df
# 사용 예
collector = SNMPTrafficCollector('192.168.1.1')
df = collector.collect_timeseries(interface_index=1, duration_minutes=1440) # 24시간
NetFlow/sFlow 데이터 수집 (선택)
from scapy.all import *
def capture_netflow(interface='eth0', count=1000):
"""NetFlow 패킷 캡처"""
packets = sniff(
iface=interface,
filter='udp port 2055', # NetFlow port
count=count
)
flows = []
for pkt in packets:
if pkt.haslayer(UDP):
# NetFlow 파싱 (간단한 예제)
flows.append({
'timestamp': datetime.now(),
'src_ip': pkt[IP].src if IP in pkt else None,
'dst_ip': pkt[IP].dst if IP in pkt else None,
'protocol': pkt[IP].proto if IP in pkt else None,
'bytes': len(pkt)
})
return pd.DataFrame(flows)
2단계: 데이터 전처리 및 시퀀스 생성
import numpy as np
from sklearn.preprocessing import MinMaxScaler
def preprocess_traffic_data(df, sequence_length=50):
"""트래픽 데이터 전처리"""
# 타임스탬프 인덱스 설정
df['timestamp'] = pd.to_datetime(df['timestamp'])
df = df.set_index('timestamp')
# 결측치 처리
df = df.fillna(method='ffill')
# 시간 기반 특징 추가
df['hour'] = df.index.hour
df['day_of_week'] = df.index.dayofweek
df['is_weekend'] = (df['day_of_week'] >= 5).astype(int)
# 이동 평균 (노이즈 제거)
df['in_bps_ma'] = df['in_bps'].rolling(window=6).mean()
df['out_bps_ma'] = df['out_bps'].rolling(window=6).mean()
# 표준편차 (변동성)
df['in_bps_std'] = df['in_bps'].rolling(window=6).std()
df['out_bps_std'] = df['out_bps'].rolling(window=6).std()
# 결측치 제거
df = df.dropna()
return df
def create_sequences(data, sequence_length=50):
"""LSTM 입력용 시퀀스 생성"""
sequences = []
targets = []
for i in range(len(data) - sequence_length):
seq = data[i:i + sequence_length]
target = data[i + sequence_length]
sequences.append(seq)
targets.append(target)
return np.array(sequences), np.array(targets)
# 데이터 로드
df = pd.read_csv('traffic_data.csv')
df = preprocess_traffic_data(df)
# 사용할 특징 선택
feature_columns = ['in_bps', 'out_bps', 'hour', 'day_of_week',
'in_bps_ma', 'out_bps_ma', 'in_bps_std', 'out_bps_std']
data = df[feature_columns].values
# 정규화 (0-1 범위)
scaler = MinMaxScaler()
data_scaled = scaler.fit_transform(data)
# 시퀀스 생성
sequence_length = 50 # 과거 50개 시점 데이터 사용
X, y = create_sequences(data_scaled, sequence_length)
print(f"시퀀스 개수: {len(X)}")
print(f"시퀀스 shape: {X.shape}") # (samples, sequence_length, features)
print(f"타겟 shape: {y.shape}")
# 학습/검증 데이터 분리
split_ratio = 0.8
split_index = int(len(X) * split_ratio)
X_train = X[:split_index]
y_train = y[:split_index]
X_val = X[split_index:]
y_val = y[split_index:]
3단계: LSTM 모델 구축
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
def build_lstm_model(sequence_length, n_features):
"""LSTM 오토인코더 모델 구축"""
model = keras.Sequential([
# 인코더
layers.LSTM(128, activation='relu', return_sequences=True,
input_shape=(sequence_length, n_features)),
layers.Dropout(0.2),
layers.LSTM(64, activation='relu', return_sequences=True),
layers.Dropout(0.2),
layers.LSTM(32, activation='relu', return_sequences=False),
layers.Dropout(0.2),
# 디코더
layers.RepeatVector(sequence_length),
layers.LSTM(32, activation='relu', return_sequences=True),
layers.Dropout(0.2),
layers.LSTM(64, activation='relu', return_sequences=True),
layers.Dropout(0.2),
layers.LSTM(128, activation='relu', return_sequences=True),
layers.Dropout(0.2),
# 출력
layers.TimeDistributed(layers.Dense(n_features))
])
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=0.001),
loss='mse',
metrics=['mae']
)
return model
# 모델 생성
n_features = X_train.shape[2]
model = build_lstm_model(sequence_length, n_features)
model.summary()
4단계: 모델 학습
# 콜백 설정
early_stopping = EarlyStopping(
monitor='val_loss',
patience=10,
restore_best_weights=True
)
model_checkpoint = ModelCheckpoint(
'best_lstm_model.h5',
monitor='val_loss',
save_best_only=True
)
# 학습
history = model.fit(
X_train, X_train, # 오토인코더: 입력 = 출력
validation_data=(X_val, X_val),
epochs=100,
batch_size=32,
callbacks=[early_stopping, model_checkpoint],
verbose=1
)
# 학습 곡선 시각화
import matplotlib.pyplot as plt
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.subplot(1, 2, 2)
plt.plot(history.history['mae'], label='Training MAE')
plt.plot(history.history['val_mae'], label='Validation MAE')
plt.title('Model MAE')
plt.xlabel('Epoch')
plt.ylabel('MAE')
plt.legend()
plt.tight_layout()
plt.savefig('training_history.png')
plt.show()
5단계: 이상 탐지 시스템 구축
class LSTMAnomalyDetector:
def __init__(self, model, scaler, sequence_length, threshold_percentile=95):
self.model = model
self.scaler = scaler
self.sequence_length = sequence_length
self.threshold = None
self.threshold_percentile = threshold_percentile
def calculate_reconstruction_error(self, X):
"""재구성 오차 계산"""
predictions = self.model.predict(X, verbose=0)
mse = np.mean(np.power(X - predictions, 2), axis=(1, 2))
return mse
def set_threshold(self, X_normal):
"""정상 데이터로 임계값 설정"""
errors = self.calculate_reconstruction_error(X_normal)
self.threshold = np.percentile(errors, self.threshold_percentile)
print(f"임계값 설정: {self.threshold:.6f}")
return self.threshold
def detect_anomalies(self, X):
"""이상 탐지"""
errors = self.calculate_reconstruction_error(X)
anomalies = errors > self.threshold
return anomalies, errors
def real_time_monitor(self, collector, interface_index, interval=10):
"""실시간 모니터링"""
buffer = []
while True:
try:
# 트래픽 수집
traffic = collector.get_interface_traffic(interface_index)
# 특징 추출
features = self._extract_features(traffic)
buffer.append(features)
# 시퀀스가 충분히 모이면 예측
if len(buffer) >= self.sequence_length:
# 최근 시퀀스 추출
sequence = np.array(buffer[-self.sequence_length:])
sequence_scaled = self.scaler.transform(sequence)
X = sequence_scaled.reshape(1, self.sequence_length, -1)
# 이상 탐지
is_anomaly, error = self.detect_anomalies(X)
print(f"
{'='*60}")
print(f"시간: {traffic['timestamp']}")
print(f"재구성 오차: {error[0]:.6f}")
print(f"임계값: {self.threshold:.6f}")
if is_anomaly[0]:
self._alert_anomaly(traffic, error[0])
else:
print("✅ 정상 트래픽")
time.sleep(interval)
except KeyboardInterrupt:
print("
모니터링 종료")
break
except Exception as e:
print(f"오류: {e}")
time.sleep(interval)
def _extract_features(self, traffic):
"""트래픽에서 특징 추출"""
now = datetime.now()
return [
traffic.get('in_bps', 0),
traffic.get('out_bps', 0),
now.hour,
now.weekday(),
0, 0, 0, 0 # 이동평균, 표준편차는 버퍼에서 계산
]
def _alert_anomaly(self, traffic, error):
"""이상 알림"""
print(f"
🚨 {'='*60}")
print(f"이상 트래픽 탐지!")
print(f"재구성 오차: {error:.6f}")
print(f"정상 임계값의 {error/self.threshold:.1f}배")
print(f"입력: {traffic.get('in_bps', 0):.0f} bps")
print(f"출력: {traffic.get('out_bps', 0):.0f} bps")
print(f"{'='*60}")
# 실제 구현: Slack, Email 등
# send_slack_alert(traffic, error)
# 이상 탐지 시스템 초기화
detector = LSTMAnomalyDetector(
model=model,
scaler=scaler,
sequence_length=sequence_length,
threshold_percentile=95 # 상위 5%를 이상으로 간주
)
# 정상 데이터로 임계값 설정
detector.set_threshold(X_train)
# 검증 데이터로 테스트
anomalies, errors = detector.detect_anomalies(X_val)
print(f"
검증 데이터 이상 비율: {anomalies.sum() / len(anomalies) * 100:.2f}%")
# 실시간 모니터링 (선택)
# collector = SNMPTrafficCollector('192.168.1.1')
# detector.real_time_monitor(collector, interface_index=1, interval=10)
6단계: 시각화 및 분석
def visualize_anomalies(df, anomalies, errors, threshold):
"""이상 탐지 결과 시각화"""
plt.figure(figsize=(15, 10))
# 서브플롯 1: 트래픽 패턴
plt.subplot(3, 1, 1)
plt.plot(df.index, df['in_bps'], label='Input Traffic', alpha=0.7)
plt.plot(df.index, df['out_bps'], label='Output Traffic', alpha=0.7)
# 이상 구간 표시
anomaly_indices = np.where(anomalies)[0]
for idx in anomaly_indices:
plt.axvspan(df.index[idx], df.index[idx+1],
color='red', alpha=0.3)
plt.title('Network Traffic with Anomalies')
plt.ylabel('bps')
plt.legend()
plt.grid(True, alpha=0.3)
# 서브플롯 2: 재구성 오차
plt.subplot(3, 1, 2)
plt.plot(df.index, errors, label='Reconstruction Error', color='blue')
plt.axhline(y=threshold, color='r', linestyle='--', label='Threshold')
plt.fill_between(df.index, 0, errors,
where=(errors > threshold),
color='red', alpha=0.3, label='Anomaly')
plt.title('Reconstruction Error')
plt.ylabel('Error')
plt.legend()
plt.grid(True, alpha=0.3)
# 서브플롯 3: 에러 분포
plt.subplot(3, 1, 3)
plt.hist(errors, bins=50, alpha=0.7, edgecolor='black')
plt.axvline(x=threshold, color='r', linestyle='--', linewidth=2, label='Threshold')
plt.title('Error Distribution')
plt.xlabel('Reconstruction Error')
plt.ylabel('Frequency')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('anomaly_detection_results.png', dpi=300)
plt.show()
# 시각화
visualize_anomalies(
df=df.iloc[split_index:split_index+len(errors)],
anomalies=anomalies,
errors=errors,
threshold=detector.threshold
)
7단계: Flask API로 배포
from flask import Flask, request, jsonify
import numpy as np
app = Flask(__name__)
# 모델 로드
model = keras.models.load_model('best_lstm_model.h5')
detector = LSTMAnomalyDetector(model, scaler, sequence_length)
detector.threshold = 0.05 # 사전 계산된 임계값
@app.route('/detect', methods=['POST'])
def detect():
"""트래픽 이상 탐지 API"""
try:
# 시퀀스 데이터 받기
sequence_data = request.json['sequence']
# numpy 배열로 변환
X = np.array(sequence_data).reshape(1, sequence_length, -1)
# 정규화
X_scaled = scaler.transform(X.reshape(-1, X.shape[2])).reshape(X.shape)
# 이상 탐지
is_anomaly, error = detector.detect_anomalies(X_scaled)
return jsonify({
'is_anomaly': bool(is_anomaly[0]),
'error': float(error[0]),
'threshold': float(detector.threshold),
'severity': 'high' if error[0] > detector.threshold * 2 else 'medium' if is_anomaly[0] else 'low'
})
except Exception as e:
return jsonify({'error': str(e)}), 400
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5001)
성능 향상 기법
1. Attention 메커니즘 추가
from tensorflow.keras.layers import Attention, Concatenate
def build_lstm_with_attention(sequence_length, n_features):
"""Attention이 추가된 LSTM"""
# 인코더
encoder_inputs = layers.Input(shape=(sequence_length, n_features))
encoder = layers.LSTM(128, return_sequences=True)(encoder_inputs)
encoder = layers.Dropout(0.2)(encoder)
# Attention
attention = layers.Attention()([encoder, encoder])
context = Concatenate()([encoder, attention])
encoder_output = layers.LSTM(64)(context)
# 디코더
decoder = layers.RepeatVector(sequence_length)(encoder_output)
decoder = layers.LSTM(64, return_sequences=True)(decoder)
decoder = layers.LSTM(128, return_sequences=True)(decoder)
decoder_output = layers.TimeDistributed(layers.Dense(n_features))(decoder)
model = keras.Model(encoder_inputs, decoder_output)
model.compile(optimizer='adam', loss='mse')
return model
2. 앙상블 모델
def ensemble_anomaly_detection(models, X, threshold):
"""여러 모델의 예측을 평균"""
errors_list = []
for model in models:
predictions = model.predict(X, verbose=0)
mse = np.mean(np.power(X - predictions, 2), axis=(1, 2))
errors_list.append(mse)
# 평균 에러
avg_errors = np.mean(errors_list, axis=0)
anomalies = avg_errors > threshold
return anomalies, avg_errors
요약
이 튜토리얼에서 다룬 내용: 1. ✅ SNMP/NetFlow로 네트워크 트래픽 수집 2. ✅ 시계열 데이터 전처리 및 시퀀스 생성 3. ✅ LSTM 오토인코더 모델 구축 및 학습 4. ✅ 재구성 오차 기반 이상 탐지 5. ✅ 실시간 모니터링 시스템 6. ✅ Flask API로 배포
이 시스템을 통해 DDoS 공격, 트래픽 급증, 비정상 패턴 등을 자동으로 탐지할 수 있습니다!