DAG: engines ROOT: Daily-Reports

schedule: @daily


engines

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
"""
Code that goes along with the Airflow located at:
http://airflow.readthedocs.org/en/latest/tutorial.html
"""
from airflow import DAG
from airflow.operators.docker_operator import DockerOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta


default_args = {
    "owner": "airflow",
    "start_date": datetime(2020, 12, 17),
    "email": ["airflow@airflow.com"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 2,
    "depends_on_past": False,
    "retry_delay": timedelta(minutes=5),
    'pool': 'engines',
}

start_date_string = default_args['start_date'].strftime('%Y-%m-%d')

#-- DDG Settings
engine_dag = DAG("engines", catchup=False, default_args=default_args, schedule_interval="@daily")
#--- (DAG, TASK) pairing
start_task = DummyOperator(task_id="Start", dag=engine_dag)
end_task = DummyOperator(task_id="End", dag=engine_dag)
wait_task = DummyOperator(task_id="Wait", dag=engine_dag)
engines = []
#ALL_ENGINES = ['email_engine', 'ddg_company_engine']
ALL_ENGINES = ['email_engine']
for _engine in ALL_ENGINES:
    command = 'python -m engines.{}'.format(_engine)
    task_id = _engine
    task_to_run = DockerOperator(
        task_id=task_id,
        image='engines:latest',
        command=command,
        docker_url='unix://var/run/docker.sock',
        environment={'RUN_MODE':'PROD'},
        network_mode='host',
        dag=engine_dag
    )
    engines.append(task_to_run)

command_es = 'python -m engines.es_engine'
es_task = DockerOperator(
    task_id="ES-Engine",
    image='engines:latest',
    command=command_es,
    docker_url='unix://var/run/docker.sock',
    environment={'RUN_MODE':'PROD'},
    network_mode='host',
    dag=engine_dag
)
command_loc = 'python -m engines.location_engine'
loc_task = DockerOperator(
    task_id="Location-Engine",
    image='engines:latest',
    command=command_loc,
    docker_url='unix://var/run/docker.sock',
    environment={'RUN_MODE':'PROD'},
    network_mode='host',
    dag=engine_dag
)
reports = 'python3 engines/reportsEngine.py'
reports_task = DockerOperator(task_id="Daily-Reports",image='engines:latest',command=reports,docker_url='unix://var/run/docker.sock',environment={'RUN_MODE':'PROD'},network_mode='host',dag=engine_dag)
#start_task >> engines >> wait_task >> end_task
start_task >> engines >> loc_task >> es_task >> reports_task  >> end_task