From 2b73540462c8350513fd3a14549ab6c1b299798b Mon Sep 17 00:00:00 2001 From: user_client2024 Date: Wed, 9 Oct 2024 06:51:46 +0000 Subject: [PATCH] System save at 09/10/2024 12:21 by user_client2024 --- .ipynb_checkpoints/main-checkpoint.ipynb | 335 +++++++++++++++++------ main.ipynb | 335 +++++++++++++++++------ main.py | 307 +++++++++++++++------ 3 files changed, 719 insertions(+), 258 deletions(-) diff --git a/.ipynb_checkpoints/main-checkpoint.ipynb b/.ipynb_checkpoints/main-checkpoint.ipynb index a8cf969..9d1d0a7 100644 --- a/.ipynb_checkpoints/main-checkpoint.ipynb +++ b/.ipynb_checkpoints/main-checkpoint.ipynb @@ -2,116 +2,277 @@ "cells": [ { "cell_type": "code", - "execution_count": null, + "execution_count": 1, "id": "8faa31bf-9b17-4b96-88c0-973636321b97", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4c41b476-0dd0-456c-a68e-79740a3bd820", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "735e6ac9-688f-47a0-98c2-11499dceeaba", + "metadata": {}, + "outputs": [], + "source": [ + "import pandas as pd" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a050c95f-4104-461e-afa4-e824639eedb5", "metadata": {}, "outputs": [], "source": [ - "from datetime import datetime, timedelta\n", - "import pandas as pd\n", "from tms_data_interface import SQLQueryInterface\n", - "\n", + "seq = SQLQueryInterface(schema=\"transactionschema\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "291c2098-902b-4a99-a3d6-df42d0749528", + "metadata": {}, + "outputs": [], + "source": [ + "seq.execute_raw(\"show tables\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "42bdf4f8-ad1f-49d8-a0f1-b084d3434d58", + "metadata": {}, + "outputs": [], + "source": [ "query = \"\"\"\n", - "WITH time_windows AS (\n", - " SELECT\n", - " -- End time is the current trade time\n", - " date_time AS end_time,\n", "\n", - " -- Subtract seconds from the end_time using date_add() with negative integer interval\n", - " date_add('second', -{time_window_s}, date_time) AS start_time,\n", + " select final.CUSTOMER_NUMBER_main as Focal_id,\n", "\n", - " -- Trade details\n", - " trade_price,\n", - " trade_volume,\n", - " trader_id,\n", + " final.Credit_transaction_amount,\n", "\n", - " -- Calculate minimum price within the time window\n", - " MIN(trade_price) OVER (\n", - " ORDER BY date_time \n", - " RANGE BETWEEN INTERVAL '{time_window_s}' SECOND PRECEDING AND CURRENT ROW\n", - " ) AS min_price,\n", + " final.Total_no_of_credit_transactions,\n", "\n", - " -- Calculate maximum price within the time window\n", - " MAX(trade_price) OVER (\n", - " ORDER BY date_time \n", - " RANGE BETWEEN INTERVAL '{time_window_s}' SECOND PRECEDING AND CURRENT ROW\n", - " ) AS max_price,\n", + " final.Debit_transaction_amount,\n", "\n", - " -- Calculate total trade volume within the time window\n", - " SUM(trade_volume) OVER ( \n", - " ORDER BY date_time \n", - " RANGE BETWEEN INTERVAL '{time_window_s}' SECOND PRECEDING AND CURRENT ROW\n", - " ) AS total_volume,\n", + " final.Total_no_of_debit_transactions,\n", "\n", - " -- Calculate participant's trade volume within the time window\n", - " SUM(CASE WHEN trader_id = trader_id THEN trade_volume ELSE 0 END) OVER (\n", - " PARTITION BY trader_id \n", - " ORDER BY date_time \n", - " RANGE BETWEEN INTERVAL '{time_window_s}' SECOND PRECEDING AND CURRENT ROW\n", - " ) AS participant_volume\n", - " FROM\n", - " {trade_data_1b}\n", - ")\n", - "SELECT\n", - " -- Select the time window details\n", - " start_time,\n", - " end_time,\n", + " final.Wash_Ratio,\n", "\n", - " -- Select the participant (trader) ID\n", - " trader_id AS \"Participant\",\n", + " final.SEGMENT,\n", "\n", - " -- Select the calculated min and max prices\n", - " min_price,\n", - " max_price,\n", + " final.RISK,\n", "\n", - " -- Calculate the price change percentage\n", - " (max_price - min_price) / NULLIF(min_price, 0) * 100 AS \"Price Change (%)\",\n", + " final.SAR_FLAG\n", "\n", - " -- Calculate the participant's volume as a percentage of total volume\n", - " (participant_volume / NULLIF(total_volume, 0)) * 100 AS \"Volume (%)\",\n", + " from \n", "\n", - " -- Participant volume\n", - " participant_volume,\n", + " (\n", + "\n", + " (\n", + "\n", + " select subquery.CUSTOMER_NUMBER_1 as CUSTOMER_NUMBER_main,\n", + "\n", + " subquery.Credit_transaction_amount,\n", + "\n", + " subquery.Total_no_of_credit_transactions,\n", + "\n", + " case\n", + "\n", + " when subquery.Debit_transaction_amount is NULL then 0\n", + "\n", + " else Debit_transaction_amount\n", + "\n", + " end as Debit_transaction_amount,\n", + "\n", + " case\n", + "\n", + " when subquery.Total_no_of_debit_transactions is NULL then 0\n", + "\n", + " else Total_no_of_debit_transactions\n", + "\n", + " end as Total_no_of_debit_transactions,\n", + "\n", + " case\n", + "\n", + " when subquery.Debit_transaction_amount = 0\n", + "\n", + " or subquery.Debit_transaction_amount is NULL then 0\n", + "\n", + " else subquery.Credit_transaction_amount / subquery.Debit_transaction_amount\n", + "\n", + " end as Wash_Ratio\n", + "\n", + " from \n", + "\n", + " (\n", + "\n", + " (\n", + "\n", + " select customer_number as CUSTOMER_NUMBER_1, \n", + "\n", + " sum(transaction_amount) as Credit_transaction_amount, \n", + "\n", + " count(*) as Total_no_of_credit_transactions\n", + "\n", + " from \n", + "\n", + " (\n", + "\n", + " select * \n", + "\n", + " from {trans_data} as trans_table left join {acc_data} as acc_table\n", + "\n", + " on trans_table.benef_account_number = acc_table.account_number\n", + "\n", + " )\n", + "\n", + " where account_number not in ('None')\n", + "\n", + " group by 1\n", + "\n", + " ) credit left join\n", + "\n", + " (\n", + "\n", + " select customer_number as CUSTOMER_NUMBER_2, \n", + "\n", + " sum(transaction_amount) as Debit_transaction_amount, \n", + "\n", + " count(*) as Total_no_of_debit_transactions\n", + "\n", + " from \n", + "\n", + " (\n", + "\n", + " select * \n", + "\n", + " from {trans_data} as trans_table left join {acc_data} as acc_table\n", + "\n", + " on trans_table.orig_account_number = acc_table.account_number\n", + "\n", + " )\n", + "\n", + " where account_number not in ('None')\n", + "\n", + " group by 1\n", + "\n", + " ) debit on credit.CUSTOMER_NUMBER_1 = debit.CUSTOMER_NUMBER_2 \n", + "\n", + " ) subquery\n", + "\n", + " ) main left join \n", + "\n", + " (\n", + "\n", + " select subquery.CUSTOMER_NUMBER_3 as CUSTOMER_NUMBER_cust,\n", + "\n", + " subquery.SEGMENT,\n", + "\n", + " subquery.RISK,\n", + "\n", + " case\n", + "\n", + " when subquery.SAR_FLAG is NULL then 'N'\n", + "\n", + " else subquery.SAR_FLAG\n", + "\n", + " end as SAR_FLAG \n", + "\n", + " from\n", + "\n", + " (\n", + "\n", + " (\n", + "\n", + " select customer_number as CUSTOMER_NUMBER_3, \n", + "\n", + " business_segment as SEGMENT,\n", + "\n", + " case\n", + "\n", + " when RISK_CLASSIFICATION = 1 then 'Low Risk'\n", + "\n", + " when RISK_CLASSIFICATION = 2 then 'Medium Risk'\n", + "\n", + " when RISK_CLASSIFICATION = 3 then 'High Risk'\n", + "\n", + " else 'Unknown Risk'\n", + "\n", + " end AS RISK\n", + "\n", + " from {cust_data}\n", + "\n", + " ) cd left join\n", + "\n", + " (\n", + "\n", + " select customer_number as CUSTOMER_NUMBER_4, \n", + "\n", + " sar_flag as SAR_FLAG\n", + "\n", + " from {alert_data}\n", + "\n", + " ) ad on cd.CUSTOMER_NUMBER_3 = ad.CUSTOMER_NUMBER_4\n", + "\n", + " ) subquery\n", + "\n", + " ) cust_alert on cust_alert.CUSTOMER_NUMBER_cust = main.CUSTOMER_NUMBER_main\n", + "\n", + " ) final\n", "\n", - " -- Select the total volume within the window\n", - " total_volume AS \"Total Volume\"\n", - "FROM\n", - " time_windows\n", "\"\"\"\n", - "\n", - "\n", + " " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "98b5784d-2738-4962-b966-ae76593d7a1e", + "metadata": {}, + "outputs": [], + "source": [ "from tms_data_interface import SQLQueryInterface\n", - "\n", + " \n", "class Scenario:\n", - " seq = SQLQueryInterface(schema=\"test_schema_test\")\n", + "\n", + " seq = SQLQueryInterface(schema=\"transactionschema\")\n", + " \n", " def logic(self, **kwargs):\n", - " validation_window = kwargs.get('validation_window')\n", - " time_window_s = int(validation_window/1000)\n", - " query_start_time = datetime.now()\n", - " print(\"Query start time :\",query_start_time)\n", - " row_list = self.seq.execute_raw(query.format(trade_data_1b=\"test_table_test1\",\n", - " time_window_s = time_window_s)\n", + "\n", + " row_list = self.seq.execute_raw(query.format(trans_data=\"transaction10m\",\n", + "\n", + " cust_data=\"customer_data_v1\",\n", + "\n", + " acc_data=\"account_data_v1\",\n", + "\n", + " alert_data=\"alert_data_v1\")\n", + "\n", " )\n", - " cols = [\n", - " 'START_DATE_TIME',\n", - " 'END_DATE_TIME',\n", - " 'Focal_id',\n", - " 'MIN_PRICE',\n", - " 'MAX_PRICE',\n", - " 'PRICE_CHANGE_PCT',\n", - " 'PARTICIPANT_VOLUME_PCT',\n", - " 'PARTICIPANT_VOLUME',\n", - " 'TOTAL_VOLUME',\n", - " ]\n", - " final_scenario_df = pd.DataFrame(row_list, columns = cols)\n", - " final_scenario_df['PARTICIPANT_VOLUME_PCT'] = final_scenario_df['PARTICIPANT_VOLUME']/\\\n", - " final_scenario_df['TOTAL_VOLUME'] * 100\n", - " final_scenario_df['Segment'] = 'Default'\n", - " final_scenario_df['SAR_FLAG'] = 'N'\n", - " final_scenario_df['Risk'] = 'Low Risk'\n", - " final_scenario_df.dropna(inplace=True)\n", - " # final_scenario_df['RUN_DATE'] = final_scenario_df['END_DATE']\n", - " return final_scenario_df" + "\n", + " cols = [\"Focal_id\", \"Credit_transaction_amount\",\n", + "\n", + " \"Total_no_of_credit_transactions\",\n", + "\n", + " \"Debit_transaction_amount\", \"Total_no_of_debit_transactions\",\n", + "\n", + " \"Wash_Ratio\", \"Segment\", \"Risk\", \"SAR_FLAG\"]\n", + "\n", + " df = pd.DataFrame(row_list, columns = cols)\n", + "\n", + " return df\n", + " " ] } ], diff --git a/main.ipynb b/main.ipynb index a8cf969..9d1d0a7 100644 --- a/main.ipynb +++ b/main.ipynb @@ -2,116 +2,277 @@ "cells": [ { "cell_type": "code", - "execution_count": null, + "execution_count": 1, "id": "8faa31bf-9b17-4b96-88c0-973636321b97", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4c41b476-0dd0-456c-a68e-79740a3bd820", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "735e6ac9-688f-47a0-98c2-11499dceeaba", + "metadata": {}, + "outputs": [], + "source": [ + "import pandas as pd" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a050c95f-4104-461e-afa4-e824639eedb5", "metadata": {}, "outputs": [], "source": [ - "from datetime import datetime, timedelta\n", - "import pandas as pd\n", "from tms_data_interface import SQLQueryInterface\n", - "\n", + "seq = SQLQueryInterface(schema=\"transactionschema\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "291c2098-902b-4a99-a3d6-df42d0749528", + "metadata": {}, + "outputs": [], + "source": [ + "seq.execute_raw(\"show tables\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "42bdf4f8-ad1f-49d8-a0f1-b084d3434d58", + "metadata": {}, + "outputs": [], + "source": [ "query = \"\"\"\n", - "WITH time_windows AS (\n", - " SELECT\n", - " -- End time is the current trade time\n", - " date_time AS end_time,\n", "\n", - " -- Subtract seconds from the end_time using date_add() with negative integer interval\n", - " date_add('second', -{time_window_s}, date_time) AS start_time,\n", + " select final.CUSTOMER_NUMBER_main as Focal_id,\n", "\n", - " -- Trade details\n", - " trade_price,\n", - " trade_volume,\n", - " trader_id,\n", + " final.Credit_transaction_amount,\n", "\n", - " -- Calculate minimum price within the time window\n", - " MIN(trade_price) OVER (\n", - " ORDER BY date_time \n", - " RANGE BETWEEN INTERVAL '{time_window_s}' SECOND PRECEDING AND CURRENT ROW\n", - " ) AS min_price,\n", + " final.Total_no_of_credit_transactions,\n", "\n", - " -- Calculate maximum price within the time window\n", - " MAX(trade_price) OVER (\n", - " ORDER BY date_time \n", - " RANGE BETWEEN INTERVAL '{time_window_s}' SECOND PRECEDING AND CURRENT ROW\n", - " ) AS max_price,\n", + " final.Debit_transaction_amount,\n", "\n", - " -- Calculate total trade volume within the time window\n", - " SUM(trade_volume) OVER ( \n", - " ORDER BY date_time \n", - " RANGE BETWEEN INTERVAL '{time_window_s}' SECOND PRECEDING AND CURRENT ROW\n", - " ) AS total_volume,\n", + " final.Total_no_of_debit_transactions,\n", "\n", - " -- Calculate participant's trade volume within the time window\n", - " SUM(CASE WHEN trader_id = trader_id THEN trade_volume ELSE 0 END) OVER (\n", - " PARTITION BY trader_id \n", - " ORDER BY date_time \n", - " RANGE BETWEEN INTERVAL '{time_window_s}' SECOND PRECEDING AND CURRENT ROW\n", - " ) AS participant_volume\n", - " FROM\n", - " {trade_data_1b}\n", - ")\n", - "SELECT\n", - " -- Select the time window details\n", - " start_time,\n", - " end_time,\n", + " final.Wash_Ratio,\n", "\n", - " -- Select the participant (trader) ID\n", - " trader_id AS \"Participant\",\n", + " final.SEGMENT,\n", "\n", - " -- Select the calculated min and max prices\n", - " min_price,\n", - " max_price,\n", + " final.RISK,\n", "\n", - " -- Calculate the price change percentage\n", - " (max_price - min_price) / NULLIF(min_price, 0) * 100 AS \"Price Change (%)\",\n", + " final.SAR_FLAG\n", "\n", - " -- Calculate the participant's volume as a percentage of total volume\n", - " (participant_volume / NULLIF(total_volume, 0)) * 100 AS \"Volume (%)\",\n", + " from \n", "\n", - " -- Participant volume\n", - " participant_volume,\n", + " (\n", + "\n", + " (\n", + "\n", + " select subquery.CUSTOMER_NUMBER_1 as CUSTOMER_NUMBER_main,\n", + "\n", + " subquery.Credit_transaction_amount,\n", + "\n", + " subquery.Total_no_of_credit_transactions,\n", + "\n", + " case\n", + "\n", + " when subquery.Debit_transaction_amount is NULL then 0\n", + "\n", + " else Debit_transaction_amount\n", + "\n", + " end as Debit_transaction_amount,\n", + "\n", + " case\n", + "\n", + " when subquery.Total_no_of_debit_transactions is NULL then 0\n", + "\n", + " else Total_no_of_debit_transactions\n", + "\n", + " end as Total_no_of_debit_transactions,\n", + "\n", + " case\n", + "\n", + " when subquery.Debit_transaction_amount = 0\n", + "\n", + " or subquery.Debit_transaction_amount is NULL then 0\n", + "\n", + " else subquery.Credit_transaction_amount / subquery.Debit_transaction_amount\n", + "\n", + " end as Wash_Ratio\n", + "\n", + " from \n", + "\n", + " (\n", + "\n", + " (\n", + "\n", + " select customer_number as CUSTOMER_NUMBER_1, \n", + "\n", + " sum(transaction_amount) as Credit_transaction_amount, \n", + "\n", + " count(*) as Total_no_of_credit_transactions\n", + "\n", + " from \n", + "\n", + " (\n", + "\n", + " select * \n", + "\n", + " from {trans_data} as trans_table left join {acc_data} as acc_table\n", + "\n", + " on trans_table.benef_account_number = acc_table.account_number\n", + "\n", + " )\n", + "\n", + " where account_number not in ('None')\n", + "\n", + " group by 1\n", + "\n", + " ) credit left join\n", + "\n", + " (\n", + "\n", + " select customer_number as CUSTOMER_NUMBER_2, \n", + "\n", + " sum(transaction_amount) as Debit_transaction_amount, \n", + "\n", + " count(*) as Total_no_of_debit_transactions\n", + "\n", + " from \n", + "\n", + " (\n", + "\n", + " select * \n", + "\n", + " from {trans_data} as trans_table left join {acc_data} as acc_table\n", + "\n", + " on trans_table.orig_account_number = acc_table.account_number\n", + "\n", + " )\n", + "\n", + " where account_number not in ('None')\n", + "\n", + " group by 1\n", + "\n", + " ) debit on credit.CUSTOMER_NUMBER_1 = debit.CUSTOMER_NUMBER_2 \n", + "\n", + " ) subquery\n", + "\n", + " ) main left join \n", + "\n", + " (\n", + "\n", + " select subquery.CUSTOMER_NUMBER_3 as CUSTOMER_NUMBER_cust,\n", + "\n", + " subquery.SEGMENT,\n", + "\n", + " subquery.RISK,\n", + "\n", + " case\n", + "\n", + " when subquery.SAR_FLAG is NULL then 'N'\n", + "\n", + " else subquery.SAR_FLAG\n", + "\n", + " end as SAR_FLAG \n", + "\n", + " from\n", + "\n", + " (\n", + "\n", + " (\n", + "\n", + " select customer_number as CUSTOMER_NUMBER_3, \n", + "\n", + " business_segment as SEGMENT,\n", + "\n", + " case\n", + "\n", + " when RISK_CLASSIFICATION = 1 then 'Low Risk'\n", + "\n", + " when RISK_CLASSIFICATION = 2 then 'Medium Risk'\n", + "\n", + " when RISK_CLASSIFICATION = 3 then 'High Risk'\n", + "\n", + " else 'Unknown Risk'\n", + "\n", + " end AS RISK\n", + "\n", + " from {cust_data}\n", + "\n", + " ) cd left join\n", + "\n", + " (\n", + "\n", + " select customer_number as CUSTOMER_NUMBER_4, \n", + "\n", + " sar_flag as SAR_FLAG\n", + "\n", + " from {alert_data}\n", + "\n", + " ) ad on cd.CUSTOMER_NUMBER_3 = ad.CUSTOMER_NUMBER_4\n", + "\n", + " ) subquery\n", + "\n", + " ) cust_alert on cust_alert.CUSTOMER_NUMBER_cust = main.CUSTOMER_NUMBER_main\n", + "\n", + " ) final\n", "\n", - " -- Select the total volume within the window\n", - " total_volume AS \"Total Volume\"\n", - "FROM\n", - " time_windows\n", "\"\"\"\n", - "\n", - "\n", + " " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "98b5784d-2738-4962-b966-ae76593d7a1e", + "metadata": {}, + "outputs": [], + "source": [ "from tms_data_interface import SQLQueryInterface\n", - "\n", + " \n", "class Scenario:\n", - " seq = SQLQueryInterface(schema=\"test_schema_test\")\n", + "\n", + " seq = SQLQueryInterface(schema=\"transactionschema\")\n", + " \n", " def logic(self, **kwargs):\n", - " validation_window = kwargs.get('validation_window')\n", - " time_window_s = int(validation_window/1000)\n", - " query_start_time = datetime.now()\n", - " print(\"Query start time :\",query_start_time)\n", - " row_list = self.seq.execute_raw(query.format(trade_data_1b=\"test_table_test1\",\n", - " time_window_s = time_window_s)\n", + "\n", + " row_list = self.seq.execute_raw(query.format(trans_data=\"transaction10m\",\n", + "\n", + " cust_data=\"customer_data_v1\",\n", + "\n", + " acc_data=\"account_data_v1\",\n", + "\n", + " alert_data=\"alert_data_v1\")\n", + "\n", " )\n", - " cols = [\n", - " 'START_DATE_TIME',\n", - " 'END_DATE_TIME',\n", - " 'Focal_id',\n", - " 'MIN_PRICE',\n", - " 'MAX_PRICE',\n", - " 'PRICE_CHANGE_PCT',\n", - " 'PARTICIPANT_VOLUME_PCT',\n", - " 'PARTICIPANT_VOLUME',\n", - " 'TOTAL_VOLUME',\n", - " ]\n", - " final_scenario_df = pd.DataFrame(row_list, columns = cols)\n", - " final_scenario_df['PARTICIPANT_VOLUME_PCT'] = final_scenario_df['PARTICIPANT_VOLUME']/\\\n", - " final_scenario_df['TOTAL_VOLUME'] * 100\n", - " final_scenario_df['Segment'] = 'Default'\n", - " final_scenario_df['SAR_FLAG'] = 'N'\n", - " final_scenario_df['Risk'] = 'Low Risk'\n", - " final_scenario_df.dropna(inplace=True)\n", - " # final_scenario_df['RUN_DATE'] = final_scenario_df['END_DATE']\n", - " return final_scenario_df" + "\n", + " cols = [\"Focal_id\", \"Credit_transaction_amount\",\n", + "\n", + " \"Total_no_of_credit_transactions\",\n", + "\n", + " \"Debit_transaction_amount\", \"Total_no_of_debit_transactions\",\n", + "\n", + " \"Wash_Ratio\", \"Segment\", \"Risk\", \"SAR_FLAG\"]\n", + "\n", + " df = pd.DataFrame(row_list, columns = cols)\n", + "\n", + " return df\n", + " " ] } ], diff --git a/main.py b/main.py index 27248a6..20b19ca 100644 --- a/main.py +++ b/main.py @@ -1,112 +1,251 @@ #!/usr/bin/env python # coding: utf-8 +# In[1]: + + + + + +# In[ ]: + + + + + # In[ ]: -from datetime import datetime, timedelta import pandas as pd + + +# In[ ]: + + from tms_data_interface import SQLQueryInterface +seq = SQLQueryInterface(schema="transactionschema") + + +# In[ ]: + + +seq.execute_raw("show tables") + + +# In[ ]: + query = """ -WITH time_windows AS ( - SELECT - -- End time is the current trade time - date_time AS end_time, - -- Subtract seconds from the end_time using date_add() with negative integer interval - date_add('second', -{time_window_s}, date_time) AS start_time, + select final.CUSTOMER_NUMBER_main as Focal_id, - -- Trade details - trade_price, - trade_volume, - trader_id, + final.Credit_transaction_amount, - -- Calculate minimum price within the time window - MIN(trade_price) OVER ( - ORDER BY date_time - RANGE BETWEEN INTERVAL '{time_window_s}' SECOND PRECEDING AND CURRENT ROW - ) AS min_price, + final.Total_no_of_credit_transactions, - -- Calculate maximum price within the time window - MAX(trade_price) OVER ( - ORDER BY date_time - RANGE BETWEEN INTERVAL '{time_window_s}' SECOND PRECEDING AND CURRENT ROW - ) AS max_price, + final.Debit_transaction_amount, - -- Calculate total trade volume within the time window - SUM(trade_volume) OVER ( - ORDER BY date_time - RANGE BETWEEN INTERVAL '{time_window_s}' SECOND PRECEDING AND CURRENT ROW - ) AS total_volume, + final.Total_no_of_debit_transactions, - -- Calculate participant's trade volume within the time window - SUM(CASE WHEN trader_id = trader_id THEN trade_volume ELSE 0 END) OVER ( - PARTITION BY trader_id - ORDER BY date_time - RANGE BETWEEN INTERVAL '{time_window_s}' SECOND PRECEDING AND CURRENT ROW - ) AS participant_volume - FROM - {trade_data_1b} -) -SELECT - -- Select the time window details - start_time, - end_time, + final.Wash_Ratio, - -- Select the participant (trader) ID - trader_id AS "Participant", + final.SEGMENT, - -- Select the calculated min and max prices - min_price, - max_price, + final.RISK, - -- Calculate the price change percentage - (max_price - min_price) / NULLIF(min_price, 0) * 100 AS "Price Change (%)", + final.SAR_FLAG - -- Calculate the participant's volume as a percentage of total volume - (participant_volume / NULLIF(total_volume, 0)) * 100 AS "Volume (%)", + from - -- Participant volume - participant_volume, + ( + + ( + + select subquery.CUSTOMER_NUMBER_1 as CUSTOMER_NUMBER_main, + + subquery.Credit_transaction_amount, + + subquery.Total_no_of_credit_transactions, + + case + + when subquery.Debit_transaction_amount is NULL then 0 + + else Debit_transaction_amount + + end as Debit_transaction_amount, + + case + + when subquery.Total_no_of_debit_transactions is NULL then 0 + + else Total_no_of_debit_transactions + + end as Total_no_of_debit_transactions, + + case + + when subquery.Debit_transaction_amount = 0 + + or subquery.Debit_transaction_amount is NULL then 0 + + else subquery.Credit_transaction_amount / subquery.Debit_transaction_amount + + end as Wash_Ratio + + from + + ( + + ( + + select customer_number as CUSTOMER_NUMBER_1, + + sum(transaction_amount) as Credit_transaction_amount, + + count(*) as Total_no_of_credit_transactions + + from + + ( + + select * + + from {trans_data} as trans_table left join {acc_data} as acc_table + + on trans_table.benef_account_number = acc_table.account_number + + ) + + where account_number not in ('None') + + group by 1 + + ) credit left join + + ( + + select customer_number as CUSTOMER_NUMBER_2, + + sum(transaction_amount) as Debit_transaction_amount, + + count(*) as Total_no_of_debit_transactions + + from + + ( + + select * + + from {trans_data} as trans_table left join {acc_data} as acc_table + + on trans_table.orig_account_number = acc_table.account_number + + ) + + where account_number not in ('None') + + group by 1 + + ) debit on credit.CUSTOMER_NUMBER_1 = debit.CUSTOMER_NUMBER_2 + + ) subquery + + ) main left join + + ( + + select subquery.CUSTOMER_NUMBER_3 as CUSTOMER_NUMBER_cust, + + subquery.SEGMENT, + + subquery.RISK, + + case + + when subquery.SAR_FLAG is NULL then 'N' + + else subquery.SAR_FLAG + + end as SAR_FLAG + + from + + ( + + ( + + select customer_number as CUSTOMER_NUMBER_3, + + business_segment as SEGMENT, + + case + + when RISK_CLASSIFICATION = 1 then 'Low Risk' + + when RISK_CLASSIFICATION = 2 then 'Medium Risk' + + when RISK_CLASSIFICATION = 3 then 'High Risk' + + else 'Unknown Risk' + + end AS RISK + + from {cust_data} + + ) cd left join + + ( + + select customer_number as CUSTOMER_NUMBER_4, + + sar_flag as SAR_FLAG + + from {alert_data} + + ) ad on cd.CUSTOMER_NUMBER_3 = ad.CUSTOMER_NUMBER_4 + + ) subquery + + ) cust_alert on cust_alert.CUSTOMER_NUMBER_cust = main.CUSTOMER_NUMBER_main + + ) final - -- Select the total volume within the window - total_volume AS "Total Volume" -FROM - time_windows """ + + + +# In[ ]: from tms_data_interface import SQLQueryInterface - + class Scenario: - seq = SQLQueryInterface(schema="test_schema_test") - def logic(self, **kwargs): - validation_window = kwargs.get('validation_window') - time_window_s = int(validation_window/1000) - query_start_time = datetime.now() - print("Query start time :",query_start_time) - row_list = self.seq.execute_raw(query.format(trade_data_1b="test_table_test1", - time_window_s = time_window_s) - ) - cols = [ - 'START_DATE_TIME', - 'END_DATE_TIME', - 'Focal_id', - 'MIN_PRICE', - 'MAX_PRICE', - 'PRICE_CHANGE_PCT', - 'PARTICIPANT_VOLUME_PCT', - 'PARTICIPANT_VOLUME', - 'TOTAL_VOLUME', - ] - final_scenario_df = pd.DataFrame(row_list, columns = cols) - final_scenario_df['PARTICIPANT_VOLUME_PCT'] = final_scenario_df['PARTICIPANT_VOLUME']/\ - final_scenario_df['TOTAL_VOLUME'] * 100 - final_scenario_df['Segment'] = 'Default' - final_scenario_df['SAR_FLAG'] = 'N' - final_scenario_df['Risk'] = 'Low Risk' - final_scenario_df.dropna(inplace=True) - # final_scenario_df['RUN_DATE'] = final_scenario_df['END_DATE'] - return final_scenario_df + + seq = SQLQueryInterface(schema="transactionschema") + + def logic(self, **kwargs): + + row_list = self.seq.execute_raw(query.format(trans_data="transaction10m", + + cust_data="customer_data_v1", + + acc_data="account_data_v1", + + alert_data="alert_data_v1") + + ) + + cols = ["Focal_id", "Credit_transaction_amount", + + "Total_no_of_credit_transactions", + + "Debit_transaction_amount", "Total_no_of_debit_transactions", + + "Wash_Ratio", "Segment", "Risk", "SAR_FLAG"] + + df = pd.DataFrame(row_list, columns = cols) + + return df +