Failure detection - Python SDK
This page shows how to do the following:
- Raise and Handle Exceptions
- Deliberately Fail Workflows
- Set Workflow Timeouts
- Set Workflow Retries
- Set Activity Timeouts
- Set an Activity Retry Policy
- Heartbeat an Activity
Raise and Handle Exceptions
In each Temporal SDK, error handling is implemented idiomatically, following the conventions of the language.
Temporal uses several different error classes internally — for example, CancelledError
in the Python SDK, to handle a Workflow cancellation.
You should not raise or otherwise implement these manually, as they are tied to Temporal platform logic.
The one Temporal error class that you will typically raise deliberately is ApplicationError
.
In fact, any other exceptions that are raised from your Python code in a Temporal Activity will be converted to an ApplicationError
internally.
This way, an error's type, severity, and any additional details can be sent to the Temporal Service, indexed by the Web UI, and even serialized across language boundaries.
In other words, these two code samples do the same thing:
class MyCustomError(Exception):
def __init__(self, message, error_code):
super().__init__(message)
self.error_code = error_code
def __str__(self):
return f"{self.message} (Error Code: {self.error_code})"
@activity.defn
async def my_activity(input: MyActivityInput):
try:
# Your activity logic goes here
except Exception as e:
raise MyCustomError(
f"Error encountered on attempt {attempt}",
) from e
from temporalio.exceptions import ApplicationError
@activity.defn
async def my_activity(input: MyActivityInput):
try:
# Your activity logic goes here
except Exception as e:
raise ApplicationError(
type="MyCustomError",
message=f"Error encountered on attempt {attempt}",
) from e
Depending on your implementation, you may decide to use either method.
One reason to use the Temporal ApplicationError
class is because it allows you to set an additional non_retryable
parameter.
This way, you can decide whether an error should not be retried automatically by Temporal.
This can be useful for deliberately failing a Workflow due to bad input data, rather than waiting for a timeout to elapse:
from temporalio.exceptions import ApplicationError
@activity.defn
async def my_activity(input: MyActivityInput):
try:
# Your activity logic goes here
except Exception as e:
raise ApplicationError(
type="MyNonRetryableError",
message=f"Error encountered on attempt {attempt}",
non_retryable=True,
) from e
You can alternately specify a list of errors that are non-retryable in your Activity Retry Policy.
Failing Workflows
One of the core design principles of Temporal is that an Activity Failure will never directly cause a Workflow Failure — a Workflow should never return as Failed unless deliberately.
The default retry policy associated with Temporal Activities is to retry them until reaching a certain timeout threshold.
Activities will not actually return a failure to your Workflow until this condition or another non-retryable condition is met.
At this point, you can decide how to handle an error returned by your Activity the way you would in any other program.
For example, you could implement a Saga Pattern that uses try
and except
blocks to "unwind" some of the steps your Workflow has performed up to the point of Activity Failure.
You will only fail a Workflow by manually raising an ApplicationError
from the Workflow code.
You could do this in response to an Activity Failure, if the failure of that Activity means that your Workflow should not continue:
try:
credit_card_confirmation = await workflow.execute_activity_method()
except ActivityError as e:
workflow.logger.error(f"Unable to process credit card {e.message}")
raise ApplicationError(
"Unable to process credit card", "CreditCardProcessingError"
)
This works differently in a Workflow than raising exceptions from Activities.
In an Activity, any Python exceptions or custom exceptions are converted to a Temporal ApplicationError
.
In a Workflow, any exceptions that are raised other than an explicit Temporal ApplicationError
will only fail that particular Workflow Task and be retried.
This includes any typical Python runtime errors like a NameError
or a TypeError
that are raised automatically.
These errors are treated as bugs that can be corrected with a fixed deployment, rather than a reason for a Temporal Workflow Execution to return unexpectedly.
Workflow timeouts
How to set Workflow timeouts using the Temporal Python SDK
Each Workflow timeout controls the maximum duration of a different aspect of a Workflow Execution.
Before we continue, we want to note that we generally do not recommend setting Workflow Timeouts, because Workflows are designed to be long-running and resilient. Instead, setting a Timeout can limit its ability to handle unexpected delays or long-running processes. If you need to perform an action inside your Workflow after a specific period of time, we recommend using a Timer.
Workflow timeouts are set when starting the Workflow Execution.
- Workflow Execution Timeout - restricts the maximum amount of time that a single Workflow Execution can be executed.
- Workflow Run Timeout: restricts the maximum amount of time that a single Workflow Run can last.
- Workflow Task Timeout: restricts the maximum amount of time that a Worker can execute a Workflow Task.
Set the timeout to either the start_workflow()
or execute_workflow()
asynchronous methods.
Available timeouts are:
execution_timeout
run_timeout
task_timeout
View the source code
in the context of the rest of the application code.
# ...
result = await client.execute_workflow(
YourWorkflow.run,
"your timeout argument",
id="your-workflow-id",
task_queue="your-task-queue",
# Set Workflow Timeout duration
execution_timeout=timedelta(seconds=2),
# run_timeout=timedelta(seconds=2),
# task_timeout=timedelta(seconds=2),
)
Workflow retries
How to set a Workflow Retry Policy using the Temporal Python SDK
A Retry Policy can work in cooperation with the timeouts to provide fine controls to optimize the execution experience.
Use a Retry Policy to retry a Workflow Execution in the event of a failure.
Workflow Executions do not retry by default, and Retry Policies should be used with Workflow Executions only in certain situations.
Set the Retry Policy to either the start_workflow()
or execute_workflow()
asynchronous methods.
View the source code
in the context of the rest of the application code.
# ...
handle = await client.execute_workflow(
YourWorkflow.run,
"your retry policy argument",
id="your-workflow-id",
task_queue="your-task-queue",
retry_policy=RetryPolicy(maximum_interval=timedelta(seconds=2)),
)
Set Activity timeouts
How to set an Activity Execution Timeout using the Temporal Python SDK
Each Activity timeout controls the maximum duration of a different aspect of an Activity Execution.
The following timeouts are available in the Activity Options.
- Schedule-To-Close Timeout: is the maximum amount of time allowed for the overall Activity Execution.
- Start-To-Close Timeout: is the maximum time allowed for a single Activity Task Execution.
- Schedule-To-Start Timeout: is the maximum amount of time that is allowed from when an Activity Task is scheduled to when a Worker starts that Activity Task.
An Activity Execution must have either the Start-To-Close or the Schedule-To-Close Timeout set.
Activity options are set as keyword arguments after the Activity arguments.
Available timeouts are:
- schedule_to_close_timeout
- schedule_to_start_timeout
- start_to_close_timeout
View the source code
in the context of the rest of the application code.
# ...
activity_timeout_result = await workflow.execute_activity(
your_activity,
YourParams(greeting, "Activity Timeout option"),
# Activity Execution Timeout
start_to_close_timeout=timedelta(seconds=10),
# schedule_to_start_timeout=timedelta(seconds=10),
# schedule_to_close_timeout=timedelta(seconds=10),
)
Set an Activity Retry Policy
How to set an Activity Retry Policy using the Temporal Python SDK
A Retry Policy works in cooperation with the timeouts to provide fine controls to optimize the execution experience.
Activity Executions are automatically associated with a default Retry Policy if a custom one is not provided.
To create an Activity Retry Policy in Python, set the RetryPolicy class within the start_activity()
or execute_activity()
function.
View the source code
in the context of the rest of the application code.
from temporalio.common import RetryPolicy
# ...
activity_result = await workflow.execute_activity(
your_activity,
YourParams(greeting, "Retry Policy options"),
start_to_close_timeout=timedelta(seconds=10),
# Retry Policy
retry_policy=RetryPolicy(
backoff_coefficient=2.0,
maximum_attempts=5,
initial_interval=timedelta(seconds=1),
maximum_interval=timedelta(seconds=2),
# non_retryable_error_types=["ValueError"],
),
)
Override the retry interval with next_retry_delay
To override the next retry interval set by the current policy, pass next_retry_delay
when raising an ApplicationError in an Activity.
This value replaces and overrides whatever the retry interval would normally be on the retry policy.
For example, you can set the delay interval based on an Activity's attempt count. In the following example, the retry delay starts at 3 seconds after the first attempt. It increases to 6 seconds for the second attempt, 9 seconds for the third attempt, and so forth. This creates a steadily increasing backoff, versus the exponential approach used by backoff coefficients:
from temporalio.exceptions import ApplicationError
from datetime import timedelta
@activity.defn
async def my_activity(input: MyActivityInput):
try:
# Your activity logic goes here
except Exception as e:
attempt = activity.info().attempt
raise ApplicationError(
f"Error encountered on attempt {attempt}",
next_retry_delay=timedelta(seconds=3 * attempt),
) from e
Heartbeat an Activity
How to Heartbeat an Activity using the Temporal Python SDK
An Activity Heartbeat is a ping from the Worker Process that is executing the Activity to the Temporal Service. Each Heartbeat informs the Temporal Service that the Activity Execution is making progress and the Worker has not crashed. If the Temporal Service does not receive a Heartbeat within a Heartbeat Timeout time period, the Activity will be considered failed and another Activity Task Execution may be scheduled according to the Retry Policy.
Heartbeats may not always be sent to the Temporal Service—they may be throttled by the Worker.
Activity Cancellations are delivered to Activities from the Temporal Service when they Heartbeat. Activities that don't Heartbeat can't receive a Cancellation. Heartbeat throttling may lead to Cancellation getting delivered later than expected.
Heartbeats can contain a details
field describing the Activity's current progress.
If an Activity gets retried, the Activity can access the details
from the last Heartbeat that was sent to the Temporal Service.
To Heartbeat an Activity Execution in Python, use the heartbeat()
API.
@activity.defn
async def your_activity_definition() -> str:
activity.heartbeat("heartbeat details!")
In addition to obtaining cancellation information, Heartbeats also support detail data that persists on the server for retrieval during Activity retry.
If an Activity calls heartbeat(123, 456)
and then fails and is retried, heartbeat_details
returns an iterable containing 123
and 456
on the next Run.
Set a Heartbeat Timeout
How to set a Heartbeat Timeout using the Temporal Python SDK
A Heartbeat Timeout works in conjunction with Activity Heartbeats.
heartbeat_timeout
is a class variable for the start_activity()
function used to set the maximum time between Activity Heartbeats.
workflow.start_activity(
activity="your-activity",
schedule_to_close_timeout=timedelta(seconds=5),
heartbeat_timeout=timedelta(seconds=1),
)
execute_activity()
is a shortcut for start_activity()
that waits on its result.
To get just the handle to wait and cancel separately, use start_activity()
. execute_activity()
should be used in most cases unless advanced task capabilities are needed.
workflow.execute_activity(
activity="your-activity",
name,
schedule_to_close_timeout=timedelta(seconds=5),
heartbeat_timeout=timedelta(seconds=1),
)