Airflow Xcom — Exclusive
: Stores a value in the Airflow metadata database. Many operators (and any @task function) automatically push their return value to a special key called return_value by default.
When using other classic operators (like the BashOperator ), setting do_xcom_push=True forces the operator to push the last line of standard output ( stdout ) to the metadata database. Manual Pushing and Pulling
If you must store larger data in XComs, consider implementing a custom XCom backend (e.g., using S3 or GCS) to avoid overwhelming your PostgreSQL/MySQL metadata database. airflow xcom exclusive
Overview: store XCom-like payloads in a dedicated DB table with a status column (available, claimed, consumed). Use an atomic UPDATE ... WHERE status='available' RETURNING * (or SELECT FOR UPDATE) to claim a row.
To activate this backend globally, add the following environment variable to your Airflow deployment configuration: : Stores a value in the Airflow metadata database
from airflow.models.xcom import BaseXCom from airflow.providers.amazon.aws.hooks.s3 import S3Hook import json import uuid class S3XComBackend(BaseXCom): PREFIX = "xcoms/" BUCKET_NAME = "my-company-exclusive-xcom-bucket" @staticmethod def serialize(value, **kwargs): # Generate an exclusive, unique key for this specific task execution output key = f"S3XComBackend.PREFIXuuid.uuid4().json" s3_hook = S3Hook(aws_conn_id='aws_default') # Upload payload to isolated S3 path s3_hook.load_string( string_data=json.dumps(value), key=key, bucket_name=S3XComBackend.BUCKET_NAME, replace=True ) # Only the reference URI is stored in the Airflow Metadata DB return BaseXCom.serialize(key) @staticmethod def deserialize(result, **kwargs): # Retrieve the exclusive URI from the DB record key = BaseXCom.deserialize(result) s3_hook = S3Hook(aws_conn_id='aws_default') # Download the actual data from S3 file_content = s3_hook.read_key(key=key, bucket_name=S3XComBackend.BUCKET_NAME) return json.loads(file_content) Use code with caution.
def process_data_func(ti): # Exclusive pull: Only fetches from 'extract_task', ignoring all other XComs raw_data = ti.xcom_pull(task_ids='extract_task', key='return_value') return f"Processed: len(raw_data) items" Use code with caution. 3. Custom XCom Backends: Exclusive Storage Offloading Manual Pushing and Pulling If you must store
: Tasks retrieve data using xcom_pull() , which can be filtered by task_ids , dag_id , or a specific key . Advanced "Exclusive" Strategies
Here is an overview of XCom exclusivity, limitations, and best practices.
Teams looking for a more modern, code-first experience often consider as a strong alternative. Apache Airflow