{ "cells": [ { "cell_type": "code", "execution_count": 1, "id": "90c70e46-71a0-44a6-8090-f53aad3193c3", "metadata": { "tags": [] }, "outputs": [], "source": [ "from datetime import datetime\n", "import pandas as pd\n", "from tms_data_interface import SQLQueryInterface\n", "\n", "# SQL query to aggregate trade data and compute metrics using ROWS with optimizations\n", "query_template = \"\"\"\n", "WITH trade_data AS (\n", " SELECT \n", " trader_id,\n", " date_time,\n", " trade_price,\n", " trade_volume,\n", " -- Create a time window for each trade by subtracting time_window_s seconds\n", " date_add('second', -{time_window_s}, date_time) AS window_start,\n", " date_time AS window_end,\n", " trade_side\n", " FROM {trade_10m_v3}\n", " WHERE date_time BETWEEN date_add('day', -1, current_date) AND current_date -- Limit to the last 1 day of data\n", " LIMIT 10000 -- Process only a subset of records for testing\n", "),\n", "\n", "aggregated_trades AS (\n", " SELECT \n", " td.trader_id,\n", " td.window_start,\n", " td.window_end,\n", " SUM(CASE WHEN td.trade_side = 'buy' THEN td.trade_volume ELSE 0 END) \n", " OVER (PARTITION BY td.trader_id ORDER BY td.date_time \n", " ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS buy_volume,\n", " SUM(CASE WHEN td.trade_side = 'sell' THEN td.trade_volume ELSE 0 END) \n", " OVER (PARTITION BY td.trader_id ORDER BY td.date_time \n", " ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS sell_volume,\n", " SUM(td.trade_volume) OVER (ORDER BY td.date_time \n", " ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS total_volume,\n", " MAX(td.trade_price) OVER (ORDER BY td.date_time \n", " ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS highest_price,\n", " MIN(td.trade_price) OVER (ORDER BY td.date_time \n", " ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS lowest_price,\n", " COUNT(*) OVER (PARTITION BY td.trader_id ORDER BY td.date_time \n", " ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS number_of_trades\n", " FROM trade_data td\n", ")\n", "\n", "SELECT \n", " window_start AS start_time,\n", " window_end AS end_time,\n", " trader_id AS \"Participant\",\n", " lowest_price AS min_price,\n", " highest_price AS max_price,\n", " (highest_price - lowest_price) / NULLIF(lowest_price, 0) * 100 AS \"Price Change (%)\",\n", " buy_volume AS participant_volume,\n", " total_volume,\n", " (buy_volume / NULLIF(total_volume, 0)) * 100 AS \"Volume (%)\"\n", "FROM aggregated_trades\n", "WHERE buy_volume > 0 OR sell_volume > 0\n", "limit 1000\n", "\"\"\"\n", "\n", "class Scenario:\n", " seq = SQLQueryInterface(schema=\"trade_schema\")\n", "\n", " def logic(self, **kwargs):\n", " validation_window = kwargs.get('validation_window')\n", " time_window_s = int(validation_window / 1000) # Convert milliseconds to seconds\n", " \n", " query_start_time = datetime.now()\n", " print(\"Query start time:\", query_start_time)\n", "\n", " # Execute the optimized query using a time window and limit\n", " row_list = self.seq.execute_raw(query_template.format(\n", " trade_10m_v3=\"trade_10m_v3\",\n", " time_window_s=time_window_s\n", " ))\n", "\n", " # Define the columns for the resulting DataFrame\n", " cols = [\n", " 'START_DATE_TIME',\n", " 'END_DATE_TIME',\n", " 'FOCAL_ID',\n", " 'MIN_PRICE',\n", " 'MAX_PRICE',\n", " 'PRICE_CHANGE (%)',\n", " 'PARTICIPANT_VOLUME',\n", " 'TOTAL_VOLUME',\n", " 'VOLUME (%)',\n", " ]\n", "\n", " # Create DataFrame from query results\n", " final_scenario_df = pd.DataFrame(row_list, columns=cols)\n", " \n", " # Calculate the participant's volume percentage\n", " final_scenario_df['PARTICIPANT_VOLUME_PCT'] = final_scenario_df['PARTICIPANT_VOLUME'] / \\\n", " final_scenario_df['TOTAL_VOLUME'] * 100\n", "\n", " # Add additional columns to the DataFrame\n", " final_scenario_df['Segment'] = 'Default'\n", " final_scenario_df['SAR_FLAG'] = 'N'\n", " final_scenario_df['Risk'] = 'Low Risk'\n", "\n", " print(\"Query end time:\", datetime.now())\n", " return final_scenario_df\n", "\n", "\n", "\n" ] }, { "cell_type": "code", "execution_count": 3, "id": "caee5554-5254-4388-bf24-029281d77890", "metadata": { "tags": [] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Query start time: 2024-10-14 05:44:12.616270\n", "Query end time: 2024-10-14 05:44:12.873749\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
START_DATE_TIMEEND_DATE_TIMEFOCAL_IDMIN_PRICEMAX_PRICEPRICE_CHANGE (%)PARTICIPANT_VOLUMETOTAL_VOLUMEVOLUME (%)PARTICIPANT_VOLUME_PCTSegmentSAR_FLAGRisk
\n", "
" ], "text/plain": [ "Empty DataFrame\n", "Columns: [START_DATE_TIME, END_DATE_TIME, FOCAL_ID, MIN_PRICE, MAX_PRICE, PRICE_CHANGE (%), PARTICIPANT_VOLUME, TOTAL_VOLUME, VOLUME (%), PARTICIPANT_VOLUME_PCT, Segment, SAR_FLAG, Risk]\n", "Index: []" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Instantiate and execute logic\n", "scenario = Scenario()\n", "scenario.logic(validation_window=100000)" ] }, { "cell_type": "code", "execution_count": null, "id": "1feaf267-88d1-41b8-a8b9-f150b7ff16cd", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.8" } }, "nbformat": 4, "nbformat_minor": 5 }