#!/usr/bin/env python # coding: utf-8 # In[1]: from datetime import datetime import pandas as pd from tms_data_interface import SQLQueryInterface # SQL query to aggregate trade data and compute metrics using ROWS with optimizations query_template = """ WITH trade_data AS ( SELECT trader_id, date_time, trade_price, trade_volume, -- Create a time window for each trade by subtracting time_window_s seconds date_add('second', -{time_window_s}, date_time) AS window_start, date_time AS window_end, trade_side FROM {trade_10m_v3} WHERE date_time BETWEEN date_add('day', -1, current_date) AND current_date -- Limit to the last 1 day of data LIMIT 10000 -- Process only a subset of records for testing ), aggregated_trades AS ( SELECT td.trader_id, td.window_start, td.window_end, SUM(CASE WHEN td.trade_side = 'buy' THEN td.trade_volume ELSE 0 END) OVER (PARTITION BY td.trader_id ORDER BY td.date_time ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS buy_volume, SUM(CASE WHEN td.trade_side = 'sell' THEN td.trade_volume ELSE 0 END) OVER (PARTITION BY td.trader_id ORDER BY td.date_time ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS sell_volume, SUM(td.trade_volume) OVER (ORDER BY td.date_time ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS total_volume, MAX(td.trade_price) OVER (ORDER BY td.date_time ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS highest_price, MIN(td.trade_price) OVER (ORDER BY td.date_time ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS lowest_price, COUNT(*) OVER (PARTITION BY td.trader_id ORDER BY td.date_time ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS number_of_trades FROM trade_data td ) SELECT window_start AS start_time, window_end AS end_time, trader_id AS "Participant", lowest_price AS min_price, highest_price AS max_price, (highest_price - lowest_price) / NULLIF(lowest_price, 0) * 100 AS "Price Change (%)", buy_volume AS participant_volume, total_volume, (buy_volume / NULLIF(total_volume, 0)) * 100 AS "Volume (%)" FROM aggregated_trades WHERE buy_volume > 0 OR sell_volume > 0 limit 1000 """ class Scenario: seq = SQLQueryInterface(schema="trade_schema") def logic(self, **kwargs): validation_window = kwargs.get('validation_window') time_window_s = int(validation_window / 1000) # Convert milliseconds to seconds query_start_time = datetime.now() print("Query start time:", query_start_time) # Execute the optimized query using a time window and limit row_list = self.seq.execute_raw(query_template.format( trade_10m_v3="trade_10m_v3", time_window_s=time_window_s )) # Define the columns for the resulting DataFrame cols = [ 'START_DATE_TIME', 'END_DATE_TIME', 'FOCAL_ID', 'MIN_PRICE', 'MAX_PRICE', 'PRICE_CHANGE (%)', 'PARTICIPANT_VOLUME', 'TOTAL_VOLUME', 'VOLUME (%)', ] # Create DataFrame from query results final_scenario_df = pd.DataFrame(row_list, columns=cols) # Calculate the participant's volume percentage final_scenario_df['PARTICIPANT_VOLUME_PCT'] = final_scenario_df['PARTICIPANT_VOLUME'] / \ final_scenario_df['TOTAL_VOLUME'] * 100 # Add additional columns to the DataFrame final_scenario_df['Segment'] = 'Default' final_scenario_df['SAR_FLAG'] = 'N' final_scenario_df['Risk'] = 'Low Risk' print("Query end time:", datetime.now()) return final_scenario_df # In[3]: # Instantiate and execute logic scenario = Scenario() scenario.logic(validation_window=100000) # In[ ]: