import { setTimeout } from 'timers/promises';

import { MIN_SUPPORTED_SNAPSHOT_READS_WIRE_VERSION } from '../cmap/wire_protocol/constants';
import {
  isRetryableReadError,
  isRetryableWriteError,
  MongoCompatibilityError,
  MONGODB_ERROR_CODES,
  MongoError,
  MongoErrorLabel,
  MongoExpiredSessionError,
  MongoInvalidArgumentError,
  MongoNetworkError,
  MongoNotConnectedError,
  MongoRuntimeError,
  MongoServerError,
  MongoTransactionError,
  MongoUnexpectedServerResponseError
} from '../error';
import type { MongoClient } from '../mongo_client';
import { ReadPreference } from '../read_preference';
import { TopologyType } from '../sdam/common';
import {
  DeprioritizedServers,
  sameServerSelector,
  secondaryWritableServerSelector,
  type ServerSelector
} from '../sdam/server_selection';
import type { Topology } from '../sdam/topology';
import type { ClientSession } from '../sessions';
import { TimeoutContext } from '../timeout';
import { abortable, maxWireVersion, supportsRetryableWrites } from '../utils';
import { AggregateOperation } from './aggregate';
import { AbstractOperation, Aspect } from './operation';
import { RunCommandOperation } from './run_command';

const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation;
const MMAPv1_RETRY_WRITES_ERROR_MESSAGE =
  'This MongoDB deployment does not support retryable writes. Please add retryWrites=false to your connection string.';

type ResultTypeFromOperation<TOperation extends AbstractOperation> = ReturnType<
  TOperation['handleOk']
>;

/**
 * Executes the given operation with provided arguments.
 * @internal
 *
 * @remarks
 * Allows for a single point of entry to provide features such as implicit sessions, which
 * are required by the Driver Sessions specification in the event that a ClientSession is
 * not provided.
 *
 * The expectation is that this function:
 * - Connects the MongoClient if it has not already been connected, see {@link autoConnect}
 * - Creates a session if none is provided and cleans up the session it creates
 * - Tries an operation and retries under certain conditions, see {@link executeOperationWithRetries}
 *
 * @typeParam T - The operation's type
 * @typeParam TResult - The type of the operation's result, calculated from T
 *
 * @param client - The MongoClient to execute this operation with
 * @param operation - The operation to execute
 */
export async function executeOperation<
  T extends AbstractOperation,
  TResult = ResultTypeFromOperation<T>
>(client: MongoClient, operation: T, timeoutContext?: TimeoutContext | null): Promise<TResult> {
  if (!(operation instanceof AbstractOperation)) {
    // TODO(NODE-3483): Extend MongoRuntimeError
    throw new MongoRuntimeError('This method requires a valid operation instance');
  }

  const topology =
    client.topology == null
      ? await abortable(autoConnect(client), operation.options)
      : client.topology;

  // The driver sessions spec mandates that we implicitly create sessions for operations
  // that are not explicitly provided with a session.
  let session = operation.session;
  let owner: symbol | undefined;

  if (session == null) {
    owner = Symbol();
    session = client.startSession({ owner, explicit: false });
  } else if (session.hasEnded) {
    throw new MongoExpiredSessionError('Use of expired sessions is not permitted');
  } else if (
    session.snapshotEnabled &&
    maxWireVersion(topology) < MIN_SUPPORTED_SNAPSHOT_READS_WIRE_VERSION
  ) {
    throw new MongoCompatibilityError('Snapshot reads require MongoDB 5.0 or later');
  } else if (session.client !== client) {
    throw new MongoInvalidArgumentError('ClientSession must be from the same MongoClient');
  }

  operation.session ??= session;

  const readPreference = operation.readPreference ?? ReadPreference.primary;
  const inTransaction = !!session?.inTransaction();

  const hasReadAspect = operation.hasAspect(Aspect.READ_OPERATION);

  if (
    inTransaction &&
    !readPreference.equals(ReadPreference.primary) &&
    (hasReadAspect || operation.commandName === 'runCommand')
  ) {
    throw new MongoTransactionError(
      `Read preference in a transaction must be primary, not: ${readPreference.mode}`
    );
  }

  if (session?.isPinned && session.transaction.isCommitted && !operation.bypassPinningCheck) {
    session.unpin();
  }

  timeoutContext ??= TimeoutContext.create({
    session,
    serverSelectionTimeoutMS: client.s.options.serverSelectionTimeoutMS,
    waitQueueTimeoutMS: client.s.options.waitQueueTimeoutMS,
    timeoutMS: operation.options.timeoutMS
  });

  try {
    return await executeOperationWithRetries(operation, {
      topology,
      timeoutContext,
      session,
      readPreference
    });
  } finally {
    if (session?.owner != null && session.owner === owner) {
      await session.endSession();
    }
  }
}

/**
 * Connects a client if it has not yet been connected
 * @internal
 */
export async function autoConnect(client: MongoClient): Promise<Topology> {
  if (client.topology == null) {
    if (client.s.hasBeenClosed) {
      throw new MongoNotConnectedError('Client must be connected before running operations');
    }
    client.s.options.__skipPingOnConnect = true;
    try {
      await client.connect();
      if (client.topology == null) {
        throw new MongoRuntimeError(
          'client.connect did not create a topology but also did not throw'
        );
      }
      return client.topology;
    } finally {
      delete client.s.options.__skipPingOnConnect;
    }
  }
  return client.topology;
}

/** @internal */
type RetryOptions = {
  session: ClientSession | undefined;
  readPreference: ReadPreference;
  topology: Topology;
  timeoutContext: TimeoutContext;
};
/** @internal The base backoff duration in milliseconds */
const BASE_BACKOFF_MS = 100;
/** @internal The maximum backoff duration in milliseconds */
const MAX_BACKOFF_MS = 10_000;

/**
 * Executes an operation and retries as appropriate
 * @internal
 *
 * @remarks
 * Implements behaviour described in [Retryable Reads](https://github.com/mongodb/specifications/blob/master/source/retryable-reads/retryable-reads.md) and [Retryable
 * Writes](https://github.com/mongodb/specifications/blob/master/source/retryable-writes/retryable-writes.md) specification
 *
 * This function:
 * - performs initial server selection
 * - attempts to execute an operation
 * - retries the operation if it meets the criteria for a retryable read or a retryable write
 *
 * @typeParam T - The operation's type
 * @typeParam TResult - The type of the operation's result, calculated from T
 *
 * @param operation - The operation to execute
 */
async function executeOperationWithRetries<
  T extends AbstractOperation,
  TResult = ResultTypeFromOperation<T>
>(
  operation: T,
  { topology, timeoutContext, session, readPreference }: RetryOptions
): Promise<TResult> {
  let selector: ReadPreference | ServerSelector;

  if (operation.hasAspect(Aspect.MUST_SELECT_SAME_SERVER)) {
    // GetMore and KillCursor operations must always select the same server, but run through
    // server selection to potentially force monitor checks if the server is
    // in an unknown state.
    selector = sameServerSelector(operation.server?.description);
  } else if (operation instanceof AggregateOperation && operation.hasWriteStage) {
    // If operation should try to write to secondary use the custom server selector
    // otherwise provide the read preference.
    selector = secondaryWritableServerSelector(topology.commonWireVersion, readPreference);
  } else {
    selector = readPreference;
  }

  let server = await topology.selectServer(selector, {
    session,
    operationName: operation.commandName,
    timeoutContext,
    signal: operation.options.signal,
    deprioritizedServers: new DeprioritizedServers()
  });

  const hasReadAspect = operation.hasAspect(Aspect.READ_OPERATION);
  const hasWriteAspect = operation.hasAspect(Aspect.WRITE_OPERATION);
  const inTransaction = session?.inTransaction() ?? false;

  const willRetryRead = topology.s.options.retryReads && !inTransaction && operation.canRetryRead;

  const willRetryWrite =
    topology.s.options.retryWrites &&
    !inTransaction &&
    supportsRetryableWrites(server) &&
    operation.canRetryWrite;

  const willRetry =
    operation.hasAspect(Aspect.RETRYABLE) &&
    session != null &&
    ((hasReadAspect && willRetryRead) || (hasWriteAspect && willRetryWrite));

  if (hasWriteAspect && willRetryWrite && session != null) {
    operation.options.willRetryWrite = true;
    session.incrementTransactionNumber();
  }

  const deprioritizedServers = new DeprioritizedServers();

  let maxAttempts =
    typeof operation.maxAttempts === 'number'
      ? operation.maxAttempts
      : willRetry
        ? timeoutContext.csotEnabled()
          ? Infinity
          : 2
        : 1;

  let error: MongoError | null = null;

  for (let attempt = 0; attempt < maxAttempts; attempt++) {
    operation.attemptsMade = attempt + 1;
    operation.server = server;

    try {
      try {
        const result = await server.command(operation, timeoutContext);
        return operation.handleOk(result);
      } catch (error) {
        return operation.handleError(error);
      }
    } catch (operationError) {
      // Should never happen but if it does - propagate the error.
      if (!(operationError instanceof MongoError)) throw operationError;

      // Preserve the original error once a write has been performed.
      // Only update to the latest error if no writes were performed.
      if (error == null) {
        error = operationError;
      } else {
        if (!operationError.hasErrorLabel(MongoErrorLabel.NoWritesPerformed)) {
          error = operationError;
        }
      }

      // Reset timeouts
      timeoutContext.clear();

      if (hasWriteAspect && operationError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) {
        throw new MongoServerError({
          message: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
          errmsg: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
          originalError: operationError
        });
      }

      if (!canRetry(operation, operationError)) {
        throw error;
      }

      if (operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) {
        const maxOverloadAttempts = topology.s.options.maxAdaptiveRetries + 1;
        maxAttempts = Math.min(maxOverloadAttempts, operation.maxAttempts ?? maxOverloadAttempts);
      }

      if (attempt + 1 >= maxAttempts) {
        throw error;
      }

      if (
        operationError instanceof MongoNetworkError &&
        operation.hasAspect(Aspect.CURSOR_CREATING) &&
        session != null &&
        session.isPinned &&
        !session.inTransaction()
      ) {
        session.unpin({ force: true, forceClear: true });
      }

      if (
        operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError) &&
        operation.hasAspect(Aspect.CURSOR_CREATING) &&
        session != null &&
        session.isPinned &&
        !session.inTransaction()
      ) {
        session.unpin({ force: true });
      }

      if (operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) {
        const backoffMS = Math.random() * Math.min(MAX_BACKOFF_MS, BASE_BACKOFF_MS * 2 ** attempt);

        // if the backoff would exhaust the CSOT timeout, short-circuit.
        if (timeoutContext.csotEnabled() && backoffMS > timeoutContext.remainingTimeMS) {
          throw error;
        }

        await setTimeout(backoffMS);
      }

      if (
        topology.description.type === TopologyType.Sharded ||
        (operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError) &&
          topology.s.options.enableOverloadRetargeting)
      ) {
        deprioritizedServers.add(server.description);
      }

      server = await topology.selectServer(selector, {
        session,
        operationName: operation.commandName,
        deprioritizedServers,
        signal: operation.options.signal
      });

      if (
        hasWriteAspect &&
        !supportsRetryableWrites(server) &&
        !operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)
      ) {
        throw new MongoUnexpectedServerResponseError(
          'Selected server does not support retryable writes'
        );
      }

      // Batched operations must reset the batch before retry,
      // otherwise building a command will build the _next_ batch, not the current batch.
      if (operation.hasAspect(Aspect.COMMAND_BATCHING)) {
        operation.resetBatch();
      }
    }
  }

  throw (
    error ??
    new MongoRuntimeError(
      'Should never happen: operation execution loop terminated but no error was recorded.'
    )
  );

  function canRetry(operation: AbstractOperation, error: MongoError) {
    // SystemOverloadedError is retryable, but must respect retryReads/retryWrites settings
    // Check topology options directly (not operation.canRetryRead/Write) because backpressure
    // expands retry support beyond traditional retryable reads/writes
    // NOTE: Unlike traditional retries, backpressure retries ARE allowed inside transactions
    if (
      error.hasErrorLabel(MongoErrorLabel.SystemOverloadedError) &&
      error.hasErrorLabel(MongoErrorLabel.RetryableError)
    ) {
      // runCommand requires BOTH retryReads and retryWrites to be enabled (per spec step 2.4)
      if (operation instanceof RunCommandOperation) {
        return topology.s.options.retryReads && topology.s.options.retryWrites;
      }

      // Write-stage aggregates ($out/$merge) require retryWrites
      if (operation instanceof AggregateOperation && operation.hasWriteStage) {
        return topology.s.options.retryWrites;
      }

      // For other operations, check if retries are enabled based on operation type
      const canRetryAsRead = hasReadAspect && topology.s.options.retryReads;
      const canRetryAsWrite = hasWriteAspect && topology.s.options.retryWrites;
      return canRetryAsRead || canRetryAsWrite;
    }

    // run command is only retryable if we get retryable overload errors
    if (operation instanceof RunCommandOperation) {
      return false;
    }

    // batch operations are only retryable if the batch is retryable
    if (operation.hasAspect(Aspect.COMMAND_BATCHING)) {
      return operation.canRetryWrite && isRetryableWriteError(error);
    }

    return (
      (hasWriteAspect && willRetryWrite && isRetryableWriteError(error)) ||
      (hasReadAspect && willRetryRead && isRetryableReadError(error))
    );
  }
}
