Please create a request on https://ezq.quantiphi.com/gitlab/user-access if you want to create a new group or project.

Commit 62edc628 authored by Pavan Kattamuri's avatar Pavan Kattamuri
Browse files

changed PTransform names

parent 1eae44e6
......@@ -375,7 +375,7 @@ python3 inference_pipeline.py \
--region us-east1 \
--streaming \
--runner DataflowRunner \
--job_name predict-fraudelent-transactions \
--job_name predict-fraudulent-transactions \
--requirements_file requirements.txt
```
......
......@@ -173,6 +173,7 @@ class InvokeMLEndpoint(beam.DoFn):
"avg_spend_pm": elem['avg_spend_pw'],
}]
# Get prediction and store prediction and confidence scores
output = predict_json(self.model_w_agg, w_agg_instance)
if output[0]=='Success':
elem['is_fraud_model_w_aggregates'] = int(output[1][0]['predicted_is_fraud'])
......@@ -258,8 +259,8 @@ def run(argv=None):
ml_output = processed_data | "Invoke ML Model" >> beam.ParDo(InvokeMLEndpoint(project_id, model_name, model_w_agg, model_wo_agg))
ml_output | "Filter fraudulent txns" >> beam.ParDo(FilterFraud()) \
| "Send Notifications" >> beam.io.WriteToPubSub(topic = f'projects/{project_id}/topics/{fraud_notification_topic}')
ml_output | "Write to transactions" >> beam.io.WriteToBigQuery(
| "Notifications to PubSub" >> beam.io.WriteToPubSub(topic = f'projects/{project_id}/topics/{fraud_notification_topic}')
ml_output | "Write pred to BigQuery" >> beam.io.WriteToBigQuery(
table=f'{project_id}:{dataset_id}.{table_name}',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
......@@ -270,22 +271,3 @@ def run(argv=None):
if __name__ == "__main__":
run()
# python3 inference_pipeline.py \
# --project qp-uk-ml-patterns-2020-11 \
# --firestore-project qp-uk-ml-patterns-2020-11 \
# --subscription-name sub1 \
# --firestore-collection v5_cc_transactions \
# --dataset-id fraud_detection_patterns \
# --table-name output_transactions \
# --model-name patterns \
# --model-with-aggregates with_agg \
# --model-without-aggregates without_agg \
# --fraud-notification-topic fraud_notifications \
# --staging_location gs://fraud_detection_sparkov/dataflow/staging \
# --temp_location gs://fraud_detection_sparkov/dataflow/tmp/ \
# --region us-east1 \
# --streaming \
# --runner DataflowRunner \
# --job_name predict-fraudelent-transactions \
# --requirements_file requirements.txt
......@@ -14,10 +14,10 @@ topic_path = publisher.topic_path(project_id, topic_name)
# Construct a BigQuery client object.
client = bigquery.Client()
query = """SELECT t1.*,t2.* EXCEPT(cc_num)
FROM `qp-fraud-detection.cc_data.demographics` t1
RIGHT JOIN `qp-fraud-detection.cc_data.simulation_data`t2
FROM `qp-fraud-detection.cc_data.simulation_data` t1
LEFT JOIN `qp-fraud-detection.cc_data.demographics` t2
ON t1.cc_num =t2.cc_num
order by trans_date_trans_time asc limit 10000"""
order by trans_date_trans_time asc limit 1000"""
print('Fetching simulated transactions from BigQuery')
query_job = client.query(query)
......
......@@ -15,7 +15,7 @@ query = '''WITH t1 AS (SELECT cc_num, amt, trans_date_trans_time FROM `qp-fraud-
UNION ALL
SELECT cc_num, amt, trans_date_trans_time FROM `qp-fraud-detection.cc_data.test_raw`),
t2 AS (SELECT * FROM t1 WHERE trans_date_trans_time > TIMESTAMP('2020-11-01 00:00:00 UTC'))
SELECT cc_num, ARRAY_AGG(STRUCT(amt, trans_date_trans_time) ORDER BY trans_date_trans_time ) AS trans_details FROM t2 GROUP BY cc_num
SELECT cc_num, ARRAY_AGG(STRUCT(amt, trans_date_trans_time) ORDER BY trans_date_trans_time desc) AS trans_details FROM t2 GROUP BY cc_num
'''
print('Fetching transaction history from BigQuery')
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment