import {
  asyncFilter,
  asyncIter,
  Duration,
  enforce,
  enforce_nonnull,
  Logger,
  pause,
  same,
  Timestamp,
  unreachable,
} from "vscript";
import * as VAPI from "../artefacts/vapi/index.js";
import {
  BackwardsCompatibleInterfaceSpec,
  collect_rtp_interfaces,
  extract_port_indices,
  getHostingPort,
  resolve_interfaces_backwards_compatibly,
} from "./network.js";
import { RollbackStack } from "./rollback_stack.js";
import { split_sdp } from "./sdp.js";
import {
  delete_row,
  find_vm_and_count,
  ResourceCount,
  time_ref,
} from "./utils.js";
import { is_higher_than_3G } from "./video.js";

export interface RTPInterfaces {
  primary: VAPI.NetworkInterfaces.VirtualInterface[];
  secondary: VAPI.NetworkInterfaces.VirtualInterface[];
}

export async function await_read_delay(
  rcv: VAPI.Any.RTPReceiver.VideoReceiver | VAPI.Any.RTPReceiver.AudioReceiver,
  pars?: { timeout?: Duration; max_jitter_s?: number },
) {
  return enforce_nonnull(
    await rcv.generic.timing.read_delay.overall.wait_until(
      (rd) => rd !== null,
      {
        timeout: pars?.timeout ?? new Duration(10, "s"),
      },
    ),
  );
}

export function lock_to_genlock<
  R extends
    | VAPI.Any.RTPReceiver.AudioReceiver
    | VAPI.Any.RTPReceiver.VideoReceiver,
>(rcv: R): any /* :( */ {
  if (
    rcv instanceof VAPI.AT1101.RTPReceiver.AudioReceiver ||
    rcv instanceof VAPI.AT1101.RTPReceiver.VideoReceiver
  ) {
    return { variant: "LockToGenlock", value: {} };
  } else {
    // FIXME: ugly
    const vm = VAPI.VM.adopt(rcv.raw.backing_store);
    enforce(vm instanceof VAPI.AT1130.Root);
    enforce(!!vm.genlock);
    return {
      variant: "LockToGenlock",
      value: { genlock: vm.genlock.instances.row(0) },
    };
  }
}

export async function categorize_sessions<
  Ss extends
    | VAPI.AT1101.RTPReceiver.Session[]
    | VAPI.AT1130.RTPReceiver.Session[],
>(
  sessions: Ss,
): Promise<{
  audio: Ss[0][];
  video: Ss[0][];
  mixed: Ss[0][];
  medialess: Ss[0][];
}> {
  const audio: Ss[0][] = [];
  const video: Ss[0][] = [];
  const mixed: Ss[0][] = [];
  const medialess: Ss[0][] = [];
  await Promise.all(
    sessions.map(async (s: VAPI.Any.RTPReceiver.Session) => {
      const numAudio = (await s.audio_receivers.read()).length;
      const numVideo = (await s.video_receivers.read()).length;
      if (numAudio > 0 && numVideo > 0) {
        mixed.push(s);
      } else if (numAudio > 0) {
        audio.push(s);
      } else if (numVideo > 0) {
        video.push(s);
      } else {
        medialess.push(s);
      }
    }),
  );
  return { audio, video, mixed, medialess };
}

// FIXME: inject into vapi
export function all_tracks_are(t: VAPI.RTPReceiver.Track) {
  return (
    tracks:
      | VAPI.RTPReceiver.UsedSessionTracks
      | VAPI.RTPReceiver.UsedReceiverTracks,
  ) =>
    tracks.current_target === t &&
    tracks.next_target === t &&
    tracks.ingress === t &&
    tracks.egress === t;
}

export function max_video_capabilities(
  vm: VAPI.VM.Any,
): VAPI.Any.RTPReceiver.VideoCapabilities | null {
  if (!vm.r_t_p_receiver) return null;
  enforce(!!vm.genlock);
  return {
    read_speed:
      vm instanceof VAPI.AT1101.Root
        ? { variant: "LockToGenlock", value: {} }
        : {
            variant: "LockToGenlock",
            value: { genlock: vm.genlock.instances.row(0) },
          },
    jpeg_xs_caliber:
      vm.r_t_p_receiver.runtime_constants.max_jpeg_xs_video_receivers > 0
        ? "JPEG_XS_upto_3G" /* FIXME: use UHD */
        : null,
    st2042_2_caliber:
      vm.r_t_p_receiver.runtime_constants.max_st2042_2_video_receivers > 0
        ? "ST2042_2_singlelink_uhd"
        : null,
    supports_2022_6: true,
    supports_2110_40: true,
    supports_clean_switching: true,
    supports_uhd_sample_interleaved: false, //TODO: patch T715 supports_uhd_sample_interleaved is not supported
    st2110_20_caliber:
      vm.r_t_p_receiver.runtime_constants.max_st2110_20_uhd_video_receivers > 0
        ? "ST2110_singlelink_uhd"
        : vm.r_t_p_receiver.runtime_constants.max_native_video_receivers > 0
          ? "ST2110_upto_3G"
          : null,
  };
}

export async function create_video_receiver(
  vm: VAPI.VM.Any,
  caps?: Partial<VAPI.Any.RTPReceiver.VideoCapabilities>,
) {
  enforce(!!vm.r_t_p_receiver);
  const rs = new RollbackStack();
  try {
    const s = await vm.r_t_p_receiver?.sessions.create_row();
    const ifcs = await collect_rtp_interfaces(vm);
    await s.interfaces.command.write({
      primary: ifcs.primary[0],
      secondary: ifcs.secondary[0],
    });
    const v = await vm.r_t_p_receiver.video_receivers.create_row();
    rs.push(async () => {
      await s.delete();
    });
    rs.push(async () => {
      await v.delete();
    });
    const max_caps = enforce_nonnull(max_video_capabilities(vm));
    await v.media_specific.capabilities.command.write({
      ...max_caps,
      ...caps,
    } as any);
    //TODO parameterize this
    await v.generic.timing.target.command.write({
      variant: "TimeSource",
      value: {
        t_src: time_ref(vm.p_t_p_clock.output),
        use_rtp_timestamp: false,
      },
    });
    await v.generic.hosting_session.command.write(s);
    await s.switch_type.command.write({
      variant: "MakeBeforeBreak",
      value: { switch_time: new Timestamp(0) },
    });
    rs.push(async () => {
      await s.active.command.write(false);
    });
    await s.active.command.write(true);
    return v;
  } catch (_e) {
    await rs.rollback();
    throw new Error();
  }
}
export async function create_audio_receiver(vm: VAPI.VM.Any) {
  enforce(!!vm.r_t_p_receiver);
  const rs = new RollbackStack();
  try {
    const s = await vm.r_t_p_receiver?.sessions.create_row();
    const ifcs = await collect_rtp_interfaces(vm);
    await s.interfaces.command.write({
      primary: ifcs.primary[0],
      secondary: ifcs.secondary[0],
    });
    const a = await vm.r_t_p_receiver.audio_receivers.create_row();
    rs.push(async () => {
      await s.delete();
    });
    rs.push(async () => {
      await a.delete();
    });
    const caps: VAPI.Any.RTPReceiver.AudioCapabilities = {
      channel_capacity: 80,
      payload_limit: "AtMost1984Bytes",
      read_speed: lock_to_genlock(a),
      supports_clean_switching: true,
    };
    await a.media_specific.capabilities.command.write(caps);
    //TODO parameterize this
    await a.generic.timing.target.command.write({
      variant: "IngressPlusX",
      value: { read_delay: new Duration(10, "ms") },
    });
    await a.generic.hosting_session.command.write(s);
    await s.switch_type.command.write({
      variant: "MakeBeforeBreak",
      value: { switch_time: new Timestamp(0) },
    });
    rs.push(async () => {
      await s.active.command.write(false);
    });
    await s.active.command.write(true);
    return a;
  } catch (_e) {
    await rs.rollback();
    throw new Error();
  }
}

type ReceiverSessions =
  | VAPI.AT1130.RTPReceiver.SessionAsNamedTableRow
  | VAPI.AT1101.RTPReceiver.SessionAsNamedTableRow
  | VAPI.AT1130.RTPReceiver.Session
  | VAPI.AT1101.RTPReceiver.Session;

export async function switch_to(
  rxs: ReceiverSessions,
  sdp: string,
  pars?: {
    timeout?: Duration;
    ensure_state?: boolean;
    switch_type?: VAPI.RTPReceiver.SwitchType;
    log?: Logger;
  },
) {
  const params = {
    timeout: pars?.timeout ?? new Duration(5, "s"),
    ensure_state: pars?.ensure_state ?? true,
    switch_type:
      pars?.switch_type ??
      ((await rxs.supports_clean_switching.read())
        ? {
            variant: "MakeBeforeBreak",
            value: { switch_time: new Timestamp(1) },
          }
        : { variant: "Patch", value: {} }),
  };
  await rxs.switch_type.command.write(params.switch_type);
  const tr = await rxs.used_tracks.read();
  await rxs.set_sdp(tr?.next_target == "A" ? "B" : "A", sdp);
  if (params.ensure_state) {
    return async () => {
      await pause(params.timeout);
      let state = await rxs.used_tracks.read();
      if (state.egress != state.current_target) {
        pars?.log?.(
          "Receiver State seems improper; Forcing state via Patch",
          "Debug",
        );
        await rxs.switch_type.command.write({ variant: "Patch", value: {} });
        await rxs.set_sdp("A", sdp);
      }
      state = await rxs.used_tracks.read();
      return state.egress == state.current_target;
    };
  }
  return async () => {
    const state = await rxs.used_tracks.read();
    return state.egress == state.current_target;
  };
}

// matches precisely, i.e., sdp being a subset of some currently active other sdp will not return a match
export async function find_sessions_listening_to_sdp(
  vm: VAPI.VM.Any,
  sdp: string,
) {
  if (!vm.r_t_p_receiver)
    throw new Error(
      `${vm.raw.ip} doesn't even have rtp receivers, please check your system configuration`,
    );
  const close_enough = (sdp_a: string, sdp_b: string) => {
    const split_a = split_sdp(sdp_a);
    const split_b = split_sdp(sdp_b);
    if (split_a.media_descriptions.length !== split_b.media_descriptions.length)
      return false;
    for (let i = 0; i < split_a.media_descriptions.length; ++i) {
      const a = split_a.media_descriptions[i];
      const b = split_b.media_descriptions[i];
      // we assume that identical addresses designate the same flow,
      // even when used on different interfaces (not necessarily true
      // in general, but should be true in our e2e tests)
      if (a.media_type === b.media_type && a.src === b.src && a.dst === b.dst)
        return true;
    }
    return false;
  };
  return asyncFilter(
    await vm.r_t_p_receiver.sessions.rows(),
    async (session) => {
      const candidates = await (async () => {
        switch ((await session.used_tracks.read()).current_target) {
          case "Empty":
            return [];
          case "A":
            return [await session.sdp_a.status.read()];
          case "B":
            return [await session.sdp_b.status.read()];
          case "AB":
            return [
              await session.sdp_a.status.read(),
              await session.sdp_b.status.read(),
            ];
        }
      })();
      return (
        candidates.find((candidate) => close_enough(candidate, sdp)) !==
        undefined
      );
    },
  );
}

// don't use this with too many receivers, doesn't currently check allocation counts
export async function maximize_buffers(vm: VAPI.VM.Any): Promise<void> {
  enforce(!!vm.r_t_p_receiver);
  if (vm instanceof VAPI.AT1101.Root) {
    await vm.r_t_p_receiver.settings.buffer_sizes.command.write({
      for_2022_6: "UpTo64KiPackets",
      for_2042_2: "UpTo64KiPackets",
      for_2110_20_other: "UpTo64KiPackets",
      for_2110_20_uhd_singlelink: "UpTo64KiPackets",
      for_audio: "UpTo64KiPackets",
      for_jpeg_xs: "UpTo64KiPackets",
      for_passthrough: "UpTo64KiPackets",
    });
  } else {
    await vm.r_t_p_receiver.settings.buffer_sizes.command.write({
      for_2022_6: "UpTo96MB",
      for_2042_2: "UpTo96MB",
      for_2110_20_other: "UpTo96MB",
      for_2110_20_uhd_singlelink: "UpTo96MB",
      for_audio: "UpTo96MB",
      for_jpeg_xs: "UpTo96MB",
      for_passthrough: "UpTo96MB",
    });
  }
}

export function reserved_bandwidth_for_capabilities(
  cap:
    | GenlockLessVideoCapabilities
    | GenlockLessAudioCapabilities
    | VAPI.Any.RTPReceiver.VideoCapabilities
    | VAPI.Any.RTPReceiver.AudioCapabilities, // FIXME: clumsy types here
) {
  if ("st2110_20_caliber" in cap) {
    const vcap = cap as GenlockLessVideoCapabilities; // KLUDGE: not quite true; but we don't care about read_speed here
    let bits_per_sec = 0;
    const bump_to = (bps: number) =>
      (bits_per_sec = Math.max(bits_per_sec, bps));
    if (
      vcap.st2110_20_caliber === "ST2110_singlelink_uhd" ||
      ((!!vcap.st2110_20_caliber || vcap.supports_2022_6) &&
        vcap.supports_uhd_sample_interleaved)
    ) {
      bump_to(12e9);
    }
    if (vcap.supports_2022_6 || !!vcap.st2110_20_caliber) bump_to(3e9);
    if (!!vcap.jpeg_xs_caliber) {
      const min_compression_rate = 5; // FIXME: shouldn't hardcode this
      bump_to(
        (vcap.jpeg_xs_caliber === "JPEG_XS_upto_3G" ? 3e9 : 12e9) /
          min_compression_rate,
      );
    }
    if (!!vcap.st2042_2_caliber) {
      const min_compression_rate = 2.5;
      bump_to(
        (vcap.st2042_2_caliber === "ST2042_2_upto_3G" ? 3e9 : 12e9) /
          min_compression_rate,
      );
    }
    if (bits_per_sec === 0) debugger;
    return { bits_per_sec };
  } else {
    const max_pps = 8000;
    const max_bytes_per_packet = 1536;
    return { bits_per_sec: max_bytes_per_packet * max_pps * 8 };
  }
}

export async function reserved_bandwidth(
  rcv: VAPI.Any.RTPReceiver.VideoReceiver | VAPI.Any.RTPReceiver.AudioReceiver,
): Promise<{
  bits_per_sec: number;
  ports: Array<VAPI.Any.NetworkInterfaces.Port>;
}> {
  const session = await rcv.generic.hosting_session.status.read();
  const cap = await rcv.media_specific.capabilities.status.read();
  if (!cap || !session) return { bits_per_sec: 0, ports: [] };
  return {
    ports: await (async () => {
      const result: Array<VAPI.Any.NetworkInterfaces.Port> = [];
      const ifcs = await session.interfaces.status
        .read()
        .then((ifcs) => [ifcs.primary, ifcs.secondary]);
      for (const ifc of ifcs) {
        if (!ifc) continue;
        const port = await getHostingPort(ifc);
        if (result.find((other_port) => same(port, other_port)) === undefined)
          result.push(port);
      }
      return result;
    })(),
    bits_per_sec: reserved_bandwidth_for_capabilities(cap).bits_per_sec,
  };
}

export async function max_video_receivers(
  pars: {
    vm: VAPI.VM.Any;
    capabilities: GenlockLessVideoCapabilities;
    preexisting_receivers: "bulldoze" | "preserve";
    log: (msg: string) => void;
    standard?: VAPI.Video.Standard | VAPI.Video.Standard[];
    bandwidth_constraints?: "obey" /* the default */ | "ignore";
  } & BackwardsCompatibleInterfaceSpec,
) {
  if (!pars.vm.r_t_p_receiver) return 0;
  const rtp_receiver = pars.vm.r_t_p_receiver;
  const max_num_by_bandwidth = async () => {
    if ((pars.bandwidth_constraints ?? "obey") === "ignore") return Infinity;
    const rtp_ports = await asyncFilter(
      await pars.vm.network_interfaces.ports.rows(),
      async (port) => await port.supports_rtp.read(),
    );
    const bandwidth_left = await (async (): Promise<
      Map<VAPI.Any.NetworkInterfaces.Port, { bits_per_sec: number }>
    > => {
      let blocked_bandwidth: Map<
        VAPI.Any.NetworkInterfaces.Port,
        { bits_per_sec: number }
      > = new Map();
      if (pars.preexisting_receivers === "preserve") {
        await asyncIter(
          await rtp_receiver.video_receivers.rows(),
          async (tx) => {
            const { bits_per_sec, ports } = await reserved_bandwidth(tx);
            for (const port of ports) {
              blocked_bandwidth.set(port, {
                bits_per_sec:
                  bits_per_sec +
                  (blocked_bandwidth.get(port) ?? { bits_per_sec: 0 })
                    .bits_per_sec,
              });
            }
          },
        );
      }
      const result = new Map<
        VAPI.Any.NetworkInterfaces.Port,
        { bits_per_sec: number }
      >();
      await asyncIter(rtp_ports, async (port) => {
        const total_bandwidth = enforce_nonnull(
          await port.max_throughput.wait_until(
            (bandwidth) => bandwidth !== null,
          ),
        );
        result.set(port, {
          bits_per_sec:
            total_bandwidth - (blocked_bandwidth.get(port)?.bits_per_sec ?? 0),
        });
      });
      return result;
    })();
    const bits_per_sec_per_rcv = reserved_bandwidth_for_capabilities(
      pars.capabilities,
    ).bits_per_sec;
    enforce(bits_per_sec_per_rcv >= 0);
    if (bits_per_sec_per_rcv === 0) return Infinity;
    let max_num = 0;
    while (true) {
      for (const port_index of extract_port_indices(pars, max_num)) {
        const entry = enforce_nonnull(
          bandwidth_left.get(rtp_ports[port_index]),
        );
        if (entry.bits_per_sec < bits_per_sec_per_rcv) return max_num;
        bandwidth_left.set(rtp_ports[port_index], {
          bits_per_sec: entry.bits_per_sec - bits_per_sec_per_rcv,
        });
        ++max_num;
      }
    }
  };
  // FIXME: doesn't take backend packing constraints into account
  const blocked_receivers = {
    n_native: 0,
    n_jpegxs: 0,
    n_2042_uhd: 0,
    n_2042: 0,
    n_2110_20_uhd: 0,
    n_2110_20: 0,
    n_2110_40: 0,
  };
  let blocked_servos = 0;
  // const leftover_blocks_by_channel = Array.from(
  //   Array(rtp_receiver.runtime_constants.video_memory_channels).keys()
  // ).map((_) => rtp_receiver.runtime_constants.blocks_per_memory_channel);
  if (pars.preexisting_receivers === "preserve") {
    await asyncIter(await rtp_receiver.video_receivers.rows(), async (rcv) => {
      const cap = await rcv.media_specific.capabilities.status.read();
      if (!cap) return;
      // FIXME: we're not seeing the channel index -- should expose that via KW interface?
      if (!!cap.jpeg_xs_caliber) blocked_receivers.n_jpegxs++;
      if (cap.supports_uhd_sample_interleaved) blocked_receivers.n_native -= 4;
      else if (!!cap.supports_2022_6 || !!cap.st2110_20_caliber)
        blocked_receivers.n_native++;
      if (cap.supports_2110_40) blocked_receivers.n_2110_40++;
      if (cap.st2042_2_caliber === "ST2042_2_singlelink_uhd")
        blocked_receivers.n_2042_uhd++;
      if (!!cap.st2042_2_caliber) blocked_receivers.n_2042++;
      if (cap.read_speed.variant === "Adaptive") blocked_servos++;
    });
  }
  const leftover = (() => {
    let result = Infinity;
    if (!!pars.capabilities.jpeg_xs_caliber) {
      result = Math.min(
        result,
        rtp_receiver.runtime_constants.max_jpeg_xs_video_receivers -
          blocked_receivers.n_jpegxs,
      );
    }
    if (pars.capabilities.st2042_2_caliber === "ST2042_2_singlelink_uhd") {
      result = Math.min(
        result,
        rtp_receiver.runtime_constants.max_st2042_2_uhd_video_receivers -
          blocked_receivers.n_2042_uhd,
      );
    }
    if (!!pars.capabilities.st2042_2_caliber) {
      result = Math.min(
        result,
        rtp_receiver.runtime_constants.max_st2042_2_video_receivers -
          blocked_receivers.n_2042,
      );
    }
    if (pars.capabilities.st2110_20_caliber === "ST2110_singlelink_uhd") {
      result = Math.min(
        result,
        rtp_receiver.runtime_constants.max_st2110_20_uhd_video_receivers -
          blocked_receivers.n_2110_20_uhd,
      );
    }
    if (
      !!pars.capabilities.st2110_20_caliber ||
      !!pars.capabilities.supports_2022_6
    ) {
      result = Math.min(
        result,
        rtp_receiver.runtime_constants.max_native_video_receivers -
          blocked_receivers.n_native,
      );
    }
    if (pars.capabilities.supports_2110_40) {
      result = Math.min(
        result,
        rtp_receiver.runtime_constants.max_metadata_receivers -
          blocked_receivers.n_2110_40,
      );
    }
    if (pars.capabilities.supports_uhd_sample_interleaved) {
      result = Math.floor(result / 4);
    }

    const available_servos =
      pars.vm.r_t_p_receiver.runtime_constants.num_video_servos -
      blocked_servos;
    if (pars.capabilities.read_speed === "Adaptive")
      result = Math.min(result, available_servos);
    // FIXME: doesn't take buffer sizes into account
    return result;
  })();
  let max_by_memory = Infinity;
  // we'll just assume for now the AT1101 isn't ram-limited
  if (pars.vm instanceof VAPI.AT1130.Root) {
    const buffer_sizes =
      await pars.vm.r_t_p_receiver.settings.buffer_sizes.status.read();
    // FIXME: doesn't take 2110-40 buffers into account
    const req_nouhd_2110_20 = Math.max(
      // pars.capabilities.jpeg_xs_caliber ? to_blocks_at1130(buffer_sizes.for_jpeg_xs) : 0,
      pars.capabilities.st2042_2_caliber
        ? to_blocks_at1130(buffer_sizes.for_2042_2)
        : 0,
      !!pars.capabilities.st2110_20_caliber
        ? to_blocks_at1130(buffer_sizes.for_2110_20_other)
        : 0,
      pars.capabilities.supports_2022_6
        ? to_blocks_at1130(buffer_sizes.for_2022_6)
        : 0,
    );
    const required_blocks_per_receiver =
      (pars.capabilities.supports_clean_switching ? 2 : 1) *
      (Math.max(
        req_nouhd_2110_20,
        pars.capabilities.st2110_20_caliber === "ST2110_singlelink_uhd"
          ? to_blocks_at1130(buffer_sizes.for_2110_20_uhd_singlelink)
          : 0,
      ) +
        (pars.capabilities.supports_uhd_sample_interleaved
          ? 3 * req_nouhd_2110_20
          : 0));
    const blocks_leftover =
      await pars.vm.r_t_p_receiver.diagnostics.memory_channels.read();
    if (pars.preexisting_receivers === "bulldoze") {
      for (const channel of blocks_leftover) {
        channel.blocks_left =
          pars.vm.r_t_p_receiver.runtime_constants.blocks_per_memory_channel;
      }
    }
    // FIXME: doesn't respect channel boundaries -- one channel may have plenty of ram left but no decoders, or vice versa
    // (though that's rather unlikely)

    // FIXME: jpegxs/native receivers may or may not be sharing the same channels... for now we'll just
    // ignore jpegxs
    max_by_memory = blocks_leftover.reduce(
      (acc, channel) =>
        acc +
        (!!channel.connected_to.video_native.find((maybe_i) => maybe_i !== null)
          ? Math.floor(channel.blocks_left / required_blocks_per_receiver)
          : 0),
      0,
    );
  }
  return leftover > 0
    ? Math.min(leftover, await max_num_by_bandwidth(), max_by_memory)
    : leftover;
}

export function to_blocks_at1101(
  buffersize: VAPI.AT1101.RTPReceiver.BufferSize,
): number {
  switch (buffersize) {
    case "UpTo4KiPackets":
      return 1;
    case "UpTo8KiPackets":
      return 2;
    case "UpTo16KiPackets":
      return 4;
    case "UpTo32KiPackets":
      return 8;
    case "UpTo64KiPackets":
      return 16;
  }
}

export function to_blocks_at1130(
  buffersize: VAPI.AT1130.RTPReceiver.BufferSize,
): number {
  switch (buffersize) {
    case "UpTo4MB":
      return 1;
    case "UpTo8MB":
      return 2;
    case "UpTo12MB":
      return 3;
    case "UpTo16MB":
      return 4;
    case "UpTo24MB":
      return 6;
    case "UpTo32MB":
      return 8;
    case "UpTo48MB":
      return 12;
    case "UpTo96MB":
      return 24;
  }
}

export async function max_audio_receivers(
  pars: {
    vm: VAPI.VM.Any;
    capabilities: GenlockLessAudioCapabilities;
    preexisting_receivers: "bulldoze" | "preserve";
    log: (msg: string) => void;
    // NOTE: doesn't consider bandwidth constraints at all
    bandwidth_constraints?: "obey" /* the default */ | "ignore";
  } & BackwardsCompatibleInterfaceSpec,
) {
  if (!pars.vm.r_t_p_receiver) return 0;
  const rtp_receiver = pars.vm.r_t_p_receiver;
  const cap = to_full_audio_capabilities(pars.capabilities);
  const buffer_blocks_per_receiver =
    (await (async () => {
      if (pars.vm instanceof VAPI.AT1101.Root) {
        return to_blocks_at1101(
          enforce_nonnull(
            (await pars.vm.r_t_p_receiver?.settings.buffer_sizes.status.read())
              ?.for_audio,
          ),
        );
      } else {
        return to_blocks_at1130(
          enforce_nonnull(
            (await pars.vm.r_t_p_receiver?.settings.buffer_sizes.status.read())
              ?.for_audio,
          ),
        );
      }
    })()) * (cap.supports_clean_switching ? 2 : 1);
  const required_fifo_blocks =
    pars.vm instanceof VAPI.AT1101.Root
      ? async () => 0
      : async (
          cap: Pick<
            VAPI.AT1130.RTPReceiver.AudioCapabilities,
            "supports_clean_switching" | "payload_limit"
          >,
        ) => {
          switch (cap.payload_limit) {
            case "AtMost448Bytes":
              return 1;
            case "AtMost960Bytes":
              return 2;
            case "AtMost1984Bytes":
              return 4;
          }
          unreachable();
        };
  let max_by_memory = 0;
  if (pars.vm instanceof VAPI.AT1130.Root) {
    const fifo_buffer_blocks_per_receiver = await required_fifo_blocks(cap);
    const blocks_leftover =
      await pars.vm.r_t_p_receiver.diagnostics.memory_channels.read();
    if (pars.preexisting_receivers === "bulldoze") {
      for (const entry of blocks_leftover) {
        for (const x of entry.connected_to.audio) {
          if (x)
            x.fifo_blocks_left =
              pars.vm.r_t_p_receiver.runtime_constants.audio_fifo_blocks_per_memory_channel;
        }
        entry.blocks_left =
          pars.vm.r_t_p_receiver.runtime_constants.blocks_per_memory_channel;
      }
    }
    for (const entry of blocks_leftover) {
      // FIXME: doesn't correctly take decoder limitations into account on DS1128
      const max_by_blocks = Math.floor(
        entry.blocks_left / buffer_blocks_per_receiver,
      );
      const max_by_fifo = entry.connected_to.audio.reduce(
        (acc, x) =>
          acc +
          Math.floor(
            (x?.fifo_blocks_left ?? 0) / fifo_buffer_blocks_per_receiver,
          ),
        0,
      );
      max_by_memory += Math.min(max_by_blocks, max_by_fifo);
    }
  } else {
    // we'll just assume for now the AT1101 isn't ram-limited
    max_by_memory = Infinity;
  }
  let blocked_channels = 0;
  let blocked_backends = 0;
  let blocked_servos = 0;
  if (pars.preexisting_receivers === "preserve") {
    await asyncIter(await rtp_receiver.audio_receivers.rows(), async (rcv) => {
      const cap = await rcv.media_specific.capabilities.status.read();
      if (!cap) return;
      blocked_servos += cap.read_speed.variant === "Adaptive" ? 1 : 0;
      blocked_channels += ((cap.channel_capacity + 15) / 16) * 16;
      blocked_backends += 1;
    });
  }
  let leftover = Math.min(
    rtp_receiver.runtime_constants.max_audio_receivers - blocked_backends,
    Math.floor(
      (rtp_receiver.runtime_constants.total_2110_30_audio_slices * 16 -
        blocked_channels) /
        cap.channel_capacity,
    ),
    max_by_memory,
  );
  const available_servos =
    pars.vm.r_t_p_receiver.runtime_constants.num_audio_servos - blocked_servos;
  if (pars.capabilities.read_speed === "Adaptive")
    leftover = Math.min(leftover, available_servos);
  return leftover;
}

// KLUDGE: using AT1130 here since it's a strict superset of the AT1101 variant anyway
export type GenlockLessAudioCapabilities = Omit<
  VAPI.AT1130.RTPReceiver.AudioCapabilities,
  "read_speed"
> & {
  read_speed: "LockToGenlock" | "Adaptive";
};

function to_full_audio_capabilities(
  cap: Partial<GenlockLessAudioCapabilities>,
): GenlockLessAudioCapabilities {
  return {
    channel_capacity: 16,
    payload_limit: "AtMost448Bytes", // ?
    supports_clean_switching: true,
    read_speed: "LockToGenlock",
    ...cap,
  };
}

export type GenlockLessVideoCapabilities = Omit<
  VAPI.AT1130.RTPReceiver.VideoCapabilities,
  "read_speed"
> & {
  read_speed: "LockToGenlock" | "Adaptive";
};

function to_full_video_capabilities(
  cap: Partial<GenlockLessVideoCapabilities>,
): GenlockLessVideoCapabilities {
  return {
    st2110_20_caliber: "ST2110_upto_3G",
    jpeg_xs_caliber: null,
    st2042_2_caliber: null,
    supports_2022_6: true,
    supports_2110_40: true,
    supports_uhd_sample_interleaved: false,
    supports_clean_switching: true,
    read_speed: "LockToGenlock",
    ...cap,
  };
}

export function to_video_capabilities(
  tf: VAPI.Video.TransportFormat,
  std?: VAPI.Video.Standard,
): Partial<GenlockLessVideoCapabilities> {
  switch (tf) {
    case "ST2022_6":
      return { supports_2022_6: true };
    case "ST2042_raw":
      return {
        st2042_2_caliber:
          !std || is_higher_than_3G(std)
            ? "ST2042_2_singlelink_uhd"
            : "ST2042_2_upto_3G",
        supports_2110_40: true,
      };
    case "ST2110_22_JpegXS":
      return {
        jpeg_xs_caliber:
          !std || is_higher_than_3G(std)
            ? "JPEG_XS_singlelink_uhd"
            : "JPEG_XS_upto_3G",
        supports_2110_40: true,
      };
    case "ST2110_BPM":
    case "ST2110_GPM":
      return {
        st2110_20_caliber:
          !std || is_higher_than_3G(std)
            ? "ST2110_singlelink_uhd"
            : "ST2110_upto_3G",
        supports_2110_40: true,
      };
  }
}

export function has_caliber_set(
  cap: Partial<GenlockLessVideoCapabilities>,
): boolean {
  return (
    cap.jpeg_xs_caliber !== undefined ||
    cap.st2042_2_caliber !== undefined ||
    cap.st2110_20_caliber !== undefined ||
    cap.supports_2022_6 !== undefined
  );
}

export function video_capability_union(
  cap1: Partial<GenlockLessVideoCapabilities>,
  cap2: Partial<GenlockLessVideoCapabilities>,
): Partial<GenlockLessVideoCapabilities> {
  const result: Partial<GenlockLessVideoCapabilities> = {
    ...(cap1.supports_2022_6 !== undefined || cap2.supports_2022_6 !== undefined
      ? { supports_2022_6: cap1.supports_2022_6 || cap2.supports_2022_6 }
      : {}),
    ...(cap1.supports_2110_40 !== undefined ||
    cap2.supports_2110_40 !== undefined
      ? { supports_2110_40: cap1.supports_2110_40 || cap2.supports_2110_40 }
      : {}),
    ...(cap1.supports_clean_switching !== undefined ||
    cap2.supports_clean_switching !== undefined
      ? {
          supports_clean_switching:
            cap1.supports_clean_switching || cap2.supports_clean_switching,
        }
      : {}),
    ...(cap1.supports_uhd_sample_interleaved !== undefined ||
    cap2.supports_uhd_sample_interleaved !== undefined
      ? {
          supports_uhd_sample_interleaved:
            cap1.supports_uhd_sample_interleaved ||
            cap2.supports_uhd_sample_interleaved,
        }
      : {}),
    ...(!!cap1.jpeg_xs_caliber || !!cap2.jpeg_xs_caliber
      ? {
          jpeg_xs_caliber:
            cap1.jpeg_xs_caliber === "JPEG_XS_singlelink_uhd" ||
            cap2.jpeg_xs_caliber === "JPEG_XS_singlelink_uhd"
              ? "JPEG_XS_singlelink_uhd"
              : "JPEG_XS_upto_3G",
        }
      : {}),
    ...(!!cap1.st2042_2_caliber || !!cap2.st2042_2_caliber
      ? {
          st2042_2_caliber:
            cap1.st2042_2_caliber === "ST2042_2_singlelink_uhd" ||
            cap2.st2042_2_caliber === "ST2042_2_singlelink_uhd"
              ? "ST2042_2_singlelink_uhd"
              : "ST2042_2_upto_3G",
        }
      : {}),
    ...(!!cap1.st2110_20_caliber || !!cap2.st2110_20_caliber
      ? {
          st2110_20_caliber:
            cap1.st2110_20_caliber === "ST2110_singlelink_uhd" ||
            cap2.st2110_20_caliber === "ST2110_singlelink_uhd"
              ? "ST2110_singlelink_uhd"
              : "ST2110_upto_3G",
        }
      : {}),
  };
  const int_of_read_speed = (x?: "Adaptive" | "LockToGenlock") => {
    if (!x) return 0;
    return x === "LockToGenlock" ? 1 : 2;
  };
  for (const cap of [cap1, cap2]) {
    if (
      !!cap.read_speed &&
      int_of_read_speed(cap.read_speed) > int_of_read_speed(result.read_speed)
    ) {
      result.read_speed = cap.read_speed;
    }
  }
  return result;
}

type CommonSetupOptions = {
  count: ResourceCount;
  // will be tried in the specified order, though maximization will overrule vm order
  // in case of count === MAX
  vms: VAPI.VM.Any[];
  preexisting_receivers: "bulldoze" | "preserve";
  log: (msg: string) => void;
  bandwidth_constraints?: "obey" /* the default */ | "ignore";
  verbose?: boolean;
} & BackwardsCompatibleInterfaceSpec;

export async function setup_video_receivers(
  pars: CommonSetupOptions & {
    // non-rtp-capable ports will be skipped over, so on our 2-port builds these will be 0 or 1
    capabilities: Partial<GenlockLessVideoCapabilities>;
  },
): Promise<
  | VAPI.AT1101.RTPReceiver.VideoReceiverAsNamedTableRow[]
  | VAPI.AT1130.RTPReceiver.VideoReceiverAsNamedTableRow[]
> {
  const cap = to_full_video_capabilities(pars.capabilities);
  const [vm, count] = await find_vm_and_count(
    pars.vms,
    async (vm) => await max_video_receivers({ ...pars, capabilities: cap, vm }),
    pars.count,
    {
      resource: "video receiver",
      specs: JSON.stringify({
        capabilities: cap,
        bandwidth_constraints: pars.bandwidth_constraints,
        preexisting_receivers: pars.preexisting_receivers,
      }),
    },
  );
  if (count === 0) return [];
  enforce(!!vm && !!vm.r_t_p_receiver);
  if (pars.preexisting_receivers === "bulldoze") {
    await asyncIter(await vm.r_t_p_receiver.video_receivers.rows(), (rcv) =>
      abandon(rcv),
    );
    await vm.r_t_p_receiver.video_receivers.delete_all();
  }
  const receivers = await (async (): Promise<
    VAPI.Any.RTPReceiver.VideoReceiver[]
  > => {
    enforce(!!vm.r_t_p_receiver);
    switch (pars.preexisting_receivers) {
      case "bulldoze":
        return enforce_nonnull(
          vm.r_t_p_receiver,
        ).video_receivers.ensure_allocated(count, "exactly");
      case "preserve": {
        const result: Array<VAPI.Any.RTPReceiver.VideoReceiver> = [];
        while (result.length < count) {
          result.push(
            enforce_nonnull(
              await enforce_nonnull(
                vm.r_t_p_receiver,
              ).video_receivers.create_row(),
            ),
          );
        }
        return result;
      }
    }
  })();
  await asyncIter(receivers, async (rcv, i) => {
    if (vm instanceof VAPI.AT1101.Root) {
      enforce(rcv instanceof VAPI.AT1101.RTPReceiver.VideoReceiver);
      await rcv.media_specific.capabilities.command.write({
        ...cap,
        read_speed: { variant: cap.read_speed, value: {} },
      });
    } else {
      enforce(rcv instanceof VAPI.AT1130.RTPReceiver.VideoReceiver);
      await rcv.media_specific.capabilities.command.write({
        ...cap,
        read_speed:
          cap.read_speed === "LockToGenlock"
            ? {
                variant: "LockToGenlock",
                value: {
                  genlock: enforce_nonnull(vm.genlock).instances.row(0),
                },
              }
            : { variant: "Adaptive", value: {} },
      });
    }
    let hosting_session = await rcv.generic.hosting_session.status.read();
    if (hosting_session) await hosting_session.active.command.write(false);
    if (!hosting_session) {
      hosting_session = enforce_nonnull(
        await enforce_nonnull(vm.r_t_p_receiver).sessions.create_row(),
      );
      enforce(!!hosting_session);
      await rcv.generic.hosting_session.command.write(hosting_session);
      await hosting_session.interfaces.command.write(
        await resolve_interfaces_backwards_compatibly(pars, vm, i),
      );
    }
  });
  // FIXME: eliminate typecast
  return receivers as
    | VAPI.AT1101.RTPReceiver.VideoReceiverAsNamedTableRow[]
    | VAPI.AT1130.RTPReceiver.VideoReceiverAsNamedTableRow[];
}

async function abandon(
  rcv: VAPI.Any.RTPReceiver.AudioReceiver | VAPI.Any.RTPReceiver.VideoReceiver,
): Promise<void> {
  const session = await rcv.generic.hosting_session.status.read();
  if (!session) return;
  const old_active = await session.active.status.read();
  await session.active.command.write(false);
  const video_receivers = (await session.video_receivers.read()).map((rcv) =>
    enforce_nonnull(rcv),
  ) as VAPI.Any.RTPReceiver.VideoReceiver[];
  const audio_receivers = (await session.audio_receivers.read()).map((rcv) =>
    enforce_nonnull(rcv),
  ) as VAPI.Any.RTPReceiver.AudioReceiver[];
  const passthrough_receivers = (
    await session.passthrough_receivers.read()
  ).map((rcv) =>
    enforce_nonnull(rcv),
  ) as VAPI.Any.RTPReceiver.PassthroughReceiver[];
  if (
    !(
      !!video_receivers.find((rcv2) => !!rcv2 && !same(rcv, rcv2)) ||
      !!audio_receivers.find(
        (audio_rcv) => !!audio_rcv && !same(audio_rcv, rcv),
      ) ||
      !!passthrough_receivers.find((ptx) => !!ptx)
    )
  ) {
    await delete_row(session); // session would have been unused afterwards
  } else {
    await session.active.command.write(old_active);
  }
}

export async function setup_audio_receivers(
  pars: CommonSetupOptions & {
    capabilities: Partial<GenlockLessAudioCapabilities>;
  },
): Promise<
  | VAPI.AT1101.RTPReceiver.AudioReceiverAsNamedTableRow[]
  | VAPI.AT1130.RTPReceiver.AudioReceiverAsNamedTableRow[]
> {
  const cap = to_full_audio_capabilities(pars.capabilities);
  const [vm, count] = await find_vm_and_count(
    pars.vms,
    async (vm) => await max_audio_receivers({ ...pars, capabilities: cap, vm }),
    pars.count,
    {
      resource: "audio receiver",
      specs: JSON.stringify({
        capabilities: cap,
        preexisting_receivers: pars.preexisting_receivers,
      }),
    },
  );
  if (count === 0) return [];
  enforce(!!vm && !!vm.r_t_p_receiver);
  if (pars.preexisting_receivers === "bulldoze") {
    await asyncIter(await vm.r_t_p_receiver.audio_receivers.rows(), (rcv) =>
      abandon(rcv),
    );
    await vm.r_t_p_receiver.audio_receivers.delete_all();
  }
  const receivers = await (async (): Promise<
    VAPI.Any.RTPReceiver.AudioReceiver[]
  > => {
    enforce(!!vm.r_t_p_receiver);
    switch (pars.preexisting_receivers) {
      case "bulldoze":
        return enforce_nonnull(
          vm.r_t_p_receiver,
        ).audio_receivers.ensure_allocated(count, "exactly");
      case "preserve": {
        const result: Array<VAPI.Any.RTPReceiver.AudioReceiver> = [];
        while (result.length < count) {
          result.push(
            enforce_nonnull(
              await enforce_nonnull(
                vm.r_t_p_receiver,
              ).audio_receivers.create_row(),
            ),
          );
        }
        return result;
      }
    }
  })();
  // FIXME: deduplicate?
  await asyncIter(receivers, async (rcv, i) => {
    if (vm instanceof VAPI.AT1101.Root) {
      enforce(rcv instanceof VAPI.AT1101.RTPReceiver.AudioReceiver);
      await rcv.media_specific.capabilities.command.write({
        ...cap,
        read_speed:
          cap.read_speed === "LockToGenlock"
            ? { variant: "LockToGenlock", value: {} }
            : { variant: "Adaptive", value: {} },
      });
    } else {
      enforce(rcv instanceof VAPI.AT1130.RTPReceiver.AudioReceiver);
      await rcv.media_specific.capabilities.command.write({
        ...cap,
        read_speed:
          cap.read_speed === "LockToGenlock"
            ? {
                variant: "LockToGenlock",
                value: {
                  genlock: enforce_nonnull(vm.genlock).instances.row(0),
                },
              }
            : { variant: "Adaptive", value: {} },
      });
    }
    let hosting_session = await rcv.generic.hosting_session.status.read();
    if (hosting_session) await hosting_session.active.command.write(false);
    if (!hosting_session) {
      hosting_session = enforce_nonnull(
        await enforce_nonnull(vm.r_t_p_receiver).sessions.create_row(),
      );
      enforce(!!hosting_session);
      await rcv.generic.hosting_session.command.write(hosting_session);
      await hosting_session.interfaces.command.write(
        await resolve_interfaces_backwards_compatibly(pars, vm, i),
      );
    }
  });
  // FIXME: eliminate typecast
  return receivers as
    | VAPI.AT1101.RTPReceiver.AudioReceiverAsNamedTableRow[]
    | VAPI.AT1130.RTPReceiver.AudioReceiverAsNamedTableRow[];
}
