Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 44 additions & 26 deletions bbot/core/helpers/web/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,23 @@
log = logging.getLogger("bbot.core.helpers.web")


async def iter_batch_results(stream):
"""
Yield individual ``BatchResult`` objects from a ``request_batch_stream`` iterator.

The native blasthttp 0.4.0 iterator yields lists of ``BatchResult`` (drained in
chunks of 1000 / 200ms to amortize the Python↔Rust boundary). A future Python
wrapper is expected to unwrap these into individual items. This helper handles
both shapes so callers can write a single ``async for`` loop.
"""
async for item in stream:
if isinstance(item, list):
for r in item:
yield r
else:
yield item


class WebHelper:
"""
Main utility class for managing HTTP operations in BBOT. Uses blasthttp (Rust) as the
Expand Down Expand Up @@ -297,23 +314,26 @@ async def request(self, *args, **kwargs):
log.trace(traceback.format_exc())
raise

async def request_batch(self, urls, threads=10, **kwargs):
async def request_batch_stream(self, urls, threads=10, **kwargs):
"""
Request multiple URLs in parallel via blasthttp's native Rust batch engine.
Request multiple URLs in parallel via blasthttp's native Rust batch engine,
yielding each response as soon as it completes (completion order, not input
order).

Applies the same header/cookie/proxy/timeout logic as ``request()`` — each
entry is translated into a ``blasthttp.BatchConfig`` and sent to Rust in one
shot. Results are returned as a list (not streamed).
entry is translated into a ``blasthttp.BatchConfig`` and dispatched through
``blasthttp.request_batch_stream``. A slow request no longer blocks faster
peers behind it, and Python work overlaps with in-flight HTTP I/O.

Each entry in ``urls`` can be:
- A plain URL string (uses shared ``**kwargs`` for all requests)
- A ``(url, per_request_kwargs)`` tuple for per-request options
- A ``(url, per_request_kwargs, tracker)`` tuple to attach arbitrary
tracking data that is returned alongside the response
tracking data that is yielded alongside the response

Returns:
When entries are plain strings: ``list[(url, response)]``
When any entry includes a tracker: ``list[(url, response, tracker)]``
Yields:
When entries are plain strings: ``(url, response)``
When any entry includes a tracker: ``(url, response, tracker)``

Args:
urls: URLs to visit — strings or ``(url, kwargs[, tracker])`` tuples.
Expand All @@ -324,15 +344,13 @@ async def request_batch(self, urls, threads=10, **kwargs):
Examples:
Simple (shared kwargs)::

results = await self.helpers.request_batch(urls, headers={"X-Test": "Test"})
for url, response in results:
async for url, response in self.helpers.request_batch_stream(urls, headers={"X-Test": "Test"}):
...

Per-request kwargs with tracker::

reqs = [("http://example.com", {"method": "POST"}, "my-tracker")]
results = await self.helpers.request_batch(reqs)
for url, response, tracker in results:
async for url, response, tracker in self.helpers.request_batch_stream(reqs):
...
"""
import blasthttp
Expand All @@ -354,33 +372,33 @@ async def request_batch(self, urls, threads=10, **kwargs):
entries.append((str(entry), kwargs, None))

if not entries:
return []
return

# Build BatchConfig objects using the same logic as request().
# Map each config URL back to a queue of trackers so we can correlate
# completion-order results to original entries even when multiple entries
# share a URL.
from collections import deque

# Build BatchConfig objects using the same logic as request()
configs = []
trackers = []
trackers_by_url = {}
for url, req_kwargs, tracker in entries:
url, method, blast_kwargs = self._build_blasthttp_kwargs(url, **req_kwargs)
config = blasthttp.BatchConfig(url, **blast_kwargs)
configs.append(config)
trackers.append(tracker)

# Send to Rust — all I/O happens here
batch_results = await self.client.request_batch(configs, concurrency=threads)
trackers_by_url.setdefault(config.url, deque()).append(tracker)

# Convert to (url, response[, tracker]) tuples
# Results are returned in the same order as configs
results = []
for i, br in enumerate(batch_results):
async for br in iter_batch_results(self.client.request_batch_stream(configs, concurrency=threads)):
if br.response is not None:
response = BlasthttpResponse(br.response, request_url=br.url, method="GET")
else:
response = None
if has_tracker:
results.append((br.url, response, trackers[i]))
queue = trackers_by_url.get(br.url)
tracker = queue.popleft() if queue else None
yield br.url, response, tracker
else:
results.append((br.url, response))
return results
yield br.url, response

async def download(self, url, **kwargs):
"""
Expand Down
2 changes: 1 addition & 1 deletion bbot/modules/git.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async def handle_event(self, event):
self.helpers.urljoin(base_url, ".git/config"),
self.helpers.urljoin(f"{base_url}/", ".git/config"),
}
for url, response in await self.helpers.request_batch(urls):
async for url, response in self.helpers.request_batch_stream(urls):
text = getattr(response, "text", "")
if not text:
text = ""
Expand Down
223 changes: 118 additions & 105 deletions bbot/modules/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import blasthttp

from bbot.core.helpers.web.web import iter_batch_results
from bbot.modules.base import BaseModule


Expand Down Expand Up @@ -174,10 +175,75 @@ def _response_to_json(self, url_input, response):

return j

async def _process_result(self, result, parent_event):
"""Emit URL + HTTP_RESPONSE events for one batch result. Returns True if status was usable."""
if not result.success:
self.debug(f"blasthttp error for {result.url}: {result.error}")
return False

response = result.response
status_code = response.status
if status_code == 0:
self.debug(f'No HTTP status code for "{result.url}"')
return False

url = response.url

# The "input" field represents the original scan target (host:port),
# not the full URL. Other modules and output consumers use this to
# correlate responses back to the target that produced them.
input_parsed = urlparse(result.url)
url_input = input_parsed.netloc or result.url
j = self._response_to_json(url_input, response)

# discard 404s from unverified URLs
path = j.get("path", "/")
if parent_event.type == "URL_UNVERIFIED" and status_code in (404,) and path != "/":
self.debug(f'Discarding 404 from "{url}"')
return True

tags = [f"status-{status_code}"]
url_context = "{module} visited {event.parent.data} and got status code {event.http_status}"
if parent_event.type == "OPEN_TCP_PORT":
url_context += " at {event.data}"

url_event = self.make_event(url, "URL", parent_event, tags=tags, context=url_context)
if url_event:
response_ip = j.get("host", "")
if response_ip:
url_event._resolved_hosts.add(response_ip)
title = j.get("title", "")
if title:
url_event.http_title = title
location = j.get("location", "")
if location:
url_event.redirect_location = location
if url_event != parent_event:
await self.emit_event(url_event)
content_type = j.get("header", {}).get("content_type", "unspecified").split(";")[0]
content_length = self.helpers.bytes_to_human(j.get("content_length", 0))
await self.emit_event(
j,
"HTTP_RESPONSE",
url_event,
tags=url_event.tags,
context=f"HTTP_RESPONSE was {content_length} with {content_type} content type",
)

if self.store_responses:
response_dir = self.scan.home / "http_responses"
self.helpers.mkdir(response_dir)
filename = f"{j['host']}.{urlparse(url).port or 443}{path.replace('/', '[slash]')}.txt"
response_file = response_dir / filename
response_file.write_text(j.get("raw_header", "") + j.get("body", ""))
return True

async def handle_batch(self, *events):
stdin = {}
# Track dual-scheme probes from OPEN_TCP_PORT: {(host, port): {"http": url, "https": url}}
port_probes = {}
# Reverse index: each paired probe URL → its (host, port) key
paired_probe_urls = {}

for event in events:
urls, url_hash = self.make_url_metadata(event)
Expand All @@ -190,6 +256,13 @@ async def handle_batch(self, *events):
scheme = "https" if url.startswith("https://") else "http"
port_probes[key][scheme] = url

# Only ports with BOTH schemes are subject to suppression — single-scheme
# OPEN_TCP_PORT probes (rare, but possible) stream through normally.
for key, schemes in port_probes.items():
if "http" in schemes and "https" in schemes:
paired_probe_urls[schemes["http"]] = key
paired_probe_urls[schemes["https"]] = key

if not stdin:
return

Expand All @@ -198,7 +271,6 @@ async def handle_batch(self, *events):
timeout = self.scan.blasthttp_timeout
retries = self.scan.blasthttp_retries

# Build batch configs
configs = []
for url in stdin:
config = blasthttp.BatchConfig(
Expand All @@ -212,110 +284,51 @@ async def handle_batch(self, *events):
)
configs.append(config)

# blasthttp batch returns a native coroutine via pyo3-async-runtimes
results = await self.client.request_batch(configs, self.threads)

# For OPEN_TCP_PORT probes, suppress redundant https when http already succeeded.
# When probing an unknown port, we try both http:// and https://. If http works,
# the port definitely speaks HTTP — the https result may be a proxy artifact
# (intercepting proxies like Burp terminate TLS themselves, making any https://
# URL "succeed" regardless of whether the target actually speaks TLS).
# If http fails but https succeeds, the port genuinely speaks TLS.
# Explicit URLs (URL_UNVERIFIED/URL) are never suppressed — this only applies
# to speculative OPEN_TCP_PORT probes.
suppressed_urls = set()
if port_probes:
successful_urls = {r.url for r in results if r.success and r.response.status != 0}
for key, schemes in port_probes.items():
http_url = schemes.get("http")
https_url = schemes.get("https")
if not (http_url and https_url):
# Suppress redundant https probes when http already succeeded for the same
# (host, port). When probing an unknown port, we try both schemes; if http
# works, the port definitely speaks HTTP, and the https result is likely a
# proxy artifact (intercepting proxies like Burp terminate TLS themselves,
# making any https:// URL "succeed" regardless of whether the target really
# speaks TLS). Explicit URL/URL_UNVERIFIED events are never suppressed —
# only speculative OPEN_TCP_PORT probes.
#
# Streaming requires per-pair coordination: emit http immediately, defer
# https until http's outcome is known (or the stream ends).
http_succeeded = {} # key -> bool, set when http result arrives
deferred_https = {} # key -> result, awaiting http verdict

async def resolve_https(key, result):
if http_succeeded.get(key) and result.success and result.response.status != 0:
self.debug(f"Suppressing https probe {result.url} (http already succeeded for {key})")
return
await self._process_result(result, stdin[result.url])

async for result in iter_batch_results(self.client.request_batch_stream(configs, concurrency=self.threads)):
key = paired_probe_urls.get(result.url)
if key is None:
# Non-paired URL — emit immediately
parent_event = stdin.get(result.url)
if parent_event is None:
self.warning(f"Unable to correlate parent event for: {result.url}")
continue
if http_url in successful_urls and https_url in successful_urls:
self.debug(f"Suppressing https probe {https_url} (http already succeeded: {http_url})")
suppressed_urls.add(https_url)

for i in range(len(results)):
result = results[i]
results[i] = None # free response body memory as we go
if not result.success:
self.debug(f"blasthttp error for {result.url}: {result.error}")
continue

response = result.response
status_code = response.status
if status_code == 0:
self.debug(f'No HTTP status code for "{result.url}"')
continue

if result.url in suppressed_urls:
await self._process_result(result, parent_event)
continue

# Map back to parent event using the input URL
parent_event = stdin.get(result.url, None)

if parent_event is None:
self.warning(f"Unable to correlate parent event for: {result.url}")
continue

url = response.url

# Build JSON dict for HTTP_RESPONSE event
# The "input" field represents the original scan target (host:port),
# not the full URL. Other modules and output consumers use this to
# correlate responses back to the target that produced them.
input_parsed = urlparse(result.url)
url_input = input_parsed.netloc or result.url
j = self._response_to_json(url_input, response)

# discard 404s from unverified URLs
path = j.get("path", "/")
if parent_event.type == "URL_UNVERIFIED" and status_code in (404,) and path != "/":
self.debug(f'Discarding 404 from "{url}"')
continue

# main URL
tags = [f"status-{status_code}"]

url_context = "{module} visited {event.parent.data} and got status code {event.http_status}"
if parent_event.type == "OPEN_TCP_PORT":
url_context += " at {event.data}"

url_event = self.make_event(
url,
"URL",
parent_event,
tags=tags,
context=url_context,
)
if url_event:
response_ip = j.get("host", "")
if response_ip:
url_event._resolved_hosts.add(response_ip)
title = j.get("title", "")
if title:
url_event.http_title = title
location = j.get("location", "")
if location:
url_event.redirect_location = location
if url_event != parent_event:
await self.emit_event(url_event)
# HTTP response
content_type = j.get("header", {}).get("content_type", "unspecified").split(";")[0]
content_length = j.get("content_length", 0)
content_length = self.helpers.bytes_to_human(content_length)
await self.emit_event(
j,
"HTTP_RESPONSE",
url_event,
tags=url_event.tags,
context=f"HTTP_RESPONSE was {content_length} with {content_type} content type",
)

# Store responses if configured
if self.store_responses:
response_dir = self.scan.home / "http_responses"
self.helpers.mkdir(response_dir)
filename = f"{j['host']}.{urlparse(url).port or 443}{path.replace('/', '[slash]')}.txt"
response_file = response_dir / filename
response_file.write_text(j.get("raw_header", "") + j.get("body", ""))
# Paired OPEN_TCP_PORT probe
is_http = result.url == port_probes[key]["http"]
if is_http:
http_succeeded[key] = result.success and result.response is not None and result.response.status != 0
await self._process_result(result, stdin[result.url])
# If https for this key arrived first and was buffered, resolve it now
pending = deferred_https.pop(key, None)
if pending is not None:
await resolve_https(key, pending)
else: # is https
if key in http_succeeded:
await resolve_https(key, result)
else:
deferred_https[key] = result

# Stream ended — any leftover https had no http result, so emit unconditionally
for key, result in deferred_https.items():
await self._process_result(result, stdin[result.url])
Loading
Loading