These two little guys might seem like they’re just there for show, but trust me when I say that they pack a punch. Let’s start with Task.cancelled().
Task.cancelled() is essentially the “is cancelled” method of asyncio’s Task class. It returns True if the task has been requested to be cancelled and False otherwise. This can come in handy when you want to check whether a particular task should continue running or not. For example:
# Import necessary libraries
import time
import asyncio
# Define a coroutine function called my_task
async def my_task():
while True:
print("I'm still alive!")
await asyncio.sleep(1) # Use await to pause the execution of the coroutine and allow other tasks to run
# This line will never be reached if the task is cancelled
print("This won't happen.")
# Check if the task has been cancelled
if __name__ == '__main__':
loop = asyncio.get_event_loop() # Get the event loop
task = loop.create_task(my_task()) # Create a task from the coroutine function
time.sleep(3) # Wait for 3 seconds before cancelling the task
if task.cancelled(): # Check if the task has been cancelled
print("Task has been cancelled!")
else:
task.cancel() # Cancel the task
print("Cancelling task...")
loop.run_until_complete(task) # Wait for the task to finish or be cancelled
In this example, we create a simple task that prints “I’m still alive!” every second and waits indefinitely using an infinite while loop. We then start the task and wait for 3 seconds before checking whether it has been cancelled using Task.cancelled(). If it hasn’t been cancelled yet, we cancel it manually and print a message to indicate that cancellation is happening. Finally, we run_until_complete() on our event loop to ensure that all tasks are finished or have been cancelled.
Now uncancel(). This method decrements the count of cancellation requests for a task. It returns the remaining number of cancellation requests after it has been called. Here’s an example:
# Import necessary libraries
import time
import asyncio
# Define an asynchronous function called my_task
async def my_task():
while True:
print("I'm still alive!")
await asyncio.sleep(1)
# This line will never be reached if the task is cancelled
print("This won't happen.")
if __name__ == '__main__':
# Get the event loop
loop = asyncio.get_event_loop()
# Create a task using the my_task function
task = loop.create_task(my_task())
# Wait for 3 seconds before cancelling the task
time.sleep(3)
# Check if the task has been cancelled
if task.cancelled():
print("Task has been cancelled!")
else:
# Cancel the task
task.cancel()
print("Cancelling task...")
# Uncancel the task to allow it to continue running
task.uncancel()
# Run the event loop until the task is completed or cancelled
loop.run_until_complete(task)
In this example, we’re doing almost exactly what we did in our previous one, but after cancelling the task and printing a message indicating that cancellation is happening, we call uncancel() on the task. This allows us to continue running the task as if nothing happened!
Now you might be wondering why we would ever want to do this. Well, sometimes it’s useful to have tasks that are cancellable but can also be “uncancelled” for certain reasons. For example, let’s say you’re writing a web server and you want to gracefully shut down the server by cancelling all outstanding requests using asyncio.TaskGroup or asyncio.timeout(). However, if there is an error during the shutdown process (e.g., a database connection fails), it might be useful to “uncancel” those tasks so that they can continue running and retrying their operations.
I hope this guide has helped you understand Task.cancelled() and uncancel(). Remember, these methods are powerful tools for managing the lifecycle of your asyncio tasks!