
























































































import * as R from 'ramda';
import dayjs from 'dayjs';
import utc from 'dayjs/plugin/utc';
import { TwButton, AlertBanner, ConfirmModal } from '@/app/components';
import { ApolloTask, ExternalKafkaHarvesterConfiguration, TFile } from '@/modules/apollo/types';
import { computed, defineComponent, PropType, reactive, ref, watch } from '@vue/composition-api';
import { ValidationObserver } from 'vee-validate';
import { LoadingStreamingSampleModal } from '../../../components';
import { ChevronRightIcon } from '@vue-hero-icons/outline';
import { FileType, KafkaProcessingOptions } from '@/modules/apollo/constants';
import { ScheduleType } from '@/modules/workflow-designer/types';
import { useApolloTask, useHarvester } from '@/modules/apollo/composable';
import ConnectionDetails from './ConnectionDetails.vue';
import { SampleUpload, RetrieveUntilSettings, ProcessingOptions } from '../common';
import { useAxios } from '@vue-composable/axios';
import { KafkaAPI } from '@/modules/apollo/api';

dayjs.extend(utc);

export default defineComponent({
    name: 'SetupExternalKafkaHarvester',
    components: {
        AlertBanner,
        SampleUpload,
        ConfirmModal,
        ValidationObserver,
        TwButton,
        ChevronRightIcon,
        ConnectionDetails,
        ProcessingOptions,
        RetrieveUntilSettings,
        LoadingStreamingSampleModal,
    },
    model: {
        prop: 'task',
    },
    props: {
        task: {
            type: Object as PropType<ApolloTask<ExternalKafkaHarvesterConfiguration>>,
            required: true,
        },
        loading: {
            type: Boolean,
            default: false,
        },
        sample: {
            type: [Array, Object],
            required: false,
        },
        sampleFile: {
            type: File as PropType<TFile>,
            required: false,
        },
        schedules: {
            type: Array as PropType<ScheduleType[]>,
            default: () => [],
        },
    },
    setup(props, { emit, root }) {
        const { loading: loadingTest, exec } = useAxios(true);
        const { inDraftStatus, inUpdateStatus, inDeprecatedStatus, isFinalized, pipelineFinalized } = useApolloTask(
            computed(() => props.task),
        );
        const {
            limitResponse,
            parseXMLString,
            reduceSampleValues,
            checkMissingFields,
            cancelSelectionChange,
            confirmSelectionChange,
            convertSelectedItemsToArray,
            showPartialMatchModal,
            showNoMatchModal,
            isReset,
            separator,
        } = useHarvester(root, emit);

        const errorAlert: any = reactive({
            title: null,
            body: null,
            showIgnore: false,
        });
        const externalKafkaValidationRef = ref<any>(null);
        const emptySampleResponse = ref<boolean>(false);
        const initialConnectionDetails = ref<any>(R.clone(props.task.configuration.connectionDetails));

        const selection = computed(() =>
            props.task.configuration.response.selectedItems.filter((item: string) => item.includes(separator)),
        );

        const invalidNumSchedules = computed(
            () => props.task.configuration.processing === KafkaProcessingOptions.TimeBased && !props.schedules.length,
        );

        const connectionDetailsChanged = computed(
            () =>
                JSON.stringify(initialConnectionDetails.value) !==
                JSON.stringify(props.task.configuration.connectionDetails),
        );

        const sampleFileIsRequired = computed(
            () => !props.sampleFile && emptySampleResponse.value && !connectionDetailsChanged.value,
        );

        const maxEndDateForProcessing = computed(() =>
            props.task.configuration.retrieval?.endDate ? new Date(props.task.configuration.retrieval.endDate) : null,
        );

        const invalidSchedulesEndDate = computed(() => {
            const today = dayjs().utc();
            const invalidSchedules = props.schedules.filter((schedule: ScheduleType) =>
                dayjs.utc(schedule.endDate).add(1, 'day').isBefore(today),
            );
            return !!invalidSchedules.length;
        });

        const modifiedErrorMessage = (error: any) => {
            const startsWith = error ? error.message.split(' ').slice(0, 2).join(' ') : null;
            errorAlert.title = 'Failed action';
            if (error.message) {
                if (error.message.includes('group coordinator')) {
                    errorAlert.body = `Failed to find Group Id ${props.task.configuration.connectionDetails.groupId}. Please update the connection details to proceed.`;
                    return;
                }

                switch (startsWith) {
                    case 'SASL SCRAM':
                        errorAlert.body = `Authentication failed due to invalid credentials with SASL mechanism ${props.task.configuration.connectionDetails.saslMechanism}. Please update the connection details to proceed.`;
                        break;
                    case 'SASL NONE':
                        errorAlert.body = `Authentication failed. Please update the connection details to proceed.`;
                        break;
                    case 'Connection error:':
                    case 'Failed to':
                        errorAlert.body = `Connecting to ${props.task.configuration.connectionDetails.url} failed. Please update the connection details to proceed.`;
                        break;
                    case 'Not authorized':
                        errorAlert.body = `Not authorised to access topic ${props.task.configuration.connectionDetails.topic}. Please update the connection details to proceed.`;
                        break;
                    default:
                        errorAlert.body = error ? `Testing failed with message: ${error.message}` : null;
                }
            }
        };

        const resetErrorAlerts = () => {
            errorAlert.title = null;
            errorAlert.body = null;
            emptySampleResponse.value = false;
        };

        const testCredentialsAndCreateSample = () => {
            const connectionDetails = props.task.configuration.connectionDetails;
            const fileType = props.task.configuration.fileType;

            exec(KafkaAPI.testCredentialsAndCreateSample(connectionDetails, fileType))
                .then((res: any) => {
                    if (!res.data || (fileType === FileType.JSON && !res.data.length)) {
                        emptySampleResponse.value = true;
                        errorAlert.title = 'Failed to retrieve sample.';
                        errorAlert.body =
                            'Please upload a sample in the "Sample Streaming Data Upload" section below. Important note: the sample must be an exact match of the messages that will be published in the specific kafka topic.';
                    } else {
                        const data = fileType === FileType.XML ? parseXMLString(res.data.toString()) : res.data;
                        const sampleData = reduceSampleValues(limitResponse(data, 20));
                        emit('sample-uploaded', sampleData);
                        if ((!inUpdateStatus.value || isReset.value) && props.task.configuration.response.selectedItems)
                            emit('update-selected-items', []);
                        if (sampleData && inUpdateStatus.value && selection.value.length && !isReset.value) {
                            const items = convertSelectedItemsToArray(selection.value);
                            const missingFields = checkMissingFields(sampleData, items);
                            if (missingFields) return;
                            else
                                emit(
                                    'update-selected-items',
                                    convertSelectedItemsToArray(props.task.configuration.response.selectedItems),
                                );
                        }
                        emit('next-tab');
                    }
                })
                .catch((error) => {
                    if (error && error.response.data) modifiedErrorMessage(error.response.data);
                    (root as any).$toastr.e('PubSub mechanism connection failed', 'Error');
                });
        };

        const validateAndProceed = async () => {
            // Validate form (configuration)
            if (!externalKafkaValidationRef.value) return;
            const valid = await externalKafkaValidationRef.value.validate();
            if (!valid) return;

            // Check for invalid schedules (dates are in the past)
            const inclusiveDate = dayjs(props.task.configuration.retrieval.endDate).add(1, 'day'); // add 1 extra day in order to make it inclusive
            if (!pipelineFinalized.value && (inclusiveDate.isBefore(dayjs().utc()) || invalidSchedulesEndDate.value)) {
                const errorMessage =
                    props.task.configuration.processing === KafkaProcessingOptions.TimeBased
                        ? 'End Dates of Schedules are in the past. Please update them accordingly to continue.'
                        : 'Retrieve Until Date is in the past. Please update it accordingly to continue.';
                (root as any).$toastr.e(errorMessage, 'Invalid Retrieve Until Date');
                return;
            }

            resetErrorAlerts();

            if (!isFinalized.value && (connectionDetailsChanged.value || (!props.sample && !props.sampleFile))) {
                if (!inUpdateStatus.value || isReset.value) emit('update-selected-items', []);
                initialConnectionDetails.value = R.clone(props.task.configuration.connectionDetails);
                testCredentialsAndCreateSample();
            } else {
                emit('next-tab');
            }
        };

        const confirmSelection = (reset: boolean) => {
            confirmSelectionChange(props.sampleFile, props.sample, selection.value, reset);
        };

        const sampleUploaded = (parsedSample: any) => {
            if (parsedSample && inUpdateStatus.value && selection.value.length && !isReset.value)
                checkMissingFields(parsedSample, selection.value);
            emit('sample-uploaded', parsedSample);
        };

        watch(
            () => connectionDetailsChanged.value,
            async () => {
                resetErrorAlerts();
                emit('sample-file-changed', null);
            },
        );

        return {
            showPartialMatchModal,
            showNoMatchModal,
            externalKafkaValidationRef,
            errorAlert,
            loadingTest,
            KafkaProcessingOptions,
            inUpdateStatus,
            inDraftStatus,
            isFinalized,
            pipelineFinalized,
            inDeprecatedStatus,
            invalidNumSchedules,
            emptySampleResponse,
            sampleFileIsRequired,
            maxEndDateForProcessing,
            validateAndProceed,
            confirmSelection,
            cancelSelectionChange,
            sampleUploaded,
        };
    },
});
