import os import time import csv import tempfile from io import StringIO from clickhouse_driver import Client from clickhouse_driver.errors import Error as ClickHouseError import gradio as gr try: from google.colab import userdata DB_PASSWORD = userdata.get('FASHION_CH_PASS') except: DB_PASSWORD = os.environ['FASHION_CH_PASS'] class DBConnectionManager: def __init__(self, max_connection_age=3600): # 1 hour max connection age self.client = None self.last_connection_time = None self.max_connection_age = max_connection_age def get_connection(self): if self.client is None or self._should_refresh_connection(): self._create_connection() return self.client def _create_connection(self): if self.client: self.close_connection() self.client = Client( host='rc1d-a93v7vf0pjfr6e2o.mdb.yandexcloud.net', port=9440, user='user1', password=DB_PASSWORD, database='db1', secure=True, ca_certs='./RootCA.pem' ) self.last_connection_time = time.time() def _should_refresh_connection(self): if self.last_connection_time is None: return True return (time.time() - self.last_connection_time) > self.max_connection_age def execute_query(self, query, params=None): max_retries = 3 for attempt in range(max_retries): try: connection = self.get_connection() return connection.execute(query, params) except ClickHouseError as e: print(f"Database error on attempt {attempt + 1}: {e}") if attempt < max_retries - 1: print("Retrying...") self._create_connection() # Force a new connection else: raise def close_connection(self): if self.client: self.client.disconnect() self.client = None self.last_connection_time = None db_manager = DBConnectionManager() def get_category_a_options(): query = "SELECT DISTINCT category_a FROM datamart_lamoda_grownups_mc_4 ORDER BY category_a" result = db_manager.execute_query(query) return [row[0] for row in result] def update_category_3(category_a): print("entering update_category_a") try: query = """ SELECT DISTINCT category_3 FROM datamart_lamoda_grownups_mc_4 WHERE category_a in %(category_a)s ORDER BY category_3 """ result = db_manager.execute_query(query, {'category_a': category_a}) category_3_options = [row[0] for row in result] return gr.CheckboxGroup(choices=category_3_options, label="Category 3", interactive=True) except Exception as e: print(f"Error updating category 3: {e}") return gr.CheckboxGroup(choices=[], label="Category 3", interactive=True) def update_category_4(category_a, category_3): print("entering update_category_4") try: query = """ SELECT DISTINCT category_4 FROM datamart_lamoda_grownups_mc_4 WHERE 1=1 and category_a in %(category_a)s and category_3 in %(category_3)s ORDER BY category_4 """ result = db_manager.execute_query(query, {'category_a': category_a, 'category_3': category_3}) category_4_options = [row[0] for row in result] return gr.CheckboxGroup(choices=category_4_options, label="Category 4", interactive=True) except Exception as e: print(f"Error updating category 4: {e}") return gr.CheckboxGroup(choices=[], label="Category 4", interactive=True) def generate_csv_report(category_a, category_3, category_4): query = """ WITH sku_prices AS ( SELECT id_product_money, category_a, category_3, category_4, avg(avg_price) AS sku_avg_price FROM datamart_lamoda_grownups_mc_4 WHERE price_tier in ('Medium', 'High', 'Premium') AND category_a IN %(category_a)s AND category_3 IN %(category_3)s AND category_4 IN %(category_4)s GROUP BY all ), median_prices AS ( SELECT category_a, category_3, category_4, median(sku_avg_price) AS median_price FROM sku_prices GROUP BY all ), main AS( SELECT d.category_a AS category_a, d.category_3 AS category_3, d.category_4 AS category_4, 500*round(0.2*median_price/500) AS step, step*round(sku_avg_price/step) AS price_group, turnover, total_sales, d.id_product_money AS id_product_money, avg_price, season, brand_rank, stock_increase, initial_stock, days_in_stock, total_days_in_period, median_price, brand_name, is_first_week, week_start_date FROM datamart_lamoda_grownups_mc_4 d LEFT JOIN median_prices m USING (category_a, category_3, category_4) LEFT JOIN sku_prices s ON d.id_product_money = s.id_product_money WHERE price_tier in ('Medium', 'High', 'Premium') AND d.category_a IN %(category_a)s AND d.category_3 IN %(category_3)s AND d.category_4 IN %(category_4)s ) SELECT year(week_start_date)::text AS YEAR, min(week_start_date)::text AS min_week_start_date, max(week_start_date)::text AS max_week_start_date, category_a, category_3, category_4, brand_name, price_group, sum(turnover) as sum_turnover, sum(total_sales) as count_sales, count(distinct id_product_money) as count_sku, round(avg(avg_price)) as avg_price1, round(sum_turnover/IF(count_sales=0,NULL,count_sales)) as avg_price2, uniqExactIf(id_product_money, season = 'AW 23') as count_sku_aw_23, uniqExactIf(id_product_money, season = 'SS 24') as count_sku_ss_24, sumIf(turnover, season = 'AW 23') as sum_turnover_aw_23, sumIf(turnover, season = 'SS 24') as sum_turnover_ss_24, round(sum_turnover/IF(count_sku=0,NULL,count_sku)) as turnover_per_sku, round(sum_turnover_aw_23/IF(count_sku_aw_23=0,NULL,count_sku_aw_23)) as turnover_per_sku_aw_23, round(sum_turnover_ss_24/IF(count_sku_ss_24=0,NULL,count_sku_ss_24)) as turnover_per_sku_ss_24, round(1.0*sumIf(turnover, brand_rank <= 10)/IF(sum_turnover=0,NULL,sum_turnover), 3) as top10_turnover_share, round(1.0*sumIf(turnover, brand_name = 'BLCV')/IF(sum_turnover=0,NULL,sum_turnover), 3) as blcv_turnover_share, round(1.0*count_sales/IF((sum(stock_increase) + max(initial_stock))=0,NULL,(sum(stock_increase) + sumIf(initial_stock, is_first_week=1))), 3) as sales_through_rate, round(1.0*uniqExactIf(id_product_money, total_sales > 0)/IF(count_sku=0,NULL,count_sku), 3) as sold_sku_share, round(1.0*sum(days_in_stock)/IF(sum(total_days_in_period)=0,NULL,sum(total_days_in_period)), 3) as availability_index, max(median_price) AS median_price FROM main GROUP BY all ORDER BY all """ params = { 'category_a': category_a, 'category_3': category_3, 'category_4': category_4 } try: print('trying to run the query') result = db_manager.execute_query(query, params) # Create a CSV string with tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='.csv', encoding='utf-8') as temp_file: csv_writer = csv.writer(temp_file) # Write header header = [ "YEAR", "min_week_start_date", "max_week_start_date", "category_a", "category_3", "category_4", "brand_name", "price_group", "sum_turnover", "count_sales", "count_sku", "avg_price1", "avg_price2", "count_sku_aw_23", "count_sku_ss_24", "sum_turnover_aw_23", "sum_turnover_ss_24", "turnover_per_sku", "turnover_per_sku_aw_23", "turnover_per_sku_ss_24", "top10_turnover_share", "blcv_turnover_share", "sales_through_rate", "sold_sku_share", "availability_index", "median_price" ] csv_writer.writerow(header) # Write data rows csv_writer.writerows(result) return temp_file.name except Exception as e: print(f"Error generating CSV report: {e}") return None def download_csv(category_a, category_3, category_4): csv_content = generate_csv_report(category_a, category_3, category_4) if csv_content: return csv_content else: raise gr.Error("Error generating CSV report. Please try again.") def interface(): with gr.Blocks() as demo: category_a_options = get_category_a_options() category_a = gr.CheckboxGroup(choices=category_a_options, label="Category A") category_3 = gr.CheckboxGroup(choices=[], label="Category 3", interactive=True) category_4 = gr.CheckboxGroup(choices=[], label="Category 4", interactive=True) download_button = gr.Button("Download CSV Report") csv_output = gr.File(label="CSV Report") category_a.change( fn=update_category_3, inputs=[category_a], outputs=[category_3] ) category_3.change( fn=update_category_4, inputs=[category_a, category_3], outputs=[category_4] ) download_button.click( fn=download_csv, inputs=[category_a, category_3, category_4], outputs=[csv_output] ) return demo def cleanup_temp_files(): temp_dir = tempfile.gettempdir() for filename in os.listdir(temp_dir): if filename.endswith('.csv'): file_path = os.path.join(temp_dir, filename) try: os.remove(file_path) except Exception as e: print(f"Error deleting temporary file {file_path}: {e}") if __name__ == "__main__": demo = interface() demo.launch(debug=True) db_manager.close_connection() # Close the connection when the app exits cleanup_temp_files()