- Published on
TM1 and Apache Airflow
NOTE
I no longer work with TM1 nor is it likely I will again. I've left these posts here for posterity though.
Why use Airflow?
Inspired by the Airflow Summit 2020, I wanted to explore whether an orchestrator like Apache Airflow could be used to help manage tasks and workloads on a TM1 server.
For example, it's common to want to extract data from TM1 and load it into a data warehouse. Most approaches end up involving multiple tasks triggered and managed in different tools, possibly managed by different teams. None of the moving parts are complicated but the end-to-end journey tends to be flaky and hard to re-use.
The introduction of the REST API and development of TM1py have vastly increased the possibilities for interacting with a TM1 server programmatically. A tool like Airflow can help manage a single workflow that extracts the data and then loads it into the destination in the example above.
And that's just one use case. Any TI process or snippet can be triggered as part of an Airflow DAG and operations can be run on multiple servers, as well as other databases, centrally. This means all of these pipelines can be monitored in one place by one team instead of having to debug multiple points of potential failure.
So what is Airflow exactly?
In their words, "Airflow is a platform created by the community to programmatically author, schedule and monitor workflows". Most commonly, it gets used to orchestrate data pipelines. It was written in Python but can be used to schedule tasks written in other languages. It also provides built-in "hooks" for connecting to a wide range of third-party systems, particularly in the cloud/big data space. This allows you to manage and monitor all your ETL pipelines in a single place. This allows you to manage and monitor all your ETL pipelines in a single place.
The jobs themselves are written in code which means they can be version-controlled and tested. The Airflow docs lists the principles they try to follow. One thing it doesn't do out of the box is to connect to TM1 but it's easy to extend it with Python which then allows to leverage the power of TM1py.
Did it work?
Yes! At least in the basic use case I identified:
- Extract the data from a cube view and write this as a CSV to an S3 bucket
- Run a TI processes
- Detect whether a value in a cell met a certain condition
Putting it to the test
Extracting the data from a cube and writing it somewhere was most interesting to me initially. I chose S3 because of its widespread use but the same concept could easily be applied to writing to any other system. I was able to create an Airflow DAG that did this pretty easily. To simplify the management of the connection to TM1, I wrote a library that extends Airflow's base functionality. I've released the resulting code as airflow-tm1 on GitHub and published it to PyPi.
Note This depends on having the following:
- a working Airflow environment with support for S3 and airflow_tm1. Read more in the docs if you want to get started.
- connections createdfor TM1 and S3. Read more about managing Airflow connections for details.
- The TM1Hook requires at least the following to be specified:
- Host
- Login
- Port
- Extras
- ssl
The trick here is that ssl
needs to be defined in the JSON string in the Extras
field:
{ "ssl": false }
Creating a DAG
Airflow uses DAGs to manage ETL jobs and the project team are much better at explaining what they are than I am. In short, a DAG can be thought of as a list of tasks defined together in a Python script. The fundamental components of a DAG are hooks, which manage connections to other systems; operators, which complete an independent task; and sensors, which check whether a condition is true.
Here is the simple DAG I created to pull data from a cube and write it to S3. It uses the TM1Hook from airflow-tm1 to manage the connection to TM1py and transfers the data using a Python operator that leverages TM1py.
Import the necessary libraries
Import any other modules you want to use in your DAGs just as you would in any other Python context.
from airflow import DAG
from airflow.hooks.S3_hook import S3Hook
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago, timedelta
from airflow_tm1.hooks.tm1 import TM1Hook
Set defaults
These can be overwritten on a task-by-task basis.
# set defaults that DAG will pass to each task
default_args = {
"owner": "Airflow",
"start_date": days_ago(2),
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"email": "you@somewhere.com",
"retries": 1
}
Define the Python function the task will use
This gets run by our task in the DAG and manages the end-to-end transfer of the data from TM1 to S3.
def cube_view_to_s3(cube, view, bucket, key, **kwargs):
# instantiate our TM1Hook using the "tm1_default" connection
tm1_hook = TM1Hook(tm1_conn_id="tm1_default")
# return an instance of the TM1Service from TM1py
tm1 = tm1_hook.get_conn()
# pull data in csv format from specified cube view
view_data = tm1.cubes.cells.execute_view_csv(
cube_name=cube, view_name=view, private=False)
# instantiate S3Hook using the "s3_default" connection
s3_hook = S3Hook(aws_conn_id="s3_default")
# write the data to a key in the bucket specified
s3_hook.load_string(string_data=view_data, key=key,
bucket_name=bucket, replace=True)
Create the DAG and a single task
This DAG only has a single task t1
but will usually contain several more and manage how they depend on one another.
This DAG only has a single task `t1` but will usually contain several more and manage how they depend on one another.
```python
with DAG(dag_id="example_tm1_to_s3", schedule_interval="@daily", default_args=default_args) as dag:
t1 = PythonOperator(
task_id="view_to_S3",
# specifies the function we want to call
python_callable=cube_view_to_s3,
# and the arguments to pass it
op_kwargs={"cube": "Income Statement Reporting", "view": "Income Statement - Management",
"key": 'airflow-test/{{ ds_nodash }}.csv', "bucket": "scrambldbucket"},
dag=dag,
)
t1
Running the DAG
DAGs are saved in Airflow's DAG folder as *.py
files. Airflow has a built-in web UI to manage DAGs. They can also be managed from a cli which can be useful for testing purposes.
airflow trigger_dag example_tm1_to_s3
Results
I tested on small views, running TM1 and Airflow locally. I was able to transfer small datasets (less than 10mb) relatively quickly but I've not done any serious testing and, obviously, scalability would be a concern. For pulling large volumes of data, a good understanding of TM1, and the model itself, would be required to develop a sensible strategy. That said, it should work in some cases such as where summary data is required for a Tableau dashboard. Additionally, I'd expect exporting cube data via TI first would prove faster in many cases.
Custom Operators
The DAG above uses the PythonOperator
to call a custom function. For tasks that are likely to be used repeatedly, it's possible to create custom operators to provide a useful abstraction to tasks. There's no reason one couldn't create a custom operator that would replicate the function I used above, but I thought I'd start with something simpler. I created custom operators to trigger TI processes and Chores.
A DAG using a Custom Operator
Using the TM1RunTIOperator the task can be written in just a few lines of code specifying the process to run and their parameters. This specific task triggers a feeders refresh in a specific cube but could be used to run anything best handled by TI.
from airflow import DAG
from airflow.utils.dates import days_ago, timedelta
from airflow_tm1.operators.tm1_run_ti import TM1RunTIOperator
default_args = {
"owner": "Airflow",
"start_date": days_ago(2),
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"email": "you@somewhere.com",
"retries": 1
}
with DAG(dag_id="example_run_ti", default_args=default_args, schedule_interval="@daily") as dag:
t1 = TM1RunTIOperator(
task_id="run_ti",
process_name="Refresh Feeders",
parameters={"pCube": "Capital"},
dag=dag,
)
t1
Custom Sensors
Sensors are used to control flow in a DAG. In this DAG, an instance of the TM1CellValueSensor checks that a value in a control cube indicates that there is at least a draft submission available for OPEX for the entire company. In this DAG, it just triggers a dummy task but could be used to trigger an export of the OPEX data for example.
A DAG using a Custom Sensor
from operator import eq, ge, gt, le, lt, ne
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago, timedelta
from airflow_tm1.sensors.tm1_cell_value import TM1CellValueSensor
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": days_ago(2),
"retries": 0,
"retry_delay": timedelta(minutes=5),
}
with DAG(dag_id="example_value_sensor", default_args=default_args) as dag:
t1 = TM1CellValueSensor(
task_id='check_value',
# check every 15 minutes
poke_interval=60 * 15,
# timeout in 12 hours
timeout=60 * 60 * 12,
tm1_conn_id="tm1_default",
cube="Task Workflow",
value=1,
elements="OPEX,Total Company,Complete",
# apply the greater than operator
op=gt,
)
t2 = DummyOperator(task_id='do_nothing')
t1 >> t2
The t1 >> t2
specifies that task t1
will run before t2
. That is, the second won't be triggered until the condition on the sensor task has returned a value of true
.
Going further
This obviously just scratches at the surface but hopefully gives you an idea of the potential. I've started working on some example DAGS and will try to come up with a more realistic example.