Please create a request on https://ezq.quantiphi.com/gitlab/user-access if you want to create a new group or project.

bq_to_pubsub.py 1.13 KB
Newer Older
Pavan Kattamuri's avatar
Pavan Kattamuri committed
1 2
"""Reads transactions from BigQuery and ingests
into Pub/Sub for simulating realtime scenarios."""
Pavan Kattamuri's avatar
WIP  
Pavan Kattamuri committed
3 4 5 6 7 8
import json
import sys
import time
from google.cloud import bigquery
from google.cloud import pubsub_v1

Pavan Kattamuri's avatar
Pavan Kattamuri committed
9 10 11 12 13
publisher = pubsub_v1.PublisherClient()
project_id = sys.argv[1]
topic_name = sys.argv[2]
topic_path = publisher.topic_path(project_id, topic_name)

Pavan Kattamuri's avatar
WIP  
Pavan Kattamuri committed
14 15
# Construct a BigQuery client object.
client = bigquery.Client()
Pavan Kattamuri's avatar
Pavan Kattamuri committed
16
query = """SELECT t1.*,t2.* EXCEPT(cc_num)
Pavan Kattamuri's avatar
Pavan Kattamuri committed
17 18
FROM `qp-fraud-detection.cc_data.simulation_data` t1 
LEFT JOIN `qp-fraud-detection.cc_data.demographics` t2
Pavan Kattamuri's avatar
Pavan Kattamuri committed
19
ON t1.cc_num =t2.cc_num 
Pavan Kattamuri's avatar
Pavan Kattamuri committed
20
order by trans_date_trans_time asc limit 1000"""
Pavan Kattamuri's avatar
WIP  
Pavan Kattamuri committed
21

Pavan Kattamuri's avatar
Pavan Kattamuri committed
22
print('Fetching simulated transactions from BigQuery')
Pavan Kattamuri's avatar
WIP  
Pavan Kattamuri committed
23
query_job = client.query(query)
Pavan Kattamuri's avatar
Pavan Kattamuri committed
24
print('Publishing messages to Pub/Sub topic')
Pavan Kattamuri's avatar
Pavan Kattamuri committed
25
count=0
Pavan Kattamuri's avatar
WIP  
Pavan Kattamuri committed
26 27 28 29 30 31 32 33
for row in query_job:
    row = dict(row)
    row['trans_date_trans_time'] = row['trans_date_trans_time'].strftime('%Y-%m-%d %H:%M:%S')
    row['dob'] = row['dob'].strftime('%Y-%m-%d')
    # Data must be a bytestring
    data = json.dumps(row).encode("utf-8")
    # When you publish a message, the client returns a future.
    future = publisher.publish(topic_path, data)