배경
GCP의 airflow에서 특정 DAG의 마지막 batch 시간 start_time을 '현재 시간'과 비교해서,
30분이상 차이가 날 경우, slack으로 알람을 보내줄 것이다. (30분이상 차이가 난다면 배치가 제대로 돌고 있지 않다는 것이므로)
구현
1. 일단 airflow에서 값을 가져오기 위한 RestAPI를 찾아보자. (버전은 1.10.3이다)
https://airflow.apache.org/docs/apache-airflow/1.10.3/api.html
원하는 API가 보인다. 각 DAG들의 latest DagRun을 리턴해주는 API. 이걸 사용해볼 예정이다.
2. API로 데이터 가져오기
일단 axios.get으로 API(http://airflow.~.com/api/experimental/latest_runs)를 쏘고, 응답을 확인해본다.
헤더에는 accountAuth 값을 포함해야 하는데, 이는 id:pw를 base64로 인코딩한 값이다.
코드
async function getDagInfoList(siteUri){
const res = await axios.get(siteUri, {
headers: {
Authorization: `Basic ${accountAuth}`
}
})
const dagInfoList = res;
console.log(dagInfoList);
return dagInfoList;
}
|
응답
저기 data:items 부분을 가져오면 될듯하다. data dagInfoList를 res.data로 수정하여 다시 가져와보자.
jane@HMH73K0H9Y jane-repo % node airflow.js 실행 { status: 200, statusText: 'OK', headers: { server: 'gunicorn/19.10.0', date: 'Sun, 19 Jun 2022 09:21:00 GMT', 'content-type': 'application/json', 'content-length': '6248', via: '1.1 google', connection: 'close' }, config: { transitional: { silentJSONParsing: true, forcedJSONParsing: true, clarifyTimeoutError: false }, adapter: [Function: httpAdapter], transformRequest: [ [Function: transformRequest] ], transformResponse: [ [Function: transformResponse] ], 중략 'user-agent': [Array], host: [Array] } }, data: { items: [ [Object], [Object], [Object], [Object], [Object], [Object], [Object], [Object], [Object], [Object], [Object], [Object], [Object], [Object], [Object], [Object], [Object], [Object], [Object], [Object], [Object], [Object], [Object], [Object], [Object], [Object] ] } } |
변경 코드 부분
const dagInfoList = res.data;
|
응답
해당 site에 등록된 모든 DAG의 latestRun값의 items 값이 출력된다.
내가 사용할 값은 dag_id=b의 start_date이기 때문에 res를 더 깊은 res.data.items로 수정한다.
jane@HMH73K0H9Y jane-repo % node airflow.js 실행 { items: [ { dag_id: 'a', dag_run_url: '/admin/airflow/graph?dag_id=a&execution_date=2022-01-03+00%3A00%3A00%2B00%3A00', execution_date: '2022-01-03T00:00:00+00:00', start_date: '2022-01-04T03:44:50.593295+00:00' }, { dag_id: 'b', dag_run_url: '/admin/airflow/graph?dag_id=b&execution_date=2022-05-05+18%3A30%3A00%2B00%3A00', execution_date: '2022-05-05T18:30:00+00:00', start_date: '2022-06-05T18:30:02.388330+00:00' }, { dag_id: 'c', dag_run_url: '/admin/airflow/graph?dag_id=c&execution_date=2022-06-19+09%3A10%3A00%2B00%3A00', execution_date: '2022-06-19T09:10:00+00:00', start_date: '2022-06-19T09:15:01.873317+00:00' 중략 |
변경 코드
const dagInfoList = res.data.items;
|
응답
jane@HMH73K0H9Y jane-repo % node airflow.js 실행 [ { dag_id: 'a', dag_run_url: '/admin/airflow/graph?dag_id=a&execution_date=2022-01-03+00%3A00%3A00%2B00%3A00', execution_date: '2022-01-03T00:00:00+00:00', start_date: '2022-01-04T03:44:50.593295+00:00' }, { dag_id: 'b', dag_run_url: '/admin/airflow/graph?dag_id=b&execution_date=2022-05-05+18%3A30%3A00%2B00%3A00', 중략 |
3. 내가 원하는 부분의 start_data만 추출하기
위에서 저장한 dagInfoList에서 targetDagId를 찾는 함수다. targetDagId는 전역변수로 미리 설정되어 있으며,
나는 a라는 DagId의 start_date만 필요하기 때문에 이를 return 값으로 가져온다.
function findTargetDagStartTime(dagInfoList, targetDagId){
const TargetDagInfo = dagInfoList.find((daginfo) => daginfo.dag_id === targetDagId);
const startTime = TargetDagInfo.start_date;
return startTime;
}
|
4. timegap 계산 (분 단위 계산)
현재 시간과 target시간을 각각 생성하여 현재 시간에서 target시간을 빼준다.
그리고 이는 정수값이기 때문에 /1000/60을 해서 분 단위의 gap을 찾는다. 여기서 초 단위를 하려면 /60대신 /3600을 하면 된다.
function calculateTimeGap(targetDagStartTime){
const now = new Date();
const TargetTime = new Date(targetDagStartTime);
console.log(`CurrentTime(KST): ${now}`);
console.log(`Target_Time(KST): ${TargetTime}`);
const timeGap = now.getTime() - TargetTime.getTime();
const minTimeGap = timeGap / 1000 / 60;
console.log(`* minTimeGap: ${minTimeGap}\n`);
return minTimeGap;
}
|
5. timegap이 30분보다 크면 slack으로 메세지 전송
function messageToSlack(uri, minTimeGap){
webhook.send({
text: `${uri}]\n* minTimeGap: ${minTimeGap}\n`
})
}
// maincode async function start(uri){
const dagInfoList = await getDagInfoList(uri);
console.log('Site: ' + uri);
const targetDagStartTime = findTargetDagStartTime(dagInfoList, targetDagId);
const minTimeGap = calculateTimeGap(targetDagStartTime);
if (minTimeGap > 30) messageToSlack(uri, minTimeGap);
}
|
6. cron으로 매 초마다 timegap 체크
/*
* test
*/
cron.schedule('* * * * *', () => {
console.log('실행');
start(airflowabcUri); // 나는 두 airflow 사이트 값이 필요해서 두 개의 uri를 전역변수로 만들어줬다.
start(airflowdefUri);
});
|
7. 전체 코드
const cron = require('node-cron');
const axios = require('axios');
const accountAuth = 'abcdefg'; // base64로 id:pw값을 인코딩해서 넣어줘야 한다.
const targetDagId = 'a';
const { IncomingWebhook } = require('@slack/webhook');
const webhook = new IncomingWebhook(slackUri);
/************************************
* function
*************************************/
async function getDagInfoList(siteUri){
const res = await axios.get(siteUri, {
headers: {
Authorization: `Basic ${accountAuth}`
}
})
const dagInfoList = res.data.items;
console.log(dagInfoList);
return dagInfoList;
}
function findTargetDagStartTime(dagInfoList, targetDagId){
const TargetDagInfo = dagInfoList.find((daginfo) => daginfo.dag_id === targetDagId);
const startTime = TargetDagInfo.start_date;
return startTime;
}
function calculateTimeGap(targetDagStartTime){
const now = new Date();
const TargetTime = new Date(targetDagStartTime);
console.log(`CurrentTime(KST): ${now}`);
console.log(`Target_Time(KST): ${TargetTime}`);
const timeGap = now.getTime() - TargetTime.getTime();
const minTimeGap = timeGap / 1000 / 60;
console.log(`* minTimeGap: ${minTimeGap}\n`);
return minTimeGap;
}
function messageToSlack(uri, minTimeGap){
webhook.send({
text: `${uri}]\n* minTimeGap: ${minTimeGap}\n`
})
}
async function start(uri){
const dagInfoList = await getDagInfoList(uri);
console.log('Site: ' + uri);
const targetDagStartTime = findTargetDagStartTime(dagInfoList, targetDagId);
const minTimeGap = calculateTimeGap(targetDagStartTime);
if (minTimeGap > 30) messageToSlack(uri, minTimeGap);
}
/************************************
* test
*************************************/
cron.schedule('* * * * *', () => {
console.log('실행');
start(airflowAUri);
start(airflowBUri);
});
|
그럼 실행결과는 이렇게 된다.
jane@HMH73K0H9Y jane-repo % node airflow.js 실행 Site: http://airflow.abc.com/api/experimental/latest_runs CurrentTime(KST): Sun Jun 19 2022 18:52:01 GMT+0900 (대한민국 표준시) Target_Time(KST): Sun Jun 19 2022 18:47:03 GMT+0900 (대한민국 표준시) * minTimeGap: 4.967416666666667 Site: http://airflow.def.com/api/experimental/latest_runs CurrentTime(KST): Sun Jun 19 2022 18:52:01 GMT+0900 (대한민국 표준시) Target_Time(KST): Sun Jun 19 2022 18:47:01 GMT+0900 (대한민국 표준시) * minTimeGap: 4.991966666666666 ^C |
slack으로 webhook은 이렇게 들어온다. (gap이 30이 넘을 시에만 메세지가 오게 만들었으나, test를 위해 찍어보았다)
끝
'직장생활 > Javascript, Nodejs' 카테고리의 다른 글
Axios 기본 및 사용법, 옵션 (0) | 2022.05.31 |
---|---|
[Nodejs] mac(ios) Nodejs, nvm 설치 (0) | 2022.05.28 |
[javascript] JSON.parse, JSON.stringify, toString() 차이 (0) | 2022.05.23 |
[javascript] '=='와 '==='의 차이 (0) | 2022.05.23 |
[javascript] [AWS] S3에서 이벤트가 발생할 때마다 lambda가 실행되게 하는 nodejs 코드 해석 (0) | 2022.05.22 |