• 기술
  • 문제해결

타임시리즈 상태로그에서 작업 식별하기

작성자 프로필이미지문정민
··5분 읽기

TimeSeries 상태로그에서 Job 식별하기🔗

프로젝트를 진행하는 중에 시계열 상태 데이터 스트림에서 작업단위를 식별해야 했다. 단일 데이터에는 Job을 나타내는 정보가 없기 때문에, 데이터의 맥락을 보고 Job을 생성해야 한다. 이 과정을 통해서 데이터로부터 유의미한 정보들을 추출할 수 있었다. 예시는 실제 데이터를 최대한 단순화하여 적용한 방법을 소개해보려 한다. 목표를 달성하기 위한 핵심은 LogStash의 Aggregate 필터 플러그인을 활용하는 것이다. 그런데 우리는 LogStash를 일부러 도입한 것은 아니고, 기존에 이미 아래의 조건을 갖추고 있었다.

  • RDB에 Raw데이터가 누적되고 있음.
  • LogStash가 RDB를 쿼리해서 특정 주기마다 ElasticSearch로 동기화 해주고 있음.

이런 상태에서 공식문서에서 큰 힌트를 얻었고, 시행착오 끝에 원하는 바를 이루었다. 그럼, 차근차근 살펴보겠다.

프롬프트는 하단참조
프롬프트는 하단참조

EV충전기 상태데이터에서 충전작업을 식별하기🔗

우리의 경우 전국의 충전기 데이터가 RDB 테이블에 실시간으로 저장되고 있다.

CREATE TABLE charger_status (
id INT AUTO_INCREMENT PRIMARY KEY,
uid CHAR(10) NOT NULL, #충전기ID
updated_at DATETIME NOT NULL, #상태가_변경된_시각
charger_status_code INT NOT NULL #충전상태
);
CREATE TABLE charger_status (
id INT AUTO_INCREMENT PRIMARY KEY,
uid CHAR(10) NOT NULL, #충전기ID
updated_at DATETIME NOT NULL, #상태가_변경된_시각
charger_status_code INT NOT NULL #충전상태
);

하나의 충전작업단위를 어떻게 정의하면 좋을까? 충전기가 쉬고있다가, 누군가가 충전를 시작하면 충전중 상태로 변경될 것이다. 충전이 완료되면 다시 충전대기 상태로 변경된다. 이 부분을 각각 충전작업의 시작과 끝으로 볼 수 있다.

논리 충전단위 식별🔗

충전기 A,B,C 세가지 충전기는 서로 다른 제조사의 특정 충전기이다.

# 충전기A
INSERT INTO charger_status (uid, updated_at, charger_status_code) VALUES
('충전기A', '2023-04-10 00:00:00', 1), #1 충전대기
('충전기A', '2023-04-10 00:01:17', 2), #2 충전중
('충전기A', '2023-04-10 01:21:15', 1); #3 충전대기

# 충전기B
INSERT INTO charger_status (uid, updated_at, charger_status_code) VALUES
('충전기B', '2023-04-10 00:00:00', 1), #4 충전대기
('충전기B', '2023-04-10 00:10:00', 2), #5 충전중
('충전기B', '2023-04-10 00:20:00', 2), #6 충전중
('충전기B', '2023-04-10 00:30:00', 2), #7 충전중
('충전기B', '2023-04-10 00:33:10', 1); #8 충전대기

# 충전기C
INSERT INTO charger_status (uid, updated_at, charger_status_code) VALUES
('충전기C', '2023-04-10 00:00:00', 1), # 9 충전대기
('충전기C', '2023-04-10 00:08:15', 2), #10 충전중
('충전기C', '2023-04-10 00:20:00', 2), #11 충전중
('충전기C', '2023-04-10 00:25:00', 3), #12 충전오류
('충전기C', '2023-04-10 00:27:30', 1); #13 충전대기
# 충전기A
INSERT INTO charger_status (uid, updated_at, charger_status_code) VALUES
('충전기A', '2023-04-10 00:00:00', 1), #1 충전대기
('충전기A', '2023-04-10 00:01:17', 2), #2 충전중
('충전기A', '2023-04-10 01:21:15', 1); #3 충전대기

# 충전기B
INSERT INTO charger_status (uid, updated_at, charger_status_code) VALUES
('충전기B', '2023-04-10 00:00:00', 1), #4 충전대기
('충전기B', '2023-04-10 00:10:00', 2), #5 충전중
('충전기B', '2023-04-10 00:20:00', 2), #6 충전중
('충전기B', '2023-04-10 00:30:00', 2), #7 충전중
('충전기B', '2023-04-10 00:33:10', 1); #8 충전대기

# 충전기C
INSERT INTO charger_status (uid, updated_at, charger_status_code) VALUES
('충전기C', '2023-04-10 00:00:00', 1), # 9 충전대기
('충전기C', '2023-04-10 00:08:15', 2), #10 충전중
('충전기C', '2023-04-10 00:20:00', 2), #11 충전중
('충전기C', '2023-04-10 00:25:00', 3), #12 충전오류
('충전기C', '2023-04-10 00:27:30', 1); #13 충전대기
  1. 충전기A: #1 대기#2 충전중#3 대기
    • 식별할 수 있는 충전작업은 #[2-3] 이다. 이 충전기는 기특하게도 충전상태가 변경되었을 때만 통지하므로 고민의 여지가 없다.
  2. 충전기B: #4 대기#5 충전중#6 충전중#7 충전중#8 충전대기
    • 충전중일때 주기적으로 충전중 상태를 기록하고 있다. 중간에 나타난 연속된 충전중 상태는 무시하고, 작업을 추출할 수 있다. 식별된 작업은 #[5-8]
  3. 충전기C: #9 대기#10 충전중#11 충전중#12 충전오류#13 충전대기
    • 애매한 상황이다. 충전기B와 양상이 비슷하지만, 중간에 충전오류 데이터가 끼어있어, 작업을 식별하기가 난해하다. 이때는 충전작업을 #[10-12]로 처리해야 할지 #[10-13]으로 처리해야 할지 결정해야 한다. 여러가지 선택이 있겠지만, 분석하고자 하는 목적이나 의도에 따라 정책을 세울수 있다.

위의 케이스에서 식별한 충전단위는 시작 : 충전중이 아닌 상태 에서 충전중인 상태로 변경되는 지점 끝 : 충전중 상태에서 충전중이 아닌 상태 로 변경되는 지점

구분 해놓고 보니 별게 없다. 단, 중간에 충전오류나 다른 것이 끼어있다면, 이것을 하나의 작업으로 볼지 아닐지로 결정해야 하는데, 여기서는 올바르지않은 충전작업으로 따로 분류해보려 한다. ElasticSearch에서 태그컬럼 활용하면 Kibana에서 따로 이런 오류들을 모아놓고 볼 수 있을 것이다. 이벤트를 아예 버리기보다는 분석에 용이하게 쓰일 수도 있을 것이다.

Aggregate 필터로 자세히 보기🔗

Aggregate 필터 플러그인을 적용하면 이벤트롤 특정값으로 그룹화하고 작업을 생성/업데이트 할 수 있다. 이 필터는 stateful 방식이기 때문에, 이전 이벤트와 현재 이벤트 다음 이벤트 사이에 유지되는 변수를 가지고 관계를 추적할 수 있다. 여기에서 중요한 변수는 map과 event이다.

  • map 은 Ruby Hash 자료형 변수이다. 여기에 개발자가 다양한 값을 저장하고 추후에 읽을 수 있으므로 활용하면, 상태를 유지할 수 있다.
  • event 변수는 한줄 한줄 로그이벤트가 읽혀졌을때의 해당 Row가 event변수에 담긴다. 여기에 접근해서 원하는 값을 get, set 할 수 있다.

구현전략🔗

  • status_code == 2 : 현재 이벤트가 충전중일 때

    위에서 살펴본 것 처럼 다른상태에서 충전중으로 바뀌었을 때 새 충전작업이 시작된다. 기본적으로 충전중을 만났을때, new event 로 새겨주면 좋겠지만, 직전상태를 고려해야한다. 만약에 기존진행중이던 충전작업이 있으면 기존것을 이어나가고 (충전중 - 충전중) 없으면 새 작업으로 만든다.

  • status_code == 1 : 현재 이벤트가 충전대기

    충전중인 상태에서 충전대기로 이어진다면 기존충전작업은 여기서 종결짓는다. 직전상태가 충전중인 상태가 아니라 충전대기 또는 다른 상태였다면? 계속 대기중이 상태이므로 여기에서는 집중하지 않고 버린다. 충전시작 시간을 알고 있으므로, 현재 이벤트 변경시간의 차이를 구하면 전체 충전시간도 유추 할 수 있다.

  • else : 다른 나머지 상태

    충전대기 상태와 마찬가지로 기존충전작업이 진행되고 있었다면 종결짓기로 하였다. 그러나 정상적인 충전종료가 아니므로, tag에 특별한 표시를 하거나 정책에 따라 작업을 버릴 수도 있다.

구현코드🔗

구현하면 이렇게 된다. 상태가 충전중이 아닐 때는 구현코드가 비슷하고 정상 충전으로 처리 할 것인지 아닌지 정도가 달라지기 때문에 공통된 코드는 하단에 함수로 따로 정의하였다.

input {
# JDBC 플러그인으로 데이터소스 입력
}
filter {
if [charger][status_code] == 2 {
#충전중
aggregate {
task_id => "%{uid}"
code => "
if map['task_id']
event.set('tags', 'charging_in_progress')
else
map['task_id'] = SecureRandom.uuid
map['started_at'] = event.get('updated_at')
event.set('tags', 'charging_started')
end
event.set('[task][id]', map['task_id'])
"
map_action => "create_or_update"
}
} else if [charger][status_code] == 1 {
#충전대기
aggregate {
task_id => "%{uid}"
code => "
calculate_charging_task(event, map)
"
map_action => "update"
end_of_task => true
add_tag => [ "charging_completed" ]
}
} else {
aggregate {
task_id => "%{uid}"
code => "
calculate_charging_task(event, map)
"
map_action => "update"
end_of_task => true
add_tag => [ "charging_incompleted" ]
}
}
}
output {
# filter에서 처리된 데이터를 다른 데이터소스로 출력
# 여기서는 ElasticSearch 인덱스로 내보내기
}

def calculate_charging_task(event, map)
if map['task_id']
event.set('[task][id]', map['task_id'])
event.set('[task][time]', event.get('updated_at') - map['started_at']
event.set('[task][range][gte]', map['started_at'])
event.set('[task][range][lte]', event.get('updated_at'))
else
event.cancel()
end
end
input {
# JDBC 플러그인으로 데이터소스 입력
}
filter {
if [charger][status_code] == 2 {
#충전중
aggregate {
task_id => "%{uid}"
code => "
if map['task_id']
event.set('tags', 'charging_in_progress')
else
map['task_id'] = SecureRandom.uuid
map['started_at'] = event.get('updated_at')
event.set('tags', 'charging_started')
end
event.set('[task][id]', map['task_id'])
"
map_action => "create_or_update"
}
} else if [charger][status_code] == 1 {
#충전대기
aggregate {
task_id => "%{uid}"
code => "
calculate_charging_task(event, map)
"
map_action => "update"
end_of_task => true
add_tag => [ "charging_completed" ]
}
} else {
aggregate {
task_id => "%{uid}"
code => "
calculate_charging_task(event, map)
"
map_action => "update"
end_of_task => true
add_tag => [ "charging_incompleted" ]
}
}
}
output {
# filter에서 처리된 데이터를 다른 데이터소스로 출력
# 여기서는 ElasticSearch 인덱스로 내보내기
}

def calculate_charging_task(event, map)
if map['task_id']
event.set('[task][id]', map['task_id'])
event.set('[task][time]', event.get('updated_at') - map['started_at']
event.set('[task][range][gte]', map['started_at'])
event.set('[task][range][lte]', event.get('updated_at'))
else
event.cancel()
end
end
# 참고를 위해 표현한 ElasticSearch 인덱스 구조 예시이다.

PUT /charging-task
{
"mappings": {
"properties": {
"uid": { "type": "keyword" },
"task": {
"properties": {
"id": { "type": "keyword" },
"time": { "type": "long" },
"range": {
"properties": {
"gte": { "type": "date" },
"lte": { "type": "date" }
}
}
}
},
"tags": { "type": "keyword" }
}
}
}
# 참고를 위해 표현한 ElasticSearch 인덱스 구조 예시이다.

PUT /charging-task
{
"mappings": {
"properties": {
"uid": { "type": "keyword" },
"task": {
"properties": {
"id": { "type": "keyword" },
"time": { "type": "long" },
"range": {
"properties": {
"gte": { "type": "date" },
"lte": { "type": "date" }
}
}
}
},
"tags": { "type": "keyword" }
}
}
}

Timeout 처리🔗

만약에 이벤트가 충전이 끝났는데 어떤 이유로 충전끝을 알리는 충전대기 신호가 누락된다면, 여러개의 충전이벤트가 이어져 충전시간이 비이상적으로 길어질 수 있다. 이런 이벤트는 peak 이벤트로 잘못된 분석의 근거가 될 수 있으므로, 특정시간이상 충전시간의 경우 이상충전 태그를 달아줄 수 도 있다.

if map['charging_time'] > 86400 #24h
event.tag("possible_incorrect_charging")
end
if map['charging_time'] > 86400 #24h
event.tag("possible_incorrect_charging")
end

위 코드를 충전시간을 계산해 넣을 때 처리 해준다면, 하루 이상 충전시간이 누적되는 peak 데이터를 필터링해줄 수 있을 것이다. 그런데 이런 휴리스틱 방식이 의미있게 동작할지는 모르겠다. 이렇게도 가능하다는 것이다.

⚠️ Aggregate 필터의 중요한 전제🔗

그런데 위 가정들은 모두 이벤트가 순서대로 입력되었을 때를 가정한다. 뒤죽박죽 입력된다면, 이런 순차분석은 아무의미가 없다. 올바로 동작하지 않는다. 그렇기 때문에 반드시 이벤트 발생 순서대로 이벤트가 입력되어야 한다. 아래 사항을 반드시 지켜야 한다.

  • input에서 이벤트 순서대로 정렬하여 전달.

    내 경우에는 RDB에서 ORDER BY 로 시간순서대로 정렬된 값을 전달한다. 반드시 이벤트 발생 순서대로 정렬한 뒤 전달되어야 한다.

  • (중요) 파이프라인이 동시에 1개만 실행되어야 함.

    이 필터의 가장 큰 단점이 아닐까 싶다. 파이프라인이 동시에 여러곳에서 실행되고 있다면, 원하는 결과를 얻을 수 없다! 원하는 결과를 얻으려면 logstash를 -w 1 플래그와 함께 실행하거나, pipeline.yml or logstash.yml 에서 pipeline.workers: 1 로 설정해주어야 한다. 또는 여러개의 파이프라인이 실행되어야 하는 상황이라면 반드시 작업집계가 필요한 이벤트들을 중앙 노드로 직접 라우트 해주어야 한다.

한계와 개선점🔗

몇 가지 한계가 존재하는데

  1. 현재 input에 대해서만 Aggregate 필터가 적용되므로, 이미 ElasticSearch에 입력된 이벤트는 무시된다. 예를들어, 연결되어있어야 할 충전작업 이벤트의 시작과 끝이 다른 파이프라인에 각각 나뉘어 있다면, 해당 이벤트는 버려지게 된다.

    이런부분을 모두 챙기려면 조금 더 신경써서 필터를 작성해야 한다. ElasticSearch 플러그인 활용하면 필터에서도 이벤트를 검색할 수 있다. 이것을 활용하면 미처 처리되지 못한 이벤트들도 빠짐없이 처리 할 수 있을 것이다.

  2. 분산처리가 불가능하다는 한계 Kafka나 MQ 같은 다른 메시지시스템을 활용하면 가능할 것 같은데, 이 부분은 더 많은 고민이 필요할것 같다.

마치며🔗

단일상태가 저장된 시계열 로그데이터에서 충전작업을 식별하는 파이프라인을 구성 및 구현해 보았다. 여기에서는 LogStash Aggregate 필터를 집중적으로 살펴보며 이벤트를 그룹화하고 작업을 식별하는 방법으로 구현했다.

시계열 데이터의 특징과 그에 따른 Logstash 파이프라인을 잘 구성하면 다양한 도메인에서 통찰력을 얻고 의사 결정을 개선할 수 있을것이라고 생각한다.

ELK 스택을 사용하거나 고려하는 중이라면 이 공유가 유용하게 사용되었으면 좋겠다.


1) 블로그 메인이미지 생성 프롬프트 참조