import { Injectable } from '@angular/core';
// ─────────────────────────────────────────────────────────────────────────────
// Duck Db
import * as duckdb from '@duckdb/duckdb-wasm';
// ─────────────────────────────────────────────────────────────────────────────
// Interfaces
import { Chunk } from '../../interfaces/chunk/chunk.interface';
import { GeneralHelpers } from '../../helpers/general.helper';
import { RenderService } from '../render/render.service';

import {
  FILE_EXTENSIONS,
  RELOAD_WASM,
  REMOVE_DUCKDB_FILE,
  SYNC_DUCKDB_FILES,
} from '../../constants/general.constants';
import { MessageService } from '../message/message.service';
import { Subscription } from 'rxjs';
import { DuckDBDataProtocol } from '@duckdb/duckdb-wasm';
import { ArrowInsertOptions } from '@duckdb/duckdb-wasm/dist/types/src/bindings';
import { LocalforageService } from '../localforage/localforage.service';
import { tableFromIPC, tableToIPC } from 'apache-arrow';
import { FileModule } from '../../interfaces/file.interface';
import { FileService } from '../file/file.service';
import { Papa } from 'ngx-papaparse';
import { from } from 'arquero';
import { ChunkContext } from '../../interfaces/chunk/chunk-context.interface';
import { FileUnifiedService } from '../file-unified/file-unified.service';
import { ConvertService } from '../convert/convert.service';

@Injectable({
  providedIn: 'root',
})
export class DuckDbService {
  private database!: duckdb.AsyncDuckDB;
  private connection!: duckdb.AsyncDuckDBConnection;
  private worker!: Worker;
  private logger!: duckdb.ConsoleLogger;

  // File list
  public fileList: string[] = [];

  // ─────────────────────────────────────────────────────────────────────
  // Message subscription
  private messageSubscription!: Subscription;

  constructor(
    private renderService: RenderService,
    private messageService: MessageService,
    private localforageService: LocalforageService,
    private fileService: FileService,
    private papa: Papa,
    private fileUnifiedService: FileUnifiedService,
    private convertService: ConvertService
  ) {
    this.messageSubscription = this.messageService
      .getMessage()
      .subscribe((message: any) => {
        if (message && message.text === REMOVE_DUCKDB_FILE) {
          this.removeFileFromDatabase(message?.data);
        }
        if (message && message.text === RELOAD_WASM) {
          setTimeout(() => {
            // this.dropAllTables();
            this.dropFiles();
            this.flushFiles();
            this.reset();
          }, 0);
        }
        if (message && message.text === SYNC_DUCKDB_FILES) {
          setTimeout(() => {
            this.processGlobalModules();
          }, 0);
        }
      });
  }

  public async init() {
    const JSDELIVR_BUNDLES = duckdb.getJsDelivrBundles();
    const bundle = await duckdb.selectBundle(JSDELIVR_BUNDLES);

    const worker_url = URL.createObjectURL(
      new Blob([`importScripts("${bundle.mainWorker!}");`], {
        type: 'text/javascript',
      })
    );

    this.worker = new Worker(worker_url);
    this.logger = new duckdb.ConsoleLogger();
    this.database = new duckdb.AsyncDuckDB(this.logger, this.worker);
    await this.database.instantiate(bundle.mainModule, bundle.pthreadWorker);
    URL.revokeObjectURL(worker_url);
    this.processGlobalModules();
  }

  public async dbQuery(args: any[], context: ChunkContext) {
    if (
      !Array.isArray(args) ||
      args.length < 1 ||
      typeof args[0] !== 'string'
    ) {
      const error = new Error('Invalid query format: The query must be a string.');
      context.addMessage(
        'Invalid query format: The query must be a string.',
        'danger'
      );
      throw error;
    }

    const query = args[0];
    context.setBusy(true, 'Executing query');

    try {
      await this.openConnection();

      const result = await this.connection.query(query);
      const resultStr = result.toString();
      
      if (args.length >= 2 && args[1] === 'arrow') {
        // For arrow format, convert the result to an arrow table
        if (resultStr && GeneralHelpers.isJSON(resultStr)) {
          const arequero = from(JSON.parse(resultStr));
          const arrowTable = arequero.toArrow();
          context.setBusy(false);
          return tableToIPC(arrowTable);
        }
      } 
      
      // Default to string result
      context.setBusy(false);
      return resultStr;
    } catch (error: any) {
      console.error(`Error in dbQuery: ${error}`);
      context.addMessage(`Error executing query: ${error.message || error}`, 'danger');
      context.setBusy(false);
      throw error; // Rethrow to allow try/catch blocks to work
    } finally {
      try {
        await this.closeConnection();
      } catch (closeError) {
        console.error('Error closing connection:', closeError);
      }
    }
  }

  public async doQuery(chunkData: Chunk, context: ChunkContext) {
    const { content } = chunkData;

    if (typeof content !== 'string' || !!content === false) {
      return;
    }
    await this.openConnection();
    const data = await this.connection.query(content);

    const dataString = data.toString();
    const isJSON = GeneralHelpers.isJSON(dataString);
    if (isJSON) {
      this.renderService.renderTable([dataString], context);
    } else {
      const aquero = from(data).objects();
      this.renderService.renderTable([aquero], context);
    }

    await this.closeConnection();
  }

  // ----------------------------------------------
  // Control connection
  // ----------------------------------------------
  private async openConnection() {
    this.connection = await this.database.connect();
  }

  private async closeConnection() {
    await this.connection.close();
  }

  // ----------------------------------------------
  // Control connection
  // ----------------------------------------------

  // ----------------------------------------------
  // Remove file from DB
  // ----------------------------------------------

  private async removeFileFromDatabase(name: string) {
    const filePosition = this.fileList.findIndex(
      (fileName) => fileName === name
    );
    if (filePosition > -1) {
      this.fileList.splice(filePosition, 1);
    }
    await this.database.dropFile(name);
  }

  // ----------------------------------------------
  // Control panel methods (PUBLIC)
  // ----------------------------------------------

  public async dropFiles() {
    await this.database.dropFiles();
  }

  public async flushFiles() {
    await this.database.flushFiles();
  }

  public async reset() {
    await this.database.reset();
  }

  public async uploadFile(data: File) {
    await this.database.registerFileHandle(
      data.name,
      data,
      DuckDBDataProtocol.BROWSER_FILEREADER,
      true
    );
  }

  // ----------------------------------------------
  // Control panel methods (PUBLIC)
  // ----------------------------------------------

  // ----------------------------------------------
  // Methods from QuickJS or Pyodide (PUBLIC)
  // ----------------------------------------------

  public async getFileFromDatabase(args: any[], context: ChunkContext) {
    if (
      args.length < 2 ||
      typeof args[0] !== 'string' ||
      typeof args[1] !== 'string'
    ) {
      context.addMessage(
        'Invalid arguments. Expected [tableName, fileName].',
        'danger'
      );
      return;
    }

    const [tableName, fileName] = args;

    try {
      await this.openConnection();

      // Query the table
      const result = await this.connection.query(`SELECT * FROM ${tableName}`);

      // Convert the result to a JSON string
      const jsonString = JSON.stringify(result.toArray());

      // Determine file extension based on the fileName
      const fileExtension = GeneralHelpers.fileExtensionFromString(fileName);
      let finalFileName = fileName;

      // If the file extension is not specified or is not JSON, default to JSON
      if (fileExtension.toLowerCase() !== FILE_EXTENSIONS.json) {
        finalFileName = `${fileName}.${FILE_EXTENSIONS.json}`;
        context.addMessage(`File extension defaulted to .json`, 'info');
      }

      // Save the file
      await this.fileService.saveFile([finalFileName, jsonString], context);

      context.addMessage(
        `Table '${tableName}' successfully exported to '${finalFileName}'.`,
        'success'
      );
    } catch (error: any) {
      console.error(`Error in getFileFromDatabase: ${error}`);
      context.addMessage(`Error exporting table: ${error.message}`, 'danger');
    } finally {
      await this.closeConnection();
    }
  }

  public async addFileToDatabase(args: any[], context: ChunkContext) {
    context.setBusy(true, 'Adding file to database');

    if (
      args.length < 2 ||
      typeof args[0] !== 'string' ||
      typeof args[1] !== 'string'
    ) {
      const error = new Error('Invalid arguments. Expected [filename, tableName].');
      context.addMessage(
        'Invalid arguments. Expected [filename, tableName].',
        'danger'
      );
      context.setBusy(false);
      throw error;
    }

    const [fileName, tableName] = args;

    try {
      await this.openConnection();

      context.setBusy(true, 'Fetching file');
      const fileContent = await this.fileUnifiedService.getFile(
        [fileName],
        context
      );
      if (!fileContent) {
        const error = new Error(`File '${fileName}' not found or empty.`);
        context.addMessage(`File '${fileName}' not found or empty.`, 'danger');
        context.setBusy(false);
        throw error;
      }

      context.setBusy(true, 'Loading file into database');
      await this.loadArrow(fileContent, tableName);

      context.addMessage(
        `File '${fileName}' successfully loaded into table '${tableName}'.`,
        'success'
      );
    } catch (error: any) {
      console.error(`Error in addFileToDatabase: ${error}`);
      context.addMessage(`Error loading file: ${error.message || error}`, 'danger');
      throw error; // Rethrow to allow try/catch blocks to work
    } finally {
      // Ensure connection is closed even on error
      try {
        await this.closeConnection();
      } catch (closeError) {
        console.error('Error closing connection:', closeError);
      }
      context.setBusy(false);
    }
  }

  private async loadArrow(content: Uint8Array, tableName: string) {
    try {
      console.log('Starting loadArrow process');

      // Use the arrow stream directly from the file content
      console.log('Inserting Arrow data into table');

      const arrowFromIPC = tableFromIPC(content);
      const ipc = tableToIPC(arrowFromIPC);

      this.connection.insertArrowFromIPCStream(ipc, {
        name: tableName,
        schema: 'main',
        create: true,
      });

      // // Verify that the table was created and has data
      // console.log('Verifying table creation');
      // const tableExists = await this.checkTableExists(tableName);
      // if (!tableExists) {
      //   throw new Error(`Failed to create table ${tableName}`);
      // }

      // const rowCount = await this.getTableRowCount(tableName);
      // console.log(
      //   `Table ${tableName} created successfully with ${rowCount} rows.`
      // );
    } catch (error) {
      console.error(`Error in loadArrow: ${error}`);
      throw error;
    }
  }

  // private async checkTableExists(tableName: string): Promise<boolean> {
  //   try {
  //     const result = await this.connection.query(
  //       `SELECT name FROM sqlite_master WHERE type='table' AND name='${tableName}'`
  //     );
  //     return result.toArray().length > 0;
  //   } catch (error) {
  //     console.error(`Error checking if table exists: ${error}`);
  //     return false;
  //   }
  // }

  private async getTableRowCount(tableName: string): Promise<number> {
    try {
      const result = await this.connection.query(
        `SELECT COUNT(*) as count FROM ${tableName}`
      );
      return result.toArray()[0].count;
    } catch (error) {
      console.error(`Error getting table row count: ${error}`);
      return 0;
    }
  }

  private isCsvContent(content: string): boolean {
    // Simple check for CSV format - contains commas and newlines
    return content.includes(',') && content.includes('\n');
  }

  private isArrowContent(content: string): boolean {
    // Arrow files start with ARROW1 magic bytes
    // This is a simplified check
    try {
      const bytes = new TextEncoder().encode(content.substring(0, 6));
      return content.length > 8 && String.fromCharCode.apply(null, Array.from(bytes)) === 'ARROW1';
    } catch {
      return false;
    }
  }

  private isParquetContent(content: string): boolean {
    // Parquet files start with PAR1 magic bytes
    // This is a simplified check
    try {
      const bytes = new TextEncoder().encode(content.substring(0, 4));
      return content.length > 8 && String.fromCharCode.apply(null, Array.from(bytes)) === 'PAR1';
    } catch {
      return false;
    }
  }

  private async loadCsv(content: string, tableName: string) {
    try {
      const parsedCsv = this.papa.parse(content, {
        header: true,
        dynamicTyping: true,
      });

      if (parsedCsv.errors.length > 0) {
        throw new Error('CSV parsing error: ' + parsedCsv.errors[0].message);
      }

      const createTableSQL = this.generateCreateTableSQL(
        parsedCsv.data,
        tableName
      );
      await this.connection.query(createTableSQL);

      const insertDataSQL = this.generateInsertDataSQL(parsedCsv.data, tableName);
      await this.connection.query(insertDataSQL);
    } catch (error) {
      console.error(`Error in loadCsv: ${error}`);
      throw error;
    }
  }

  public async addDataToDatabase(args: any[], context: ChunkContext) {
    context.setBusy(true, 'Adding data to database');

    if (
      args.length < 2 ||
      typeof args[1] !== 'string' ||
      (typeof args[0] !== 'string' && !Array.isArray(args[0]))
    ) {
      const error = new Error('Invalid arguments. Expected [byteString, tableName].');
      context.addMessage(
        'Invalid arguments. Expected [byteString, tableName].',
        'danger'
      );
      context.setBusy(false);
      throw error;
    }

    let byteString = args[0];
    const tableName = args[1];

    if (Array.isArray(byteString)) {
      try {
        // Try to convert array to numeric byte values
        byteString = String.fromCharCode.apply(
          null,
          byteString.map((x) => parseInt(x, 10))
        );
      } catch {
        const error = new Error('Failed to parse byte array. Invalid format.');
        context.addMessage('Failed to parse byte array. Invalid format.', 'danger');
        context.setBusy(false);
        throw error;
      }
    }

    try {
      await this.openConnection();

      // First create or replace table
      context.setBusy(true, 'Creating or replacing table');
      await this.dropTableIfExists(tableName);

      // Detect file type based on the content
      if (this.isCsvContent(byteString)) {
        context.addMessage('Detected CSV format', 'info');
        await this.loadCsv(byteString, tableName);
      } else if (this.isArrowContent(byteString)) {
        context.addMessage('Detected Arrow format', 'info');
        // Convert string to binary data
        const binaryData = this.stringToBinary(byteString);
        await this.loadArrow(binaryData, tableName);
      } else if (this.isParquetContent(byteString)) {
        context.addMessage('Detected Parquet format', 'info');
        // Convert parquet to arrow first
        try {
          const convertResult = await this.convertService.parquetToArrow(
            [byteString],
            context
          );
          if (convertResult) {
            await this.loadArrow(convertResult, tableName);
          } else {
            throw new Error('Failed to convert parquet to arrow format');
          }
        } catch (conversionError: any) {
          const error = new Error(`Parquet conversion error: ${conversionError.message || conversionError}`);
          context.addMessage(`Parquet conversion error: ${conversionError.message || conversionError}`, 'danger');
          throw error;
        }
      } else {
        const error = new Error('Unsupported file format.');
        context.addMessage('Unsupported file format.', 'danger');
        context.setBusy(false);
        throw error;
      }

      context.addMessage(
        `Data successfully loaded into table '${tableName}'.`,
        'success'
      );
    } catch (error: any) {
      console.error(`Error in addDataToDatabase: ${error}`);
      context.addMessage(`Error loading data: ${error.message || error}`, 'danger');
      throw error; // Rethrow to allow try/catch blocks to work
    } finally {
      // Ensure connection is closed even on error
      try {
        await this.closeConnection();
      } catch (closeError) {
        console.error('Error closing connection:', closeError);
      }
      context.setBusy(false);
    }
  }

  private stringToBinary(str: string): Uint8Array {
    // If it's a comma-separated list of numbers
    if (GeneralHelpers.canBeParsedToNumberArray(str)) {
      const bytes = str.split(',').map(Number);
      return new Uint8Array(bytes);
    }
    
    // Otherwise treat as text and encode
    return new TextEncoder().encode(str);
  }

  private async dropTableIfExists(tableName: string) {
    const checkTableQuery = `
      SELECT name 
      FROM sqlite_master 
      WHERE type='table' AND name='${tableName}';
    `;

    const result = await this.connection.query(checkTableQuery);

    if (result.toArray().length > 0) {
      await this.connection.query(`DROP TABLE IF EXISTS ${tableName};`);
    }
  }

  private async processCSV(data: Uint8Array, tableName: string) {
    const csvString = new TextDecoder().decode(data);
    const parsedCsv = this.papa.parse(csvString, {
      header: true,
      dynamicTyping: true,
    });

    if (parsedCsv.errors.length > 0) {
      throw new Error('CSV parsing error: ' + parsedCsv.errors[0].message);
    }

    const createTableSQL = this.generateCreateTableSQL(
      parsedCsv.data,
      tableName
    );
    await this.connection.query(createTableSQL);

    const insertDataSQL = this.generateInsertDataSQL(parsedCsv.data, tableName);
    await this.connection.query(insertDataSQL);
  }

  private async processArrow(data: Uint8Array, tableName: string) {
    await this.connection.insertArrowFromIPCStream(data, {
      name: tableName,
      schema: 'main',
      create: true,
    });
  }

  private async processParquet(data: Uint8Array, tableName: string) {
    const tempFileName = `temp_${tableName}.parquet`;
    await this.database.registerFileBuffer(tempFileName, data);
    await this.connection.query(
      `CREATE TABLE ${tableName} AS SELECT * FROM parquet_scan('${tempFileName}')`
    );
    await this.database.dropFile(tempFileName);
  }

  private generateCreateTableSQL(data: any[], tableName: string): string {
    if (data.length === 0) {
      throw new Error('No data to create table');
    }

    const columns = Object.keys(data[0]).map((key) => {
      const value = data[0][key];
      let type = 'VARCHAR';
      if (typeof value === 'number') {
        type = Number.isInteger(value) ? 'INTEGER' : 'DOUBLE';
      } else if (typeof value === 'boolean') {
        type = 'BOOLEAN';
      }
      return `"${key}" ${type}`;
    });

    return `CREATE TABLE ${tableName} (${columns.join(', ')})`;
  }

  private generateInsertDataSQL(data: any[], tableName: string): string {
    const columns = Object.keys(data[0]);
    const values = data
      .map(
        (row) =>
          '(' +
          columns
            .map((col) => {
              const value = row[col];
              if (value === null || value === undefined) return 'NULL';
              if (typeof value === 'string')
                return `'${value.replace(/'/g, "''")}'`;
              return value;
            })
            .join(', ') +
          ')'
      )
      .join(', ');

    return `INSERT INTO ${tableName} (${columns
      .map((c) => `"${c}"`)
      .join(', ')}) VALUES ${values}`;
  }

  private async insertArrowFromIPCStream(
    buffer: Uint8Array,
    options?: ArrowInsertOptions
  ) {
    try {
      await this.openConnection();
      await this.connection.insertArrowFromIPCStream(buffer, {
        schema: 'main',
        ...options,
      } as ArrowInsertOptions);
    } catch (error) {
      console.log(error);
    } finally {
      await this.closeConnection();
    }
  }

  private async processGlobalModules() {
    const list: FileModule[] = await this.fileService.getFileModuleList([
      FILE_EXTENSIONS.csv,
      FILE_EXTENSIONS.json,
      FILE_EXTENSIONS.parquet,
      FILE_EXTENSIONS.arrow,
    ]);
    const hasModules = Array.isArray(list) && list.length > 0;
    if (hasModules) {
      for (let index = 0; index < list.length; index++) {
        const element = list[index];
        this.addModule(element);
      }
    }
  }

  private async addModule(element: FileModule) {
    const { name, extension } = element;
    if (this.fileList.includes(name)) {
      return;
    }
    const isFile = await this.checkFileByKey(name);
    if (isFile) {
      return;
    }
    try {
      switch (extension) {
        case FILE_EXTENSIONS.csv: {
          const csvData: unknown = await this.localforageService.getItem(name);
          if (csvData) {
            this.database.registerFileText(name, csvData as string);
          }
          break;
        }
        case FILE_EXTENSIONS.json: {
          const jsonData: any = await this.localforageService.getItem(name);
          if (jsonData) {
            const parsed = this.papa.unparse(jsonData);
            const csvName = name.substr(0, name.lastIndexOf('.')) + '.csv';
            this.database.registerFileText(csvName, parsed);
          }
          break;
        }
        case FILE_EXTENSIONS.parquet: {
          const parquetData: any = await this.localforageService.getItem(name);
          if (parquetData) {
            this.database.registerFileBuffer(name, new Uint8Array(parquetData));
          }
          break;
        }
        case FILE_EXTENSIONS.arrow: {
          const arrowData: any = await this.localforageService.getItem(name);
          if (arrowData) {
            const buffer = this.processArrowData(arrowData);
            this.insertArrowFromIPCStream(buffer, {
              name: name,
              create: true,
            });
          }
          break;
        }
        default:
          break;
      }
      this.fileList.push(name);
    } catch (e) {
      console.error(e);
    }
  }

  private processArrowData(arrowData: ArrayBuffer): Uint8Array {
    const table = tableFromIPC(arrowData);
    return tableToIPC(table);
  }

  private async checkFileByKey(key: string) {
    const files: duckdb.WebFile[] = await this.database.globFiles(key);
    if (files.length > 0) {
      return true;
    }
    return false;
  }

  private async dropAllTables() {
    await this.openConnection();
    const tables = await this.connection.getTableNames('*');
    for (const element of tables) {
      const tableName = element;
      await this.connection.query(`DROP TABLE ${tableName}`);
    }
    await this.closeConnection();
  }

  private async checkTableExists(tableName: string) {
    await this.openConnection();
    const tables = await this.connection.query('SHOW TABLES;');
    const tablesString = tables.toString();
    console.log('Existing tables', tablesString);
    await this.closeConnection();
    return tablesString.includes(tableName);
  }
}
