development

기류 : DAG를 삭제하는 방법?

big-blog 2020. 12. 8. 18:57
반응형

기류 : DAG를 삭제하는 방법?


Airflow 웹 서버를 시작하고 일부 dags를 예약했습니다. 웹 GUI에서 dags를 볼 수 있습니다.

특정 DAG가 실행되고 웹 GUI에 표시되지 않도록 삭제하려면 어떻게해야합니까? 이를 수행하는 Airflow CLI 명령이 있습니까?

주위를 둘러 보았지만 DAG가로드되고 예약 된 후에는 DAG를 삭제하는 간단한 방법에 대한 답을 찾을 수 없었습니다.


8/27/18 편집-Airflow 1.10이 이제 PyPI에서 출시되었습니다!

https://pypi.org/project/apache-airflow/1.10.0/


DAG를 완전히 삭제하는 방법

이 기능은 이제 Airflow ≥ 1.10에 있습니다!

이제 Airflow에 DAG 제거를 추가 하는 PR # 2199 (Jira : AIRFLOW-1002 )가 병합되어 모든 관련 테이블에서 DAG의 항목을 완전히 삭제할 수 있습니다.

핵심 delete_dag (...) 코드는 이제 실험적 API의 일부이며 CLIREST API를 통해 사용할 수있는 진입 점이 있습니다 .

CLI :

airflow delete_dag my_dag_id

REST API (로컬로 웹 서버 실행) :

curl -X "DELETE" http://127.0.0.1:8080/api/experimental/dags/my_dag_id

REST API 관련 경고 : Airflow 클러스터 프로덕션에서 인증사용 하는지 확인하세요 .

Airflow 1.10 (현재) 설치 / 업그레이드

업그레이드하려면 다음 중 하나를 실행하십시오.

export SLUGIFY_USES_TEXT_UNIDECODE=yes

또는:

export AIRFLOW_GPL_UNIDECODE=yes

그때:

pip install -U apache-airflow

자세한 내용은 먼저 UPDATING.md 를 확인 하십시오!


이것은 기본 connection_id와 함께 PostgresHook을 사용하는 내 적응 코드입니다.

import sys
from airflow.hooks.postgres_hook import PostgresHook

dag_input = sys.argv[1]
hook=PostgresHook( postgres_conn_id= "airflow_db")

for t in ["xcom", "task_instance", "sla_miss", "log", "job", "dag_run", "dag" ]:
    sql="delete from {} where dag_id='{}'".format(t, dag_input)
    hook.run(sql, True)

Apache Airflow에 DAG를 삭제하는 명확하고 쉬운 방법이없는 이유를 잘 모르겠습니다.

https://issues.apache.org/jira/browse/AIRFLOW-1002에 제출 됨


특정 dag와 관련된 모든 것을 삭제하는 스크립트를 작성했지만 이것은 MySQL에만 해당됩니다. PostgreSQL을 사용하는 경우 다른 커넥터 방법을 작성할 수 있습니다. 원래 Lance가 https://groups.google.com/forum/#!topic/airbnb_airflow/GVsNsUxPRC0 에 게시 한 명령 은 스크립트에 넣었습니다. 도움이 되었기를 바랍니다. 형식 : python script.py dag_id

import sys
import MySQLdb

dag_input = sys.argv[1]

query = {'delete from xcom where dag_id = "' + dag_input + '"',
        'delete from task_instance where dag_id = "' + dag_input + '"',
        'delete from sla_miss where dag_id = "' + dag_input + '"',
        'delete from log where dag_id = "' + dag_input + '"',
        'delete from job where dag_id = "' + dag_input + '"',
        'delete from dag_run where dag_id = "' + dag_input + '"',
        'delete from dag where dag_id = "' + dag_input + '"' }

def connect(query):
        db = MySQLdb.connect(host="hostname", user="username", passwd="password", db="database")
        cur = db.cursor()
        cur.execute(query)
        db.commit()
        db.close()
        return

for value in query:
        print value
        connect(value)

DAG-s는 Airflow 1.10에서 삭제할 수 있지만 프로세스 및 작업 순서는 정확해야합니다. "달걀과 닭 문제"가 있습니다. 파일이 아직있는 동안 프런트 엔드에서 DAG를 삭제하면 DAG가 다시로드됩니다 (파일이 삭제되지 않았기 때문). 먼저 파일을 삭제하고 페이지를 새로 고치면 웹 GUI에서 더 이상 DAG를 삭제할 수 없습니다. 따라서 프런트 엔드에서 DAG를 삭제할 수있는 일련의 작업은 다음과 같습니다.

  1. DAG 파일 삭제 (제 경우에는 파이프 라인 저장소에서 삭제하고 Airflow 서버, 특히 스케줄러에 배포)
  2. 웹 GUI를 새로 고치지 마십시오.
  3. DAG보기 (일반 프론트 페이지)의 웹 GUI에서 "Delete dag"-> 여기에 이미지 설명 입력맨 오른쪽의 빨간색 아이콘을 클릭하십시오 .
  4. 데이터베이스에서이 DAG의 모든 나머지를 정리합니다.

기본 SQLite DB의 특정 dag와 관련된 모든 메타 데이터를 삭제하는 스크립트를 작성했습니다. 이것은 위의 예수님의 대답을 기반으로하지만 Postgres에서 SQLite로 조정되었습니다. 사용자는 ../airflow.db기본 airflow.db 파일 (일반적으로 ~/airflow)을 기준으로 script.py가 저장된 위치로 설정해야합니다 . 실행하려면 python script.py dag_id.

import sqlite3
import sys

conn = sqlite3.connect('../airflow.db')
c = conn.cursor()

dag_input = sys.argv[1]

for t in ["xcom", "task_instance", "sla_miss", "log", "job", "dag_run", "dag" ]:
    query = "delete from {} where dag_id='{}'".format(t, dag_input)
    c.execute(query)

conn.commit()
conn.close()

Airflow 1.10.1이 출시되었습니다. 이 릴리스에는 파일 시스템에서 해당 DAG를 삭제 한 후 웹 UI에서 DAG를 삭제하는 기능이 추가되었습니다.

See this ticket for more details:

[AIRFLOW-2657] Add ability to delete DAG from web ui

삭제 아이콘이있는 Airflow Links 메뉴

Please note that this doesn't actually delete the DAG from the file system, you will need to do this manually first otherwise the DAG will get reloaded.


There is nothing inbuilt in Airflow that does that for you. In order to delete the DAG, delete it from the repository and delete the database entries in the Airflow metastore table - dag.


You can clear a set of task instance, as if they never ran with:

airflow clear dag_id -s 2017-1-23 -e 2017-8-31

And then remove dag file from dags folder


Based on the answer of @OlegYamin, I'm doing the following to delete a dag backed by postgres, where airflow uses the public schema.

delete from public.dag_pickle where id = (
    select pickle_id from public.dag where dag_id = 'my_dag_id'
);
delete from public.dag_run where dag_id = 'my_dag_id';
delete from public.dag_stats where dag_id = 'my_dag_id';
delete from public.log where dag_id = 'my_dag_id';
delete from public.sla_miss where dag_id = 'my_dag_id';
delete from public.task_fail where dag_id = 'my_dag_id';
delete from public.task_instance where dag_id = 'my_dag_id';
delete from public.xcom where dag_id = 'my_dag_id';
delete from public.dag where dag_id = 'my_dag_id';

WARNING: The effect/correctness of the first delete query is unknown to me. It is just an assumption that it is needed.


just delete it from mysql, works fine for me. delete them from below tables:

  • dag

  • dag_constructor

  • dag_group_ship
  • dag_pickle
  • dag_run
  • dag_stats

(might be more tables in future release) then restart webserver and worker.


versions >= 1.10.0:

I have airflow version 1.10.2 and I tried executing airflow delete_dag command but the command throws following error:

bash-4.2# airflow delete_dag dag_id

[2019-03-16 15:37:20,804] {settings.py:174} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=28224 /usr/lib64/python2.7/site-packages/psycopg2/init.py:144: UserWarning: The psycopg2 wheel package will be renamed from release 2.8; in order to keep installing from binary please use "pip install psycopg2-binary" instead. For details see: http://initd.org/psycopg/docs/install.html#binary-install-from-pypi. """) This will drop all existing records related to the specified DAG. Proceed? (y/n)y Traceback (most recent call last): File "/usr/bin/airflow", line 32, in args.func(args) File "/usr/lib/python2.7/site-packages/airflow/utils/cli.py", line 74, in wrapper return f(*args, **kwargs) File "/usr/lib/python2.7/site-packages/airflow/bin/cli.py", line 258, in delete_dag raise AirflowException(err) airflow.exceptions.AirflowException: Server error

Though I am able to delete through Curl command. Please let me know if anyone have idea about this command's execution, is this known or I am doing something wrong.

versions <= 1.9.0:

There is not a command to delete a dag, so you need to first delete the dag file, and then delete all the references to the dag_id from the airflow metadata database.

WARNING

You can reset the airflow meta database, you will erase everything, including the dags, but remember that you will also erase the history, pools, variables, etc.

airflow resetdb and then airflow initdb


Remove the dag(you want to delete) from the dags folder and run airflow resetdb.

Alternatively, you can go into the airflow_db and manually delete those entries from the dag tables(task_fail, xcom, task_instance, sla_miss, log, job, dag_run, dag, dag_stats).


For those who are still finding answers. On Airflow version 1.8, its very difficult to delete a DAG, you can refer to answers above. But since 1.9 has been released, you just have to

remove the dag on the dags folder and restart webserver

참고URL : https://stackoverflow.com/questions/40651783/airflow-how-to-delete-a-dag

반응형