import {
  BatchDataSourceType,
  DataSourceFCO,
  DataSourceFCOColumn,
  DataSourceFCOFields,
  DataSourceFCOType,
  DataSourceFileFormat,
  FCOFields,
  FCOKeyValuePair,
  FCOType,
  StreamingDataSourceType,
} from '../../core/types/fcoTypes';
import { Fco } from '../../types/tecton_proto/data/fco';
import { VirtualDataSource } from '../../types/tecton_proto/data/virtual_data_source';
import { dataTypeToString } from '../../utils/feature-view/feature-view-util';
import { getSharedFCOProperties } from './fcoUtils';
import { UserDefinedFunction } from '../../types/tecton_proto/args/user_defined_function';
import { SparkBatchDataSourceFunction } from '../../types/tecton_proto/data/batch_data_source';
import { SparkField } from '../../types/tecton_proto/common/spark_schema';
import { Column } from '../../types/tecton_proto/common/schema';
import { DataSourceType } from '../../types/tecton_proto/common/data_source_type';
import { Duration } from '../../types/google/protobuf/duration';

export const transformDataSourceProtoToDataSourceFCO: (fco: Fco) => DataSourceFCO = (fco) => {
  const protoDataSource: VirtualDataSource = fco.virtual_data_source!;
  const metadata = protoDataSource.fco_metadata!;

  const batchProperties = getBatchProperties(protoDataSource);
  const streamProperties = getStreamProperties(protoDataSource);

  const asDataSourceFco: DataSourceFCO = {
    ...getSharedFCOProperties(metadata, protoDataSource.virtual_data_source_id),
    fcoType: FCOType.DATA_SOURCE,
    dataSourceType: getDataSourceType(protoDataSource),
    streamProperties: streamProperties,
    columns: getColumns(protoDataSource),
    tectonSchemeColumns: getTectonSchemeColumns(protoDataSource),
    batchProperties: batchProperties,
    batchFullyQualifiedName: getFullyQualifiedName(batchProperties),
    rawBatchTranslator: getRawBatchTranslator(protoDataSource),
    rawStreamTranslator: getRawStreamTranslator(protoDataSource),
    batchDataSourceFunction: getBatchDataSourceFunction(protoDataSource),
    streamFullyQualifiedName: getFullyQualifiedName(streamProperties), // Based on stream properties and defined post instantiation,
    batchDataSource: protoDataSource.batch_data_source,
    fileFormat: getFileFormat(protoDataSource),
    streamDataSource: protoDataSource.stream_data_source,
    [DataSourceFCOFields.BATCH_SOURCE]: getUnderlyingBatchDataSourceType(protoDataSource),
    underlyingStreamDataSourceType: getUnderlyingStreamDataSourceType(protoDataSource),
    [DataSourceFCOFields.DEPENDENT_TRANSFORMATIONS]: [],
    [DataSourceFCOFields.DEPENDENT_FEATURE_VIEWS]: [],
    [DataSourceFCOFields.DEPENDENT_FEATURE_SERVICES]: [],
    [FCOFields.PREVENT_DESTROY]: protoDataSource.validation_args?.args?.prevent_destroy,
    [DataSourceFCOFields.BATCH_DATA_DELAY]: getBatchDelay(protoDataSource.batch_data_source?.data_delay),
  };

  return asDataSourceFco;
};

const getBatchDelay: (rawDuration: Duration | undefined) => Duration = (rawDuration) => {
  // This is a workaround for the fact that there's a bug where the values that we're getting for duration aren't actually of Duration type
  // Somewhere along the line, they're serialized into strings
  // Making this change here allows us to correctly type these values for usage down the line
  return {
    seconds: rawDuration ? rawDuration.toString().replace('s', '') : undefined,
  };
};

const getStreamProperties: (dataSource: VirtualDataSource) => FCOKeyValuePair[] = (dataSource) => {
  if (!dataSource.stream_data_source) {
    return [];
  }

  if (dataSource.stream_data_source.kinesis_data_source) {
    return [
      {
        name: 'streamName',
        value: dataSource.stream_data_source.kinesis_data_source.stream_name!,
      },
    ];
  }

  if (dataSource.stream_data_source.kafka_data_source) {
    return [
      {
        name: 'topics',
        value: dataSource.stream_data_source.kafka_data_source.topics!,
      },
    ];
  }

  if (dataSource.stream_data_source.spark_data_source_function) {
    return [
      {
        name: 'type',
        value: 'Data Source Function',
      },
    ];
  }

  return [
    {
      name: 'type',
      value: 'Unknown',
    },
  ];
};

const getDataSourceType: (dataSource: VirtualDataSource) => DataSourceFCOType = (dataSource) => {
  if (dataSource.data_source_type === 'STREAM_WITH_BATCH') {
    return DataSourceFCOType.STREAM;
  }
  if (dataSource.data_source_type === 'PUSH_NO_BATCH' || dataSource.data_source_type === 'PUSH_WITH_BATCH') {
    return DataSourceFCOType.STREAM;
  }

  return DataSourceFCOType.BATCH;
};

const getColumns: (dataSource: VirtualDataSource) => DataSourceFCOColumn[] = (dataSource) => {
  // Since the stream ds has a subset of columns, return stream's columns if present

  const schema = dataSource.stream_data_source
    ? dataSource.stream_data_source.spark_schema
    : dataSource.batch_data_source?.spark_schema;
  const columns: DataSourceFCOColumn[] = [];
  if (!schema || Object.keys(schema).length == 0) {
    if (dataSource?.schema?.tecton_schema?.columns) {
      dataSource?.schema?.tecton_schema?.columns.forEach((rawColumn) => {
        columns.push({
          name: rawColumn.name,
          type: rawColumn.offline_data_type?.type,
        });
      });
    }

    return columns;
  }

  schema.fields?.forEach((fld: SparkField) => {
    const fldSchema = JSON.parse(fld.structfield_json ? fld.structfield_json : '');
    let type = '-';
    if (typeof fldSchema['type'] === 'string') {
      type = fldSchema['type'];
    } else if (typeof fldSchema['type'] === 'object' && typeof fldSchema['type']['type'] === 'string') {
      type = fldSchema['type']['type'];
    }

    const member = { name: fld.name, type: type };
    if (member as DataSourceFCOColumn) {
      columns.push(member as DataSourceFCOColumn);
    }
  });

  return columns;
};

const getTectonSchemeColumns: (dataSource: VirtualDataSource) => DataSourceFCOColumn[] = (dataSource) => {
  const columns: DataSourceFCOColumn[] = [];
  const schemaColumns = dataSource.schema?.tecton_schema?.columns;

  if (getDataSourceType(dataSource).toUpperCase().includes('PUSH') && Array.isArray(schemaColumns)) {
    schemaColumns.forEach((field: Column) => {
      columns.push({
        name: field.name,
        type: dataTypeToString(field?.offline_data_type)?.toLowerCase(),
      });
    });
  }

  return columns;
};

const getBatchProperties: (dataSource: VirtualDataSource) => FCOKeyValuePair[] = (dataSource) => {
  if (!dataSource.batch_data_source) {
    return [];
  }

  if (dataSource.batch_data_source.hive_table) {
    return [
      {
        name: 'database',
        value: dataSource.batch_data_source.hive_table.database,
      },
      {
        name: 'table',
        value: dataSource.batch_data_source.hive_table.table,
      },
    ];
  }

  if (dataSource.batch_data_source.file) {
    return [
      {
        name: 'uri',
        value: dataSource.batch_data_source.file.uri,
      },
    ];
  }

  if (dataSource.batch_data_source.redshift_db) {
    return [
      {
        name: 'database',
        value: dataSource.batch_data_source.redshift_db.database,
      },
      {
        name: 'table',
        value: dataSource.batch_data_source.redshift_db.table,
      },
    ];
  }

  if (dataSource.batch_data_source.snowflake) {
    return [
      {
        name: 'database',
        value: dataSource.batch_data_source.snowflake.snowflakeArgs?.database,
      },
      {
        name: 'schema',
        value: dataSource.batch_data_source.snowflake.snowflakeArgs?.schema,
      },
      {
        name: 'table',
        value: dataSource.batch_data_source.snowflake.snowflakeArgs?.table,
      },
      {
        name: 'warehouse',
        value: dataSource.batch_data_source.snowflake.snowflakeArgs?.warehouse,
      },
    ];
  }

  if (dataSource.batch_data_source.spark_data_source_function) {
    return [
      {
        name: 'type',
        value: 'Data Source Function',
      },
    ];
  }

  if (dataSource.data_source_type === DataSourceType.PUSH_NO_BATCH) {
    return [];
  }

  return [
    {
      name: 'type',
      value: 'Unknown',
    },
  ];
};

const getFullyQualifiedName: (props: FCOKeyValuePair[]) => string = (props) => {
  return props.map((prop) => prop.value).join('.');
};

const getRawBatchTranslator: (dataSource: VirtualDataSource) => UserDefinedFunction | undefined = (dataSource) => {
  return dataSource.batch_data_source && dataSource.batch_data_source.raw_batch_translator
    ? dataSource.batch_data_source.raw_batch_translator
    : undefined;
};

const getRawStreamTranslator: (dataSource: VirtualDataSource) => UserDefinedFunction | undefined = (dataSource) => {
  return dataSource.stream_data_source && dataSource.stream_data_source.raw_stream_translator
    ? dataSource.stream_data_source.raw_stream_translator
    : undefined;
};

const getBatchDataSourceFunction: (dataSource: VirtualDataSource) => SparkBatchDataSourceFunction | undefined = (
  dataSource
) => {
  return dataSource.batch_data_source?.spark_data_source_function
    ? dataSource.batch_data_source?.spark_data_source_function
    : undefined;
};

const getFileFormat: (dataSource: VirtualDataSource) => DataSourceFileFormat | undefined = (dataSource) => {
  const file = dataSource.batch_data_source?.file;
  if (file && file.format) {
    if (file.format === 'FILE_DATA_SOURCE_FORMAT_JSON') {
      return DataSourceFileFormat.JSON;
    }
    if (file.format === 'FILE_DATA_SOURCE_FORMAT_PARQUET') {
      return DataSourceFileFormat.PARQUET;
    }
    if (file.format === 'FILE_DATA_SOURCE_FORMAT_CSV') {
      return DataSourceFileFormat.CSV;
    }
  }
  return undefined;
};

const getUnderlyingBatchDataSourceType: (dataSource: VirtualDataSource) => BatchDataSourceType | undefined = (
  dataSource
) => {
  if (dataSource.data_source_type === 'PUSH_NO_BATCH') {
    return undefined;
  }

  if (dataSource.batch_data_source?.snowflake) {
    return BatchDataSourceType.SNOWFLAKE;
  }

  if (dataSource.batch_data_source?.redshift_db) {
    return BatchDataSourceType.REDSHIFT;
  }

  if (dataSource.batch_data_source?.file) {
    return BatchDataSourceType.FILE;
  }

  if (dataSource.batch_data_source?.hive_table) {
    return BatchDataSourceType.HIVE;
  }

  if (dataSource.batch_data_source?.spark_data_source_function) {
    return BatchDataSourceType.DATA_SOURCE_FUNCTION;
  }

  if (dataSource.batch_data_source?.unity_table) {
    return BatchDataSourceType.UNITY;
  }

  if (dataSource.batch_data_source?.pandas_data_source_function) {
    return BatchDataSourceType.PANDAS;
  }

  return undefined;
};

const getUnderlyingStreamDataSourceType: (dataSource: VirtualDataSource) => StreamingDataSourceType | undefined = (
  dataSource
) => {
  if (dataSource.stream_data_source?.kinesis_data_source) {
    return StreamingDataSourceType.KINESIS;
  }

  if (dataSource.stream_data_source?.kafka_data_source) {
    return StreamingDataSourceType.KAFKA;
  }

  if (dataSource.stream_data_source?.spark_data_source_function) {
    return StreamingDataSourceType.DATA_SOURCE_FUNCTION;
  }

  if (dataSource.stream_data_source) {
    return StreamingDataSourceType.PUSH;
  }

  return undefined;
};
