프로젝트를 진행하는 중에 시계열 상태 데이터 스트림에서 작업단위를 식별해야 했다.
단일 데이터에는 Job을 나타내는 정보가 없기 때문에, 데이터의 맥락을 보고 Job을 생성해야 한다.
이 과정을 통해서 데이터로부터 유의미한 정보들을 추출할 수 있었다.
예시는 실제 데이터를 최대한 단순화하여 적용한 방법을 소개해보려 한다.
목표를 달성하기 위한 핵심은 LogStash의 Aggregate 필터 플러그인을 활용하는 것이다.
그런데 우리는 LogStash를 일부러 도입한 것은 아니고,
기존에 이미 아래의 조건을 갖추고 있었다.
RDB에 Raw데이터가 누적되고 있음.
LogStash가 RDB를 쿼리해서 특정 주기마다 ElasticSearch로 동기화 해주고 있음.
이런 상태에서 공식문서에서 큰 힌트를 얻었고, 시행착오 끝에 원하는 바를 이루었다.
그럼, 차근차근 살펴보겠다.
애매한 상황이다.
충전기B와 양상이 비슷하지만, 중간에 충전오류 데이터가 끼어있어, 작업을 식별하기가 난해하다.
이때는 충전작업을 #[10-12]로 처리해야 할지 #[10-13]으로 처리해야 할지 결정해야 한다.
여러가지 선택이 있겠지만, 분석하고자 하는 목적이나 의도에 따라 정책을 세울수 있다.
위의 케이스에서 식별한 충전단위는
시작 : 충전중이 아닌 상태 에서 충전중인 상태로 변경되는 지점
끝 : 충전중 상태에서 충전중이 아닌 상태 로 변경되는 지점
구분 해놓고 보니 별게 없다. 단, 중간에 충전오류나 다른 것이 끼어있다면, 이것을 하나의 작업으로 볼지 아닐지로 결정해야 하는데, 여기서는 올바르지않은 충전작업으로 따로 분류해보려 한다.
ElasticSearch에서 태그컬럼 활용하면 Kibana에서 따로 이런 오류들을 모아놓고 볼 수 있을 것이다. 이벤트를 아예 버리기보다는 분석에 용이하게 쓰일 수도 있을 것이다.
Aggregate 필터 플러그인을 적용하면 이벤트롤 특정값으로 그룹화하고 작업을 생성/업데이트 할 수 있다.
이 필터는 stateful 방식이기 때문에, 이전 이벤트와 현재 이벤트 다음 이벤트 사이에 유지되는 변수를 가지고 관계를 추적할 수 있다.
여기에서 중요한 변수는 map과 event이다.
map 은 Ruby Hash 자료형 변수이다. 여기에 개발자가 다양한 값을 저장하고 추후에 읽을 수 있으므로 활용하면, 상태를 유지할 수 있다.
event 변수는 한줄 한줄 로그이벤트가 읽혀졌을때의 해당 Row가 event변수에 담긴다. 여기에 접근해서 원하는 값을 get, set 할 수 있다.
위에서 살펴본 것 처럼 다른상태에서 충전중으로 바뀌었을 때 새 충전작업이 시작된다.
기본적으로 충전중을 만났을때, new event 로 새겨주면 좋겠지만, 직전상태를 고려해야한다.
만약에 기존진행중이던 충전작업이 있으면 기존것을 이어나가고 (충전중 - 충전중)
없으면 새 작업으로 만든다.
status_code == 1 : 현재 이벤트가 충전대기
충전중인 상태에서 충전대기로 이어진다면 기존충전작업은 여기서 종결짓는다.
직전상태가 충전중인 상태가 아니라 충전대기 또는 다른 상태였다면? 계속 대기중이 상태이므로 여기에서는 집중하지 않고 버린다.
충전시작 시간을 알고 있으므로, 현재 이벤트 변경시간의 차이를 구하면 전체 충전시간도 유추 할 수 있다.
else : 다른 나머지 상태
충전대기 상태와 마찬가지로 기존충전작업이 진행되고 있었다면 종결짓기로 하였다.
그러나 정상적인 충전종료가 아니므로, tag에 특별한 표시를 하거나 정책에 따라 작업을 버릴 수도 있다.
만약에 이벤트가 충전이 끝났는데 어떤 이유로 충전끝을 알리는 충전대기 신호가 누락된다면, 여러개의 충전이벤트가 이어져 충전시간이 비이상적으로 길어질 수 있다. 이런 이벤트는 peak 이벤트로 잘못된 분석의 근거가 될 수 있으므로, 특정시간이상 충전시간의 경우 이상충전 태그를 달아줄 수 도 있다.
if map['charging_time'] > 86400 #24h
event.tag("possible_incorrect_charging")
end
if map['charging_time'] > 86400 #24h
event.tag("possible_incorrect_charging")
end
위 코드를 충전시간을 계산해 넣을 때 처리 해준다면,
하루 이상 충전시간이 누적되는 peak 데이터를 필터링해줄 수 있을 것이다.
그런데 이런 휴리스틱 방식이 의미있게 동작할지는 모르겠다.
이렇게도 가능하다는 것이다.
그런데 위 가정들은 모두 이벤트가 순서대로 입력되었을 때를 가정한다. 뒤죽박죽 입력된다면, 이런 순차분석은 아무의미가 없다. 올바로 동작하지 않는다.
그렇기 때문에 반드시 이벤트 발생 순서대로 이벤트가 입력되어야 한다.
아래 사항을 반드시 지켜야 한다.
input에서 이벤트 순서대로 정렬하여 전달.
내 경우에는 RDB에서 ORDER BY 로 시간순서대로 정렬된 값을 전달한다.
반드시 이벤트 발생 순서대로 정렬한 뒤 전달되어야 한다.
(중요) 파이프라인이 동시에 1개만 실행되어야 함.
이 필터의 가장 큰 단점이 아닐까 싶다.
파이프라인이 동시에 여러곳에서 실행되고 있다면, 원하는 결과를 얻을 수 없다!
원하는 결과를 얻으려면 logstash를 -w 1 플래그와 함께 실행하거나, pipeline.yml or logstash.yml 에서 pipeline.workers: 1 로 설정해주어야 한다.
또는 여러개의 파이프라인이 실행되어야 하는 상황이라면 반드시 작업집계가 필요한 이벤트들을 중앙 노드로 직접 라우트 해주어야 한다.