LSTM으로 네트워크 트래픽 패턴 분석 및 이상 탐지

23 조회 2025-11-17 튜토리얼

LSTM으로 네트워크 트래픽 패턴 분석 및 이상 탐지

개요

LSTM(Long Short-Term Memory) 신경망을 사용하여 네트워크 트래픽의 시계열 패턴을 학습하고, 정상 패턴에서 벗어나는 이상 트래픽을 자동으로 탐지하는 시스템을 만듭니다.

필요한 라이브러리

pip install tensorflow keras pandas numpy matplotlib seaborn netmiko scapy

1단계: 네트워크 트래픽 데이터 수집

SNMP로 인터페이스 트래픽 수집

from pysnmp.hlapi import *
import pandas as pd
from datetime import datetime
import time

class SNMPTrafficCollector:
    def __init__(self, target, community='public'):
        self.target = target
        self.community = community

        # OID 정의
        self.ifInOctets_OID = '1.3.6.1.2.1.2.2.1.10'
        self.ifOutOctets_OID = '1.3.6.1.2.1.2.2.1.16'

    def get_interface_traffic(self, interface_index):
        """특정 인터페이스의 트래픽 조회"""

        # InOctets
        in_iterator = getCmd(
            SnmpEngine(),
            CommunityData(self.community),
            UdpTransportTarget((self.target, 161)),
            ContextData(),
            ObjectType(ObjectIdentity(f'{self.ifInOctets_OID}.{interface_index}'))
        )

        errorIndication, errorStatus, errorIndex, varBinds = next(in_iterator)
        in_octets = int(varBinds[0][1])

        # OutOctets
        out_iterator = getCmd(
            SnmpEngine(),
            CommunityData(self.community),
            UdpTransportTarget((self.target, 161)),
            ContextData(),
            ObjectType(ObjectIdentity(f'{self.ifOutOctets_OID}.{interface_index}'))
        )

        errorIndication, errorStatus, errorIndex, varBinds = next(out_iterator)
        out_octets = int(varBinds[0][1])

        return {
            'timestamp': datetime.now(),
            'in_octets': in_octets,
            'out_octets': out_octets,
            'total_octets': in_octets + out_octets
        }

    def collect_timeseries(self, interface_index, duration_minutes=60, interval_seconds=10):
        """시계열 데이터 수집"""

        data = []
        iterations = (duration_minutes * 60) // interval_seconds

        prev_in = None
        prev_out = None

        for i in range(iterations):
            traffic = self.get_interface_traffic(interface_index)

            # 초당 전송량 계산 (bps)
            if prev_in is not None:
                in_bps = ((traffic['in_octets'] - prev_in) * 8) / interval_seconds
                out_bps = ((traffic['out_octets'] - prev_out) * 8) / interval_seconds

                data.append({
                    'timestamp': traffic['timestamp'],
                    'in_bps': in_bps,
                    'out_bps': out_bps,
                    'total_bps': in_bps + out_bps
                })

            prev_in = traffic['in_octets']
            prev_out = traffic['out_octets']

            print(f"수집 진행: {i+1}/{iterations}")
            time.sleep(interval_seconds)

        df = pd.DataFrame(data)
        df.to_csv('traffic_data.csv', index=False)
        return df

# 사용 예
collector = SNMPTrafficCollector('192.168.1.1')
df = collector.collect_timeseries(interface_index=1, duration_minutes=1440)  # 24시간

NetFlow/sFlow 데이터 수집 (선택)

from scapy.all import *

def capture_netflow(interface='eth0', count=1000):
    """NetFlow 패킷 캡처"""

    packets = sniff(
        iface=interface,
        filter='udp port 2055',  # NetFlow port
        count=count
    )

    flows = []
    for pkt in packets:
        if pkt.haslayer(UDP):
            # NetFlow 파싱 (간단한 예제)
            flows.append({
                'timestamp': datetime.now(),
                'src_ip': pkt[IP].src if IP in pkt else None,
                'dst_ip': pkt[IP].dst if IP in pkt else None,
                'protocol': pkt[IP].proto if IP in pkt else None,
                'bytes': len(pkt)
            })

    return pd.DataFrame(flows)

2단계: 데이터 전처리 및 시퀀스 생성

import numpy as np
from sklearn.preprocessing import MinMaxScaler

def preprocess_traffic_data(df, sequence_length=50):
    """트래픽 데이터 전처리"""

    # 타임스탬프 인덱스 설정
    df['timestamp'] = pd.to_datetime(df['timestamp'])
    df = df.set_index('timestamp')

    # 결측치 처리
    df = df.fillna(method='ffill')

    # 시간 기반 특징 추가
    df['hour'] = df.index.hour
    df['day_of_week'] = df.index.dayofweek
    df['is_weekend'] = (df['day_of_week'] >= 5).astype(int)

    # 이동 평균 (노이즈 제거)
    df['in_bps_ma'] = df['in_bps'].rolling(window=6).mean()
    df['out_bps_ma'] = df['out_bps'].rolling(window=6).mean()

    # 표준편차 (변동성)
    df['in_bps_std'] = df['in_bps'].rolling(window=6).std()
    df['out_bps_std'] = df['out_bps'].rolling(window=6).std()

    # 결측치 제거
    df = df.dropna()

    return df

def create_sequences(data, sequence_length=50):
    """LSTM 입력용 시퀀스 생성"""

    sequences = []
    targets = []

    for i in range(len(data) - sequence_length):
        seq = data[i:i + sequence_length]
        target = data[i + sequence_length]

        sequences.append(seq)
        targets.append(target)

    return np.array(sequences), np.array(targets)

# 데이터 로드
df = pd.read_csv('traffic_data.csv')
df = preprocess_traffic_data(df)

# 사용할 특징 선택
feature_columns = ['in_bps', 'out_bps', 'hour', 'day_of_week',
                   'in_bps_ma', 'out_bps_ma', 'in_bps_std', 'out_bps_std']

data = df[feature_columns].values

# 정규화 (0-1 범위)
scaler = MinMaxScaler()
data_scaled = scaler.fit_transform(data)

# 시퀀스 생성
sequence_length = 50  # 과거 50개 시점 데이터 사용
X, y = create_sequences(data_scaled, sequence_length)

print(f"시퀀스 개수: {len(X)}")
print(f"시퀀스 shape: {X.shape}")  # (samples, sequence_length, features)
print(f"타겟 shape: {y.shape}")

# 학습/검증 데이터 분리
split_ratio = 0.8
split_index = int(len(X) * split_ratio)

X_train = X[:split_index]
y_train = y[:split_index]
X_val = X[split_index:]
y_val = y[split_index:]

3단계: LSTM 모델 구축

from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint

def build_lstm_model(sequence_length, n_features):
    """LSTM 오토인코더 모델 구축"""

    model = keras.Sequential([
        # 인코더
        layers.LSTM(128, activation='relu', return_sequences=True,
                   input_shape=(sequence_length, n_features)),
        layers.Dropout(0.2),

        layers.LSTM(64, activation='relu', return_sequences=True),
        layers.Dropout(0.2),

        layers.LSTM(32, activation='relu', return_sequences=False),
        layers.Dropout(0.2),

        # 디코더
        layers.RepeatVector(sequence_length),

        layers.LSTM(32, activation='relu', return_sequences=True),
        layers.Dropout(0.2),

        layers.LSTM(64, activation='relu', return_sequences=True),
        layers.Dropout(0.2),

        layers.LSTM(128, activation='relu', return_sequences=True),
        layers.Dropout(0.2),

        # 출력
        layers.TimeDistributed(layers.Dense(n_features))
    ])

    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=0.001),
        loss='mse',
        metrics=['mae']
    )

    return model

# 모델 생성
n_features = X_train.shape[2]
model = build_lstm_model(sequence_length, n_features)

model.summary()

4단계: 모델 학습

# 콜백 설정
early_stopping = EarlyStopping(
    monitor='val_loss',
    patience=10,
    restore_best_weights=True
)

model_checkpoint = ModelCheckpoint(
    'best_lstm_model.h5',
    monitor='val_loss',
    save_best_only=True
)

# 학습
history = model.fit(
    X_train, X_train,  # 오토인코더: 입력 = 출력
    validation_data=(X_val, X_val),
    epochs=100,
    batch_size=32,
    callbacks=[early_stopping, model_checkpoint],
    verbose=1
)

# 학습 곡선 시각화
import matplotlib.pyplot as plt

plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history.history['mae'], label='Training MAE')
plt.plot(history.history['val_mae'], label='Validation MAE')
plt.title('Model MAE')
plt.xlabel('Epoch')
plt.ylabel('MAE')
plt.legend()

plt.tight_layout()
plt.savefig('training_history.png')
plt.show()

5단계: 이상 탐지 시스템 구축

class LSTMAnomalyDetector:
    def __init__(self, model, scaler, sequence_length, threshold_percentile=95):
        self.model = model
        self.scaler = scaler
        self.sequence_length = sequence_length
        self.threshold = None
        self.threshold_percentile = threshold_percentile

    def calculate_reconstruction_error(self, X):
        """재구성 오차 계산"""
        predictions = self.model.predict(X, verbose=0)
        mse = np.mean(np.power(X - predictions, 2), axis=(1, 2))
        return mse

    def set_threshold(self, X_normal):
        """정상 데이터로 임계값 설정"""
        errors = self.calculate_reconstruction_error(X_normal)
        self.threshold = np.percentile(errors, self.threshold_percentile)
        print(f"임계값 설정: {self.threshold:.6f}")
        return self.threshold

    def detect_anomalies(self, X):
        """이상 탐지"""
        errors = self.calculate_reconstruction_error(X)
        anomalies = errors > self.threshold
        return anomalies, errors

    def real_time_monitor(self, collector, interface_index, interval=10):
        """실시간 모니터링"""

        buffer = []

        while True:
            try:
                # 트래픽 수집
                traffic = collector.get_interface_traffic(interface_index)

                # 특징 추출
                features = self._extract_features(traffic)
                buffer.append(features)

                # 시퀀스가 충분히 모이면 예측
                if len(buffer) >= self.sequence_length:
                    # 최근 시퀀스 추출
                    sequence = np.array(buffer[-self.sequence_length:])
                    sequence_scaled = self.scaler.transform(sequence)
                    X = sequence_scaled.reshape(1, self.sequence_length, -1)

                    # 이상 탐지
                    is_anomaly, error = self.detect_anomalies(X)

                    print(f"
{'='*60}")
                    print(f"시간: {traffic['timestamp']}")
                    print(f"재구성 오차: {error[0]:.6f}")
                    print(f"임계값: {self.threshold:.6f}")

                    if is_anomaly[0]:
                        self._alert_anomaly(traffic, error[0])
                    else:
                        print("✅ 정상 트래픽")

                time.sleep(interval)

            except KeyboardInterrupt:
                print("
모니터링 종료")
                break
            except Exception as e:
                print(f"오류: {e}")
                time.sleep(interval)

    def _extract_features(self, traffic):
        """트래픽에서 특징 추출"""
        now = datetime.now()
        return [
            traffic.get('in_bps', 0),
            traffic.get('out_bps', 0),
            now.hour,
            now.weekday(),
            0, 0, 0, 0  # 이동평균, 표준편차는 버퍼에서 계산
        ]

    def _alert_anomaly(self, traffic, error):
        """이상 알림"""
        print(f"
🚨 {'='*60}")
        print(f"이상 트래픽 탐지!")
        print(f"재구성 오차: {error:.6f}")
        print(f"정상 임계값의 {error/self.threshold:.1f}배")
        print(f"입력: {traffic.get('in_bps', 0):.0f} bps")
        print(f"출력: {traffic.get('out_bps', 0):.0f} bps")
        print(f"{'='*60}")

        # 실제 구현: Slack, Email 등
        # send_slack_alert(traffic, error)

# 이상 탐지 시스템 초기화
detector = LSTMAnomalyDetector(
    model=model,
    scaler=scaler,
    sequence_length=sequence_length,
    threshold_percentile=95  # 상위 5%를 이상으로 간주
)

# 정상 데이터로 임계값 설정
detector.set_threshold(X_train)

# 검증 데이터로 테스트
anomalies, errors = detector.detect_anomalies(X_val)
print(f"
검증 데이터 이상 비율: {anomalies.sum() / len(anomalies) * 100:.2f}%")

# 실시간 모니터링 (선택)
# collector = SNMPTrafficCollector('192.168.1.1')
# detector.real_time_monitor(collector, interface_index=1, interval=10)

6단계: 시각화 및 분석

def visualize_anomalies(df, anomalies, errors, threshold):
    """이상 탐지 결과 시각화"""

    plt.figure(figsize=(15, 10))

    # 서브플롯 1: 트래픽 패턴
    plt.subplot(3, 1, 1)
    plt.plot(df.index, df['in_bps'], label='Input Traffic', alpha=0.7)
    plt.plot(df.index, df['out_bps'], label='Output Traffic', alpha=0.7)

    # 이상 구간 표시
    anomaly_indices = np.where(anomalies)[0]
    for idx in anomaly_indices:
        plt.axvspan(df.index[idx], df.index[idx+1],
                   color='red', alpha=0.3)

    plt.title('Network Traffic with Anomalies')
    plt.ylabel('bps')
    plt.legend()
    plt.grid(True, alpha=0.3)

    # 서브플롯 2: 재구성 오차
    plt.subplot(3, 1, 2)
    plt.plot(df.index, errors, label='Reconstruction Error', color='blue')
    plt.axhline(y=threshold, color='r', linestyle='--', label='Threshold')
    plt.fill_between(df.index, 0, errors,
                     where=(errors > threshold),
                     color='red', alpha=0.3, label='Anomaly')
    plt.title('Reconstruction Error')
    plt.ylabel('Error')
    plt.legend()
    plt.grid(True, alpha=0.3)

    # 서브플롯 3: 에러 분포
    plt.subplot(3, 1, 3)
    plt.hist(errors, bins=50, alpha=0.7, edgecolor='black')
    plt.axvline(x=threshold, color='r', linestyle='--', linewidth=2, label='Threshold')
    plt.title('Error Distribution')
    plt.xlabel('Reconstruction Error')
    plt.ylabel('Frequency')
    plt.legend()
    plt.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.savefig('anomaly_detection_results.png', dpi=300)
    plt.show()

# 시각화
visualize_anomalies(
    df=df.iloc[split_index:split_index+len(errors)],
    anomalies=anomalies,
    errors=errors,
    threshold=detector.threshold
)

7단계: Flask API로 배포

from flask import Flask, request, jsonify
import numpy as np

app = Flask(__name__)

# 모델 로드
model = keras.models.load_model('best_lstm_model.h5')
detector = LSTMAnomalyDetector(model, scaler, sequence_length)
detector.threshold = 0.05  # 사전 계산된 임계값

@app.route('/detect', methods=['POST'])
def detect():
    """트래픽 이상 탐지 API"""
    try:
        # 시퀀스 데이터 받기
        sequence_data = request.json['sequence']

        # numpy 배열로 변환
        X = np.array(sequence_data).reshape(1, sequence_length, -1)

        # 정규화
        X_scaled = scaler.transform(X.reshape(-1, X.shape[2])).reshape(X.shape)

        # 이상 탐지
        is_anomaly, error = detector.detect_anomalies(X_scaled)

        return jsonify({
            'is_anomaly': bool(is_anomaly[0]),
            'error': float(error[0]),
            'threshold': float(detector.threshold),
            'severity': 'high' if error[0] > detector.threshold * 2 else 'medium' if is_anomaly[0] else 'low'
        })

    except Exception as e:
        return jsonify({'error': str(e)}), 400

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5001)

성능 향상 기법

1. Attention 메커니즘 추가

from tensorflow.keras.layers import Attention, Concatenate

def build_lstm_with_attention(sequence_length, n_features):
    """Attention이 추가된 LSTM"""

    # 인코더
    encoder_inputs = layers.Input(shape=(sequence_length, n_features))
    encoder = layers.LSTM(128, return_sequences=True)(encoder_inputs)
    encoder = layers.Dropout(0.2)(encoder)

    # Attention
    attention = layers.Attention()([encoder, encoder])
    context = Concatenate()([encoder, attention])

    encoder_output = layers.LSTM(64)(context)

    # 디코더
    decoder = layers.RepeatVector(sequence_length)(encoder_output)
    decoder = layers.LSTM(64, return_sequences=True)(decoder)
    decoder = layers.LSTM(128, return_sequences=True)(decoder)
    decoder_output = layers.TimeDistributed(layers.Dense(n_features))(decoder)

    model = keras.Model(encoder_inputs, decoder_output)
    model.compile(optimizer='adam', loss='mse')

    return model

2. 앙상블 모델

def ensemble_anomaly_detection(models, X, threshold):
    """여러 모델의 예측을 평균"""

    errors_list = []
    for model in models:
        predictions = model.predict(X, verbose=0)
        mse = np.mean(np.power(X - predictions, 2), axis=(1, 2))
        errors_list.append(mse)

    # 평균 에러
    avg_errors = np.mean(errors_list, axis=0)
    anomalies = avg_errors > threshold

    return anomalies, avg_errors

요약

이 튜토리얼에서 다룬 내용: 1. ✅ SNMP/NetFlow로 네트워크 트래픽 수집 2. ✅ 시계열 데이터 전처리 및 시퀀스 생성 3. ✅ LSTM 오토인코더 모델 구축 및 학습 4. ✅ 재구성 오차 기반 이상 탐지 5. ✅ 실시간 모니터링 시스템 6. ✅ Flask API로 배포

이 시스템을 통해 DDoS 공격, 트래픽 급증, 비정상 패턴 등을 자동으로 탐지할 수 있습니다!