diff --git a/.ipynb_checkpoints/main-checkpoint.ipynb b/.ipynb_checkpoints/main-checkpoint.ipynb index 1413d17..71e2dcd 100644 --- a/.ipynb_checkpoints/main-checkpoint.ipynb +++ b/.ipynb_checkpoints/main-checkpoint.ipynb @@ -3,149 +3,115 @@ { "cell_type": "code", "execution_count": null, - "id": "393f184e-6bc6-44d0-bf4f-89610f05c1dc", - "metadata": {}, - "outputs": [], - "source": [ - "import pandas as pd" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "1599a219-9bf4-49d5-aed4-db58f6810c26", + "id": "a4895b40-8a5a-41d8-b3dc-d2590fc04c3c", "metadata": {}, "outputs": [], "source": [ + "from datetime import datetime, timedelta\n", + "import pandas as pd\n", "from tms_data_interface import SQLQueryInterface\n", - "seq = SQLQueryInterface(schema = \"qa_schema\")\n", - "seq.execute_raw(\"show tables\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "f7df9028-84c1-47dd-82e6-161238532686", - "metadata": {}, - "outputs": [], - "source": [ + "\n", "query = \"\"\"\n", - " select final.CUSTOMER_NUMBER_main as Focal_id,\n", - " final.Credit_transaction_amount,\n", - " final.Total_no_of_credit_transactions,\n", - " final.Debit_transaction_amount,\n", - " final.Total_no_of_debit_transactions,\n", - " final.Wash_Ratio,\n", - " final.SEGMENT,\n", - " final.RISK,\n", - " final.SAR_FLAG\n", - " from \n", - " (\n", - " (\n", - " select subquery.CUSTOMER_NUMBER_1 as CUSTOMER_NUMBER_main,\n", - " subquery.Credit_transaction_amount,\n", - " subquery.Total_no_of_credit_transactions,\n", - " case\n", - " when subquery.Debit_transaction_amount is NULL then 0\n", - " else Debit_transaction_amount\n", - " end as Debit_transaction_amount,\n", - " case\n", - " when subquery.Total_no_of_debit_transactions is NULL then 0\n", - " else Total_no_of_debit_transactions\n", - " end as Total_no_of_debit_transactions,\n", - " case\n", - " when subquery.Debit_transaction_amount = 0\n", - " or subquery.Debit_transaction_amount is NULL then 0\n", - " else subquery.Credit_transaction_amount / subquery.Debit_transaction_amount\n", - " end as Wash_Ratio\n", - " from \n", - " (\n", - " (\n", - " select customer_number as CUSTOMER_NUMBER_1, \n", - " sum(transaction_amount) as Credit_transaction_amount, \n", - " count(*) as Total_no_of_credit_transactions\n", - " from \n", - " (\n", - " select * \n", - " from {trans_data} as trans_table left join {acc_data} as acc_table\n", - " on trans_table.benef_account_number = acc_table.account_number\n", - " )\n", - " where account_number not in ('None')\n", - " group by 1\n", - " ) credit left join\n", - " (\n", - " select customer_number as CUSTOMER_NUMBER_2, \n", - " sum(transaction_amount) as Debit_transaction_amount, \n", - " count(*) as Total_no_of_debit_transactions\n", - " from \n", - " (\n", - " select * \n", - " from {trans_data} as trans_table left join {acc_data} as acc_table\n", - " on trans_table.orig_account_number = acc_table.account_number\n", - " )\n", - " where account_number not in ('None')\n", - " group by 1\n", - " ) debit on credit.CUSTOMER_NUMBER_1 = debit.CUSTOMER_NUMBER_2 \n", - " ) subquery\n", - " ) main left join \n", - " (\n", - " select subquery.CUSTOMER_NUMBER_3 as CUSTOMER_NUMBER_cust,\n", - " subquery.SEGMENT,\n", - " subquery.RISK,\n", - " case\n", - " when subquery.SAR_FLAG is NULL then 'N'\n", - " else subquery.SAR_FLAG\n", - " end as SAR_FLAG \n", - " from\n", - " (\n", - " (\n", - " select customer_number as CUSTOMER_NUMBER_3, \n", - " business_segment as SEGMENT,\n", - " case\n", - " when RISK_CLASSIFICATION = 1 then 'Low Risk'\n", - " when RISK_CLASSIFICATION = 2 then 'Medium Risk'\n", - " when RISK_CLASSIFICATION = 3 then 'High Risk'\n", - " else 'Unknown Risk'\n", - " end AS RISK\n", - " from {cust_data}\n", - " ) cd left join\n", - " (\n", - " select customer_number as CUSTOMER_NUMBER_4, \n", - " sar_flag as SAR_FLAG\n", - " from {alert_data}\n", - " ) ad on cd.CUSTOMER_NUMBER_3 = ad.CUSTOMER_NUMBER_4\n", - " ) subquery\n", - " ) cust_alert on cust_alert.CUSTOMER_NUMBER_cust = main.CUSTOMER_NUMBER_main\n", - " ) final\n", + "WITH time_windows AS (\n", + " SELECT\n", + " -- End time is the current trade time\n", + " date_time AS end_time,\n", + "\n", + " -- Subtract seconds from the end_time using date_add() with negative integer interval\n", + " date_add('second', -{time_window_s}, date_time) AS start_time,\n", + "\n", + " -- Trade details\n", + " trade_price,\n", + " trade_volume,\n", + " trader_id,\n", + "\n", + " -- Calculate minimum price within the time window\n", + " MIN(trade_price) OVER (\n", + " ORDER BY date_time \n", + " RANGE BETWEEN INTERVAL '{time_window_s}' SECOND PRECEDING AND CURRENT ROW\n", + " ) AS min_price,\n", + "\n", + " -- Calculate maximum price within the time window\n", + " MAX(trade_price) OVER (\n", + " ORDER BY date_time \n", + " RANGE BETWEEN INTERVAL '{time_window_s}' SECOND PRECEDING AND CURRENT ROW\n", + " ) AS max_price,\n", + "\n", + " -- Calculate total trade volume within the time window\n", + " SUM(trade_volume) OVER ( \n", + " ORDER BY date_time \n", + " RANGE BETWEEN INTERVAL '{time_window_s}' SECOND PRECEDING AND CURRENT ROW\n", + " ) AS total_volume,\n", + "\n", + " -- Calculate participant's trade volume within the time window\n", + " SUM(CASE WHEN trader_id = trader_id THEN trade_volume ELSE 0 END) OVER (\n", + " PARTITION BY trader_id \n", + " ORDER BY date_time \n", + " RANGE BETWEEN INTERVAL '{time_window_s}' SECOND PRECEDING AND CURRENT ROW\n", + " ) AS participant_volume\n", + " FROM\n", + " {trade_data_1b}\n", + ")\n", + "SELECT\n", + " -- Select the time window details\n", + " start_time,\n", + " end_time,\n", + "\n", + " -- Select the participant (trader) ID\n", + " trader_id AS \"Participant\",\n", + "\n", + " -- Select the calculated min and max prices\n", + " min_price,\n", + " max_price,\n", + "\n", + " -- Calculate the price change percentage\n", + " (max_price - min_price) / NULLIF(min_price, 0) * 100 AS \"Price Change (%)\",\n", + "\n", + " -- Calculate the participant's volume as a percentage of total volume\n", + " (participant_volume / NULLIF(total_volume, 0)) * 100 AS \"Volume (%)\",\n", + "\n", + " -- Participant volume\n", + " participant_volume,\n", + "\n", + " -- Select the total volume within the window\n", + " total_volume AS \"Total Volume\"\n", + "FROM\n", + " time_windows\n", "\"\"\"\n", - "\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "361ae64c-dc41-456d-93e4-f7d8b211dce4", - "metadata": {}, - "outputs": [], - "source": [ + "\n", + "\n", "from tms_data_interface import SQLQueryInterface\n", "\n", "class Scenario:\n", - " seq = SQLQueryInterface(schema =\"qa_schema\")\n", - "\n", + " seq = SQLQueryInterface(schema=\"trade_schema\")\n", " def logic(self, **kwargs):\n", - " row_list = self.seq.execute_raw(query.format(trans_data=\"qa_table_test\",\n", - " cust_data=\"customer_v1\",\n", - " acc_data=\"account_v1\",\n", - " alert_data=\"alert_v1\")\n", + " validation_window = kwargs.get('validation_window')\n", + " time_window_s = int(validation_window/1000)\n", + " query_start_time = datetime.now()\n", + " print(\"Query start time :\",query_start_time)\n", + " row_list = self.seq.execute_raw(query.format(trade_data_1b=\"trade_10m_v3\",\n", + " time_window_s = time_window_s)\n", " )\n", - " cols = [\"Focal_id\", \"Credit_transaction_amount\",\n", - " \"Total_no_of_credit_transactions\",\n", - " \"Debit_transaction_amount\", \"Total_no_of_debit_transactions\",\n", - " \"Wash_Ratio\",\n", - " \"Segment\", \"Risk\", \"SAR_FLAG\"]\n", - " df = pd.DataFrame(row_list, columns = cols)\n", - " return df" + " cols = [\n", + " 'START_DATE_TIME',\n", + " 'END_DATE_TIME',\n", + " 'Focal_id',\n", + " 'MIN_PRICE',\n", + " 'MAX_PRICE',\n", + " 'PRICE_CHANGE_PCT',\n", + " 'PARTICIPANT_VOLUME_PCT',\n", + " 'PARTICIPANT_VOLUME',\n", + " 'TOTAL_VOLUME',\n", + " ]\n", + " final_scenario_df = pd.DataFrame(row_list, columns = cols)\n", + " final_scenario_df['PARTICIPANT_VOLUME_PCT'] = final_scenario_df['PARTICIPANT_VOLUME']/\\\n", + " final_scenario_df['TOTAL_VOLUME'] * 100\n", + " final_scenario_df['Segment'] = 'Default'\n", + " final_scenario_df['SAR_FLAG'] = 'N'\n", + " final_scenario_df['Risk'] = 'Low Risk'\n", + " final_scenario_df.dropna(inplace=True)\n", + " # final_scenario_df['RUN_DATE'] = final_scenario_df['END_DATE']\n", + " return final_scenario_df" ] } ], diff --git a/main.ipynb b/main.ipynb index 1413d17..71e2dcd 100644 --- a/main.ipynb +++ b/main.ipynb @@ -3,149 +3,115 @@ { "cell_type": "code", "execution_count": null, - "id": "393f184e-6bc6-44d0-bf4f-89610f05c1dc", - "metadata": {}, - "outputs": [], - "source": [ - "import pandas as pd" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "1599a219-9bf4-49d5-aed4-db58f6810c26", + "id": "a4895b40-8a5a-41d8-b3dc-d2590fc04c3c", "metadata": {}, "outputs": [], "source": [ + "from datetime import datetime, timedelta\n", + "import pandas as pd\n", "from tms_data_interface import SQLQueryInterface\n", - "seq = SQLQueryInterface(schema = \"qa_schema\")\n", - "seq.execute_raw(\"show tables\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "f7df9028-84c1-47dd-82e6-161238532686", - "metadata": {}, - "outputs": [], - "source": [ + "\n", "query = \"\"\"\n", - " select final.CUSTOMER_NUMBER_main as Focal_id,\n", - " final.Credit_transaction_amount,\n", - " final.Total_no_of_credit_transactions,\n", - " final.Debit_transaction_amount,\n", - " final.Total_no_of_debit_transactions,\n", - " final.Wash_Ratio,\n", - " final.SEGMENT,\n", - " final.RISK,\n", - " final.SAR_FLAG\n", - " from \n", - " (\n", - " (\n", - " select subquery.CUSTOMER_NUMBER_1 as CUSTOMER_NUMBER_main,\n", - " subquery.Credit_transaction_amount,\n", - " subquery.Total_no_of_credit_transactions,\n", - " case\n", - " when subquery.Debit_transaction_amount is NULL then 0\n", - " else Debit_transaction_amount\n", - " end as Debit_transaction_amount,\n", - " case\n", - " when subquery.Total_no_of_debit_transactions is NULL then 0\n", - " else Total_no_of_debit_transactions\n", - " end as Total_no_of_debit_transactions,\n", - " case\n", - " when subquery.Debit_transaction_amount = 0\n", - " or subquery.Debit_transaction_amount is NULL then 0\n", - " else subquery.Credit_transaction_amount / subquery.Debit_transaction_amount\n", - " end as Wash_Ratio\n", - " from \n", - " (\n", - " (\n", - " select customer_number as CUSTOMER_NUMBER_1, \n", - " sum(transaction_amount) as Credit_transaction_amount, \n", - " count(*) as Total_no_of_credit_transactions\n", - " from \n", - " (\n", - " select * \n", - " from {trans_data} as trans_table left join {acc_data} as acc_table\n", - " on trans_table.benef_account_number = acc_table.account_number\n", - " )\n", - " where account_number not in ('None')\n", - " group by 1\n", - " ) credit left join\n", - " (\n", - " select customer_number as CUSTOMER_NUMBER_2, \n", - " sum(transaction_amount) as Debit_transaction_amount, \n", - " count(*) as Total_no_of_debit_transactions\n", - " from \n", - " (\n", - " select * \n", - " from {trans_data} as trans_table left join {acc_data} as acc_table\n", - " on trans_table.orig_account_number = acc_table.account_number\n", - " )\n", - " where account_number not in ('None')\n", - " group by 1\n", - " ) debit on credit.CUSTOMER_NUMBER_1 = debit.CUSTOMER_NUMBER_2 \n", - " ) subquery\n", - " ) main left join \n", - " (\n", - " select subquery.CUSTOMER_NUMBER_3 as CUSTOMER_NUMBER_cust,\n", - " subquery.SEGMENT,\n", - " subquery.RISK,\n", - " case\n", - " when subquery.SAR_FLAG is NULL then 'N'\n", - " else subquery.SAR_FLAG\n", - " end as SAR_FLAG \n", - " from\n", - " (\n", - " (\n", - " select customer_number as CUSTOMER_NUMBER_3, \n", - " business_segment as SEGMENT,\n", - " case\n", - " when RISK_CLASSIFICATION = 1 then 'Low Risk'\n", - " when RISK_CLASSIFICATION = 2 then 'Medium Risk'\n", - " when RISK_CLASSIFICATION = 3 then 'High Risk'\n", - " else 'Unknown Risk'\n", - " end AS RISK\n", - " from {cust_data}\n", - " ) cd left join\n", - " (\n", - " select customer_number as CUSTOMER_NUMBER_4, \n", - " sar_flag as SAR_FLAG\n", - " from {alert_data}\n", - " ) ad on cd.CUSTOMER_NUMBER_3 = ad.CUSTOMER_NUMBER_4\n", - " ) subquery\n", - " ) cust_alert on cust_alert.CUSTOMER_NUMBER_cust = main.CUSTOMER_NUMBER_main\n", - " ) final\n", + "WITH time_windows AS (\n", + " SELECT\n", + " -- End time is the current trade time\n", + " date_time AS end_time,\n", + "\n", + " -- Subtract seconds from the end_time using date_add() with negative integer interval\n", + " date_add('second', -{time_window_s}, date_time) AS start_time,\n", + "\n", + " -- Trade details\n", + " trade_price,\n", + " trade_volume,\n", + " trader_id,\n", + "\n", + " -- Calculate minimum price within the time window\n", + " MIN(trade_price) OVER (\n", + " ORDER BY date_time \n", + " RANGE BETWEEN INTERVAL '{time_window_s}' SECOND PRECEDING AND CURRENT ROW\n", + " ) AS min_price,\n", + "\n", + " -- Calculate maximum price within the time window\n", + " MAX(trade_price) OVER (\n", + " ORDER BY date_time \n", + " RANGE BETWEEN INTERVAL '{time_window_s}' SECOND PRECEDING AND CURRENT ROW\n", + " ) AS max_price,\n", + "\n", + " -- Calculate total trade volume within the time window\n", + " SUM(trade_volume) OVER ( \n", + " ORDER BY date_time \n", + " RANGE BETWEEN INTERVAL '{time_window_s}' SECOND PRECEDING AND CURRENT ROW\n", + " ) AS total_volume,\n", + "\n", + " -- Calculate participant's trade volume within the time window\n", + " SUM(CASE WHEN trader_id = trader_id THEN trade_volume ELSE 0 END) OVER (\n", + " PARTITION BY trader_id \n", + " ORDER BY date_time \n", + " RANGE BETWEEN INTERVAL '{time_window_s}' SECOND PRECEDING AND CURRENT ROW\n", + " ) AS participant_volume\n", + " FROM\n", + " {trade_data_1b}\n", + ")\n", + "SELECT\n", + " -- Select the time window details\n", + " start_time,\n", + " end_time,\n", + "\n", + " -- Select the participant (trader) ID\n", + " trader_id AS \"Participant\",\n", + "\n", + " -- Select the calculated min and max prices\n", + " min_price,\n", + " max_price,\n", + "\n", + " -- Calculate the price change percentage\n", + " (max_price - min_price) / NULLIF(min_price, 0) * 100 AS \"Price Change (%)\",\n", + "\n", + " -- Calculate the participant's volume as a percentage of total volume\n", + " (participant_volume / NULLIF(total_volume, 0)) * 100 AS \"Volume (%)\",\n", + "\n", + " -- Participant volume\n", + " participant_volume,\n", + "\n", + " -- Select the total volume within the window\n", + " total_volume AS \"Total Volume\"\n", + "FROM\n", + " time_windows\n", "\"\"\"\n", - "\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "361ae64c-dc41-456d-93e4-f7d8b211dce4", - "metadata": {}, - "outputs": [], - "source": [ + "\n", + "\n", "from tms_data_interface import SQLQueryInterface\n", "\n", "class Scenario:\n", - " seq = SQLQueryInterface(schema =\"qa_schema\")\n", - "\n", + " seq = SQLQueryInterface(schema=\"trade_schema\")\n", " def logic(self, **kwargs):\n", - " row_list = self.seq.execute_raw(query.format(trans_data=\"qa_table_test\",\n", - " cust_data=\"customer_v1\",\n", - " acc_data=\"account_v1\",\n", - " alert_data=\"alert_v1\")\n", + " validation_window = kwargs.get('validation_window')\n", + " time_window_s = int(validation_window/1000)\n", + " query_start_time = datetime.now()\n", + " print(\"Query start time :\",query_start_time)\n", + " row_list = self.seq.execute_raw(query.format(trade_data_1b=\"trade_10m_v3\",\n", + " time_window_s = time_window_s)\n", " )\n", - " cols = [\"Focal_id\", \"Credit_transaction_amount\",\n", - " \"Total_no_of_credit_transactions\",\n", - " \"Debit_transaction_amount\", \"Total_no_of_debit_transactions\",\n", - " \"Wash_Ratio\",\n", - " \"Segment\", \"Risk\", \"SAR_FLAG\"]\n", - " df = pd.DataFrame(row_list, columns = cols)\n", - " return df" + " cols = [\n", + " 'START_DATE_TIME',\n", + " 'END_DATE_TIME',\n", + " 'Focal_id',\n", + " 'MIN_PRICE',\n", + " 'MAX_PRICE',\n", + " 'PRICE_CHANGE_PCT',\n", + " 'PARTICIPANT_VOLUME_PCT',\n", + " 'PARTICIPANT_VOLUME',\n", + " 'TOTAL_VOLUME',\n", + " ]\n", + " final_scenario_df = pd.DataFrame(row_list, columns = cols)\n", + " final_scenario_df['PARTICIPANT_VOLUME_PCT'] = final_scenario_df['PARTICIPANT_VOLUME']/\\\n", + " final_scenario_df['TOTAL_VOLUME'] * 100\n", + " final_scenario_df['Segment'] = 'Default'\n", + " final_scenario_df['SAR_FLAG'] = 'N'\n", + " final_scenario_df['Risk'] = 'Low Risk'\n", + " final_scenario_df.dropna(inplace=True)\n", + " # final_scenario_df['RUN_DATE'] = final_scenario_df['END_DATE']\n", + " return final_scenario_df" ] } ], diff --git a/main.py b/main.py index 183331f..04fdca1 100644 --- a/main.py +++ b/main.py @@ -4,131 +4,109 @@ # In[ ]: +from datetime import datetime, timedelta import pandas as pd - - -# In[ ]: - - from tms_data_interface import SQLQueryInterface -seq = SQLQueryInterface(schema = "qa_schema") -seq.execute_raw("show tables") - - -# In[ ]: - query = """ - select final.CUSTOMER_NUMBER_main as Focal_id, - final.Credit_transaction_amount, - final.Total_no_of_credit_transactions, - final.Debit_transaction_amount, - final.Total_no_of_debit_transactions, - final.Wash_Ratio, - final.SEGMENT, - final.RISK, - final.SAR_FLAG - from - ( - ( - select subquery.CUSTOMER_NUMBER_1 as CUSTOMER_NUMBER_main, - subquery.Credit_transaction_amount, - subquery.Total_no_of_credit_transactions, - case - when subquery.Debit_transaction_amount is NULL then 0 - else Debit_transaction_amount - end as Debit_transaction_amount, - case - when subquery.Total_no_of_debit_transactions is NULL then 0 - else Total_no_of_debit_transactions - end as Total_no_of_debit_transactions, - case - when subquery.Debit_transaction_amount = 0 - or subquery.Debit_transaction_amount is NULL then 0 - else subquery.Credit_transaction_amount / subquery.Debit_transaction_amount - end as Wash_Ratio - from - ( - ( - select customer_number as CUSTOMER_NUMBER_1, - sum(transaction_amount) as Credit_transaction_amount, - count(*) as Total_no_of_credit_transactions - from - ( - select * - from {trans_data} as trans_table left join {acc_data} as acc_table - on trans_table.benef_account_number = acc_table.account_number - ) - where account_number not in ('None') - group by 1 - ) credit left join - ( - select customer_number as CUSTOMER_NUMBER_2, - sum(transaction_amount) as Debit_transaction_amount, - count(*) as Total_no_of_debit_transactions - from - ( - select * - from {trans_data} as trans_table left join {acc_data} as acc_table - on trans_table.orig_account_number = acc_table.account_number - ) - where account_number not in ('None') - group by 1 - ) debit on credit.CUSTOMER_NUMBER_1 = debit.CUSTOMER_NUMBER_2 - ) subquery - ) main left join - ( - select subquery.CUSTOMER_NUMBER_3 as CUSTOMER_NUMBER_cust, - subquery.SEGMENT, - subquery.RISK, - case - when subquery.SAR_FLAG is NULL then 'N' - else subquery.SAR_FLAG - end as SAR_FLAG - from - ( - ( - select customer_number as CUSTOMER_NUMBER_3, - business_segment as SEGMENT, - case - when RISK_CLASSIFICATION = 1 then 'Low Risk' - when RISK_CLASSIFICATION = 2 then 'Medium Risk' - when RISK_CLASSIFICATION = 3 then 'High Risk' - else 'Unknown Risk' - end AS RISK - from {cust_data} - ) cd left join - ( - select customer_number as CUSTOMER_NUMBER_4, - sar_flag as SAR_FLAG - from {alert_data} - ) ad on cd.CUSTOMER_NUMBER_3 = ad.CUSTOMER_NUMBER_4 - ) subquery - ) cust_alert on cust_alert.CUSTOMER_NUMBER_cust = main.CUSTOMER_NUMBER_main - ) final +WITH time_windows AS ( + SELECT + -- End time is the current trade time + date_time AS end_time, + + -- Subtract seconds from the end_time using date_add() with negative integer interval + date_add('second', -{time_window_s}, date_time) AS start_time, + + -- Trade details + trade_price, + trade_volume, + trader_id, + + -- Calculate minimum price within the time window + MIN(trade_price) OVER ( + ORDER BY date_time + RANGE BETWEEN INTERVAL '{time_window_s}' SECOND PRECEDING AND CURRENT ROW + ) AS min_price, + + -- Calculate maximum price within the time window + MAX(trade_price) OVER ( + ORDER BY date_time + RANGE BETWEEN INTERVAL '{time_window_s}' SECOND PRECEDING AND CURRENT ROW + ) AS max_price, + + -- Calculate total trade volume within the time window + SUM(trade_volume) OVER ( + ORDER BY date_time + RANGE BETWEEN INTERVAL '{time_window_s}' SECOND PRECEDING AND CURRENT ROW + ) AS total_volume, + + -- Calculate participant's trade volume within the time window + SUM(CASE WHEN trader_id = trader_id THEN trade_volume ELSE 0 END) OVER ( + PARTITION BY trader_id + ORDER BY date_time + RANGE BETWEEN INTERVAL '{time_window_s}' SECOND PRECEDING AND CURRENT ROW + ) AS participant_volume + FROM + {trade_data_1b} +) +SELECT + -- Select the time window details + start_time, + end_time, + + -- Select the participant (trader) ID + trader_id AS "Participant", + + -- Select the calculated min and max prices + min_price, + max_price, + + -- Calculate the price change percentage + (max_price - min_price) / NULLIF(min_price, 0) * 100 AS "Price Change (%)", + + -- Calculate the participant's volume as a percentage of total volume + (participant_volume / NULLIF(total_volume, 0)) * 100 AS "Volume (%)", + + -- Participant volume + participant_volume, + + -- Select the total volume within the window + total_volume AS "Total Volume" +FROM + time_windows """ - -# In[ ]: - - from tms_data_interface import SQLQueryInterface class Scenario: - seq = SQLQueryInterface(schema ="qa_schema") - + seq = SQLQueryInterface(schema="trade_schema") def logic(self, **kwargs): - row_list = self.seq.execute_raw(query.format(trans_data="qa_table_test", - cust_data="customer_v1", - acc_data="account_v1", - alert_data="alert_v1") + validation_window = kwargs.get('validation_window') + time_window_s = int(validation_window/1000) + query_start_time = datetime.now() + print("Query start time :",query_start_time) + row_list = self.seq.execute_raw(query.format(trade_data_1b="trade_10m_v3", + time_window_s = time_window_s) ) - cols = ["Focal_id", "Credit_transaction_amount", - "Total_no_of_credit_transactions", - "Debit_transaction_amount", "Total_no_of_debit_transactions", - "Wash_Ratio", - "Segment", "Risk", "SAR_FLAG"] - df = pd.DataFrame(row_list, columns = cols) - return df + cols = [ + 'START_DATE_TIME', + 'END_DATE_TIME', + 'Focal_id', + 'MIN_PRICE', + 'MAX_PRICE', + 'PRICE_CHANGE_PCT', + 'PARTICIPANT_VOLUME_PCT', + 'PARTICIPANT_VOLUME', + 'TOTAL_VOLUME', + ] + final_scenario_df = pd.DataFrame(row_list, columns = cols) + final_scenario_df['PARTICIPANT_VOLUME_PCT'] = final_scenario_df['PARTICIPANT_VOLUME']/\ + final_scenario_df['TOTAL_VOLUME'] * 100 + final_scenario_df['Segment'] = 'Default' + final_scenario_df['SAR_FLAG'] = 'N' + final_scenario_df['Risk'] = 'Low Risk' + final_scenario_df.dropna(inplace=True) + # final_scenario_df['RUN_DATE'] = final_scenario_df['END_DATE'] + return final_scenario_df