#!/usr/bin/env python # coding: utf-8 # In[1]: from tms_data_interface import SQLQueryInterface class Scenario: seq = SQLQueryInterface() def logic(self, **kwargs): # from datetime import datetime, timedelta import pandas as pd from tms_data_interface import SQLQueryInterface query = """ WITH trade_data AS ( SELECT trader_id, date_time, trade_price, trade_volume, -- Create a time window for each trade date_time - INTERVAL '1 second' * {time_window_s} AS window_start, date_time AS window_end FROM {trade_data_1b} ), aggregated_trades AS ( SELECT td.trader_id, td.window_start, td.window_end, SUM(CASE WHEN trade_side = 'buy' THEN trade_volume ELSE 0 END) OVER (PARTITION BY td.trader_id ORDER BY td.date_time RANGE BETWEEN {time_window_s} PRECEDING AND CURRENT ROW) AS buy_volume, SUM(CASE WHEN trade_side = 'sell' THEN trade_volume ELSE 0 END) OVER (PARTITION BY td.trader_id ORDER BY td.date_time RANGE BETWEEN {time_window_s} PRECEDING AND CURRENT ROW) AS sell_volume, SUM(trade_volume) OVER (ORDER BY td.date_time RANGE BETWEEN {time_window_s} PRECEDING AND CURRENT ROW) AS total_volume, MAX(trade_price) OVER (ORDER BY td.date_time RANGE BETWEEN {time_window_s} PRECEDING AND CURRENT ROW) AS highest_price, MIN(trade_price) OVER (ORDER BY td.date_time RANGE BETWEEN {time_window_s} PRECEDING AND CURRENT ROW) AS lowest_price, COUNT(*) OVER (PARTITION BY td.trader_id ORDER BY td.date_time RANGE BETWEEN {time_window_s} PRECEDING AND CURRENT ROW) AS number_of_trades FROM trade_data td ) SELECT window_start AS start_time, window_end AS end_time, trader_id AS "Participant", lowest_price AS min_price, highest_price AS max_price, (highest_price - lowest_price) / NULLIF(lowest_price, 0) * 100 AS "Price Change (%)", buy_volume AS participant_volume, total_volume, (buy_volume / NULLIF(total_volume, 0)) * 100 AS "Volume (%)" FROM aggregated_trades WHERE buy_volume > 0 OR sell_volume > 0 """ class Scenario: seq = SQLQueryInterface(schema="internal") def logic(self, **kwargs): validation_window = kwargs.get('validation_window') time_window_s = int(validation_window / 1000) query_start_time = datetime.now() print("Query start time :", query_start_time) row_list = self.seq.execute_raw(query.format(trade_data_1b="trade_data_2b", time_window_s=time_window_s) ) cols = [ 'START_DATE_TIME', 'END_DATE_TIME', 'PARTICIPANT', 'MIN_PRICE', 'MAX_PRICE', 'PRICE_CHANGE (%)', 'PARTICIPANT_VOLUME', 'TOTAL_VOLUME', 'VOLUME (%)', ] final_scenario_df = pd.DataFrame(row_list, columns=cols) final_scenario_df['PARTICIPANT_VOLUME_PCT'] = final_scenario_df['PARTICIPANT_VOLUME'] / \ final_scenario_df['TOTAL_VOLUME'] * 100 # Adding additional columns final_scenario_df['Segment'] = 'Default' final_scenario_df['SAR_FLAG'] = 'N' final_scenario_df['Risk'] = 'Low Risk' return final_scenario_df # In[2]: from datetime import datetime import pandas as pd from tms_data_interface import SQLQueryInterface # SQL query to aggregate trade data and compute metrics query = """ WITH trade_data AS ( SELECT trader_id, date_time, trade_price, trade_volume, -- Create a time window for each trade date_time - INTERVAL '1 second' * {time_window_s} AS window_start, date_time AS window_end FROM {trade_data_1b} ), aggregated_trades AS ( SELECT td.trader_id, td.window_start, td.window_end, SUM(CASE WHEN trade_side = 'buy' THEN trade_volume ELSE 0 END) OVER (PARTITION BY td.trader_id ORDER BY td.date_time RANGE BETWEEN {time_window_s} PRECEDING AND CURRENT ROW) AS buy_volume, SUM(CASE WHEN trade_side = 'sell' THEN trade_volume ELSE 0 END) OVER (PARTITION BY td.trader_id ORDER BY td.date_time RANGE BETWEEN {time_window_s} PRECEDING AND CURRENT ROW) AS sell_volume, SUM(trade_volume) OVER (ORDER BY td.date_time RANGE BETWEEN {time_window_s} PRECEDING AND CURRENT ROW) AS total_volume, MAX(trade_price) OVER (ORDER BY td.date_time RANGE BETWEEN {time_window_s} PRECEDING AND CURRENT ROW) AS highest_price, MIN(trade_price) OVER (ORDER BY td.date_time RANGE BETWEEN {time_window_s} PRECEDING AND CURRENT ROW) AS lowest_price, COUNT(*) OVER (PARTITION BY td.trader_id ORDER BY td.date_time RANGE BETWEEN {time_window_s} PRECEDING AND CURRENT ROW) AS number_of_trades FROM trade_data td ) SELECT window_start AS start_time, window_end AS end_time, trader_id AS "Participant", lowest_price AS min_price, highest_price AS max_price, (highest_price - lowest_price) / NULLIF(lowest_price, 0) * 100 AS "Price Change (%)", buy_volume AS participant_volume, total_volume, (buy_volume / NULLIF(total_volume, 0)) * 100 AS "Volume (%)" FROM aggregated_trades WHERE buy_volume > 0 OR sell_volume > 0 """ class Scenario: seq = SQLQueryInterface(schema="internal") def logic(self, **kwargs): validation_window = kwargs.get('validation_window') time_window_s = int(validation_window / 1000) query_start_time = datetime.now() print("Query start time:", query_start_time) row_list = self.seq.execute_raw(query.format( trade_data_1b="trade_data_2b", time_window_s=time_window_s )) cols = [ 'START_DATE_TIME', 'END_DATE_TIME', 'PARTICIPANT', 'MIN_PRICE', 'MAX_PRICE', 'PRICE_CHANGE (%)', 'PARTICIPANT_VOLUME', 'TOTAL_VOLUME', 'VOLUME (%)', ] final_scenario_df = pd.DataFrame(row_list, columns=cols) final_scenario_df['PARTICIPANT_VOLUME_PCT'] = final_scenario_df['PARTICIPANT_VOLUME'] / \ final_scenario_df['TOTAL_VOLUME'] * 100 # Adding additional columns final_scenario_df['Segment'] = 'Default' final_scenario_df['SAR_FLAG'] = 'N' final_scenario_df['Risk'] = 'Low Risk' return final_scenario_df # In[ ]: