Skip to content

Building a Real-Time Event Correlator: Detecting Suspicious Patterns in Log Streams

In cybersecurity, the difference between a minor incident and a catastrophic breach often lies in the connection between seemingly unrelated events. A single failed login might be a typo; a single firewall drop might be a misconfiguration. But a failed login followed by a firewall drop and a database connection failure from the same IP within minutes? That is the signature of an attack.

Traditional security tools often analyze events in isolation, treating each log entry as an independent incident. This "snapshot" approach misses the narrative of an attack. Modern adversaries operate in campaigns, weaving their actions through the noise of benign activity. To counter them, we need a system that doesn't just count events but understands their sequence and context. This is the domain of Event Correlation.

This chapter guides you through building a stateful event correlator in Python. We will move beyond static analysis to create a dynamic engine that tracks attack patterns across a sliding time window, transforming raw log data into high-fidelity security intelligence.

From Discrete Logs to Coherent Narratives

Imagine a library of millions of security logs. Treating each log as an independent word, we miss the story told by the sentences and paragraphs. An event correlator acts as both a literary critic and a forensic detective, synthesizing disparate data points into a coherent narrative.

Event Correlation is the automated process of analyzing multiple security events—often from different sources like firewalls, servers, and endpoints—to identify relationships and patterns that indicate a security incident. It transforms high-volume data into actionable intelligence.

The Power of Statefulness

The most critical concept in correlation is statefulness. A stateless system sees events in isolation. A stateful system possesses memory.

Consider these three events from the same IP address 10.1.1.5: 1. Firewall Log: Connection attempt to Port 22 (SSH) denied. 2. Web Server Log: Access to /index.html successful. 3. Active Directory Log: Failed login for user ‘admin’.

A stateless system treats these as three unrelated, low-priority events. A stateful correlator, however, maintains a contextual record for 10.1.1.5. It remembers the initial suspicious network behavior and the subsequent authentication failure. If the failed login repeats nine more times within a minute, the correlator increments a counter and triggers a high-priority alert for a brute-force attack. This memory allows the defender to track an attacker’s progress through the stages of the Cyber Kill Chain.

The Sliding Time Window: Temporal Relevance

Statefulness requires a temporal boundary. Without a time constraint, correlation is meaningless. An event today and another six months later are coincidental, not correlated.

The Sliding Time Window provides this constraint. Imagine a conveyor belt of log events moving past a fixed viewing port. Only events within this port—the current time window—are relevant for correlation. As new events enter the right side of the window, old events exit the left side and are discarded.

This mechanism manages computational load and ensures relevance. The window size is a critical design parameter: too short, and slow attacks are missed; too long, and memory consumption spikes with increased false positives.

The Rule Engine: Defining Suspicion

The correlation engine needs a standardized way to define suspicious patterns. Our Python implementation will focus on two primary correlation logics:

  1. Threshold-Based Correlation (Aggregation): Counting events associated with a key (e.g., source IP) within the window.

    • Logic: IF (Count of Event X for Key Y) > Z within Time Window T, THEN Alert.
    • Example: More than 50 database connection failures from a service account in 5 minutes suggests a credential stuffing attack.
  2. Sequential Chain Correlation (Causality): Detecting a flow of events where one action triggers the expectation of the next.

    • Logic: IF Event A occurs, initiate a state. IF Event B occurs from the same source within the time window, escalate the state. IF Event C follows, trigger an alert.
    • Example: A successful login (Event A), followed by failed file access attempts (Event B), followed by a remote PowerShell execution (Event C) indicates lateral movement.

Building a Python Event Correlator

To demonstrate these principles, we’ll build a simple yet powerful stateful correlator. This engine will detect a specific attack pattern: a Reconnaissance Scan followed shortly by an Access Attempt from the same source IP.

We’ll use Python’s asyncio-friendly design patterns, though this example is synchronous for clarity. The core concepts—state management and time windows—are directly applicable to asynchronous implementations.

The Code: SimpleStatefulEventCorrelator

import time
from datetime import datetime, timedelta
from typing import Dict, Any

# --- Configuration & State Definitions ---
TIME_WINDOW_SECONDS = 10 

# Define explicit states for our finite state machine
STATE_START = "INITIAL"
STATE_RECON_DETECTED = "RECON_DETECTED"
STATE_ACCESS_ATTEMPT_DETECTED = "ALERT_TRIGGERED"

class SimpleEventCorrelator:
    """
    Manages stateful detection of sequential events within a sliding time window.
    Rule: Reconnaissance (Type A) followed by Access Attempt (Type B) within 10 seconds.
    """

    def __init__(self, window_seconds: int = TIME_WINDOW_SECONDS):
        # active_correlations stores state for every IP being tracked.
        # Key: Source IP (str) -> Value: State data dictionary
        self.active_correlations: Dict[str, Dict[str, Any]] = {}
        self.window = timedelta(seconds=window_seconds)

    def _cleanup_expired_states(self):
        """Removes states that have exceeded the time window to prevent memory bloat."""
        current_time = datetime.now()
        expired_ips = []

        for ip, data in self.active_correlations.items():
            last_event_time = datetime.fromtimestamp(data['last_ts'])
            if current_time - last_event_time > self.window:
                expired_ips.append(ip)

        for ip in expired_ips:
            print(f"[CLEANUP] State expired for IP: {ip}. Removing state.")
            self.active_correlations.pop(ip)

    def process_event(self, event: Dict[str, str]) -> bool:
        """Processes a single log event and applies correlation logic."""
        self._cleanup_expired_states()

        source_ip = event.get('src_ip')
        event_type = event.get('type')
        current_ts = time.time()

        if not source_ip or not event_type:
            return False 

        # Retrieve current state or initialize if new IP
        current_state_data = self.active_correlations.get(source_ip, 
                                                          {'state': STATE_START, 'last_ts': current_ts})

        current_state = current_state_data['state']
        last_ts = current_state_data['last_ts']

        # --- State Transition Logic ---

        # A. Handling the INITIAL state
        if current_state == STATE_START:
            if event_type == "RECON_SCAN":
                print(f"[{source_ip}] State Change: INITIAL -> RECON_DETECTED.")
                self.active_correlations[source_ip] = {
                    'state': STATE_RECON_DETECTED,
                    'last_ts': current_ts
                }
            return False

        # B. Handling the RECON_DETECTED state
        elif current_state == STATE_RECON_DETECTED:
            time_diff = current_ts - last_ts

            # Check Time Window Enforcement
            if time_diff > self.window.total_seconds():
                print(f"[{source_ip}] Time window expired ({time_diff:.2f}s). Resetting state.")
                self.active_correlations[source_ip] = {
                    'state': STATE_START, 
                    'last_ts': current_ts
                }
                return False

            # Check Sequence Completion
            if event_type == "ACCESS_ATTEMPT":
                print(f"\n!!! ALERT TRIGGERED !!! [{source_ip}] Detected: Recon followed by Access Attempt in {time_diff:.2f}s.")
                self.active_correlations[source_ip]['state'] = STATE_ACCESS_ATTEMPT_DETECTED
                return True

            # Check State Extension (Sliding Window)
            elif event_type == "RECON_SCAN":
                print(f"[{source_ip}] Extending Recon window (Sliding Window concept).")
                self.active_correlations[source_ip]['last_ts'] = current_ts
                return False

            return False

        # C. Handling ALERT_TRIGGERED state
        elif current_state == STATE_ACCESS_ATTEMPT_DETECTED:
            return False 

        return False

# --- Simulation Setup ---
correlator = SimpleEventCorrelator(window_seconds=8)

print(f"--- Starting Simulation with {correlator.window.total_seconds()} second window ---")

# 1. Start the sequence
event1 = {'src_ip': '192.168.1.10', 'type': 'RECON_SCAN', 'payload': 'Nmap scan'}
print("\n[T=0s] Processing Event 1 (Reconnaissance)")
correlator.process_event(event1)

# 2. Benign noise event
event2 = {'src_ip': '10.0.0.5', 'type': 'HTTP_GET', 'payload': 'Normal request'}
print("\n[T=0s] Processing Event 2 (Noise)")
correlator.process_event(event2)

# 3. Time passes (within the window)
time.sleep(3) 

# 4. Trigger event (completes the sequence)
event3 = {'src_ip': '192.168.1.10', 'type': 'ACCESS_ATTEMPT', 'payload': 'Failed login attempt'}
print("\n[T=3s] Processing Event 3 (Access Attempt)")
alert_status = correlator.process_event(event3)
print(f"Alert Status: {alert_status}")

# 5. Show final state after alert
print("\n--- Current Active Correlations After Alert ---")
print(correlator.active_correlations)

# --- Testing Time Window Expiration ---
event4 = {'src_ip': '172.16.0.2', 'type': 'RECON_SCAN', 'payload': 'New IP scan'}
print("\n[T=3s] Processing Event 4 (New Recon)")
correlator.process_event(event4)

print("\n[T=3s -> T=12s] Waiting 9 seconds to expire the window for 172.16.0.2...")
time.sleep(9) 

event5 = {'src_ip': '172.16.0.2', 'type': 'ACCESS_ATTEMPT', 'payload': 'Delayed login attempt'}
print("\n[T=12s] Processing Event 5 (Delayed Access Attempt)")
alert_status_expired = correlator.process_event(event5)
print(f"Alert Status: {alert_status_expired}")
print("\n--- Final State After Expiration ---")
print(correlator.active_correlations)

Dissecting the Correlation Engine

Let’s break down the key components that make this engine effective.

1. State Management with Dictionaries

The active_correlations dictionary is the engine's memory. The key is the identifier (source IP), and the value is a nested dictionary containing the current state and the timestamp of the last relevant event. This structure allows for O(1) average-time complexity for lookups, which is crucial for high-throughput environments.

2. Safe Initialization with dict.get()

The line self.active_correlations.get(source_ip, {'state': STATE_START, ...}) is a cornerstone of robust state management. If the IP is new, it returns the default initialization dictionary. If the IP exists, it returns the current state. This prevents KeyError exceptions and simplifies the logic for handling new vs. existing IPs.

3. Sliding Window Enforcement

The _cleanup_expired_states method ensures the engine doesn't consume infinite memory. By iterating through active states and removing those where the time since the last event exceeds the window, we maintain a lean and relevant dataset. This is the practical implementation of the sliding window concept.

4. Finite State Machine (FSM) Logic

The process_event method acts as a state machine. It evaluates the current state of an IP and the incoming event type to determine the next state. This sequential logic is what enables the detection of multi-stage attacks. The engine doesn't just look for isolated events; it looks for sequences.

5. Asynchronous Readiness

While this example is synchronous, the architecture is designed for asyncio. The event loop can handle I/O (ingesting logs) while the state machine processes events. Timers for window expiration can be managed as asynchronous tasks, allowing a single-threaded Python process to monitor thousands of concurrent attack chains efficiently.

Conclusion: From Theory to Defense

Building an event correlator shifts your defensive posture from reactive to proactive. By implementing statefulness, temporal awareness, and sequential rule matching, you create a system that understands the narrative of an attack.

This foundational engine can be extended in many ways: integrating machine learning for anomaly detection, adding more complex correlation rules, or connecting it to a stream processing framework like Apache Kafka. The principles remain the same: remember the past, respect time, and look for the patterns that matter.

Let's Discuss

  1. Beyond IPs: What other identifiers would be valuable for correlating events in a modern cloud environment (e.g., user IDs, container IDs, session tokens), and what new challenges would they introduce?
  2. Performance vs. Complexity: How would you scale this correlator to handle 100,000 events per second? Would you prioritize a distributed in-memory database, a stream processing engine, or a hybrid approach?

The concepts and code demonstrated here are drawn directly from the comprehensive roadmap laid out in the book Python Defensive Cybersecurity Amazon Link of the Python Programming Series, you can find it also on Leanpub.com.



Code License: All code examples are released under the MIT License. Github repo.

Content Copyright: Copyright © 2026 Edgar Milvus | Privacy & Cookie Policy. All rights reserved.

All textual explanations, original diagrams, and illustrations are the intellectual property of the author. To support the maintenance of this site via AdSense, please read this content exclusively online. Copying, redistribution, or reproduction is strictly prohibited.