반응형

배경

GCP의 airflow에서 특정 DAG의 마지막 batch 시간 start_time을 '현재 시간'과 비교해서,

30분이상 차이가 날 경우, slack으로 알람을 보내줄 것이다. (30분이상 차이가 난다면 배치가 제대로 돌고 있지 않다는 것이므로)

 

 

구현

1. 일단 airflow에서 값을 가져오기 위한 RestAPI를 찾아보자. (버전은 1.10.3이다)

https://airflow.apache.org/docs/apache-airflow/1.10.3/api.html

원하는 API가 보인다. 각 DAG들의 latest DagRun을 리턴해주는 API. 이걸 사용해볼 예정이다.

 

 

2. API로 데이터 가져오기

일단 axios.get으로 API(http://airflow.~.com/api/experimental/latest_runs)를 쏘고, 응답을 확인해본다.

헤더에는 accountAuth 값을 포함해야 하는데, 이는 id:pw를 base64로 인코딩한 값이다.

코드

async function getDagInfoList(siteUri){
const res = await axios.get(siteUri, {
headers: {
Authorization: `Basic ${accountAuth}`
}
})
const dagInfoList = res;
console.log(dagInfoList);
return dagInfoList;
}

 

응답

저기 data:items 부분을 가져오면 될듯하다. data dagInfoList를 res.data로 수정하여 다시 가져와보자.

jane@HMH73K0H9Y jane-repo % node airflow.js
실행
{
  status: 200,
  statusText: 'OK',
  headers: {
    server: 'gunicorn/19.10.0',
    date: 'Sun, 19 Jun 2022 09:21:00 GMT',
    'content-type': 'application/json',
    'content-length': '6248',
    via: '1.1 google',
    connection: 'close'
  },
  config: {
    transitional: {
      silentJSONParsing: true,
      forcedJSONParsing: true,
      clarifyTimeoutError: false
    },
    adapter: [Function: httpAdapter],
    transformRequest: [ [Function: transformRequest] ],
    transformResponse: [ [Function: transformResponse] ],
중략
      'user-agent': [Array],
      host: [Array]
    }
  },
  data: {
    items: [
      [Object], [Object], [Object],
      [Object], [Object], [Object],
      [Object], [Object], [Object],
      [Object], [Object], [Object],
      [Object], [Object], [Object],
      [Object], [Object], [Object],
      [Object], [Object], [Object],
      [Object], [Object], [Object],
      [Object], [Object]
    ]
  }
}

 

변경 코드 부분

const dagInfoList = res.data;

 

응답

해당 site에 등록된 모든 DAG의 latestRun값의 items 값이 출력된다.

내가 사용할 값은 dag_id=b의 start_date이기 때문에 res를 더 깊은 res.data.items로 수정한다.


jane@HMH73K0H9Y jane-repo % node airflow.js
실행
{
  items: [
    {
      dag_id: 'a',
      dag_run_url: '/admin/airflow/graph?dag_id=a&execution_date=2022-01-03+00%3A00%3A00%2B00%3A00',
      execution_date: '2022-01-03T00:00:00+00:00',
      start_date: '2022-01-04T03:44:50.593295+00:00'
    },
    {
      dag_id: 'b',
      dag_run_url: '/admin/airflow/graph?dag_id=b&execution_date=2022-05-05+18%3A30%3A00%2B00%3A00',
      execution_date: '2022-05-05T18:30:00+00:00',
      start_date: '2022-06-05T18:30:02.388330+00:00'
    },
    {
      dag_id: 'c',
      dag_run_url: '/admin/airflow/graph?dag_id=c&execution_date=2022-06-19+09%3A10%3A00%2B00%3A00',
      execution_date: '2022-06-19T09:10:00+00:00',
      start_date: '2022-06-19T09:15:01.873317+00:00'
중략

 

변경 코드

const dagInfoList = res.data.items;

 

응답

jane@HMH73K0H9Y jane-repo % node airflow.js 
실행
[
  {
    dag_id: 'a',
    dag_run_url: '/admin/airflow/graph?dag_id=a&execution_date=2022-01-03+00%3A00%3A00%2B00%3A00',
    execution_date: '2022-01-03T00:00:00+00:00',
    start_date: '2022-01-04T03:44:50.593295+00:00'
  },
  {
    dag_id: 'b',
    dag_run_url: '/admin/airflow/graph?dag_id=b&execution_date=2022-05-05+18%3A30%3A00%2B00%3A00',
중략

 

3. 내가 원하는 부분의 start_data만 추출하기

위에서 저장한 dagInfoList에서 targetDagId를 찾는 함수다. targetDagId는 전역변수로 미리 설정되어 있으며,

나는 a라는 DagId의 start_date만 필요하기 때문에 이를 return 값으로 가져온다.

const targetDagId = 'a';
function findTargetDagStartTime(dagInfoList, targetDagId){
const TargetDagInfo = dagInfoList.find((daginfo) => daginfo.dag_id === targetDagId);
const startTime = TargetDagInfo.start_date;
return startTime;
}

 

4. timegap 계산 (분 단위 계산)

현재 시간과 target시간을 각각 생성하여 현재 시간에서 target시간을 빼준다.

그리고 이는 정수값이기 때문에 /1000/60을 해서 분 단위의 gap을 찾는다. 여기서 초 단위를 하려면 /60대신 /3600을 하면 된다.

function calculateTimeGap(targetDagStartTime){
const now = new Date();
const TargetTime = new Date(targetDagStartTime);
console.log(`CurrentTime(KST): ${now}`);
console.log(`Target_Time(KST): ${TargetTime}`);

const timeGap = now.getTime() - TargetTime.getTime();
const minTimeGap = timeGap / 1000 / 60;
console.log(`* minTimeGap: ${minTimeGap}\n`);
return minTimeGap;
}

 

5.  timegap이 30분보다 크면 slack으로 메세지 전송

function messageToSlack(uri, minTimeGap){
webhook.send({
text: `${uri}]\n* minTimeGap: ${minTimeGap}\n`
})
}


// maincode
async function start(uri){
const dagInfoList = await getDagInfoList(uri);
console.log('Site: ' + uri);

const targetDagStartTime = findTargetDagStartTime(dagInfoList, targetDagId);
const minTimeGap = calculateTimeGap(targetDagStartTime);

if (minTimeGap > 30) messageToSlack(uri, minTimeGap);
}

 

6.  cron으로 매 초마다 timegap 체크

/*
* test
*/

cron.schedule('* * * * *', () => {
console.log('실행');
start(airflowabcUri); // 나는 두 airflow 사이트 값이 필요해서 두 개의 uri를 전역변수로 만들어줬다.
start(airflowdefUri);
});

 

7. 전체 코드 

const cron = require('node-cron');
const axios = require('axios');
const accountAuth = 'abcdefg';   // base64로 id:pw값을 인코딩해서 넣어줘야 한다.
const targetDagId = 'a';

const { IncomingWebhook } = require('@slack/webhook');
const slackUri = 'https://hooks.slack.com/services/abcdefg';  // webhook 생성 후 나오는 uri
const webhook = new IncomingWebhook(slackUri);

/************************************
* function
*************************************/

async function getDagInfoList(siteUri){
const res = await axios.get(siteUri, {
headers: {
Authorization: `Basic ${accountAuth}`
}
})
const dagInfoList = res.data.items;
console.log(dagInfoList);
return dagInfoList;
}

function findTargetDagStartTime(dagInfoList, targetDagId){
const TargetDagInfo = dagInfoList.find((daginfo) => daginfo.dag_id === targetDagId);
const startTime = TargetDagInfo.start_date;
return startTime;
}

function calculateTimeGap(targetDagStartTime){
const now = new Date();
const TargetTime = new Date(targetDagStartTime);
console.log(`CurrentTime(KST): ${now}`);
console.log(`Target_Time(KST): ${TargetTime}`);

const timeGap = now.getTime() - TargetTime.getTime();
const minTimeGap = timeGap / 1000 / 60;
console.log(`* minTimeGap: ${minTimeGap}\n`);
return minTimeGap;
}

function messageToSlack(uri, minTimeGap){
webhook.send({
text: `${uri}]\n* minTimeGap: ${minTimeGap}\n`
})
}

async function start(uri){
const dagInfoList = await getDagInfoList(uri);
console.log('Site: ' + uri);

const targetDagStartTime = findTargetDagStartTime(dagInfoList, targetDagId);
const minTimeGap = calculateTimeGap(targetDagStartTime);

if (minTimeGap > 30) messageToSlack(uri, minTimeGap);
}

/************************************
* test
*************************************/

cron.schedule('* * * * *', () => {
console.log('실행');
start(airflowAUri);
start(airflowBUri);
});

 

그럼 실행결과는 이렇게 된다.

jane@HMH73K0H9Y jane-repo % node airflow.js
실행
Site: http://airflow.abc.com/api/experimental/latest_runs
CurrentTime(KST): Sun Jun 19 2022 18:52:01 GMT+0900 (대한민국 표준시)
Target_Time(KST): Sun Jun 19 2022 18:47:03 GMT+0900 (대한민국 표준시)
* minTimeGap: 4.967416666666667


Site: http://airflow.def.com/api/experimental/latest_runs
CurrentTime(KST): Sun Jun 19 2022 18:52:01 GMT+0900 (대한민국 표준시)
Target_Time(KST): Sun Jun 19 2022 18:47:01 GMT+0900 (대한민국 표준시)
* minTimeGap: 4.991966666666666

^C

 

slack으로 webhook은 이렇게 들어온다. (gap이 30이 넘을 시에만 메세지가 오게 만들었으나, test를 위해 찍어보았다)

 

반응형

+ Recent posts