{ "cells": [ { "cell_type": "code", "execution_count": 1, "id": "90c70e46-71a0-44a6-8090-f53aad3193c3", "metadata": { "tags": [] }, "outputs": [], "source": [ "from datetime import datetime\n", "import pandas as pd\n", "from tms_data_interface import SQLQueryInterface\n", "\n", "# SQL query to aggregate trade data and compute metrics using ROWS with optimizations\n", "query_template = \"\"\"\n", "WITH trade_data AS (\n", " SELECT \n", " trader_id,\n", " date_time,\n", " trade_price,\n", " trade_volume,\n", " -- Create a time window for each trade by subtracting time_window_s seconds\n", " date_add('second', -{time_window_s}, date_time) AS window_start,\n", " date_time AS window_end,\n", " trade_side\n", " FROM {trade_10m_v3}\n", " WHERE date_time BETWEEN date_add('day', -1, current_date) AND current_date -- Limit to the last 1 day of data\n", " LIMIT 10000 -- Process only a subset of records for testing\n", "),\n", "\n", "aggregated_trades AS (\n", " SELECT \n", " td.trader_id,\n", " td.window_start,\n", " td.window_end,\n", " SUM(CASE WHEN td.trade_side = 'buy' THEN td.trade_volume ELSE 0 END) \n", " OVER (PARTITION BY td.trader_id ORDER BY td.date_time \n", " ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS buy_volume,\n", " SUM(CASE WHEN td.trade_side = 'sell' THEN td.trade_volume ELSE 0 END) \n", " OVER (PARTITION BY td.trader_id ORDER BY td.date_time \n", " ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS sell_volume,\n", " SUM(td.trade_volume) OVER (ORDER BY td.date_time \n", " ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS total_volume,\n", " MAX(td.trade_price) OVER (ORDER BY td.date_time \n", " ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS highest_price,\n", " MIN(td.trade_price) OVER (ORDER BY td.date_time \n", " ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS lowest_price,\n", " COUNT(*) OVER (PARTITION BY td.trader_id ORDER BY td.date_time \n", " ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS number_of_trades\n", " FROM trade_data td\n", ")\n", "\n", "SELECT \n", " window_start AS start_time,\n", " window_end AS end_time,\n", " trader_id AS \"Participant\",\n", " lowest_price AS min_price,\n", " highest_price AS max_price,\n", " (highest_price - lowest_price) / NULLIF(lowest_price, 0) * 100 AS \"Price Change (%)\",\n", " buy_volume AS participant_volume,\n", " total_volume,\n", " (buy_volume / NULLIF(total_volume, 0)) * 100 AS \"Volume (%)\"\n", "FROM aggregated_trades\n", "WHERE buy_volume > 0 OR sell_volume > 0\n", "limit 1000\n", "\"\"\"\n", "\n", "class Scenario:\n", " seq = SQLQueryInterface(schema=\"trade_schema\")\n", "\n", " def logic(self, **kwargs):\n", " validation_window = kwargs.get('validation_window')\n", " time_window_s = int(validation_window / 1000) # Convert milliseconds to seconds\n", " \n", " query_start_time = datetime.now()\n", " print(\"Query start time:\", query_start_time)\n", "\n", " # Execute the optimized query using a time window and limit\n", " row_list = self.seq.execute_raw(query_template.format(\n", " trade_10m_v3=\"trade_10m_v3\",\n", " time_window_s=time_window_s\n", " ))\n", "\n", " # Define the columns for the resulting DataFrame\n", " cols = [\n", " 'START_DATE_TIME',\n", " 'END_DATE_TIME',\n", " 'FOCAL_ID',\n", " 'MIN_PRICE',\n", " 'MAX_PRICE',\n", " 'PRICE_CHANGE (%)',\n", " 'PARTICIPANT_VOLUME',\n", " 'TOTAL_VOLUME',\n", " 'VOLUME (%)',\n", " ]\n", "\n", " # Create DataFrame from query results\n", " final_scenario_df = pd.DataFrame(row_list, columns=cols)\n", " \n", " # Calculate the participant's volume percentage\n", " final_scenario_df['PARTICIPANT_VOLUME_PCT'] = final_scenario_df['PARTICIPANT_VOLUME'] / \\\n", " final_scenario_df['TOTAL_VOLUME'] * 100\n", "\n", " # Add additional columns to the DataFrame\n", " final_scenario_df['Segment'] = 'Default'\n", " final_scenario_df['SAR_FLAG'] = 'N'\n", " final_scenario_df['Risk'] = 'Low Risk'\n", "\n", " print(\"Query end time:\", datetime.now())\n", " return final_scenario_df\n", "\n", "\n", "\n" ] }, { "cell_type": "code", "execution_count": 3, "id": "caee5554-5254-4388-bf24-029281d77890", "metadata": { "tags": [] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Query start time: 2024-10-14 05:44:12.616270\n", "Query end time: 2024-10-14 05:44:12.873749\n" ] }, { "data": { "text/html": [ "
| \n", " | START_DATE_TIME | \n", "END_DATE_TIME | \n", "FOCAL_ID | \n", "MIN_PRICE | \n", "MAX_PRICE | \n", "PRICE_CHANGE (%) | \n", "PARTICIPANT_VOLUME | \n", "TOTAL_VOLUME | \n", "VOLUME (%) | \n", "PARTICIPANT_VOLUME_PCT | \n", "Segment | \n", "SAR_FLAG | \n", "Risk | \n", "
|---|