diff --git a/.ipynb_checkpoints/main-checkpoint.ipynb b/.ipynb_checkpoints/main-checkpoint.ipynb index cf81d87..5098552 100644 --- a/.ipynb_checkpoints/main-checkpoint.ipynb +++ b/.ipynb_checkpoints/main-checkpoint.ipynb @@ -2,29 +2,92 @@ "cells": [ { "cell_type": "code", - "execution_count": 4, + "execution_count": 1, "id": "90c70e46-71a0-44a6-8090-f53aad3193c3", "metadata": { "tags": [] }, - "outputs": [], + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Query start time: 2024-10-13 18:13:45.509982\n", + "Query end time: 2024-10-13 18:13:45.944136\n" + ] + }, + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
START_DATE_TIMEEND_DATE_TIMEFOCAL_IDMIN_PRICEMAX_PRICEPRICE_CHANGE (%)PARTICIPANT_VOLUMETOTAL_VOLUMEVOLUME (%)PARTICIPANT_VOLUME_PCTSegmentSAR_FLAGRisk
\n", + "
" + ], + "text/plain": [ + "Empty DataFrame\n", + "Columns: [START_DATE_TIME, END_DATE_TIME, FOCAL_ID, MIN_PRICE, MAX_PRICE, PRICE_CHANGE (%), PARTICIPANT_VOLUME, TOTAL_VOLUME, VOLUME (%), PARTICIPANT_VOLUME_PCT, Segment, SAR_FLAG, Risk]\n", + "Index: []" + ] + }, + "execution_count": 1, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ "from datetime import datetime\n", "import pandas as pd\n", "from tms_data_interface import SQLQueryInterface\n", "\n", - "# SQL query to aggregate trade data and compute metrics\n", - "query = \"\"\"\n", + "# SQL query to aggregate trade data and compute metrics using ROWS with optimizations\n", + "query_template = \"\"\"\n", "WITH trade_data AS (\n", " SELECT \n", " trader_id,\n", " date_time,\n", " trade_price,\n", " trade_volume,\n", - " -- Create a time window for each trade\n", - " date_time - INTERVAL '1 second' * {time_window_s} AS window_start,\n", - " date_time AS window_end\n", + " -- Create a time window for each trade by subtracting time_window_s seconds\n", + " date_add('second', -{time_window_s}, date_time) AS window_start,\n", + " date_time AS window_end,\n", + " trade_side\n", " FROM {trade_10m_v3}\n", + " WHERE date_time BETWEEN date_add('day', -1, current_date) AND current_date -- Limit to the last 1 day of data\n", + " LIMIT 10000 -- Process only a subset of records for testing\n", "),\n", "\n", "aggregated_trades AS (\n", @@ -32,21 +95,21 @@ " td.trader_id,\n", " td.window_start,\n", " td.window_end,\n", - " SUM(CASE WHEN trade_side = 'buy' THEN trade_volume ELSE 0 END) \n", + " SUM(CASE WHEN td.trade_side = 'buy' THEN td.trade_volume ELSE 0 END) \n", " OVER (PARTITION BY td.trader_id ORDER BY td.date_time \n", - " RANGE BETWEEN {time_window_s} PRECEDING AND CURRENT ROW) AS buy_volume,\n", - " SUM(CASE WHEN trade_side = 'sell' THEN trade_volume ELSE 0 END) \n", + " ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS buy_volume,\n", + " SUM(CASE WHEN td.trade_side = 'sell' THEN td.trade_volume ELSE 0 END) \n", " OVER (PARTITION BY td.trader_id ORDER BY td.date_time \n", - " RANGE BETWEEN {time_window_s} PRECEDING AND CURRENT ROW) AS sell_volume,\n", - " SUM(trade_volume) OVER (ORDER BY td.date_time \n", - " RANGE BETWEEN {time_window_s} PRECEDING AND CURRENT ROW) AS total_volume,\n", - " MAX(trade_price) OVER (ORDER BY td.date_time \n", - " RANGE BETWEEN {time_window_s} PRECEDING AND CURRENT ROW) AS highest_price,\n", - " MIN(trade_price) OVER (ORDER BY td.date_time \n", - " RANGE BETWEEN {time_window_s} PRECEDING AND CURRENT ROW) AS lowest_price,\n", + " ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS sell_volume,\n", + " SUM(td.trade_volume) OVER (ORDER BY td.date_time \n", + " ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS total_volume,\n", + " MAX(td.trade_price) OVER (ORDER BY td.date_time \n", + " ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS highest_price,\n", + " MIN(td.trade_price) OVER (ORDER BY td.date_time \n", + " ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS lowest_price,\n", " COUNT(*) OVER (PARTITION BY td.trader_id ORDER BY td.date_time \n", - " RANGE BETWEEN {time_window_s} PRECEDING AND CURRENT ROW) AS number_of_trades\n", - " FROM {trade_10m_v3} td\n", + " ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS number_of_trades\n", + " FROM trade_data td\n", ")\n", "\n", "SELECT \n", @@ -68,16 +131,18 @@ "\n", " def logic(self, **kwargs):\n", " validation_window = kwargs.get('validation_window')\n", - " time_window_s = int(validation_window / 1000)\n", + " time_window_s = int(validation_window / 1000) # Convert milliseconds to seconds\n", " \n", " query_start_time = datetime.now()\n", " print(\"Query start time:\", query_start_time)\n", "\n", - " row_list = self.seq.execute_raw(query.format(\n", + " # Execute the optimized query using a time window and limit\n", + " row_list = self.seq.execute_raw(query_template.format(\n", " trade_10m_v3=\"trade_10m_v3\",\n", " time_window_s=time_window_s\n", " ))\n", "\n", + " # Define the columns for the resulting DataFrame\n", " cols = [\n", " 'START_DATE_TIME',\n", " 'END_DATE_TIME',\n", @@ -90,16 +155,25 @@ " 'VOLUME (%)',\n", " ]\n", "\n", + " # Create DataFrame from query results\n", " final_scenario_df = pd.DataFrame(row_list, columns=cols)\n", + " \n", + " # Calculate the participant's volume percentage\n", " final_scenario_df['PARTICIPANT_VOLUME_PCT'] = final_scenario_df['PARTICIPANT_VOLUME'] / \\\n", " final_scenario_df['TOTAL_VOLUME'] * 100\n", "\n", - " # Adding additional columns\n", + " # Add additional columns to the DataFrame\n", " final_scenario_df['Segment'] = 'Default'\n", " final_scenario_df['SAR_FLAG'] = 'N'\n", " final_scenario_df['Risk'] = 'Low Risk'\n", "\n", - " return final_scenario_df\n" + " print(\"Query end time:\", datetime.now())\n", + " return final_scenario_df\n", + "\n", + "\n", + "# Instantiate and execute logic\n", + "scenario = Scenario()\n", + "scenario.logic(validation_window=1000)\n" ] }, { @@ -122,8 +196,9 @@ } ], "source": [ - "#scenario = Scenario()\n", - "#scenario.logic()" + "# Instantiate and execute logic\n", + "scenario = Scenario()\n", + "scenario.logic(validation_window=1000)" ] } ], diff --git a/main.ipynb b/main.ipynb index cf81d87..db3c9d0 100644 --- a/main.ipynb +++ b/main.ipynb @@ -2,7 +2,7 @@ "cells": [ { "cell_type": "code", - "execution_count": 4, + "execution_count": 3, "id": "90c70e46-71a0-44a6-8090-f53aad3193c3", "metadata": { "tags": [] @@ -13,18 +13,21 @@ "import pandas as pd\n", "from tms_data_interface import SQLQueryInterface\n", "\n", - "# SQL query to aggregate trade data and compute metrics\n", - "query = \"\"\"\n", + "# SQL query to aggregate trade data and compute metrics using ROWS with optimizations\n", + "query_template = \"\"\"\n", "WITH trade_data AS (\n", " SELECT \n", " trader_id,\n", " date_time,\n", " trade_price,\n", " trade_volume,\n", - " -- Create a time window for each trade\n", - " date_time - INTERVAL '1 second' * {time_window_s} AS window_start,\n", - " date_time AS window_end\n", + " -- Create a time window for each trade by subtracting time_window_s seconds\n", + " date_add('second', -{time_window_s}, date_time) AS window_start,\n", + " date_time AS window_end,\n", + " trade_side\n", " FROM {trade_10m_v3}\n", + " WHERE date_time BETWEEN date_add('day', -1, current_date) AND current_date -- Limit to the last 1 day of data\n", + " LIMIT 10000 -- Process only a subset of records for testing\n", "),\n", "\n", "aggregated_trades AS (\n", @@ -32,21 +35,21 @@ " td.trader_id,\n", " td.window_start,\n", " td.window_end,\n", - " SUM(CASE WHEN trade_side = 'buy' THEN trade_volume ELSE 0 END) \n", + " SUM(CASE WHEN td.trade_side = 'buy' THEN td.trade_volume ELSE 0 END) \n", " OVER (PARTITION BY td.trader_id ORDER BY td.date_time \n", - " RANGE BETWEEN {time_window_s} PRECEDING AND CURRENT ROW) AS buy_volume,\n", - " SUM(CASE WHEN trade_side = 'sell' THEN trade_volume ELSE 0 END) \n", + " ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS buy_volume,\n", + " SUM(CASE WHEN td.trade_side = 'sell' THEN td.trade_volume ELSE 0 END) \n", " OVER (PARTITION BY td.trader_id ORDER BY td.date_time \n", - " RANGE BETWEEN {time_window_s} PRECEDING AND CURRENT ROW) AS sell_volume,\n", - " SUM(trade_volume) OVER (ORDER BY td.date_time \n", - " RANGE BETWEEN {time_window_s} PRECEDING AND CURRENT ROW) AS total_volume,\n", - " MAX(trade_price) OVER (ORDER BY td.date_time \n", - " RANGE BETWEEN {time_window_s} PRECEDING AND CURRENT ROW) AS highest_price,\n", - " MIN(trade_price) OVER (ORDER BY td.date_time \n", - " RANGE BETWEEN {time_window_s} PRECEDING AND CURRENT ROW) AS lowest_price,\n", + " ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS sell_volume,\n", + " SUM(td.trade_volume) OVER (ORDER BY td.date_time \n", + " ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS total_volume,\n", + " MAX(td.trade_price) OVER (ORDER BY td.date_time \n", + " ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS highest_price,\n", + " MIN(td.trade_price) OVER (ORDER BY td.date_time \n", + " ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS lowest_price,\n", " COUNT(*) OVER (PARTITION BY td.trader_id ORDER BY td.date_time \n", - " RANGE BETWEEN {time_window_s} PRECEDING AND CURRENT ROW) AS number_of_trades\n", - " FROM {trade_10m_v3} td\n", + " ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS number_of_trades\n", + " FROM trade_data td\n", ")\n", "\n", "SELECT \n", @@ -68,16 +71,18 @@ "\n", " def logic(self, **kwargs):\n", " validation_window = kwargs.get('validation_window')\n", - " time_window_s = int(validation_window / 1000)\n", + " time_window_s = int(validation_window / 1000) # Convert milliseconds to seconds\n", " \n", " query_start_time = datetime.now()\n", " print(\"Query start time:\", query_start_time)\n", "\n", - " row_list = self.seq.execute_raw(query.format(\n", + " # Execute the optimized query using a time window and limit\n", + " row_list = self.seq.execute_raw(query_template.format(\n", " trade_10m_v3=\"trade_10m_v3\",\n", " time_window_s=time_window_s\n", " ))\n", "\n", + " # Define the columns for the resulting DataFrame\n", " cols = [\n", " 'START_DATE_TIME',\n", " 'END_DATE_TIME',\n", @@ -90,41 +95,104 @@ " 'VOLUME (%)',\n", " ]\n", "\n", + " # Create DataFrame from query results\n", " final_scenario_df = pd.DataFrame(row_list, columns=cols)\n", + " \n", + " # Calculate the participant's volume percentage\n", " final_scenario_df['PARTICIPANT_VOLUME_PCT'] = final_scenario_df['PARTICIPANT_VOLUME'] / \\\n", " final_scenario_df['TOTAL_VOLUME'] * 100\n", "\n", - " # Adding additional columns\n", + " # Add additional columns to the DataFrame\n", " final_scenario_df['Segment'] = 'Default'\n", " final_scenario_df['SAR_FLAG'] = 'N'\n", " final_scenario_df['Risk'] = 'Low Risk'\n", "\n", - " return final_scenario_df\n" + " print(\"Query end time:\", datetime.now())\n", + " return final_scenario_df\n", + "\n", + "\n", + "\n" ] }, { "cell_type": "code", - "execution_count": 6, + "execution_count": 2, "id": "caee5554-5254-4388-bf24-029281d77890", "metadata": {}, "outputs": [ { - "ename": "TypeError", - "evalue": "unsupported operand type(s) for /: 'NoneType' and 'int'", - "output_type": "error", - "traceback": [ - "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", - "\u001b[0;31mTypeError\u001b[0m Traceback (most recent call last)", - "Cell \u001b[0;32mIn[6], line 2\u001b[0m\n\u001b[1;32m 1\u001b[0m scenario \u001b[38;5;241m=\u001b[39m Scenario()\n\u001b[0;32m----> 2\u001b[0m \u001b[43mscenario\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mlogic\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n", - "Cell \u001b[0;32mIn[4], line 60\u001b[0m, in \u001b[0;36mScenario.logic\u001b[0;34m(self, **kwargs)\u001b[0m\n\u001b[1;32m 58\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mlogic\u001b[39m(\u001b[38;5;28mself\u001b[39m, \u001b[38;5;241m*\u001b[39m\u001b[38;5;241m*\u001b[39mkwargs):\n\u001b[1;32m 59\u001b[0m validation_window \u001b[38;5;241m=\u001b[39m kwargs\u001b[38;5;241m.\u001b[39mget(\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mvalidation_window\u001b[39m\u001b[38;5;124m'\u001b[39m)\n\u001b[0;32m---> 60\u001b[0m time_window_s \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mint\u001b[39m(\u001b[43mvalidation_window\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m/\u001b[39;49m\u001b[43m \u001b[49m\u001b[38;5;241;43m1000\u001b[39;49m)\n\u001b[1;32m 62\u001b[0m query_start_time \u001b[38;5;241m=\u001b[39m datetime\u001b[38;5;241m.\u001b[39mnow()\n\u001b[1;32m 63\u001b[0m \u001b[38;5;28mprint\u001b[39m(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mQuery start time:\u001b[39m\u001b[38;5;124m\"\u001b[39m, query_start_time)\n", - "\u001b[0;31mTypeError\u001b[0m: unsupported operand type(s) for /: 'NoneType' and 'int'" + "name": "stdout", + "output_type": "stream", + "text": [ + "Query start time: 2024-10-13 18:14:29.793521\n", + "Query end time: 2024-10-13 18:14:29.933772\n" ] + }, + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
START_DATE_TIMEEND_DATE_TIMEFOCAL_IDMIN_PRICEMAX_PRICEPRICE_CHANGE (%)PARTICIPANT_VOLUMETOTAL_VOLUMEVOLUME (%)PARTICIPANT_VOLUME_PCTSegmentSAR_FLAGRisk
\n", + "
" + ], + "text/plain": [ + "Empty DataFrame\n", + "Columns: [START_DATE_TIME, END_DATE_TIME, FOCAL_ID, MIN_PRICE, MAX_PRICE, PRICE_CHANGE (%), PARTICIPANT_VOLUME, TOTAL_VOLUME, VOLUME (%), PARTICIPANT_VOLUME_PCT, Segment, SAR_FLAG, Risk]\n", + "Index: []" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" } ], "source": [ - "#scenario = Scenario()\n", - "#scenario.logic()" + "# Instantiate and execute logic\n", + "scenario = Scenario()\n", + "scenario.logic(validation_window=1000)" ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1feaf267-88d1-41b8-a8b9-f150b7ff16cd", + "metadata": {}, + "outputs": [], + "source": [] } ], "metadata": { diff --git a/main.py b/main.py index 068be16..621c750 100644 --- a/main.py +++ b/main.py @@ -1,25 +1,28 @@ #!/usr/bin/env python # coding: utf-8 -# In[4]: +# In[3]: from datetime import datetime import pandas as pd from tms_data_interface import SQLQueryInterface -# SQL query to aggregate trade data and compute metrics -query = """ +# SQL query to aggregate trade data and compute metrics using ROWS with optimizations +query_template = """ WITH trade_data AS ( SELECT trader_id, date_time, trade_price, trade_volume, - -- Create a time window for each trade - date_time - INTERVAL '1 second' * {time_window_s} AS window_start, - date_time AS window_end + -- Create a time window for each trade by subtracting time_window_s seconds + date_add('second', -{time_window_s}, date_time) AS window_start, + date_time AS window_end, + trade_side FROM {trade_10m_v3} + WHERE date_time BETWEEN date_add('day', -1, current_date) AND current_date -- Limit to the last 1 day of data + LIMIT 10000 -- Process only a subset of records for testing ), aggregated_trades AS ( @@ -27,21 +30,21 @@ aggregated_trades AS ( td.trader_id, td.window_start, td.window_end, - SUM(CASE WHEN trade_side = 'buy' THEN trade_volume ELSE 0 END) + SUM(CASE WHEN td.trade_side = 'buy' THEN td.trade_volume ELSE 0 END) OVER (PARTITION BY td.trader_id ORDER BY td.date_time - RANGE BETWEEN {time_window_s} PRECEDING AND CURRENT ROW) AS buy_volume, - SUM(CASE WHEN trade_side = 'sell' THEN trade_volume ELSE 0 END) + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS buy_volume, + SUM(CASE WHEN td.trade_side = 'sell' THEN td.trade_volume ELSE 0 END) OVER (PARTITION BY td.trader_id ORDER BY td.date_time - RANGE BETWEEN {time_window_s} PRECEDING AND CURRENT ROW) AS sell_volume, - SUM(trade_volume) OVER (ORDER BY td.date_time - RANGE BETWEEN {time_window_s} PRECEDING AND CURRENT ROW) AS total_volume, - MAX(trade_price) OVER (ORDER BY td.date_time - RANGE BETWEEN {time_window_s} PRECEDING AND CURRENT ROW) AS highest_price, - MIN(trade_price) OVER (ORDER BY td.date_time - RANGE BETWEEN {time_window_s} PRECEDING AND CURRENT ROW) AS lowest_price, + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS sell_volume, + SUM(td.trade_volume) OVER (ORDER BY td.date_time + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS total_volume, + MAX(td.trade_price) OVER (ORDER BY td.date_time + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS highest_price, + MIN(td.trade_price) OVER (ORDER BY td.date_time + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS lowest_price, COUNT(*) OVER (PARTITION BY td.trader_id ORDER BY td.date_time - RANGE BETWEEN {time_window_s} PRECEDING AND CURRENT ROW) AS number_of_trades - FROM {trade_10m_v3} td + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS number_of_trades + FROM trade_data td ) SELECT @@ -63,16 +66,18 @@ class Scenario: def logic(self, **kwargs): validation_window = kwargs.get('validation_window') - time_window_s = int(validation_window / 1000) + time_window_s = int(validation_window / 1000) # Convert milliseconds to seconds query_start_time = datetime.now() print("Query start time:", query_start_time) - row_list = self.seq.execute_raw(query.format( + # Execute the optimized query using a time window and limit + row_list = self.seq.execute_raw(query_template.format( trade_10m_v3="trade_10m_v3", time_window_s=time_window_s )) + # Define the columns for the resulting DataFrame cols = [ 'START_DATE_TIME', 'END_DATE_TIME', @@ -85,21 +90,35 @@ class Scenario: 'VOLUME (%)', ] + # Create DataFrame from query results final_scenario_df = pd.DataFrame(row_list, columns=cols) + + # Calculate the participant's volume percentage final_scenario_df['PARTICIPANT_VOLUME_PCT'] = final_scenario_df['PARTICIPANT_VOLUME'] / \ final_scenario_df['TOTAL_VOLUME'] * 100 - # Adding additional columns + # Add additional columns to the DataFrame final_scenario_df['Segment'] = 'Default' final_scenario_df['SAR_FLAG'] = 'N' final_scenario_df['Risk'] = 'Low Risk' + print("Query end time:", datetime.now()) return final_scenario_df -# In[6]: -#scenario = Scenario() -#scenario.logic() + +# In[2]: + + +# Instantiate and execute logic +scenario = Scenario() +scenario.logic(validation_window=1000) + + +# In[ ]: + + +