Backend
📝

채용 공고 트렌드 분석을 위한 데이터 파이프라인 구축 프로젝트

프로젝트 목적

프로젝트 목적은 아래와 같습니다.
빠르게 변해가는 개발 직군 채용 트렌드를 직무별로 분석해보는 취지에서 시작하였습니다.
온라인 채용 공고에서 명시된 업무 내용, 자격 요건, 우대 사항, 기술 스택, 주소 등의 데이터를 활용하여 키워드 추출을 수행하였습니다.
이를 통해 현재 개발 분야의 채용 트렌드를 분석 및 시각화하였습니다.

프로젝트 아키텍처

전체적인 데이터 인프라 아키텍처
8월 10일 ~ 29일까지 총 157,095개의 공고 데이터를 스크래핑해왔습니다.
자연어가 포함된 데이터라 데이터량에 비해 용량이 있는 편(3GB)입니다.
아래는 중복 제거 전 각 플랫폼 별 공고 개수 분포입니다.
날짜 별 중복을 제거한 순 공고 개수는 아래와 같습니다.
날짜 별로 가지고 오는 공고의 개수는 아래와 같습니다.
평균적으로 매일 4490 개의 공고 데이터를 가져옵니다.

웹 스크래핑 (ETL)

데이터 소스

채용 플랫폼인 원티드, 랠릿, 점핏, 잡플래닛 에서 데이터를 수집했습니다.
개발자 도구의 네트워크 탭 분석을 통해 내부에서 사용하는 api 서버 주소를 확인한 뒤 GET 요청을 보내서 공고를 수집했습니다.

데이터 수집 및 분류

4가지 채용 플랫폼에서 공통으로 가져올 수 있는 정보를 선정한 뒤 json 형식으로 정보를 저장했습니다.
수집한 정보 : 공고 id, 회사명, 주소, 자격 요건, 우대 사항, 주요 업무, 복지, 기술 스택 …
플랫폼마다 직군을 분류하는 카테고리가 다르기 때문에 대분류, 중분류, 소분류로 세분화 해서 공고를 분류했습니다.
아래는 분류 사항의 일부입니다.

스크래퍼 서버

스크래퍼는 airflow 서버와 별개의 ec2 인스턴스에서 FastAPI 서버를 통해 동작합니다.
이는 스크래핑에 사용되는 리소스를 분리함으로써, airflow 서버는 오직 DAG를 실행시키는 것에만 집중하게 하기 위해서 입니다.
각 스크래퍼 별 url에 airflow 서버가 httpOperator를 통해 GET 요청을 날려서 트리거하는 방식입니다.

비동기 작업과 FastAPI

플랫폼의 api 서버를 호출해 몇천~몇만건의 정보를 가져오는 작업은 네트워크 연결에 대부분의 시간이 소요되는 I/O intensive 한 작업입니다.
이 작업을 동기적으로 처리했을 때는 많은 시간이 걸렸습니다.
더 빠르게 작업을 수행하기 위해 스크래핑에 async/await 키워드를 사용해 비동기적으로 정보를 가져오도록 코드를 수정했습니다.
아래는 스크래퍼 코드의 일부입니다.
async def scrape_category(self) -> List[Dict]: """ 카테고리별 공고를 크롤링 """ async with aiohttp.ClientSession() as session: position_ids = await self.get_position_ids(session) tasks = [] for i, position_id in enumerate(position_ids): task = asyncio.create_task( self.get_position_detail(position_id, session) ) tasks.append(task) await asyncio.gather(*tasks) return self.jobs --------------------------------------------------------------------------- @router.get("/scrape-jumpit") async def jumpit_scrape_jobs() -> Dict[str, str]: ... tasks = [] for category_id, category_name in JumpitScraper.job_category_dict.items(): scraper = JumpitScraper(category_id, category_name) task = scraper.scrape_category() tasks.append(task) data_list = await asyncio.gather(*tasks) ...
Python
복사
async/await 키워드를 사용해 비동기적으로 처리하게 한 결과, 스크래핑 속도는 몇 시간 단위에서 몇 분 단위로 급격하게 줄어드는 것을 확인할 수 있었습니다.
이러한 코드를 실행시키기 위한 웹 프레임워크로 FastAPI를 선택했습니다.
FastAPI는 flask와 비슷한 웹 프레임워크입니다. 하지만 빠른 속도와 비동기 처리 등의 modern python을 더 잘 지원한다는 장점이 있어서 FastAPI를 선택했습니다.

secret manager 사용한 보안 관리

def get_secret(): """ AWS Secrets Manager를 이용해 환경변수를 불러옵니다. """ ... session = boto3.session.Session() client = session.client( service_name='secretsmanager', region_name=REGION_NAME ) try: get_secret_value_response = client.get_secret_value( SecretId=secret_name ) except ClientError as e: raise e ...
Python
복사
기존에는 환경변수나 시크릿 키 등을 담는 .env 파일을 각자 만들어서 관리하는 방식이었습니다.
이러한 방식은 보안상 취약하며 관리하기도 번거로운 단점이 있습니다.
프로젝트 대부분이 aws 인프라 위에서 동작하므로 aws secret manager 서비스를 사용해 시크릿을 관리하기로 결정했습니다.
boto3의 secretmanager 클라이언트를 사용함으로써 간단하게 보안 수준을 높일 수 있었습니다.

glue crawler 호환성을 고려한 s3 file path 설정

@staticmethod def upload_to_s3(file_path: str, bucket_name: str, access_key: str, secret_key: str, region_name: str) -> None: '''Uploads the specified file to an AWS S3 bucket.''' today = date.today() year = str(today.year) month = str(today.month).zfill(2) day = str(today.day).zfill(2) file_name = f"jumpit/year={year}/month={month}/day={day}/jumpit.json" s3 = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region_name) s3.upload_file(file_path, bucket_name, file_name) path_name = os.path.join(bucket_name, file_name) print(f"End Upload to s3://{path_name}")
Python
복사
수집한 데이터를 S3에 …/year=/month=/day=/~.json 형식으로 저장합니다.
이렇게 하는 이유는 glue crawler를 사용할 때 partition을 잡아주기 위함입니다.
S3에 날짜 포맷을 맞춰서 저장하는 것 만으로도 자동으로 파티션이 생성되므로, 이후 glue job, athena 등의 서비스를 통해 손쉽게 데이터를 분석할 수 있었습니다.

CI/CD

CI/CD는 github actions와 aws code deploy를 같이 사용했습니다.
배포 방식은 다음과 같습니다.
1.
main 브랜치에 pr이 merge 되면 action을 트리거합니다.
2.
action을 사용해 전체 레포를 tar.gz 파일로 압축한 뒤 s3에 전송합니다.
3.
s3에 전송된 압축 파일을 code deploy가 타겟 그룹에 전송합니다.
4.
서버에 ssh action을 사용해 접속합니다.
5.
.env 파일을 github secret을 참조해 생성해 환경변수를 세팅합니다.
6.
서버에서 동작하고 있는 docker container들을 모두 내린 뒤, docker compose build & up 합니다.
배포 대상인 인스턴스는 하나뿐이므로 AllAtOnce 전략을 사용했습니다.

Airflow

Custom Operator

저희는 크게 Http, Athena, Redshift 관련해서 Airflow 커스텀 오퍼레이터를 만들었습니다.

HTTP Custom Operator 개발

HTTP 커스텀 오퍼레이터를 개발한 이유는 다음과 같습니다.
1.
Scraper 서버 API 요청은 response 타임이 짧을 때는 몇 분이 걸리지만, 길 때는 2시간까지 걸릴 수 있습니다.
2.
기존의 SimpleHttpOperator는 timeout이 존재합니다. Timeout으로 인해 response를 받아오지 못하면 DAG가 Fail 처리 됩니다.
만약 HTTP 요청의 응답이 올 때까지 대기하게 하려면 HttpHookrun 메서드에서 요청 시 timeout을 무한대로 설정했습니다.
이를 위해 2가지를 수정해야 했습니다. HttpHookSimpleHttpOperator 입니다.
1.
이를 위해 먼저, HttpHookrun 메서드에서 사용되는 session.request() 함수의 timeout 매개변수를 설정하지 않거나 큰 값을 주면 timeout을 무한대로 설정할 수 있습니다.
a.
HttpHookrun 메서드를 찾아서 timeout을 무한대로 설정합니다.
b.
아래 hook의 run 메서드를 간소화시키는 방향으로 처리합니다.
2.
SimpleHttpOperatorexecute 메서드에서 예외 처리를 추가하여 response_check가 False를 반환하더라도 예외가 발생하지 않도록 합니다.
아래는 해당 변경 사항을 반영한 코드 일부입니다
plugins/long_http_operator.py
httphook + simplehttpoperator을 같은 모듈 파일에 넣어 플러그인으로 생성하였습니다.
... from airflow.providers.http.hooks.http import HttpHook from airflow.providers.http.operators.http import SimpleHttpOperator ... import requests ## HttpHook을 오버라이딩하여 Timeout을 None으로 처리 class InfiniteTimeoutHttpHook(HttpHook): def run(self, endpoint, data=None, headers=None, extra_options=None): with self.get_conn() as session: extra_options = extra_options or {} extra_options.setdefault('timeout', None) # Timout 수정 url = self.base_url + endpoint self.log.info("Sending %s to %s with timeout %s", self.method, url, extra_options.get("timeout", "not set")) response = session.request(self.method, url, **extra_options) try: response.raise_for_status() except requests.exceptions.HTTPError as err: raise AirflowException( f"HTTP error: {err.response.reason}, error code: {err.response.status_code}" ) return response class CustomSimpleHttpOperator(SimpleHttpOperator): def execute(self, context): http = InfiniteTimeoutHttpHook(self.method, http_conn_id=self.http_conn_id) self.log.info("Calling HTTP method") response = http.run(self.endpoint, self.data, self.headers, self.extra_options) if self.response_check: check_result = self.response_check(response) self.log.info(f"Response check result: {check_result}") if self.xcom_push_flag: return response.text if self.log_response: self.log.info(response.text)
Python
복사
향후, 실제 개발에서는 timeout을 적절하게 설정하고, 재시도 로직을 추가하는 것이 바람직하기 때문에 개선이 필요합니다.

AWS Athena, Redshift Custom operator

Athena
Redshift
__init__ 메서드
O
O
execute 메서드
O
O
on_kill 메서드
O
X
Athena, Redshift의 custom operator를 만든 이유는 아래와 같습니다
aws provider가 airflow에서 공식으로 지원하는 라이브러리가 아닙니다.
operator 동작 원리 이해하고, 프로젝트에 필요한 최소한의 기능을 갖춘 관리 가능한 operator를 만들기 위함입니다. 이를 BaseOperator를 사용하여 구현하였습니다.
또한 아래 2개의 DB 커넥션을 excute 메서드 안에서 구현하였습니다. 생성자 내에서 DAG 파싱 시 호출로 인해, 빈번한 연결을 지양하게 하기 위해서입니다.
저희 프로젝트에서는 AWS 서비스에 접근하기 위해 boto3를 이용하였습니다.
boto3를 사용하여 AWS의 클라이언트에 접근하여 Athena, Redshift에 쿼리를 날리고 모니터링하는 작업을 구현하는 것에 초점을 맞췄습니다.
코드가 길기 때문에 아래에서는 각 커스텀 오퍼레이터 내부 주요 메서드에 대한 설명을 하도록 하겠습니다
AthenaCustomOperator
해당 오퍼레이터를 이용하여 Athena를 이용한 데이터 마트 생성에 필요한 베이스 테이블들을 생성합니다. 테이블 생성에 필요한 쿼리를 날리는 목적으로 오퍼레이터가 만들어졌습니다.
__init__ 메서드 : 이 생성자 함수에서는 인스턴스가 초기화될 때 필요한 여러 매개변수들을 설정하였습니다
execute 메서드 : AWS Athena에서의 주요 동작을 정의합니다.
Airflow에서 제공되는 AwsHook() 메소드를 통해 생성한 연결 정보를 통해 boto3를 이용하여 start_query_execution 메소드를 통해 query를 요청하는 방식입니다.
이를 쿼리 요청부터, 상태 모니터링, 상태 반환까지 진행합니다.
기존의 코드와 달리 특정 시간 이상 query 실행이 지속될 경우 동작을 종료하는 구문을 추가했습니다.
on_kill 메서드 : Airflow에서 DAG 혹은 task 작업이 종료될 시, kill 시그널을 보낼 수 있도록 설정했습니다.
RedshiftCustomOperator
기존의 provider 내부 RedshiftDataOperator와 유사하게 작성하되, 프로젝트에서 필요한 기능인 데이터 마트 테이블 생성에 초점을 두었습니다. 즉, redshift와 연결하여 쿼리를 실행하는 프로세스가 메인입니다.
__init__ 메서드
기본 속성을 선언해주는 것 외에 아래의 변수들을 추가했고, 이 변수들은 아래에서 나올 execute()와 hook()에서 활용될 db 연결 및 redshift query 요청 시 필요한 변수들로 확인됩니다.
execute 메서드
AwsHook 클래스를 통해 aws 연결을 시도하고 있으며, boto3를 이용하여 'redshift-data' 클라이언트에 접근하여 execute_statement 메소드를 통해 query를 요청하는 방식이며, Redshift Serverless를 사용하기 때문에 생성자에서 선언한 workgroup_name 등이 이때 활용됩니다.
기존의 코드와 달리 특정 시간 이상 query 실행이 지속될 경우 동작을 종료하는 구문을 추가했습니다.

Airflow의 병렬성 옵션 사용

저희 프로젝트에서 진행되는 태스크들은 동시에 병렬적으로 진행되는 DAG들이 꽤나 존재합니다. 하지만 저희는 EC2 내부에 Airflow를 배포했기 때문에 한정된 자원을 이용해야합니다.
t3.xlarge 인스턴스는 일반적으로 4 vCPU와 16 GiB의 메모리를 가집니다
AIRFLOW__CORE__EXECUTOR: CeleryExecutor AIRFLOW__CORE__PARALLELISM: 3 AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG: 4
Python
복사
한정된 자원 속에서 task의 병렬성을 유지하기 위해 위와 같이 airflow의 병렬 옵션을 수정하였습니다.
또한 celery 큐를 사용하기 때문에 워커의 스케일 아웃을 하여 워커의 idle time 최소화를 이끌어냈습니다. 이를 위해 워커를 최대 2개까지 사용하도록 설정했습니다.
flower를 통해 워커 간 처리가 어떻게 진행되는지 모니터링을 했습니다.

DAG List

아래 DAG는 번호 순서대로 수행이 됩니다.
크게 데이터 스크래핑, Glue 관련 ETL 잡, Redshift 관련 ELT 프로세스로 나뉩니다.
1.
{platform_name}_job_api_scraper_dag
예시로 jobplanet_job_api_scraper_dag 그래프를 가져왔습니다. jobplanet 자리에 각 플랫폼명이 들어갑니다. 각 플랫폼의 API Scraper를 trigger 합니다. HTTP sensor를 통해 API health check를 진행한 이후, HTTP custom operator를 작동시킵니다. 이를 통해 응답까지 오래 걸리는 Scraper API 호출을 Airflow에서 처리할 수 있습니다.
자정에 플랫폼 별로 수행되도록 스케쥴링을 했습니다.
2.
glue_crawler_dag
: Glue (Crawler, ETL job : pandas, spark) 관련된 내용을 트리거링하는 DAG입니다. 성립 조건이 어려운 Sensor보다는 DagTrigger를 선택하였습니다.
a.
빨강색 (1)번 : 각 플랫폼별 스크래핑 데이터를 AWS Glue crawler를 trigger 합니다.
b.
glue_etl_job_dag : 1차 전처리 Glue job을 trigger 합니다.
c.
빨강색 (2)번 : 1차 전처리 결과에 대해 Glue crawler를 trigger 합니다.
d.
glue_nlp_job_dag : 2차 전처리 Glue job을 trigger 합니다.
e.
glue_crawler_dag (3) : 2차 전처리 결과에 대해 Glue crawler를 trigger 합니다.
3.
athena_query_dag
Amazon Athena 쿼리들을 trigger 합니다. 여기서 Array의 경우 unnesting을 진행합니다. 이를 통해 베이스 스키마 테이블을 만듭니다. (Star Schema)
Drop ~ Create 의 Full refresh 형태
여기서 만들어진 테이블들을 이용하여 8번 마트 DAG를 수행합니다.
4.
redshift_elt_query_dag
마지막으로 Amazon Redshift Spectrum ELT 쿼리를 trigger합니다. 데이터 마트 테이블을 생성합니다. 이 때, 스키마가 생긴 후, 근간이 되는 베이스 테이블을 먼저 만들고, 해당 테이블을 조인해서 생기는 마트 테이블들을 생성하게끔 DAG를 작성했습니다.

데이터 전처리 (ETL, 정규화 테이블)

Glue 사용을 위하여 Custom Glue 정책을 생성합니다.
1.
Custom Glue 정책
2.
Glue Role 생성
우선 위와 같이 IAM role을 설정해주었습니다.

Glue Crawler

S3 bucket에 있는 데이터를 Glue Data Catalog tables로 옮기는 역할을 합니다.
Partition(날짜)를 기반으로 하여 테이블 스키마를 자동으로 생성하도록 했습니다.
크롤러는 크게 3가지로 나뉘는데, 1) 플랫폼별 크롤러, 2) 1차 전처리 크롤러, 3) 2차 전처리 크롤러입니다.
Classifiers는 크롤러 내부 세부 조건을 지정할 수 있습니다. json(de-1-1-json-classifier), parquet(DE1_1_classifier) 2가지로 나뉩니다.
플랫폼별 크롤러의 경우, $.results[*] 를 json path로 설정하였습니다.
1차 및 2차 전처리 크롤러의 경우, part-%{INT:part_number}-%{UUID:uuid}-c%{INT:partition}.snappy.parquet 를 Grok patterns로 설정하였습니다.
크롤러를 통해 생기는 데이터 카탈로그 (year/month/day partition key 칼럼)
scraped data
1st preprocessed
2nd preprocessed

Glue ETL jobs

Glue ETL을 사용한 이유

AWS Glue는 서버리스 서비스이며, Spark를 지원합니다. Amazon EMR과 다르게 프로비저닝 등을 고려하지 않습니다.
또한 Glue Crawler를 통해 Partition key 기반으로 생성된 Catalog를 읽어와 작업이 가능합니다.

Glue ETL jobs에서 수행한 작업

Python Shell script editor를 사용하여 1차 전처리와 2차 전처리 작업 코드를 작성하였습니다. 각 처리 과정의 개요는 아래 표와 같습니다.
1차 전처리 (glue spark)
2차 전처리 (glue pandas)
- 일부 플랫폼은 coordinate가 null → 카카오맵 API를 통해 채워넣기 - 다른 포맷의 채용 정보들을 통합할 수 있는 기준을 마련하기 위해, 대/중/소분류를 연결
텍스트 덩어리에서 명사만을 추출해서 의미있는 데이터를 뽑아내기 위해, 외부 라이브러리인 kiwipiepy (한국어 형태소 분석기)를 활용
worker : 10 (DPU)
pandas보다 spark에서 시간이 더 오래걸림 (DPU 5) → Glue 비용 축소를 위해 DPU 조정 → Glue를 읽어올 때와, parquet 저장할 때 외에는 pandas dataframe을 사용 → 즉, master node에서 작업이 수행됨
JSON이었던 파일을 위와 같은 전처리 수행 후 s3 버킷에 parquet로 저장
1차 전처리 이후 데이터를 위와 같은 방법으로 NLP 전처리하여 최종적으로 s3 버킷에 parquet로 한 번 더 저장
s3에 저장 시, 파티션을 하나로 축소한 후, year, month, day를 파티션 키로 잡아 버킷 내부 파티셔닝 수행
s3에 저장 시, 파티션을 하나로 축소한 후, year, month, day를 파티션 키로 잡아 버킷 내부 파티셔닝 수행
위 파티셔닝을 통해 Incremental update가 되도록 수행

1차 전처리 상세 설명

coordinate(위도, 경도) 컬럼 값이 null인 플랫폼(잡플래닛, 점핏)을 대상으로 spark udf와 카카오맵 API를 통해 값을 채워넣습니다.
# ... @udf(returnType=ArrayType(FloatType())) def get_coordinate_from_location(location, KAKAO_API_TOKEN=KAKAO_API_TOKEN): headers = {'Authorization': 'KakaoAK ' + KAKAO_API_TOKEN} """ 카카오 API를 통해 location으로부터 coordinate(lat, lon) 리스트를 반환합니다. """ if location is None: return None url = f'https://dapi.kakao.com/v2/local/search/address.json?query={location}' try: response = requests.get(url, headers=headers, timeout=5) result = json.loads(response.text) match_first = result['documents'][0]['address'] return [float(match_first['y']), float(match_first['x'])] except (requests.exceptions.RequestException, TypeError, ValueError, KeyError, IndexError) as e: print(f'Error occurred: {e} while fetching address: {location}') # ...
Python
복사
플랫폼별로 모두 사전에 수작업으로 정의한 대분류와 중분류를 소분류에 연결합니다.
# ... categories = [ ("WEB", "서버/백엔드 개발자", ["서버 개발자", "자바 개발자", "Node.js 개발자", "PHP 개발자", "웹 개발자", "루비온레일즈 개발자", ".NET 개발자", "백엔드 개발", "웹개발", "BACKEND_DEVELOPER", "서버/백엔드 개발자", "웹 풀스택 개발자"]), ("WEB", "프론트엔드 개발자", ["프론트엔드 개발자","프론트엔드 개발","FRONTEND_DEVELOPER"]), ("WEB", "웹 퍼블리셔", ["웹 퍼블리셔","웹퍼블리셔"]), ("GAME", "게임 개발자", ["게임개발", "게임 클라이언트 개발자", "게임 서버 개발자"]), ("GAME", "VR/AR/3D", ["VR 엔지니어", "그래픽스 엔지니어", "VR/AR/3D,게임 클라이언트 개발자"]), ("DATA", "데이터 사이언티스트", ["데이터 사이언티스트", "DATA_SCIENTIST"]), ("DATA", "데이터 엔지니어", ["데이터 엔지니어", "빅데이터 엔지니어", "DATA_ENGINEER"]), ("DATA", "데이터 분석가", ["BI 엔지니어", "데이터 분석가", "DATA_ANALYST"]), ("DATA", "AI 엔지니어", ["머신러닝 엔지니어", "영상,음성 엔지니어", "MACHINE_LEARNING", "인공지능/머신러닝"]), ("DATA", "DBA", ["DBA", "빅데이터 엔지니어,DBA"]), ("MOBILE", "안드로이드 개발자", ["안드로이드 개발자", "안드로이드 개발", "ANDROID_DEVELOPER"]), ("MOBILE", "iOS 개발자", ["iOS 개발자", "iOS", "IOS_DEVELOPER", "IOS 개발자"]), ("MOBILE", "크로스 플랫폼 모바일 개발자", ["크로스플랫폼 앱 개발자", "크로스플랫폼 앱개발자", "CROSS_PLATFORM_DEVELOPER"]), ("SUPPORT", "PM", ["개발 매니저", "프로덕트 매니저", "AGILE_SCRUM_MASTER", "인공지능/머신러닝,개발 PM"]), ("SUPPORT", "QA 엔지니어", ["QA,테스트 엔지니어", "QA", "QA_ENGINEER", "QA 엔지니어"]), ("SUPPORT", "기술지원", ["기술지원", "SUPPORT_ENGINEER"]), ("DEVSECOPS", "데브옵스/인프라 엔지니어", ["DevOps / 시스템 관리자", "시스템,네트워크 관리자", "네트워크/보안/운영", "클라우드 개발", "DEV_OPS", "INFRA_ENGINEER", "devops/시스템 엔지니어"]), ("DEVSECOPS", "정보보안 담당자", ["보안 엔지니어", "CIO,Chief Information Officer", "SECURITY_ENGINEER", "정보보안 담당자"]), ("SW/HW/IOT", "HW/임베디드 개발자", ["임베디드 개발자", "하드웨어 엔지니어", "하드웨어 개발", "HARDWARE_EMBEDDED_ENGINEER", "HW/임베디드"]), ("SW/HW/IOT", "소프트웨어 개발자", ["소프트웨어 엔지니어", "파이썬 개발자", "C,C++ 개발자", "소프트웨어 개발", "소프트웨어아키텍트", "SOFTWARE_ENGINEER", "SW/솔루션"]), ("ETC", "블록체인 엔지니어", ["블록체인 플랫폼 엔지니어", "BLOCKCHAIN_ENGINEER", "프론트엔드 개발자,블록체인"]), ("ETC", "기타", ["ERP전문가", "CTO,Chief Technology Officer", "CTO", "ERP", "etc"]) ] data_list = [] for major_category, middle_category, job_list in categories: for sub_category in job_list: data_list.append((major_category, middle_category, sub_category)) schema = StructType([ StructField("major_category", StringType(), True), StructField("middle_category", StringType(), True), StructField("sub_category", StringType(), True) ]) mapping_df = spark.createDataFrame(data_list, schema=schema) df_with_mapped_categories = df_final.join(mapping_df, df_final.category == mapping_df.sub_category, "left") # ...
Python
복사

2차 전처리 상세 설명

외부 라이브러리인 kiwipiepy 한국어 형태소 분석기를 활용하여, 문장에서 한글 명사 및 영어 명사를 분리하는 작업을 아래 코드와 같이 수행합니다.
kiwipiepy 라이브러리를 사용한 이유
비슷한 라이브러리로는 Konlpy 등이 있는데, 이들은 의존성 문제가 복잡하므로 상대적으로 이러한 단점이 덜한 kiwipiepy를 사용했습니다.
# ... from kiwipiepy import Kiwi import pandas as pd # ... def extract_korean_noun(kiwi, text): if text is None or text.strip() == "": return [] result = kiwi.tokenize(text) return [token.form for token in result if token.tag in {'NNG', 'NNP'}] def extract_english_noun(kiwi, text): if text is None or text.strip() == "": return [] result = kiwi.tokenize(text) return [token.form for token in result if token.tag == 'SL'] # ... kiwi = Kiwi() result_df['preferred_korean_nouns'] = result_df.apply(lambda x: extract_korean_noun(kiwi, x['preferred']), axis=1) result_df['required_korean_nouns'] = result_df.apply(lambda x: extract_korean_noun(kiwi, x['required']), axis=1) result_df['primary_responsibility_korean_nouns'] = result_df.apply(lambda x: extract_korean_noun(kiwi, x['primary_responsibility']), axis=1) result_df['welfare_korean_nouns'] = result_df.apply(lambda x: extract_korean_noun(kiwi, x['welfare']), axis=1) result_df['preferred_english_nouns'] = result_df.apply(lambda x: extract_english_noun(kiwi, x['preferred']), axis=1) result_df['required_english_nouns'] = result_df.apply(lambda x: extract_english_noun(kiwi, x['required']), axis=1) result_df['primary_responsibility_english_nouns'] = result_df.apply(lambda x: extract_english_noun(kiwi, x['primary_responsibility']), axis=1) result_df['welfare_english_nouns'] = result_df.apply(lambda x: extract_english_noun(kiwi, x['welfare']), axis=1) # ...
Python
복사
2차 전처리에서 Spark를 사용하지 않고 Pandas로 처리한 이유
Spark UDF(User Defined Function) 사용 시, 속도가 굉장히 느려지는 현상을 발견했습니다.
Spark의 모든 워커 노드에서 kiwi 객체를 생성하고 처리하는 과정에서 시간이 오래 걸립니다.
kiwipiepy 라이브러리의 Kiwi 객체는 직렬화할 수 없습니다.
Spark의 장점인 병렬처리를 활용하기 어렵습니다.
→ 위와 같은 이유로, pandas를 사용해 하나의 노드에서만 처리하도록 수정하였습니다. 사용하는 워커 노드의 수가 줄어들어 금전적 비용도 줄어들고, 소요 시간도 줄어드는 효과를 얻었습니다.

정규화 테이블 (Amazon Athena with Star Schema)

Athena Full Access 정책을 사용합니다.
참고로 Athena를 사용해 쿼리하기 전, Athena를 이용할 데이터 양식은 Parquet 같은 칼럼 지향 데이터 파일을 사용하는 것이 쿼리 비용을 최적화하는 데 좋습니다. 따라서 저희는 여기서 Parquet 양식을 도입했습니다. 더 나아가 날짜 별 분산된 데이터 파일을 파티셔닝 키를 통해 빠르게 읽을 수 있는 장점이 존재합니다.
raw_data_external 스키마의 ERD는 아래 그림과 같습니다. 쿼리 모음은 깃허브에서 확인하실 수 있습니다.
해당 프로젝트에서는 parquet 내부에 토큰화된 단어들의 집합을 Array 형태로 칼럼 내부에 관리했습니다.
기존에 Array 타입이었던 column에 대해 Unnesting 작업을 아테나로 수행합니다. 이후 nested 칼럼은 drop합니다.
daily_jd_table 을 기준으로 하여, 각 테이블은 job_idplatform 으로 연결되어있습니다. company_deatail 테이블의 경우 company 로 연결되어있습니다. 이 모든 테이블들은 베이스 스키마로써 활용됩니다.
Amazon Athena를 사용하여 정규화를 한 이유는 아래와 같습니다.
기존의 중복된 칼럼, 행을 정규화하여 마트 구성 시, read disk IO를 줄일 수 있기 때문입니다.
가끔 Athena 쿼리 대시보드가 작동 안할 때가 존재하기 때문에 아래 토글 정책을 추가하셔도 됩니다.
DE-1-1-SQL-Workbench

데이터 마트와 대시보드

Redshift를 사용할 때, 사용되는 Role은 아래와 같고, DE-1-1 사용자에 sts:AssumeRole 을 부여했습니다.

Redshift와 Redshift Spectrum을 활용한 데이터 마트 구축

Redshift Spectrum을 활용하여 데이터 카탈로그의 Athena 테이블을 외부 테이블 구축하고, 해당 테이블 간의 비정규화 과정을 거쳐 Redshift에 데이터 마트를 구축했습니다.

Redshift Spectrum을 사용한 이유

외부 테이블 활용의 편의성
glue와 Athena로 구축한 데이터 카탈로그와 S3 데이터를 직접 로드하지 않고, 쿼리를 요청할 수 있습니다.
Redshift 외부테이블에서 Athena에서 정규화 과정을 통해 구축한 테이블의 변화를 자동으로 감지하여, 별도의 조작없이 지속적으로 업데이트 된 데이터에 접근할 수 있습니다.
확장성
Athena 테이블을 데이터 마트로 재구성하여 대시보드와 연동할 수 있지만, 차후 데이터 증가에 따른 스케일 아웃 등의 확장성을 고려하여 Redshift로 구축했습니다.

데이터 마트 구축 과정

외부 스키마 생성
Redshift 내의 데이터 마트 테이블을 위한 analytics 스키마를 생성합니다. 아래 쿼리문을 통해 데이터 카탈로그에 있는 Athena 테이블을 Redshift Spectrum이 바라볼 수 있도록 설정했습니다.
CREATE EXTERNAL SCHEMA IF NOT EXISTS raw_data_external FROM data catalog DATABASE 'de1_1_database' CREATE EXTERNAL DATABASE IF NOT EXISTS;
SQL
복사
데이터에서 도출 가능한 인사이트
좀더 정교한 데이터 마트 테이블 스키마를 설계하기 위해서 수집한 데이터에서 추출 가능한 인사이트는 어떤 것이 있을지 논의했습니다. 프로젝트를 진행한 목적에 맞는 인사이트를 추출한 결과는 다음과 같습니다.
전체 채용공고 중 직무별로 차지하는 비중은 어떻게 될 것인가?
희망하는 직무의 회사는 주로 어느 지역에 위치하는가?
지금 채용 중인 공고는 어떤게 있을까?
직무별로 많이 사용하는 기술은 어떤게 있을까?
채용공고의 자격 요건, 우대 사항, 기술 스택 등에서 자주 등장하는 키워드는 무엇인가?
데이터 마트 테이블 구축
추출한 인사이트를 바탕으로 데이터 마트 테이블 스키마를 설계했으며, 데이터 마트 테이블은 Redshift Spectrum가 참조하고 있는 외부 테이블의 비정규화 과정을 거친 후 구축했습니다. 비정규화 과정에서는 테이블 간의 join을 통해 중복된 값을 데이터 마트에 허용하고, 연산을 수행한 값을 적재함으로써 쿼리 성능을 향상시켰습니다. 또한, 데이터 마트 테이블 생성은 데이터의 볼륨 크기를 고려하고, 업데이트 로직의 간소화를 위해 Full-refresh 방식으로 진행했습니다.
데이터 마트의 스키마는 다음과 같습니다.
ELT 파이프라인
앞서 설명한 외부 테이블 생성과 데이터 마트 테이블 구축 과정은 Airflow의 DAG(redshift_elt_query_dag)를 통해 ELT 파이프라인으로 구축했습니다. Full-refresh을 위한 DROP, CTS 쿼리 구문을 sql 파일로 작성했으며, 제작한 Redshift Operator를 통해 해당 쿼리를 실행하는 방식으로 테이블들을 삭제, 생성합니다.
일부 테이블에는 테이블 간 종속성 문제가 존재하여, Airflow DAG를 통해 task 순서를 명시적으로 지정하여 이를 해결했습니다. 아래는 활용한 CTS 쿼리 중 일부이며, 이외 다른 쿼리는 깃허브에서 자세히 확인할 수 있습니다.
CREATE TABLE "analytics"."unique_jds" AS ( SELECT DISTINCT platform , job_id , title , company , major_category , middle_category , sub_category FROM "raw_data_external"."daily_jd_table" ); CREATE TABLE "analytics"."unique_jd_skills" AS ( SELECT u.major_category, u.middle_category, u.platform, u.job_id, js.unnested_skill as skill FROM "analytics"."unique_jds" AS u JOIN "raw_data_external"."jd_skills" AS js ON u.platform = js.platform AND u.job_id = js.job_id );
Python
복사

데이터 대시보드 구축

대시보드 설계 과정

본격적인 대시보드 제작에 앞서, 논의한 인사이트와 구축한 데이터 마트 테이블을 바탕으로 대시보드 설계 테이블을 작성했습니다. 대시보드 설계 테이블은 시각화 주제와 차트 그리고 시각화에 필요한 테이블과 컬럼 등 대시보드 구성 내용을 담고 있습니다.
주제
활용 차트
테이블명
컬럼명
직무 카테고리별 공고수 (대분류)
Pie Chart
unique_category_count_major
major_category count
직무 카테고리별 공고수 (대분류/중분류/소분류)
Tree Map
unique_category_count_major_middle
major_category middle_category sub_category count
일자별 수집한 공고 수
Trendline
unique_daily_job_posting_count
day count
기술별로 사용되는 직무
Bar Chart
unique_jd_skills
major_category middle_category skill
전체 회사 위치도
Map
unique_company_coordinates
lat lon
직무 카테고리별 회사 위치
Map
unique_company_coordinates
lat lon major_category middle_category sub_category
직무 카테고리별 마감 임박 공고리스트
table
unique_upcoming_deadline_jobs_7days
company title url end_at primary_responsibility
직무 카테고리별 채용공고 사용기술에 포함된 기술스택 키워드 언급량
Bar Chart
unique_preferred_english
preferred
직무 카테고리별 채용공고 자격요건에 포함된 기술스택 키워드 언급량
Bar Chart
unique_required_english
required
직무 카테고리별 채용공고 주요업무에 포함된 기술스택 키워드 언급량
Bar Chart
unique_primary_responsibility_english
primary_responsibility

대시보드 플랫폼 선정

대시보드 플랫폼은 데이터 마트 플랫폼인 Redshift 연동과 대시보드 설계 테이블의 차트 종류, 계산식 지원 등의 기준을 두고 검토했으며, 비교적 러닝커브가 적고 무료 프로젝트 기능이 지원되는 슈퍼셋으로 선정했습니다.

대시보드 구축 결과

프로젝트 대시보드는 크게 모든 직무의 통합적인 정보를 제공하는 파트와 직무 카테고리별로 필터를 적용할 수 있는 상세 파트로 구성되어 있습니다.
직무 통합 대시보드
직무 통합 대시보드는 수집한 다양한 채용 플랫폼의 데이터를 종합하여 직무 별로 채용 중인 공고 수, 회사 위치, 기술스택 등을 종합적으로 확인할 수 있습니다.
간략히 분석 내용을 정리하자면 직무 대분류 기준으로 WEB 채용공고가 48%로 가장 많았으며, SW/HW/IOT 채용공고가 약 21%로 두번째 많았으며 나머지 직무에서는 DATA > DEVSECOPS > MOBILE > SUPPORT > ETC > GAME 순으로 확인되었습니다. 또한, 채용 회사의 97% 이상이 서울, 경기 등 수도권 지역에 위치했습니다. 채용공고에서 언급되는 기술스택 키워드로 Javascript가 1위였으며, 그 다음으로 AWS > JAVA > REACT > PYTHON 등이 자주 언급되었습니다.
직무 카테고리별 상세 정보 대시보드
직무 카테고리별 상세 정보 대시보드에서는 대분류, 중분류, 소분류에 따라 공고 수, 회사 위치, 마감이 임박한 채용공고를 확인할 수 있으며, 채용공고의 자격요건, 주요업무, 사용기술에 포함되어 있는 기술 스택을 언급량 순으로 보여주고 있습니다. 보다 자세한 구성과 내용은 <대시보드 사용 영상> 링크를 통해 확인할 수 있습니다.

이벤트 모니터링

AWS 인프라에서 발생하는 이벤트중 원하는 이벤트만 필터링해, 작업이 잘 진행되고 있는지 확인할 수 있도록 슬랙에 알림을 보내주도록 구성했습니다.

추적하고자 하는 이벤트

프로젝트에서 이벤트를 추적해 확인하고자 하는 사항은 아래와 같습니다.
1.
웹 스크래핑의 결과인 json 파일이 s3에 저장되었는지
2.
s3에 저장된 데이터가 갑자기 삭제되지는 않았는지
3.
실행시킨 glue job이 어떤 상태로 종료되었는지 (SUCCEEDED, FAILED, TIMEOUT …)

사전 설정

aws의 거의 모든 이벤트는 계정에 하나씩 할당되어있는 default 이벤트 버스로 전송됩니다.
하지만 s3와 lambda 등의 특정 서비스의 이벤트는 데이터 이벤트에 속하는데, 데이터 이벤트는 자동으로 전송되지 않아 별개의 설정을 해줘야 합니다.
S3의 버킷의 속성 탭에 들어가서 EventBridge로 알림 전송을 활성화 해주면, 해당 버킷에서 발생하는 이벤트들이 default 이벤트 버스로 전송됩니다.

Event 처리 방식

default 이벤트 버스로 전송된 이벤트들은 이벤트 규칙에 필터링된 후에 이벤트 타겟으로 전송됩니다.
프로젝트에서 설정한 이벤트 타겟은 람다 함수입니다. 람다는 이벤트를 받아서 이벤트 메타 데이터를 가져와 메시지를 생성한 뒤 슬랙 채널로 전송합니다.

Event Rules

아래는 Glue Job이 생성하는 샘플 이벤트입니다.
{ "version": "0", "id": "abcdef00-1234-5678-9abc-def012345678", "detail-type": "Glue Job State Change", "source": "aws.glue", "account": "123456789012", "time": "2017-09-07T18:57:21Z", "region": "us-east-1", "resources": [], "detail": { "jobName": "MyJob", "severity": "INFO", "state": "SUCCEEDED", "jobRunId": "jr_abcdef0123456789abcdef0123456789", "message": "Job run succeeded" } }
JSON
복사
Event Rule에 해당하는 패턴을 가진 이벤트들만 타겟으로 전달됩니다.
{ "source": ["aws.glue"], "detail-type": ["Glue Job State Change"], "detail": { "state": ["SUCCEEDED", "FAILED", "TIMEOUT", "STOPPED"], "jobName": ["de1_1_1st_preprocessing_script", "de1_1_2nd_preprocessing_script"] } }
JSON
복사
detail.jobName이 "de1_1_1st_preprocessing_script", "de1_1_2nd_preprocessing_script" 둘 중 하나인 이벤트만 위의 이벤트 룰을 통과할 수 있습니다.
이 이벤트들은 람다 함수로 전달됩니다.

Lambda

람다 함수는 EventBridge에 의해 트리거됩니다.
실행되는 코드는 아래와 같습니다.
from datetime import datetime import json import os import requests import boto3 def lambda_handler(event, context): webhook_url = os.environ.get('SLACK_WEBHOOK_URL') if not webhook_url: print('Slack Webhook URL not found') return { 'statusCode': 400, 'body': json.dumps('Slack Webhook URL not found') } glue_job_name = event['detail']['jobName'] glue_job_run_id = event['detail']['jobRunId'] glue_job_state = event['detail']['state'] slack_message = {} message_text = '' attachments = '' if glue_job_state == 'SUCCEEDED': # message_text = f'Glue Job {glue_job_name} has succeeded!' glue = boto3.client('glue') response = glue.get_job_run(JobName=glue_job_name, RunId=glue_job_run_id) execution_time = response['JobRun']['ExecutionTime'] hours, remainder = divmod(execution_time, 3600) minutes, seconds = divmod(remainder, 60) message_text = f"Glue Job {glue_job_name} has succeeded!" attachments = f"Execution Time: {hours}h {minutes}m {seconds}s." elif glue_job_state == 'FAILED': message_text = f'Glue Job {glue_job_name} has failed.' elif glue_job_state == 'TIMEOUT': message_text = f'Glue Job {glue_job_name} has timed out.' elif glue_job_state == 'STOPPED': message_text = f'Glue Job {glue_job_name} has been stopped.' slack_message['text'] = message_text if attachments: slack_message["attachments"] = [ { "text": attachments } ] response = requests.post(webhook_url, json=slack_message) if response.status_code != 200: print(f'Failed to send Slack message. Status Code: {response.status_code}, Reason: {response.text}') return { 'statusCode': 200, 'body': json.dumps('Slack message sent') }
JSON
복사
코드중 boto3에서 glue 클라이언트를 받아와 glue job 메타데이터인 execution_time을 받아오는 부분이 있는데, 이는 람다로 넘어오는 이벤트에는 execution_time 값이 없기 때문입니다.
이를 위해서 람다 role에 계정의 모든 glue job에 대한 접근을 허용하는 인라인 정책을 추가해줬습니다.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "glue:GetJobRun" ], "Resource": "arn:aws:glue:ap-northeast-2:862327261051:job/*" } ] }
JSON
복사

모니터링 결과

파일이 s3에 저장될 때
파일이 삭제될 때
glue job이 성공했을 때
glue job이 실패했을 때