참고 - T아카데미 : [토크ON세미나] GCP기반의 데이터 엔지니어링

 

 

트위터 스트리밍 데이터 수집

 

실습 준비

GCP 가입 및 결재 설정

구글 SDK 설치 : Google Cloud SDK 설치 프로그램 사용  |  Cloud SDK 문서

트위터 가입

도커 설치

 

실습


#1

트위터 API 

트위터 API 코드1

더보기
# pip install tweepy 
# 또는 
# conda install -c conda-forge tweepy

# 실행 확인 
import tweepy
tweepy.StreamListener
print("hello")
import tweepy
twitter_api_key = '<twitter_api_key>'
twitter_api_secret_key = '<twitter_api_secret_key>'
twitter_access_token = '<twitter_access_token>'
twitter_access_token_secret = '<twitter_access_token_secret>'

class SimpleStreamListener(tweepy.StreamListener):
    def on_status(self, status):
        print(status)
        
stream_listener = SimpleStreamListener()
auth = tweepy.OAuthHandler(twitter_api_key, twitter_api_secret_key)
auth.set_access_token(twitter_access_token, twitter_access_token_secret)
twitterStream = tweepy.Stream(auth, stream_listener)
twitterStream.filter(track=['data'])

 


#2

구글 콘솔 접속 ->  새 프로젝트

 

1) Pub/Sub -> 주제 만들기, 권한 부여 하기

 

 

2) IAM 관리자 -> 서비스 계정 -> 서비스 계정 만들기

키가 다운로드 되면 프로젝트에 넣어 주면 된다.

 

 

3) Pub/Sub 주제 만들기 

tweets 이라는 주제를 생성한다.

 

그 후 다음 코드 실행

트위터 API 코드 2

더보기
# pip install tweepy 
# pip install --upgrade google-api-python-client
# pip install --upgrade google-cloud-pubsub
# pip install --upgrade google-auth

import json
import tweepy
from google.cloud import pubsub_v1
from google.oauth2 import service_account
key_path = "키 파일.json"
credentials = service_account.Credentials.from_service_account_file(
 key_path,
 scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
client = pubsub_v1.PublisherClient(credentials=credentials)
topic_path = client.topic_path('프로젝트ID', 'tweets')

twitter_api_key = '<twitter_api_key>'
twitter_api_secret_key = '<twitter_api_secret_key>'
twitter_access_token = '<twitter_access_token>'
twitter_access_token_secret = '<twitter_access_token_secret>'

class SimpleStreamListener(tweepy.StreamListener):
    def on_status(self, status):
        print(status)
        tweet = json.dumps({'id': status.id, 'created_at': status.created_at, 'text': status.text}, default=str)
        client.publish(topic_path, data=tweet.encode('utf-8'))

    def on_error(self, status_code):
        print(status_code)
        if status_code == 420:
            return False
stream_listener = SimpleStreamListener()
auth = tweepy.OAuthHandler(twitter_api_key, twitter_api_secret_key)
auth.set_access_token(twitter_access_token, twitter_access_token_secret)
twitterStream = tweepy.Stream(auth, stream_listener)
twitterStream.filter(track=['data'])

 

잘 들어갔는지 확인하려면 구독탭에서 확인한다.

 

 

4) BigQuery 데이터 세트, 테이블 생성

네비게이션 탭 - BigQuery - 프로젝트 - 데이터 세트 만들기 - "tweet_data"

테이블 만들기 - "tweets"

데이터 세트 만들기
테이블 만들기
스키마 추가 : 순서대로!

 

5) Google Cloud Functions 설정

Pub/Sub 탭 - Cloud 함수 트리거 

 

main.py 와 requirements.txt 에 각 코드를 붙여 넣는다.

# main.py
import base64
import json
from google.cloud import bigquery

def tweets_to_bq(tweet):
    client = bigquery.Client()
    dataset_ref = client.dataset('tweet_data') # BigQuery 데이터 세트
    table_ref = dataset_ref.table('tweets') # BigQuery 테이블
    table = client.get_table(table_ref)

    tweet_dict = json.loads(tweet)
    rows_to_insert = [
    (tweet_dict['id'], tweet_dict['created_at'], tweet_dict['text'])
    ]

    error = client.insert_rows(table, rows_to_insert)
    print(error)

def hello_pubsub(event, context):
    """Triggered from a message on a Cloud Pub/Sub topic.
    Args:
         event (dict): Event payload.
         context (google.cloud.functions.Context): Metadata for the event.
    """
    pubsub_message = base64.b64decode(event['data']).decode('utf-8')
    print(pubsub_message)
    tweets_to_bq(pubsub_message)
# requirements.txt
google-cloud-bigquery

 

실행 결과

 


#3 

DataStudio 

DataStudio - 빈 보고서 - 데이터에 연결 - BigQurey 

 

차트 추가 - 시계열 차트 

 

메이저한 툴은 아니다.

나중에 다른 좋은 도구로 다시 시각화 해보면 좋을 것 같다. (Tableau, Superset 등.. )

 


#4 

Python 코드를 Docker Container로 생성하여 최종적으로 GKE에 배포하여 무인으로 동작하게 하기.

 

1) Docker Images

Dockerfile이라는 (확장자 없음, 반드시 이 이름으로!) 새 파일을 만든다.

requirements.txt  새 파일을 만든다.

다음의 코드를 붙여 넣는다.

# Base Image 설정
FROM python:3.7-slim

# Working Directory 설정
WORKDIR /app

# 실행에 필요한  파일 복사
## Twitter.py / GCP SA Key.json / requirements.txt
ADD . /app

# 패키지 설치
RUN pip install --trusted-host pypi.python.org -r requirements.txt

# 환경변수 설정
ENV GOOGLE_APPLICATION_CREDENTIALS="/app/키 이름.json"

CMD ["python", "tweet2.py"]
# requirements.txt
google-cloud-pubsub==1.6.0
tweepy==3.8.0

 

Docker Images 빌드

# 터미널 입력
$ docker build -t tweet2 .
$ docker images

 

2) GKE 배포

위에서 Cloud SDK 를 설치 했다면

Cloud SDK Shell에서 다음을 입력하여 초기화 하고 해당 프로젝트를 선택해 준다.

  • Linux 또는 macOS:
    • ./google-cloud-sdk/bin/gcloud init
  • Windows
    • .\google-cloud-sdk\bin\gcloud init

그 후 터미널에서 다음을 진행한다.

# docker tag 이미지:태그 gcr.io/프로젝트 ID/이미지:태그
$ gcloud auth configure-docker   # 인증된 Docker 구성
$ docker tag tweet2 gcr.io/프로젝트ID/tweet2
$ docker push gcr.io/프로젝트ID/tweet2

 

확인 및 배포

GCP Console - Container Registry 

확인
GKE에 배포

~끝 👍👍~

 

'PROJECT' 카테고리의 다른 글

작귀 ETL 파이프라인  (0) 2023.11.20
블로그 이미지

hjc_

୧( “̮ )୨

,