diff --git a/.ipynb_checkpoints/main-checkpoint.ipynb b/.ipynb_checkpoints/main-checkpoint.ipynb index a4d429e..df2260d 100644 --- a/.ipynb_checkpoints/main-checkpoint.ipynb +++ b/.ipynb_checkpoints/main-checkpoint.ipynb @@ -9,120 +9,149 @@ }, "outputs": [], "source": [ - "from datetime import datetime, timedelta\n", + "from datetime import datetime\n", "import pandas as pd\n", "from tms_data_interface import SQLQueryInterface\n", "\n", "query = \"\"\"\n", - "SELECT \n", - " n.TRADER_ID,\n", - " n.trade_time_window,\n", - " n.net_volume,\n", - " n.order_count, -- Include number of orders\n", - " COALESCE(t.total_trade_volume, 0) AS total_trade_volume,\n", + "WITH \n", + "-- Capture all orders and trades within the spoofing time window\n", + "trade_window AS (\n", + " SELECT\n", + " t.trade_id,\n", + " t.trader_id,\n", + " t.date_time AS trade_time,\n", + " t.trade_side,\n", + " t.trade_volume,\n", + " o.trader_id AS order_trader_id,\n", + " o.date_time AS order_time,\n", + " o.order_volume,\n", + " o.order_status,\n", + " o.order_price,\n", + " o.side AS order_side\n", + " FROM \n", + " {trade_data_1b} t\n", + " LEFT JOIN \n", + " order_10m o ON o.date_time BETWEEN t.date_time - INTERVAL '{spoofing_time_window_s}' SECOND \n", + " AND t.date_time\n", + " WHERE \n", + " o.side = '{spoofing_side}'\n", + "),\n", + "\n", + "-- Calculate net order volume for the specific trader\n", + "net_order_volume_cte AS (\n", + " SELECT \n", + " trader_id,\n", + " trade_id,\n", + " trade_time,\n", + " SUM(CASE \n", + " WHEN order_status = 'new' THEN order_volume \n", + " WHEN order_status = 'cancelled' THEN -order_volume \n", + " WHEN order_status = 'fulfilled' THEN -order_volume \n", + " ELSE 0 \n", + " END) AS net_order_volume,\n", + " COUNT(*) AS num_orders\n", + " FROM trade_window\n", + " WHERE order_trader_id = trader_id -- Filter by the trader who executed the trade\n", + " GROUP BY trader_id, trade_id, trade_time\n", + "),\n", + "\n", + "-- Calculate total net order volume for all traders (i.e., for spoofing side orders)\n", + "net_order_volume_all_cte AS (\n", + " SELECT \n", + " trade_id,\n", + " SUM(CASE \n", + " WHEN order_status = 'new' THEN order_volume \n", + " WHEN order_status = 'cancelled' THEN -order_volume \n", + " WHEN order_status = 'fulfilled' THEN -order_volume \n", + " ELSE 0 \n", + " END) AS net_order_volume_all\n", + " FROM trade_window\n", + " GROUP BY trade_id\n", + "),\n", + "\n", + "-- Calculate total trade volume on the opposite side (e.g., sell if spoofing is on buy)\n", + "opposite_trade_volume_cte AS (\n", + " SELECT \n", + " t.trader_id,\n", + " t.trade_id,\n", + " SUM(t.trade_volume) AS total_trade_volume\n", + " FROM {trade_data_1b} t\n", + " WHERE \n", + " t.date_time BETWEEN t.date_time - INTERVAL '{trade_time_window_s}' SECOND\n", + " AND t.date_time\n", + " AND t.trade_side = CASE WHEN '{spoofing_side}' = 'buy' THEN 'sell' ELSE 'buy' END\n", + " GROUP BY t.trader_id, t.trade_id\n", + ")\n", + "\n", + "-- Final result with calculated spoofing indicators\n", + "SELECT\n", + " n.trade_id,\n", + " n.trader_id,\n", + " n.trade_time,\n", + " n.num_orders,\n", + " n.net_order_volume,\n", " CASE \n", - " WHEN COALESCE(t.total_trade_volume, 0) > 0 THEN n.net_volume / t.total_trade_volume\n", - " ELSE 0 -- or another value to indicate no trades\n", + " WHEN o.total_trade_volume > 0 THEN n.net_order_volume / o.total_trade_volume\n", + " ELSE NULL\n", " END AS order_trade_ratio,\n", " CASE \n", - " WHEN net_volume_all.total_net_volume_all > 0 THEN \n", - " (n.net_volume / net_volume_all.total_net_volume_all) * 100 \n", - " ELSE 0 \n", - " END AS volume_percentage -- Calculate volume percentage\n", - "FROM (\n", - " -- Step 2: Subquery for net_order_volume\n", - " SELECT \n", - " o.TRADER_ID,\n", - " t.DATE_TIME AS trade_time_window,\n", - " SUM(CASE \n", - " WHEN o.ORDER_STATUS = 'New' THEN o.ORDER_VOLUME\n", - " WHEN o.ORDER_STATUS = 'Cancelled' THEN -o.ORDER_VOLUME\n", - " WHEN o.ORDER_STATUS = 'Fulfilled' THEN -o.ORDER_VOLUME\n", - " ELSE 0 END\n", - " ) AS net_volume,\n", - " COUNT(o.ORDER_ID) AS order_count -- Count the number of orders\n", - " FROM {order_10m} o\n", - " JOIN {trade_data_1b} t\n", - " ON o.TRADER_ID = t.TRADER_ID\n", - " WHERE o.SIDE = 'buy'\n", - " AND o.DATE_TIME BETWEEN t.DATE_TIME - INTERVAL '{time_window_s}' SECOND AND t.DATE_TIME\n", - " GROUP BY o.TRADER_ID, t.DATE_TIME\n", - ") AS n\n", - "LEFT JOIN (\n", - " -- Step 6: Subquery for total_trade_volume (opposite side trades after spoofing)\n", - " SELECT \n", - " t.TRADER_ID,\n", - " t.DATE_TIME,\n", - " SUM(t.TRADE_VOLUME) AS total_trade_volume\n", - " FROM (\n", - " -- Step 5: Subquery for relevant_trades\n", - " SELECT t1.*\n", - " FROM {trade_data_1b} t1\n", - " WHERE t1.TRADE_SIDE = 'buy'\n", - " AND EXISTS (\n", - " SELECT 1\n", - " FROM {trade_data_1b} t2\n", - " WHERE t2.TRADER_ID = t1.TRADER_ID\n", - " AND t2.DATE_TIME BETWEEN t1.DATE_TIME - INTERVAL '{time_window_s}' SECOND AND t1.DATE_TIME\n", - " )\n", - " ) AS t\n", - " GROUP BY t.DATE_TIME, t.TRADER_ID\n", - ") AS t \n", - "ON n.TRADER_ID = t.TRADER_ID AND n.trade_time_window = t.DATE_TIME\n", - "\n", - "-- New subquery for total net volume for all traders in the same time window\n", - "LEFT JOIN (\n", - " SELECT \n", - " t.DATE_TIME AS trade_time_window,\n", - " SUM(CASE \n", - " WHEN o.ORDER_STATUS = 'New' THEN o.ORDER_VOLUME\n", - " WHEN o.ORDER_STATUS = 'Cancelled' THEN -o.ORDER_VOLUME\n", - " WHEN o.ORDER_STATUS = 'Fulfilled' THEN -o.ORDER_VOLUME\n", - " ELSE 0 END\n", - " ) AS total_net_volume_all\n", - " FROM {order_10m} o\n", - " JOIN {trade_data_1b} t\n", - " ON o.TRADER_ID = t.TRADER_ID\n", - " WHERE o.SIDE = 'buy'\n", - " AND o.DATE_TIME BETWEEN t.DATE_TIME - INTERVAL '{time_window_s}' SECOND AND t.DATE_TIME\n", - " GROUP BY t.DATE_TIME\n", - ") AS net_volume_all\n", - "ON n.trade_time_window = net_volume_all.trade_time_window\n", - "\n", - "ORDER BY n.trade_time_window\n", + " WHEN a.net_order_volume_all > 0 THEN n.net_order_volume / a.net_order_volume_all\n", + " ELSE NULL\n", + " END AS volume_percentage\n", + "FROM \n", + " net_order_volume_cte n\n", + "LEFT JOIN \n", + " opposite_trade_volume_cte o ON n.trade_id = o.trade_id\n", + "LEFT JOIN \n", + " net_order_volume_all_cte a ON n.trade_id = a.trade_id\n", + "WHERE \n", + " n.net_order_volume > 0 -- Only consider positive net order volumes (potential spoofing);\n", + " limit 1000\n", "\"\"\"\n", "\n", - "\n", - "from tms_data_interface import SQLQueryInterface\n", - "\n", "class Scenario:\n", - " seq = SQLQueryInterface(schema=\"trade_schema\")\n", - " def logic(self, **kwargs):\n", - " validation_window = kwargs.get('validation_window')\n", - " spoofing_side = kwargs.get('buy')\n", - " time_window_s = int(validation_window/1000)\n", + " seq = SQLQueryInterface(schema=\"internal\")\n", + "\n", + " def logic(self, **params):\n", + " spoofing_time_window = params.get('spoofing_time_window', 300000) # default to 300,000 ms (5 minutes)\n", + " spoofing_side = params.get('spoofing_side', 'buy')\n", + " use_volume_for_order_trade_ratio = params.get('use_volume_for_order_trade_ratio', True)\n", + " trade_time_window = params.get('trade_time_window', 300000)\n", + " ignore_trade_after_spoofing = params.get('ignore_trade_after_spoofing', True)\n", + " ignore_price_improvement = params.get('ignore_price_improvement', True)\n", + "\n", + " # Convert time windows from milliseconds to seconds\n", + " spoofing_time_window_s = int(spoofing_time_window / 1000)\n", + " trade_time_window_s = int(trade_time_window / 1000)\n", + "\n", " query_start_time = datetime.now()\n", - " print(\"Query start time :\",query_start_time)\n", - " row_list = self.seq.execute_raw(query.format(trade_data_1b=\"trade_10m_v3\",\n", - " order_10m = 'order_10m',\n", - " time_window_s = time_window_s)\n", - " )\n", + " print(\"Query start time:\", query_start_time)\n", + "\n", + " # Execute the query with the parameters passed from `params`\n", + " row_list = self.seq.execute_raw(query.format(\n", + " trade_data_1b=\"trade_10m_v3\", # Replace with actual table name\n", + " spoofing_time_window_s=spoofing_time_window_s,\n", + " trade_time_window_s=trade_time_window_s,\n", + " spoofing_side=spoofing_side\n", + " ))\n", + "\n", + " # Define columns for the resulting DataFrame\n", " cols = [\n", - " 'Focal_id',\n", - " 'trade_time_window',\n", - " 'net_volume',\n", - " 'order_count',\n", - " 'total_trade_volume',\n", - " 'order_trade_ratio',\n", - " 'volume_percentage'\n", + " 'trade_id', 'focal_id', 'trade_time', 'num_orders', \n", + " 'net_order_volume', 'order_trade_ratio', 'volume_percentage'\n", " ]\n", - " final_scenario_df = pd.DataFrame(row_list, columns = cols)\n", - " final_scenario_df['Segment'] = 'Default'\n", - " final_scenario_df['SAR_FLAG'] = 'N'\n", - " final_scenario_df['Risk'] = 'Low Risk'\n", - " final_scenario_df.dropna(inplace=True)\n", - " # final_scenario_df['RUN_DATE'] = final_scenario_df['END_DATE']\n", - " return final_scenario_df\n" + "\n", + " # Create a DataFrame from the query result\n", + " final_scenario_df = pd.DataFrame(row_list, columns=cols)\n", + "\n", + "\n", + " # Adding additional columns\n", + " final_scenario_df['segment'] = 'Default'\n", + " final_scenario_df['sar_flag'] = 'N'\n", + " final_scenario_df['risk'] = 'Low Risk'\n", + "\n", + " return final_scenario_df" ] }, { @@ -371,8 +400,8 @@ } ], "source": [ - "# scenario = Scenario()\n", - "# scenario.logic(validation_window=300000)" + "scenario = Scenario()\n", + "scenario.logic(validation_window=300000)" ] }, { diff --git a/main.ipynb b/main.ipynb index a4d429e..df2260d 100644 --- a/main.ipynb +++ b/main.ipynb @@ -9,120 +9,149 @@ }, "outputs": [], "source": [ - "from datetime import datetime, timedelta\n", + "from datetime import datetime\n", "import pandas as pd\n", "from tms_data_interface import SQLQueryInterface\n", "\n", "query = \"\"\"\n", - "SELECT \n", - " n.TRADER_ID,\n", - " n.trade_time_window,\n", - " n.net_volume,\n", - " n.order_count, -- Include number of orders\n", - " COALESCE(t.total_trade_volume, 0) AS total_trade_volume,\n", + "WITH \n", + "-- Capture all orders and trades within the spoofing time window\n", + "trade_window AS (\n", + " SELECT\n", + " t.trade_id,\n", + " t.trader_id,\n", + " t.date_time AS trade_time,\n", + " t.trade_side,\n", + " t.trade_volume,\n", + " o.trader_id AS order_trader_id,\n", + " o.date_time AS order_time,\n", + " o.order_volume,\n", + " o.order_status,\n", + " o.order_price,\n", + " o.side AS order_side\n", + " FROM \n", + " {trade_data_1b} t\n", + " LEFT JOIN \n", + " order_10m o ON o.date_time BETWEEN t.date_time - INTERVAL '{spoofing_time_window_s}' SECOND \n", + " AND t.date_time\n", + " WHERE \n", + " o.side = '{spoofing_side}'\n", + "),\n", + "\n", + "-- Calculate net order volume for the specific trader\n", + "net_order_volume_cte AS (\n", + " SELECT \n", + " trader_id,\n", + " trade_id,\n", + " trade_time,\n", + " SUM(CASE \n", + " WHEN order_status = 'new' THEN order_volume \n", + " WHEN order_status = 'cancelled' THEN -order_volume \n", + " WHEN order_status = 'fulfilled' THEN -order_volume \n", + " ELSE 0 \n", + " END) AS net_order_volume,\n", + " COUNT(*) AS num_orders\n", + " FROM trade_window\n", + " WHERE order_trader_id = trader_id -- Filter by the trader who executed the trade\n", + " GROUP BY trader_id, trade_id, trade_time\n", + "),\n", + "\n", + "-- Calculate total net order volume for all traders (i.e., for spoofing side orders)\n", + "net_order_volume_all_cte AS (\n", + " SELECT \n", + " trade_id,\n", + " SUM(CASE \n", + " WHEN order_status = 'new' THEN order_volume \n", + " WHEN order_status = 'cancelled' THEN -order_volume \n", + " WHEN order_status = 'fulfilled' THEN -order_volume \n", + " ELSE 0 \n", + " END) AS net_order_volume_all\n", + " FROM trade_window\n", + " GROUP BY trade_id\n", + "),\n", + "\n", + "-- Calculate total trade volume on the opposite side (e.g., sell if spoofing is on buy)\n", + "opposite_trade_volume_cte AS (\n", + " SELECT \n", + " t.trader_id,\n", + " t.trade_id,\n", + " SUM(t.trade_volume) AS total_trade_volume\n", + " FROM {trade_data_1b} t\n", + " WHERE \n", + " t.date_time BETWEEN t.date_time - INTERVAL '{trade_time_window_s}' SECOND\n", + " AND t.date_time\n", + " AND t.trade_side = CASE WHEN '{spoofing_side}' = 'buy' THEN 'sell' ELSE 'buy' END\n", + " GROUP BY t.trader_id, t.trade_id\n", + ")\n", + "\n", + "-- Final result with calculated spoofing indicators\n", + "SELECT\n", + " n.trade_id,\n", + " n.trader_id,\n", + " n.trade_time,\n", + " n.num_orders,\n", + " n.net_order_volume,\n", " CASE \n", - " WHEN COALESCE(t.total_trade_volume, 0) > 0 THEN n.net_volume / t.total_trade_volume\n", - " ELSE 0 -- or another value to indicate no trades\n", + " WHEN o.total_trade_volume > 0 THEN n.net_order_volume / o.total_trade_volume\n", + " ELSE NULL\n", " END AS order_trade_ratio,\n", " CASE \n", - " WHEN net_volume_all.total_net_volume_all > 0 THEN \n", - " (n.net_volume / net_volume_all.total_net_volume_all) * 100 \n", - " ELSE 0 \n", - " END AS volume_percentage -- Calculate volume percentage\n", - "FROM (\n", - " -- Step 2: Subquery for net_order_volume\n", - " SELECT \n", - " o.TRADER_ID,\n", - " t.DATE_TIME AS trade_time_window,\n", - " SUM(CASE \n", - " WHEN o.ORDER_STATUS = 'New' THEN o.ORDER_VOLUME\n", - " WHEN o.ORDER_STATUS = 'Cancelled' THEN -o.ORDER_VOLUME\n", - " WHEN o.ORDER_STATUS = 'Fulfilled' THEN -o.ORDER_VOLUME\n", - " ELSE 0 END\n", - " ) AS net_volume,\n", - " COUNT(o.ORDER_ID) AS order_count -- Count the number of orders\n", - " FROM {order_10m} o\n", - " JOIN {trade_data_1b} t\n", - " ON o.TRADER_ID = t.TRADER_ID\n", - " WHERE o.SIDE = 'buy'\n", - " AND o.DATE_TIME BETWEEN t.DATE_TIME - INTERVAL '{time_window_s}' SECOND AND t.DATE_TIME\n", - " GROUP BY o.TRADER_ID, t.DATE_TIME\n", - ") AS n\n", - "LEFT JOIN (\n", - " -- Step 6: Subquery for total_trade_volume (opposite side trades after spoofing)\n", - " SELECT \n", - " t.TRADER_ID,\n", - " t.DATE_TIME,\n", - " SUM(t.TRADE_VOLUME) AS total_trade_volume\n", - " FROM (\n", - " -- Step 5: Subquery for relevant_trades\n", - " SELECT t1.*\n", - " FROM {trade_data_1b} t1\n", - " WHERE t1.TRADE_SIDE = 'buy'\n", - " AND EXISTS (\n", - " SELECT 1\n", - " FROM {trade_data_1b} t2\n", - " WHERE t2.TRADER_ID = t1.TRADER_ID\n", - " AND t2.DATE_TIME BETWEEN t1.DATE_TIME - INTERVAL '{time_window_s}' SECOND AND t1.DATE_TIME\n", - " )\n", - " ) AS t\n", - " GROUP BY t.DATE_TIME, t.TRADER_ID\n", - ") AS t \n", - "ON n.TRADER_ID = t.TRADER_ID AND n.trade_time_window = t.DATE_TIME\n", - "\n", - "-- New subquery for total net volume for all traders in the same time window\n", - "LEFT JOIN (\n", - " SELECT \n", - " t.DATE_TIME AS trade_time_window,\n", - " SUM(CASE \n", - " WHEN o.ORDER_STATUS = 'New' THEN o.ORDER_VOLUME\n", - " WHEN o.ORDER_STATUS = 'Cancelled' THEN -o.ORDER_VOLUME\n", - " WHEN o.ORDER_STATUS = 'Fulfilled' THEN -o.ORDER_VOLUME\n", - " ELSE 0 END\n", - " ) AS total_net_volume_all\n", - " FROM {order_10m} o\n", - " JOIN {trade_data_1b} t\n", - " ON o.TRADER_ID = t.TRADER_ID\n", - " WHERE o.SIDE = 'buy'\n", - " AND o.DATE_TIME BETWEEN t.DATE_TIME - INTERVAL '{time_window_s}' SECOND AND t.DATE_TIME\n", - " GROUP BY t.DATE_TIME\n", - ") AS net_volume_all\n", - "ON n.trade_time_window = net_volume_all.trade_time_window\n", - "\n", - "ORDER BY n.trade_time_window\n", + " WHEN a.net_order_volume_all > 0 THEN n.net_order_volume / a.net_order_volume_all\n", + " ELSE NULL\n", + " END AS volume_percentage\n", + "FROM \n", + " net_order_volume_cte n\n", + "LEFT JOIN \n", + " opposite_trade_volume_cte o ON n.trade_id = o.trade_id\n", + "LEFT JOIN \n", + " net_order_volume_all_cte a ON n.trade_id = a.trade_id\n", + "WHERE \n", + " n.net_order_volume > 0 -- Only consider positive net order volumes (potential spoofing);\n", + " limit 1000\n", "\"\"\"\n", "\n", - "\n", - "from tms_data_interface import SQLQueryInterface\n", - "\n", "class Scenario:\n", - " seq = SQLQueryInterface(schema=\"trade_schema\")\n", - " def logic(self, **kwargs):\n", - " validation_window = kwargs.get('validation_window')\n", - " spoofing_side = kwargs.get('buy')\n", - " time_window_s = int(validation_window/1000)\n", + " seq = SQLQueryInterface(schema=\"internal\")\n", + "\n", + " def logic(self, **params):\n", + " spoofing_time_window = params.get('spoofing_time_window', 300000) # default to 300,000 ms (5 minutes)\n", + " spoofing_side = params.get('spoofing_side', 'buy')\n", + " use_volume_for_order_trade_ratio = params.get('use_volume_for_order_trade_ratio', True)\n", + " trade_time_window = params.get('trade_time_window', 300000)\n", + " ignore_trade_after_spoofing = params.get('ignore_trade_after_spoofing', True)\n", + " ignore_price_improvement = params.get('ignore_price_improvement', True)\n", + "\n", + " # Convert time windows from milliseconds to seconds\n", + " spoofing_time_window_s = int(spoofing_time_window / 1000)\n", + " trade_time_window_s = int(trade_time_window / 1000)\n", + "\n", " query_start_time = datetime.now()\n", - " print(\"Query start time :\",query_start_time)\n", - " row_list = self.seq.execute_raw(query.format(trade_data_1b=\"trade_10m_v3\",\n", - " order_10m = 'order_10m',\n", - " time_window_s = time_window_s)\n", - " )\n", + " print(\"Query start time:\", query_start_time)\n", + "\n", + " # Execute the query with the parameters passed from `params`\n", + " row_list = self.seq.execute_raw(query.format(\n", + " trade_data_1b=\"trade_10m_v3\", # Replace with actual table name\n", + " spoofing_time_window_s=spoofing_time_window_s,\n", + " trade_time_window_s=trade_time_window_s,\n", + " spoofing_side=spoofing_side\n", + " ))\n", + "\n", + " # Define columns for the resulting DataFrame\n", " cols = [\n", - " 'Focal_id',\n", - " 'trade_time_window',\n", - " 'net_volume',\n", - " 'order_count',\n", - " 'total_trade_volume',\n", - " 'order_trade_ratio',\n", - " 'volume_percentage'\n", + " 'trade_id', 'focal_id', 'trade_time', 'num_orders', \n", + " 'net_order_volume', 'order_trade_ratio', 'volume_percentage'\n", " ]\n", - " final_scenario_df = pd.DataFrame(row_list, columns = cols)\n", - " final_scenario_df['Segment'] = 'Default'\n", - " final_scenario_df['SAR_FLAG'] = 'N'\n", - " final_scenario_df['Risk'] = 'Low Risk'\n", - " final_scenario_df.dropna(inplace=True)\n", - " # final_scenario_df['RUN_DATE'] = final_scenario_df['END_DATE']\n", - " return final_scenario_df\n" + "\n", + " # Create a DataFrame from the query result\n", + " final_scenario_df = pd.DataFrame(row_list, columns=cols)\n", + "\n", + "\n", + " # Adding additional columns\n", + " final_scenario_df['segment'] = 'Default'\n", + " final_scenario_df['sar_flag'] = 'N'\n", + " final_scenario_df['risk'] = 'Low Risk'\n", + "\n", + " return final_scenario_df" ] }, { @@ -371,8 +400,8 @@ } ], "source": [ - "# scenario = Scenario()\n", - "# scenario.logic(validation_window=300000)" + "scenario = Scenario()\n", + "scenario.logic(validation_window=300000)" ] }, { diff --git a/main.py b/main.py index 177e065..e41291c 100644 --- a/main.py +++ b/main.py @@ -4,127 +4,156 @@ # In[21]: -from datetime import datetime, timedelta +from datetime import datetime import pandas as pd from tms_data_interface import SQLQueryInterface query = """ -SELECT - n.TRADER_ID, - n.trade_time_window, - n.net_volume, - n.order_count, -- Include number of orders - COALESCE(t.total_trade_volume, 0) AS total_trade_volume, +WITH +-- Capture all orders and trades within the spoofing time window +trade_window AS ( + SELECT + t.trade_id, + t.trader_id, + t.date_time AS trade_time, + t.trade_side, + t.trade_volume, + o.trader_id AS order_trader_id, + o.date_time AS order_time, + o.order_volume, + o.order_status, + o.order_price, + o.side AS order_side + FROM + {trade_data_1b} t + LEFT JOIN + order_10m o ON o.date_time BETWEEN t.date_time - INTERVAL '{spoofing_time_window_s}' SECOND + AND t.date_time + WHERE + o.side = '{spoofing_side}' +), + +-- Calculate net order volume for the specific trader +net_order_volume_cte AS ( + SELECT + trader_id, + trade_id, + trade_time, + SUM(CASE + WHEN order_status = 'new' THEN order_volume + WHEN order_status = 'cancelled' THEN -order_volume + WHEN order_status = 'fulfilled' THEN -order_volume + ELSE 0 + END) AS net_order_volume, + COUNT(*) AS num_orders + FROM trade_window + WHERE order_trader_id = trader_id -- Filter by the trader who executed the trade + GROUP BY trader_id, trade_id, trade_time +), + +-- Calculate total net order volume for all traders (i.e., for spoofing side orders) +net_order_volume_all_cte AS ( + SELECT + trade_id, + SUM(CASE + WHEN order_status = 'new' THEN order_volume + WHEN order_status = 'cancelled' THEN -order_volume + WHEN order_status = 'fulfilled' THEN -order_volume + ELSE 0 + END) AS net_order_volume_all + FROM trade_window + GROUP BY trade_id +), + +-- Calculate total trade volume on the opposite side (e.g., sell if spoofing is on buy) +opposite_trade_volume_cte AS ( + SELECT + t.trader_id, + t.trade_id, + SUM(t.trade_volume) AS total_trade_volume + FROM {trade_data_1b} t + WHERE + t.date_time BETWEEN t.date_time - INTERVAL '{trade_time_window_s}' SECOND + AND t.date_time + AND t.trade_side = CASE WHEN '{spoofing_side}' = 'buy' THEN 'sell' ELSE 'buy' END + GROUP BY t.trader_id, t.trade_id +) + +-- Final result with calculated spoofing indicators +SELECT + n.trade_id, + n.trader_id, + n.trade_time, + n.num_orders, + n.net_order_volume, CASE - WHEN COALESCE(t.total_trade_volume, 0) > 0 THEN n.net_volume / t.total_trade_volume - ELSE 0 -- or another value to indicate no trades + WHEN o.total_trade_volume > 0 THEN n.net_order_volume / o.total_trade_volume + ELSE NULL END AS order_trade_ratio, CASE - WHEN net_volume_all.total_net_volume_all > 0 THEN - (n.net_volume / net_volume_all.total_net_volume_all) * 100 - ELSE 0 - END AS volume_percentage -- Calculate volume percentage -FROM ( - -- Step 2: Subquery for net_order_volume - SELECT - o.TRADER_ID, - t.DATE_TIME AS trade_time_window, - SUM(CASE - WHEN o.ORDER_STATUS = 'New' THEN o.ORDER_VOLUME - WHEN o.ORDER_STATUS = 'Cancelled' THEN -o.ORDER_VOLUME - WHEN o.ORDER_STATUS = 'Fulfilled' THEN -o.ORDER_VOLUME - ELSE 0 END - ) AS net_volume, - COUNT(o.ORDER_ID) AS order_count -- Count the number of orders - FROM {order_10m} o - JOIN {trade_data_1b} t - ON o.TRADER_ID = t.TRADER_ID - WHERE o.SIDE = 'buy' - AND o.DATE_TIME BETWEEN t.DATE_TIME - INTERVAL '{time_window_s}' SECOND AND t.DATE_TIME - GROUP BY o.TRADER_ID, t.DATE_TIME -) AS n -LEFT JOIN ( - -- Step 6: Subquery for total_trade_volume (opposite side trades after spoofing) - SELECT - t.TRADER_ID, - t.DATE_TIME, - SUM(t.TRADE_VOLUME) AS total_trade_volume - FROM ( - -- Step 5: Subquery for relevant_trades - SELECT t1.* - FROM {trade_data_1b} t1 - WHERE t1.TRADE_SIDE = 'buy' - AND EXISTS ( - SELECT 1 - FROM {trade_data_1b} t2 - WHERE t2.TRADER_ID = t1.TRADER_ID - AND t2.DATE_TIME BETWEEN t1.DATE_TIME - INTERVAL '{time_window_s}' SECOND AND t1.DATE_TIME - ) - ) AS t - GROUP BY t.DATE_TIME, t.TRADER_ID -) AS t -ON n.TRADER_ID = t.TRADER_ID AND n.trade_time_window = t.DATE_TIME - --- New subquery for total net volume for all traders in the same time window -LEFT JOIN ( - SELECT - t.DATE_TIME AS trade_time_window, - SUM(CASE - WHEN o.ORDER_STATUS = 'New' THEN o.ORDER_VOLUME - WHEN o.ORDER_STATUS = 'Cancelled' THEN -o.ORDER_VOLUME - WHEN o.ORDER_STATUS = 'Fulfilled' THEN -o.ORDER_VOLUME - ELSE 0 END - ) AS total_net_volume_all - FROM {order_10m} o - JOIN {trade_data_1b} t - ON o.TRADER_ID = t.TRADER_ID - WHERE o.SIDE = 'buy' - AND o.DATE_TIME BETWEEN t.DATE_TIME - INTERVAL '{time_window_s}' SECOND AND t.DATE_TIME - GROUP BY t.DATE_TIME -) AS net_volume_all -ON n.trade_time_window = net_volume_all.trade_time_window - -ORDER BY n.trade_time_window + WHEN a.net_order_volume_all > 0 THEN n.net_order_volume / a.net_order_volume_all + ELSE NULL + END AS volume_percentage +FROM + net_order_volume_cte n +LEFT JOIN + opposite_trade_volume_cte o ON n.trade_id = o.trade_id +LEFT JOIN + net_order_volume_all_cte a ON n.trade_id = a.trade_id +WHERE + n.net_order_volume > 0 -- Only consider positive net order volumes (potential spoofing); + limit 1000 """ - -from tms_data_interface import SQLQueryInterface - class Scenario: - seq = SQLQueryInterface(schema="trade_schema") - def logic(self, **kwargs): - validation_window = kwargs.get('validation_window') - spoofing_side = kwargs.get('buy') - time_window_s = int(validation_window/1000) + seq = SQLQueryInterface(schema="internal") + + def logic(self, **params): + spoofing_time_window = params.get('spoofing_time_window', 300000) # default to 300,000 ms (5 minutes) + spoofing_side = params.get('spoofing_side', 'buy') + use_volume_for_order_trade_ratio = params.get('use_volume_for_order_trade_ratio', True) + trade_time_window = params.get('trade_time_window', 300000) + ignore_trade_after_spoofing = params.get('ignore_trade_after_spoofing', True) + ignore_price_improvement = params.get('ignore_price_improvement', True) + + # Convert time windows from milliseconds to seconds + spoofing_time_window_s = int(spoofing_time_window / 1000) + trade_time_window_s = int(trade_time_window / 1000) + query_start_time = datetime.now() - print("Query start time :",query_start_time) - row_list = self.seq.execute_raw(query.format(trade_data_1b="trade_10m_v3", - order_10m = 'order_10m', - time_window_s = time_window_s) - ) + print("Query start time:", query_start_time) + + # Execute the query with the parameters passed from `params` + row_list = self.seq.execute_raw(query.format( + trade_data_1b="trade_10m_v3", # Replace with actual table name + spoofing_time_window_s=spoofing_time_window_s, + trade_time_window_s=trade_time_window_s, + spoofing_side=spoofing_side + )) + + # Define columns for the resulting DataFrame cols = [ - 'Focal_id', - 'trade_time_window', - 'net_volume', - 'order_count', - 'total_trade_volume', - 'order_trade_ratio', - 'volume_percentage' + 'trade_id', 'focal_id', 'trade_time', 'num_orders', + 'net_order_volume', 'order_trade_ratio', 'volume_percentage' ] - final_scenario_df = pd.DataFrame(row_list, columns = cols) - final_scenario_df['Segment'] = 'Default' - final_scenario_df['SAR_FLAG'] = 'N' - final_scenario_df['Risk'] = 'Low Risk' - final_scenario_df.dropna(inplace=True) - # final_scenario_df['RUN_DATE'] = final_scenario_df['END_DATE'] + + # Create a DataFrame from the query result + final_scenario_df = pd.DataFrame(row_list, columns=cols) + + + # Adding additional columns + final_scenario_df['segment'] = 'Default' + final_scenario_df['sar_flag'] = 'N' + final_scenario_df['risk'] = 'Low Risk' + return final_scenario_df # In[22]: -# scenario = Scenario() -# scenario.logic(validation_window=300000) +scenario = Scenario() +scenario.logic(validation_window=300000) # In[ ]: